From 8314965c8f208e82e29746133acb5b176038fd4a Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 4 Nov 2025 23:08:33 +0530 Subject: [PATCH] refactor: optimize OpenSearch query building in merge-suggestions --- .../src/activities/memberMergeSuggestions.ts | 429 +++++++++--------- .../organizationMergeSuggestions.ts | 252 ++++++---- 2 files changed, 381 insertions(+), 300 deletions(-) diff --git a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts index b93a8c3eae..326f7d0703 100644 --- a/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/memberMergeSuggestions.ts @@ -19,6 +19,7 @@ import { import { svc } from '../main' import MemberSimilarityCalculator from '../memberSimilarityCalculator' import { ISimilarMemberOpensearchResult, ISimilarityFilter } from '../types' +import { chunkArray } from '../utils' import { EMAIL_AS_USERNAME_PLATFORMS } from './common' @@ -46,234 +47,248 @@ export async function getMemberMergeSuggestions( const qx = pgpQx(svc.postgres.reader.connection()) const fullMember = await buildFullMemberForMergeSuggestions(qx, member) - const identitiesPartialQuery: any = { - should: [ - { - term: { - [`keyword_displayName`]: fullMember.displayName, - }, - }, - ], - minimum_should_match: 1, - must_not: [ - { - term: { - uuid_memberId: fullMember.id, - }, - }, - ], - must: [ - { - term: { - uuid_tenantId: tenantId, - }, - }, - ], - } - - // deduplicate identities, sort verified first + // Deduplicate and sort verified identities first const identities = uniqBy(fullMember.identities, (i) => `${i.platform}:${i.value}`).sort( (a, b) => (a.verified === b.verified ? 0 : a.verified ? -1 : 1), ) - if (identities && identities.length > 0) { - // push nested search scaffold for strong identities - identitiesPartialQuery.should.push({ - nested: { - path: 'nested_identities', - query: { - bool: { - should: [], - boost: 1, - minimum_should_match: 1, - }, - }, - }, - }) + // Early return if no identities to match against + if (identities.length === 0) { + return [] + } + + // Get members that should not be merged + const noMergeIds = await memberMergeSuggestionsRepo.findNoMergeIds(member.id) + const excludeIds = [fullMember.id] + if (noMergeIds && noMergeIds.length > 0) { + excludeIds.push(...noMergeIds) + } + + // Categorize identities into different query types + const verifiedExactMatches = [] + const verifiedEmailUsernameMatches = [] + const verifiedUsernameEmailMatches = [] + const verifiedFuzzyMatches = [] + + const unverifiedExactMatches = [] + const unverifiedEmailUsernameMatches = [] + const unverifiedUsernameEmailMatches = [] + + // Process up to 100 identities + // This is a safety limit to prevent OpenSearch max clause errors + for (const identity of identities.slice(0, 100)) { + if (identity.value && identity.value.length > 0) { + if (identity.verified) { + // Verified identities: exact match on unverified identities + verifiedExactMatches.push({ + value: identity.value, + platform: identity.platform, + }) - // prevent processing more than 100 identities because of opensearch limits (maxClauseCount = 1024) - for (const identity of identities.slice(0, 75)) { - if (identity.value && identity.value.length > 0) { - // For verified identities (either email or username) - // 1. Exact search the identity in other unverified identities - // 2. Fuzzy search the identity in other verified identities - if (identity.verified) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - match: { - [`nested_identities.string_platform`]: identity.platform, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: false, - }, - }, - ], - }, + // Email-as-username: verified email matching unverified username + if (identity.type === MemberIdentityType.EMAIL) { + verifiedEmailUsernameMatches.push({ + value: identity.value, }) + } + + // Email-as-username: verified username matching unverified email + if ( + identity.type === MemberIdentityType.USERNAME && + EMAIL_AS_USERNAME_PLATFORMS.includes(identity.platform as PlatformType) + ) { + verifiedUsernameEmailMatches.push({ + value: identity.value, + }) + } - // handle email as username platforms: email identity matching username identity - if (identity.type === MemberIdentityType.EMAIL) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - terms: { - [`nested_identities.string_platform`]: EMAIL_AS_USERNAME_PLATFORMS, - }, - }, - { - term: { - [`nested_identities.keyword_type`]: MemberIdentityType.USERNAME, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: false, - }, - }, - ], - }, - }) - } - - // handle email as username platforms: username identity matching email identity - if ( - identity.type === MemberIdentityType.USERNAME && - EMAIL_AS_USERNAME_PLATFORMS.includes(identity.platform as PlatformType) - ) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - term: { - [`nested_identities.keyword_type`]: MemberIdentityType.EMAIL, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: false, - }, - }, - ], - }, - }) - } - - // some identities have https? in the beginning, resulting in false positive suggestions - // remove these when making fuzzy and wildcard searches + // Fuzzy search for verified identities (non-numeric only) + if (Number.isNaN(Number(identity.value))) { const cleanedIdentityName = identity.value.replace(/^https?:\/\//, '') + verifiedFuzzyMatches.push({ + value: identity.value, + cleanedValue: cleanedIdentityName, + }) + } + } else { + // Unverified identities: exact match on verified identities + unverifiedExactMatches.push({ + value: identity.value, + platform: identity.platform, + }) - if (Number.isNaN(Number(identity.value))) { - // fuzzy search for identities - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { - match: { - [`nested_identities.string_value`]: { - query: cleanedIdentityName, - prefix_length: 1, - fuzziness: 'auto', - }, - }, - }, - ], - }, - }) - } - } else { - // exact search the unverified identity in other verified identities - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - match: { - [`nested_identities.string_platform`]: identity.platform, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: true, - }, - }, - ], - }, + // Email-as-username: unverified email matching verified username + if (identity.type === MemberIdentityType.EMAIL) { + unverifiedEmailUsernameMatches.push({ + value: identity.value, }) + } - // handle email as username platforms: unverified email matching verified username - if (identity.type === MemberIdentityType.EMAIL) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - terms: { - [`nested_identities.string_platform`]: EMAIL_AS_USERNAME_PLATFORMS, - }, - }, - { - term: { - [`nested_identities.keyword_type`]: MemberIdentityType.USERNAME, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: true, - }, - }, - ], - }, - }) - } - - // handle email as username platforms: unverified username matching verified email - if ( - identity.type === MemberIdentityType.USERNAME && - EMAIL_AS_USERNAME_PLATFORMS.includes(identity.platform as PlatformType) - ) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ - bool: { - must: [ - { term: { [`nested_identities.keyword_value`]: identity.value } }, - { - term: { - [`nested_identities.keyword_type`]: MemberIdentityType.EMAIL, - }, - }, - { - term: { - [`nested_identities.bool_verified`]: true, - }, - }, - ], - }, - }) - } + // Email-as-username: unverified username matching verified email + if ( + identity.type === MemberIdentityType.USERNAME && + EMAIL_AS_USERNAME_PLATFORMS.includes(identity.platform as PlatformType) + ) { + unverifiedUsernameEmailMatches.push({ + value: identity.value, + }) } } } } - const noMergeIds = await memberMergeSuggestionsRepo.findNoMergeIds(member.id) + // Build OpenSearch query clauses + const identitiesShould = [] + const CHUNK_SIZE = 20 // Split queries into chunks to avoid OpenSearch limits + + // Query 1: Verified -> Unverified exact matches + for (const { value, platform } of verifiedExactMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { match: { [`nested_identities.string_platform`]: platform } }, + { term: { [`nested_identities.bool_verified`]: false } }, + ], + }, + }) + } - if (noMergeIds && noMergeIds.length > 0) { - for (const noMergeId of noMergeIds) { - identitiesPartialQuery.must_not.push({ - term: { - uuid_memberId: noMergeId, + // Query 2: Verified email -> Unverified username (email-as-username platforms) + for (const { value } of verifiedEmailUsernameMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { terms: { [`nested_identities.string_platform`]: EMAIL_AS_USERNAME_PLATFORMS } }, + { term: { [`nested_identities.keyword_type`]: MemberIdentityType.USERNAME } }, + { term: { [`nested_identities.bool_verified`]: false } }, + ], + }, + }) + } + + // Query 3: Verified username -> Unverified email (email-as-username platforms) + for (const { value } of verifiedUsernameEmailMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { term: { [`nested_identities.keyword_type`]: MemberIdentityType.EMAIL } }, + { term: { [`nested_identities.bool_verified`]: false } }, + ], + }, + }) + } + + // Query 4: Verified -> Verified fuzzy matches (chunked) + if (verifiedFuzzyMatches.length > 0) { + const uniqueFuzzyValues = [ + ...new Set(verifiedFuzzyMatches.map(({ cleanedValue }) => cleanedValue)), + ] + const fuzzyChunks = chunkArray(uniqueFuzzyValues, CHUNK_SIZE) + + for (const chunk of fuzzyChunks) { + const fuzzyShouldClauses = chunk.map((cleanedValue) => ({ + match: { + [`nested_identities.string_value`]: { + query: cleanedValue, + prefix_length: 1, + fuzziness: 'auto', + }, + }, + })) + + identitiesShould.push({ + bool: { + should: fuzzyShouldClauses, + minimum_should_match: 1, }, }) } } + // Query 5: Unverified -> Verified exact matches + for (const { value, platform } of unverifiedExactMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { match: { [`nested_identities.string_platform`]: platform } }, + { term: { [`nested_identities.bool_verified`]: true } }, + ], + }, + }) + } + + // Query 6: Unverified email -> Verified username (email-as-username platforms) + for (const { value } of unverifiedEmailUsernameMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { terms: { [`nested_identities.string_platform`]: EMAIL_AS_USERNAME_PLATFORMS } }, + { term: { [`nested_identities.keyword_type`]: MemberIdentityType.USERNAME } }, + { term: { [`nested_identities.bool_verified`]: true } }, + ], + }, + }) + } + + // Query 7: Unverified username -> Verified email (email-as-username platforms) + for (const { value } of unverifiedUsernameEmailMatches) { + identitiesShould.push({ + bool: { + must: [ + { term: { [`nested_identities.keyword_value`]: value } }, + { term: { [`nested_identities.keyword_type`]: MemberIdentityType.EMAIL } }, + { term: { [`nested_identities.bool_verified`]: true } }, + ], + }, + }) + } + + // Wrap all identity queries in a nested query (identities are nested documents) + const nestedIdentityQuery = { + nested: { + path: 'nested_identities', + query: { + bool: { + should: identitiesShould, + boost: 1, + minimum_should_match: 1, + }, + }, + }, + } + + // Main query: match by display name OR any of the identity queries + const identitiesPartialQuery: any = { + should: [ + { + term: { + [`keyword_displayName`]: fullMember.displayName, + }, + }, + ...(identitiesShould.length > 0 ? [nestedIdentityQuery] : []), + ], + minimum_should_match: 1, + must_not: [ + { + terms: { + uuid_memberId: excludeIds, + }, + }, + ], + filter: [ + { + term: { + uuid_tenantId: tenantId, + }, + }, + ], + } + const similarMembersQueryBody = { query: { bool: identitiesPartialQuery, @@ -299,7 +314,7 @@ export async function getMemberMergeSuggestions( }) ).body?.hits?.hits || [] } catch (e) { - svc.log.info( + svc.log.error( { error: e, query: identitiesPartialQuery }, 'Error while searching for similar members!', ) diff --git a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts index 5ad0ccfa07..d6e2a2bc3b 100644 --- a/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts +++ b/services/apps/merge_suggestions_worker/src/activities/organizationMergeSuggestions.ts @@ -21,7 +21,7 @@ import { import { svc } from '../main' import OrganizationSimilarityCalculator from '../organizationSimilarityCalculator' import { ISimilarOrganizationOpensearchResult, ISimilarityFilter } from '../types' -import { prefixLength } from '../utils' +import { chunkArray, prefixLength } from '../utils' export async function getOrganizations( tenantId: string, @@ -86,8 +86,7 @@ export async function getOrganizationMergeSuggestions( tenantId: string, organization: IOrganizationBaseForMergeSuggestions, ): Promise { - svc.log.debug(`Getting merge suggestions for ${organization.id}!`) - + // Helper to convert OpenSearch organization response to organization object function opensearchToFullOrg( organization: IOrganizationOpensearch, ): IOrganizationFullAggregatesOpensearch { @@ -115,13 +114,16 @@ export async function getOrganizationMergeSuggestions( svc.log, ) + // Build full organization with all identities and attributes const qx = pgpQx(svc.postgres.reader.connection()) const fullOrg = await buildFullOrgForMergeSuggestions(qx, organization) + // Early return if no identities to match against if (fullOrg.identities.length === 0) { return [] } + // Get organizations that should not be merged const noMergeIds = await organizationMergeSuggestionsRepo.findNoMergeIds(fullOrg.id) const excludeIds = [fullOrg.id] @@ -129,7 +131,151 @@ export async function getOrganizationMergeSuggestions( excludeIds.push(...noMergeIds) } + // Deduplicate and sort verified identities first + const identities = uniqBy(fullOrg.identities, (i) => `${i.platform}:${i.value}`).sort((a, b) => + a.verified === b.verified ? 0 : a.verified ? -1 : 1, + ) + + // Categorize identities into different query types + const weakIdentities = [] + const identitiesForFuzzy = [] + const identitiesForPrefix = [] + + // Process up to 100 identities + // This is a safety limit to prevent OpenSearch max clause errors + for (const identity of identities.slice(0, 100)) { + if (identity.value.length > 0) { + // All identities go into weak query (exact match on unverified identities) + weakIdentities.push({ + value: identity.value, + platform: identity.platform, + }) + + // Clean identity value (remove protocol, handle LinkedIn URLs) + let cleanedIdentityName = identity.value.replace(/^https?:\/\//, '') + + if (identity.platform === 'linkedin') { + cleanedIdentityName = cleanedIdentityName.split(':').pop() || cleanedIdentityName + } + + // Skip numeric-only values (these don't work well with fuzzy/prefix matching) + if (Number.isNaN(Number(identity.value))) { + identitiesForFuzzy.push({ + value: identity.value, + cleanedValue: cleanedIdentityName, + }) + + // Prefix matching only for longer values without spaces + if (identity.value.length > 5 && identity.value.indexOf(' ') === -1) { + identitiesForPrefix.push({ + value: identity.value, + cleanedValue: cleanedIdentityName, + prefix: cleanedIdentityName.slice(0, prefixLength(cleanedIdentityName)), + }) + } + } + } + } + + // Build OpenSearch query clauses const identitiesShould = [] + const CHUNK_SIZE = 20 // Split queries into chunks to avoid OpenSearch limits + + // Query 1: Weak identities - exact match on unverified identities + for (const { value, platform } of weakIdentities) { + const weakQuery = { + bool: { + must: [ + { match: { [`nested_identities.string_value`]: value } }, + { match: { [`nested_identities.string_platform`]: platform } }, + { term: { [`nested_identities.bool_verified`]: false } }, + ], + }, + } + identitiesShould.push(weakQuery) + } + + // Query 2: Fuzzy matching - find similar values with typos/variations (verified only) + if (identitiesForFuzzy.length > 0) { + // Deduplicate cleaned values + const uniqueFuzzyValues = [ + ...new Set(identitiesForFuzzy.map(({ cleanedValue }) => cleanedValue)), + ] + const fuzzyChunks = chunkArray(uniqueFuzzyValues, CHUNK_SIZE) + + for (const chunk of fuzzyChunks) { + const fuzzyShouldClauses = chunk.map((cleanedValue) => ({ + match: { + [`nested_identities.string_value`]: { + query: cleanedValue, + prefix_length: 1, + fuzziness: 'auto', + }, + }, + })) + + const fuzzyQuery = { + bool: { + should: fuzzyShouldClauses, + minimum_should_match: 1, + filter: [ + { + term: { + [`nested_identities.bool_verified`]: true, + }, + }, + ], + }, + } + identitiesShould.push(fuzzyQuery) + } + } + + // Query 3: Prefix matching - find values that start with our prefix (verified only) + if (identitiesForPrefix.length > 0) { + // Deduplicate prefixes + const uniquePrefixes = [...new Set(identitiesForPrefix.map(({ prefix }) => prefix))] + const prefixChunks = chunkArray(uniquePrefixes, CHUNK_SIZE) + for (const chunk of prefixChunks) { + const prefixShouldClauses = chunk.map((prefix) => ({ + prefix: { + [`nested_identities.string_value`]: { + value: prefix, + }, + }, + })) + + const prefixQuery = { + bool: { + should: prefixShouldClauses, + minimum_should_match: 1, + filter: [ + { + term: { + [`nested_identities.bool_verified`]: true, + }, + }, + ], + }, + } + identitiesShould.push(prefixQuery) + } + } + + // Wrap all identity queries in a nested query (identities are nested documents) + const nestedIdentityQuery = { + nested: { + path: 'nested_identities', + query: { + bool: { + should: identitiesShould, + minimum_should_match: 1, + }, + }, + }, + } + + // Main query: match by display name OR any of the identity queries const identitiesPartialQuery = { should: [ { @@ -137,17 +283,7 @@ export async function getOrganizationMergeSuggestions( [`keyword_displayName`]: fullOrg.displayName, }, }, - { - nested: { - path: 'nested_identities', - query: { - bool: { - should: identitiesShould, - minimum_should_match: 1, - }, - }, - }, - }, + ...(identitiesShould.length > 0 ? [nestedIdentityQuery] : []), ], minimum_should_match: 1, must_not: [ @@ -157,7 +293,7 @@ export async function getOrganizationMergeSuggestions( }, }, ], - must: [ + filter: [ { term: { uuid_tenantId: tenantId, @@ -165,84 +301,8 @@ export async function getOrganizationMergeSuggestions( }, ], } - let hasFuzzySearch = false - - // deduplicate identities, sort verified first - const identities = uniqBy(fullOrg.identities, (i) => `${i.platform}:${i.value}`).sort((a, b) => - a.verified === b.verified ? 0 : a.verified ? -1 : 1, - ) - - // limit to prevent exceeding OpenSearch maxClauseCount (1024) - for (const identity of identities.slice(0, 60)) { - if (identity.value.length > 0) { - // weak identity search - identitiesShould.push({ - bool: { - must: [ - { match: { [`nested_identities.string_value`]: identity.value } }, - { match: { [`nested_identities.string_platform`]: identity.platform } }, - { term: { [`nested_identities.bool_verified`]: false } }, - ], - }, - }) - - // some identities have https? in the beginning, resulting in false positive suggestions - // remove these when making fuzzy, wildcard and prefix searches - let cleanedIdentityName = identity.value.replace(/^https?:\/\//, '') - - // linkedin identities now have prefixes in them, like `school:` or `company:` - // we should remove these prefixes when searching for similar identities - if (identity.platform === 'linkedin') { - cleanedIdentityName = cleanedIdentityName.split(':').pop() - } - - // only do fuzzy/wildcard/partial search when identity name is not all numbers (like linkedin organization profiles) - if (Number.isNaN(Number(identity.value))) { - hasFuzzySearch = true - // fuzzy search for identities - identitiesShould.push({ - bool: { - must: [ - { - match: { - [`nested_identities.string_value`]: { - query: cleanedIdentityName, - prefix_length: 1, - fuzziness: 'auto', - }, - }, - }, - { term: { [`nested_identities.bool_verified`]: true } }, - ], - }, - }) - - // also check for prefix for identities that has more than 5 characters and no whitespace - if (identity.value.length > 5 && identity.value.indexOf(' ') === -1) { - identitiesShould.push({ - bool: { - must: [ - { - prefix: { - [`nested_identities.string_value`]: { - value: cleanedIdentityName.slice(0, prefixLength(cleanedIdentityName)), - }, - }, - }, - { term: { [`nested_identities.bool_verified`]: true } }, - ], - }, - }) - } - } - } - } - - // check if we have any actual identity searches, if not remove it from the query - if (!hasFuzzySearch) { - identitiesPartialQuery.should.pop() - } + // Build final OpenSearch query with fields to return const similarOrganizationsQueryBody = { query: { bool: identitiesPartialQuery, @@ -261,12 +321,14 @@ export async function getOrganizationMergeSuggestions( ], } + // Check if primary org has LFX membership (used to filter suggestions) const primaryOrgWithLfxMembership = await hasLfxMembership(qx, { organizationId: fullOrg.id, }) let organizationsToMerge: ISimilarOrganizationOpensearchResult[] + // Execute OpenSearch query to find similar organizations try { organizationsToMerge = ( @@ -276,27 +338,31 @@ export async function getOrganizationMergeSuggestions( }) ).body?.hits?.hits || [] } catch (e) { - svc.log.info( + svc.log.error( { error: e, query: identitiesPartialQuery }, 'Error while searching for similar organizations!', ) throw e } + // Process each candidate organization and calculate similarity for (const organizationToMerge of organizationsToMerge) { const secondaryOrgWithLfxMembership = await hasLfxMembership(qx, { organizationId: organizationToMerge._source.uuid_organizationId, }) + // Skip if both organizations have LFX membership (don't merge LFX organizations) if (primaryOrgWithLfxMembership && secondaryOrgWithLfxMembership) { continue } + // Calculate similarity score between organizations const similarityConfidenceScore = OrganizationSimilarityCalculator.calculateSimilarity( fullOrg, opensearchToFullOrg(organizationToMerge._source), ) + // Sort organizations: primary has more identities/activity, secondary is the one to merge const organizationsSorted = [fullOrg, opensearchToFullOrg(organizationToMerge._source)].sort( (a, b) => { if (