Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.metadata.search.elasticsearch.query.request;

import static com.linkedin.metadata.search.utils.ESUtils.*;
import static com.linkedin.metadata.search.utils.ESUtils.NAME_SUGGESTION;
import static com.linkedin.metadata.search.utils.ESUtils.applyDefaultSearchFilters;

Expand Down Expand Up @@ -57,6 +56,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.opensearch.action.search.SearchRequest;
Expand All @@ -76,7 +76,7 @@
@Slf4j
public class SearchRequestHandler extends BaseRequestHandler {

private static final Map<List<EntitySpec>, SearchRequestHandler> REQUEST_HANDLER_BY_ENTITY_NAME =
private static final Map<SearchHandlerKey, SearchRequestHandler> REQUEST_HANDLER_BY_ENTITY_NAME =
new ConcurrentHashMap<>();
private final List<EntitySpec> entitySpecs;
private final List<String> entityNames;
Expand Down Expand Up @@ -144,16 +144,13 @@ public static SearchRequestHandler getBuilder(
@Nullable CustomSearchConfiguration customSearchConfiguration,
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
@Nonnull SearchServiceConfiguration searchServiceConfiguration) {
return REQUEST_HANDLER_BY_ENTITY_NAME.computeIfAbsent(
return getBuilder(
systemOperationContext,
ImmutableList.of(entitySpec),
k ->
new SearchRequestHandler(
systemOperationContext,
entitySpec,
configs,
customSearchConfiguration,
queryFilterRewriteChain,
searchServiceConfiguration));
configs,
customSearchConfiguration,
queryFilterRewriteChain,
searchServiceConfiguration);
}

public static SearchRequestHandler getBuilder(
Expand All @@ -164,7 +161,12 @@ public static SearchRequestHandler getBuilder(
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
@Nonnull SearchServiceConfiguration searchServiceConfiguration) {
return REQUEST_HANDLER_BY_ENTITY_NAME.computeIfAbsent(
ImmutableList.copyOf(entitySpecs),
new SearchHandlerKey(
ImmutableList.copyOf(entitySpecs),
configs,
customSearchConfiguration,
queryFilterRewriteChain,
searchServiceConfiguration),
k ->
new SearchRequestHandler(
systemOperationContext,
Expand Down Expand Up @@ -676,4 +678,22 @@ private List<SearchSuggestion> extractSearchSuggestions(@Nonnull SearchResponse
}
return searchSuggestions;
}

/**
* Enhanced cache key implementation to prevent handler cross-contamination in tests.
*
* <p>Background: Flaky tests occurred because the cache key (previously just entitySpecs) didn't
* account for all configuration variants. Identical entitySpecs with different search
* configurations would incorrectly share handlers, leading to test instability.
*
* <p>This key ensures each unique configuration combination gets its own handler instance.
*/
@Value
private static class SearchHandlerKey {
@Nonnull private final List<EntitySpec> entitySpecs;
@Nonnull private final ElasticSearchConfiguration configs;
@Nullable private final CustomSearchConfiguration customSearchConfiguration;
@Nonnull private final QueryFilterRewriteChain queryFilterRewriteChain;
@Nonnull private final SearchServiceConfiguration searchServiceConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
import static io.datahubproject.test.search.SearchTestUtils.TEST_ES_SEARCH_CONFIG;
import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG;
import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG_WITH_PIT;
import static io.datahubproject.test.search.SearchTestUtils.TEST_SEARCH_SERVICE_CONFIG;
import static io.datahubproject.test.search.SearchTestUtils.createDelegatingMappingsBuilder;
import static io.datahubproject.test.search.SearchTestUtils.getTestOsSearchConfigWithPit;
import static io.datahubproject.test.search.SearchTestUtils.syncAfterWrite;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -128,9 +128,9 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
IndexConvention mockIndexConvention = mock(IndexConvention.class);
when(mockIndexConvention.isV2EntityIndex(anyString())).thenReturn(true);
settingsBuilder = new V2LegacySettingsBuilder(indexConfiguration, mockIndexConvention);
elasticSearchService = buildEntitySearchService();
elasticSearchService = buildEntitySearchService(getSearchConfiguration());
elasticSearchService.reindexAll(operationContext, Collections.emptySet());
pitElasticSearchService = buildPITEntitySearchService();
pitElasticSearchService = buildPITEntitySearchService(getSearchConfiguration());
pitElasticSearchService.reindexAll(operationContext, Collections.emptySet());
cacheManager = new ConcurrentMapCacheManager();
resetSearchService();
Expand Down Expand Up @@ -174,8 +174,9 @@ public void wipe() throws Exception {
}

@Nonnull
private ElasticSearchService buildEntitySearchService() {
ElasticSearchConfiguration esConfig = TEST_OS_SEARCH_CONFIG.toBuilder().build();
private ElasticSearchService buildEntitySearchService(SearchConfiguration searchConfiguration) {
ElasticSearchConfiguration esConfig =
TEST_OS_SEARCH_CONFIG.toBuilder().search(searchConfiguration).build();
ESSearchDAO searchDAO =
new ESSearchDAO(
getSearchClient(),
Expand Down Expand Up @@ -206,8 +207,11 @@ private ElasticSearchService buildEntitySearchService() {
}

@Nonnull
private ElasticSearchService buildPITEntitySearchService() {
ElasticSearchConfiguration esConfig = TEST_OS_SEARCH_CONFIG_WITH_PIT.toBuilder().build();
private ElasticSearchService buildPITEntitySearchService(
SearchConfiguration searchConfiguration) {
ElasticSearchConfiguration esConfig =
getTestOsSearchConfigWithPit(searchConfiguration).toBuilder().build();

ESSearchDAO searchDAO =
new ESSearchDAO(
getSearchClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public void testExplain() {
explainResponse.getId(),
"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.ca_border_wait_times,PROD)");
assertTrue(explainResponse.isExists());
assertEquals(explainResponse.getExplanation().getValue(), 18.0f);
assertEquals(explainResponse.getExplanation().getValue(), 1.25f);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,19 @@ private SearchTestUtils() {}
.build();

// Configuration with PIT enabled for search entities (for tests that specifically need PIT)
public static ElasticSearchConfiguration TEST_OS_SEARCH_CONFIG_WITH_PIT =
BASE_TEST_CONFIG.toBuilder()
.search(
BASE_TEST_CONFIG.getSearch().toBuilder()
.pointInTimeCreationEnabled(true) // Enable PIT for search entities
.graph(
BASE_TEST_CONFIG.getSearch().getGraph().toBuilder()
.pointInTimeCreationEnabled(true) // Enable graph PIT
.build())
.build())
.build();
public static ElasticSearchConfiguration getTestOsSearchConfigWithPit(
SearchConfiguration searchConfiguration) {
return BASE_TEST_CONFIG.toBuilder()
.search(
searchConfiguration.toBuilder()
.pointInTimeCreationEnabled(true) // Enable PIT for search entities
.graph(
searchConfiguration.getGraph().toBuilder()
.pointInTimeCreationEnabled(true) // Enable graph PIT
.build())
.build())
.build();
}

public static ElasticSearchConfiguration TEST_ES_SEARCH_CONFIG =
TEST_OS_SEARCH_CONFIG.toBuilder().build();
Expand Down
Loading