From 2f6066ff7e8804db1b3f608a4fa979bfe19fa4af Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 3 May 2024 13:48:19 +0530 Subject: [PATCH 01/17] Disable loading lookups by default in CompactionTask --- .../indexing/IndexerControllerContext.java | 14 ++-- .../druid/msq/indexing/MSQWorkerTask.java | 27 -------- .../druid/msq/sql/MSQTaskQueryMaker.java | 4 +- .../msq/indexing/MSQControllerTaskTest.java | 5 +- .../druid/msq/indexing/MSQWorkerTaskTest.java | 13 ++-- .../apache/druid/msq/test/MSQTestBase.java | 17 +---- .../indexing/common/task/CompactionTask.java | 13 ++++ .../druid/indexing/common/task/Task.java | 6 +- .../common/task/CompactionTaskTest.java | 15 +++++ .../druid/indexing/common/task/TaskTest.java | 7 ++ .../lookup/cache/LookupLoadingSpec.java | 55 +++++++++++++++ .../lookup/cache/LookupLoadingSpecTest.java | 67 ++++++++++++++++++- .../sql/calcite/planner/PlannerContext.java | 4 +- 13 files changed, 182 insertions(+), 65 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index e8fba09ddc51..17ac82d736ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -55,7 +55,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.DruidNode; -import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -271,16 +271,16 @@ public static Map makeTaskContext( .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()); // Put the lookup loading info in the task context to facilitate selective loading of lookups. - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUP_LOADING_MODE, - controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) ); } - if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) { + if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) != null) { taskContextOverridesBuilder.put( - PlannerContext.CTX_LOOKUPS_TO_LOAD, - controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, + controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index a23c62881a00..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -38,13 +37,9 @@ import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerImpl; -import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -190,26 +185,4 @@ public int hashCode() { return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, retryCount, worker); } - - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - final Object lookupModeValue = getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE); - if (lookupModeValue == null) { - return LookupLoadingSpec.ALL; - } - - final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); - if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { - return LookupLoadingSpec.NONE; - } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - Collection lookupsToLoad = (Collection) getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { - throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); - } - return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); - } else { - return LookupLoadingSpec.ALL; - } - } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 533010c30575..4debe4d9d43e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -285,9 +285,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); final Map context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, plannerContext.getLookupLoadingSpec().getMode()); if (plannerContext.getLookupLoadingSpec().getMode() == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, plannerContext.getLookupLoadingSpec().getLookupsToLoad()); } final MSQControllerTask controllerTask = new MSQControllerTask( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index a5001fb58ebd..9de14610f19c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -38,7 +38,6 @@ import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -114,8 +113,8 @@ public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() .dataSource("target") .context( ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED) ) .build() ) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 5e79b129f3bd..3672e9d1c299 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Assert; import org.junit.Test; @@ -125,7 +124,7 @@ public void testGetDefaultLookupLoadingSpec() @Test public void testGetLookupLoadingWithModeNoneInContext() { - final ImmutableMap context = ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + final ImmutableMap context = ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.NONE, msqWorkerTask.getLookupLoadingSpec()); } @@ -134,8 +133,8 @@ public void testGetLookupLoadingWithModeNoneInContext() public void testGetLookupLoadingSpecWithLookupListInContext() { final ImmutableMap context = ImmutableMap.of( - PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), - PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", "lookupName2"), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, msqWorkerTask.getLookupLoadingSpec().getMode()); Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad()); @@ -145,10 +144,10 @@ public void testGetLookupLoadingSpecWithLookupListInContext() public void testGetLookupLoadingSpecWithInvalidInput() { final HashMap context = new HashMap<>(); - context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); // Setting CTX_LOOKUPS_TO_LOAD as null - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, null); MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); DruidException exception = Assert.assertThrows( @@ -160,7 +159,7 @@ public void testGetLookupLoadingSpecWithInvalidInput() exception.getMessage()); // Setting CTX_LOOKUPS_TO_LOAD as empty list - context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); + context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Collections.emptyList()); MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); exception = Assert.assertThrows( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index cdaafc75c60e..25213cc11a7c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -854,7 +854,7 @@ public abstract class MSQTester> protected CompactionState expectedLastCompactionState = null; protected Set expectedTombstoneIntervals = null; protected List expectedResultRows = null; - protected LookupLoadingSpec expectedLookupLoadingSpec = null; + protected LookupLoadingSpec expectedLookupLoadingSpec = LookupLoadingSpec.NONE; protected Matcher expectedValidationErrorMatcher = null; protected List, String>> adhocReportAssertionAndReasons = new ArrayList<>(); protected Matcher expectedExecutionErrorMatcher = null; @@ -1020,19 +1020,8 @@ public void verifyPlanningErrors() protected void verifyLookupLoadingInfoInTaskContext(Map context) { - String lookupLoadingMode = context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString(); - List lookupsToLoad = (List) context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD); - if (expectedLookupLoadingSpec != null) { - Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), lookupLoadingMode); - if (expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED)) { - Assert.assertEquals(new ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad); - } else { - Assert.assertNull(lookupsToLoad); - } - } else { - Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), lookupLoadingMode); - Assert.assertNull(lookupsToLoad); - } + LookupLoadingSpec specFromContext = LookupLoadingSpec.getSpecFromContext(context, LookupLoadingSpec.ALL); + Assert.assertEquals(expectedLookupLoadingSpec, specFromContext); } protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 833bcdd2fed9..126fe2ed4a4b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; @@ -249,6 +250,12 @@ public CompactionTask( this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; + + // Unless context has been overridden to load lookups differently, we want to load no lookups by default in any task spawned + // up by the CompactionTask. We achieve this by populating this info in the context which is passed to the spawned tasks. + if (context == null || !context.containsKey(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)) { + addToContext(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + } } @VisibleForTesting @@ -1522,4 +1529,10 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) ); } } + + @Override + public LookupLoadingSpec getLookupLoadingSpec() + { + return LookupLoadingSpec.getSpecFromContext(getContext(), LookupLoadingSpec.NONE); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cdf7cea7e3f2..5380ba0aaab9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -334,9 +334,13 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java index c2957f6688c7..33502ecb3fb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.junit.Assert; import org.junit.Test; @@ -128,4 +129,10 @@ public void testGetInputSourceResources() TASK::getInputSourceResources ); } + + @Test + public void testGetLookupLoadingSpec() + { + Assert.assertEquals(LookupLoadingSpec.ALL, TASK.getLookupLoadingSpec()); + } } diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 88524fe27f96..63499cb26d6e 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,6 +22,10 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -39,6 +43,10 @@ */ public class LookupLoadingSpec { + + public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; + public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; + public enum Mode { ALL, NONE, ONLY_REQUIRED @@ -80,6 +88,34 @@ public ImmutableSet getLookupsToLoad() return lookupsToLoad; } + public static LookupLoadingSpec getSpecFromContext(Map context, LookupLoadingSpec defaultSpec) + { + if (context == null) { + return defaultSpec; + } + + final Object lookupModeValue = context.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE); + if (lookupModeValue == null) { + return defaultSpec; + } + + final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + + if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { + return LookupLoadingSpec.NONE; + } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ALL) { + return LookupLoadingSpec.ALL; + } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { + Collection lookupsToLoad = (Collection) context.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD); + if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { + throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); + } + return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad)); + } else { + return defaultSpec; + } + } + @Override public String toString() { @@ -88,4 +124,23 @@ public String toString() ", lookupsToLoad=" + lookupsToLoad + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupLoadingSpec that = (LookupLoadingSpec) o; + return mode == that.mode && Objects.equals(lookupsToLoad, that.lookupsToLoad); + } + + @Override + public int hashCode() + { + return Objects.hash(mode, lookupsToLoad); + } } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 8d0a7a5518a3..b7acce8543a2 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -19,11 +19,18 @@ package org.apache.druid.server.lookup.cache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; import java.util.Set; public class LookupLoadingSpecTest @@ -59,4 +66,62 @@ public void testLoadingOnlyRequiredLookupsWithNullList() DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.loadOnly(null)); Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); } + + @MethodSource("provideParamsForTestGetSpecFromContext") + @ParameterizedTest + public void testGetLookupLoadingSpecFromContext(Map context, LookupLoadingSpec defaultSpec, LookupLoadingSpec expectedSpec) + { + LookupLoadingSpec specFromContext = LookupLoadingSpec.getSpecFromContext(context, defaultSpec); + Assert.assertEquals(expectedSpec.getMode(), specFromContext.getMode()); + Assert.assertEquals(expectedSpec.getLookupsToLoad(), specFromContext.getLookupsToLoad()); + } + + public static Collection provideParamsForTestGetSpecFromContext() + { + ImmutableSet lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2"); + final ImmutableMap contextWithModeOnlyRequired = ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, new ArrayList<>(lookupsToLoad), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); + final ImmutableMap contextWithModeNone = ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); + final ImmutableMap contextWithModeAll = ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ALL); + final ImmutableMap contextWithoutLookupKeys = ImmutableMap.of(); + + // Return params: + Object[][] params = new Object[][]{ + // Default spec is returned in the case of context not having the lookup keys. + { + contextWithoutLookupKeys, + LookupLoadingSpec.ALL, + LookupLoadingSpec.ALL + }, + // Default spec is returned in the case of context not having the lookup keys. + { + contextWithoutLookupKeys, + LookupLoadingSpec.NONE, + LookupLoadingSpec.NONE + }, + // Only required lookups are returned in the case of context having the lookup keys. + { + contextWithModeOnlyRequired, + LookupLoadingSpec.ALL, + LookupLoadingSpec.loadOnly(lookupsToLoad) + }, + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + { + contextWithModeAll, + LookupLoadingSpec.NONE, + LookupLoadingSpec.ALL + }, + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + { + contextWithModeNone, + LookupLoadingSpec.ALL, + LookupLoadingSpec.NONE + } + }; + + return Arrays.asList(params); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 99f721bffaa7..78995817b7e6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -80,8 +80,6 @@ public class PlannerContext public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp"; public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone"; public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm"; - public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode"; - public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad"; private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = JoinAlgorithm.BROADCAST; /** @@ -357,7 +355,7 @@ public void addLookupToLoad(String lookupName) } /** - * Returns the lookup to load for a given task. + * Returns the lookup loading spec for a given task. */ public LookupLoadingSpec getLookupLoadingSpec() { From d22ee896f0ca5a5d66532b932c45c70364722cdc Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 12:19:35 +0530 Subject: [PATCH 02/17] Improve jacoco branch coverage --- .../druid/server/lookup/cache/LookupLoadingSpecTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index b7acce8543a2..b78b73d67420 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -72,8 +72,7 @@ public void testLoadingOnlyRequiredLookupsWithNullList() public void testGetLookupLoadingSpecFromContext(Map context, LookupLoadingSpec defaultSpec, LookupLoadingSpec expectedSpec) { LookupLoadingSpec specFromContext = LookupLoadingSpec.getSpecFromContext(context, defaultSpec); - Assert.assertEquals(expectedSpec.getMode(), specFromContext.getMode()); - Assert.assertEquals(expectedSpec.getLookupsToLoad(), specFromContext.getLookupsToLoad()); + Assert.assertEquals(expectedSpec, specFromContext); } public static Collection provideParamsForTestGetSpecFromContext() @@ -119,6 +118,12 @@ public static Collection provideParamsForTestGetSpecFromContext() contextWithModeNone, LookupLoadingSpec.ALL, LookupLoadingSpec.NONE + }, + // Default spec is returned in the case of context=null. + { + null, + LookupLoadingSpec.NONE, + LookupLoadingSpec.NONE } }; From 6c5420ed0e8fb7b9546756252fce0122198c4235 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 12:41:14 +0530 Subject: [PATCH 03/17] Address review comments --- .../test/java/org/apache/druid/msq/test/MSQTestBase.java | 2 +- .../druid/indexing/common/task/CompactionTask.java | 9 +++------ .../java/org/apache/druid/indexing/common/task/Task.java | 7 ++++--- .../druid/server/lookup/cache/LookupLoadingSpec.java | 2 +- .../druid/server/lookup/cache/LookupLoadingSpecTest.java | 6 +++--- .../apache/druid/sql/calcite/planner/PlannerContext.java | 2 +- 6 files changed, 13 insertions(+), 15 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 25213cc11a7c..64597e41b98d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1020,7 +1020,7 @@ public void verifyPlanningErrors() protected void verifyLookupLoadingInfoInTaskContext(Map context) { - LookupLoadingSpec specFromContext = LookupLoadingSpec.getSpecFromContext(context, LookupLoadingSpec.ALL); + LookupLoadingSpec specFromContext = LookupLoadingSpec.createFromContext(context, LookupLoadingSpec.ALL); Assert.assertEquals(expectedLookupLoadingSpec, specFromContext); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 126fe2ed4a4b..ffbff0e453cf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -251,11 +251,8 @@ public CompactionTask( this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; - // Unless context has been overridden to load lookups differently, we want to load no lookups by default in any task spawned - // up by the CompactionTask. We achieve this by populating this info in the context which is passed to the spawned tasks. - if (context == null || !context.containsKey(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)) { - addToContext(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); - } + // By default, do not load any lookups in sub-tasks launched by compaction task. + addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); } @VisibleForTesting @@ -1533,6 +1530,6 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) @Override public LookupLoadingSpec getLookupLoadingSpec() { - return LookupLoadingSpec.getSpecFromContext(getContext(), LookupLoadingSpec.NONE); + return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.NONE); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5380ba0aaab9..18d2ac7d6960 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -335,12 +335,13 @@ static TaskInfo toTaskIdentifierInfo(TaskInfo getLookupsToLoad() return lookupsToLoad; } - public static LookupLoadingSpec getSpecFromContext(Map context, LookupLoadingSpec defaultSpec) + public static LookupLoadingSpec createFromContext(Map context, LookupLoadingSpec defaultSpec) { if (context == null) { return defaultSpec; diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index b78b73d67420..d1c2bceb20d1 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -67,15 +67,15 @@ public void testLoadingOnlyRequiredLookupsWithNullList() Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); } - @MethodSource("provideParamsForTestGetSpecFromContext") + @MethodSource("provideParamsForTestCreateFromContext") @ParameterizedTest public void testGetLookupLoadingSpecFromContext(Map context, LookupLoadingSpec defaultSpec, LookupLoadingSpec expectedSpec) { - LookupLoadingSpec specFromContext = LookupLoadingSpec.getSpecFromContext(context, defaultSpec); + LookupLoadingSpec specFromContext = LookupLoadingSpec.createFromContext(context, defaultSpec); Assert.assertEquals(expectedSpec, specFromContext); } - public static Collection provideParamsForTestGetSpecFromContext() + public static Collection provideParamsForTestCreateFromContext() { ImmutableSet lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2"); final ImmutableMap contextWithModeOnlyRequired = ImmutableMap.of( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java index 78995817b7e6..b7e2de3e66b2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java @@ -355,7 +355,7 @@ public void addLookupToLoad(String lookupName) } /** - * Returns the lookup loading spec for a given task. + * Lookup loading spec used if this context corresponds to an MSQ task. */ public LookupLoadingSpec getLookupLoadingSpec() { From 149721db3c0a09f26d948e64e134e84f4fbfbab0 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 14:59:34 +0530 Subject: [PATCH 04/17] Fix ClientCompactionTaskQuerySerdeTest --- .../ClientCompactionTaskQuerySerdeTest.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index fc581d4954b9..2b1258f0e1d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.joda.time.Duration; @@ -71,6 +72,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Map; public class ClientCompactionTaskQuerySerdeTest { @@ -82,6 +84,7 @@ public class ClientCompactionTaskQuerySerdeTest @Test public void testClientCompactionTaskQueryToCompactionTask() throws IOException { + Map context = ImmutableMap.of("key", "value"); final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( "id", @@ -127,7 +130,7 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), - ImmutableMap.of("key", "value") + context ); final byte[] json = mapper.writeValueAsBytes(query); @@ -220,7 +223,6 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException query.getIoConfig().isDropExisting(), task.getIoConfig().isDropExisting() ); - Assert.assertEquals(query.getContext(), task.getContext()); Assert.assertEquals( query.getDimensionsSpec().getDimensions(), task.getDimensionsSpec().getDimensions() @@ -233,6 +235,11 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException query.getMetricsSpec(), task.getMetricsSpec() ); + + for (String key : context.keySet()) { + Assert.assertEquals(context.get(key), task.getContext().get(key)); + } + Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), task.getContext().get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)); } @Test @@ -351,7 +358,16 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException final byte[] json = mapper.writeValueAsBytes(task); final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); - Assert.assertEquals(expected, actual); + Assert.assertEquals(expected.getDataSource(), actual.getDataSource()); + Assert.assertEquals(expected.getId(), actual.getId()); + Assert.assertEquals(expected.getType(), actual.getType()); + Assert.assertEquals(expected.getGranularitySpec(), actual.getGranularitySpec()); + Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); + Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig()); + Assert.assertEquals(expected.getTransformSpec(), actual.getTransformSpec()); + Assert.assertEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); + Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); + Assert.assertEquals(ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()), actual.getContext()); } private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) From 170e59eec714bd7246c407e14d04c34df8c6969c Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 17:36:25 +0530 Subject: [PATCH 05/17] Use assertArrayEquals instead of assertEquals --- .../common/task/ClientCompactionTaskQuerySerdeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 2b1258f0e1d2..6391cfdbbfe9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -365,7 +365,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig()); Assert.assertEquals(expected.getTransformSpec(), actual.getTransformSpec()); - Assert.assertEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); + Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); Assert.assertEquals(ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()), actual.getContext()); } From c0e9a90ce9b90f5ea9da668ddb917a4d16722f72 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 9 May 2024 22:15:38 +0530 Subject: [PATCH 06/17] Address review comments --- .../common/task/ClientCompactionTaskQuerySerdeTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 6391cfdbbfe9..f4993861a916 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -84,7 +84,7 @@ public class ClientCompactionTaskQuerySerdeTest @Test public void testClientCompactionTaskQueryToCompactionTask() throws IOException { - Map context = ImmutableMap.of("key", "value"); + final Map context = ImmutableMap.of("key", "value"); final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( "id", @@ -236,9 +236,11 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException task.getMetricsSpec() ); + // Verify values of context keys originally present in the ClientCompactionTaskQuery for (String key : context.keySet()) { Assert.assertEquals(context.get(key), task.getContext().get(key)); } + // Verify values of context parameters added by the CompactionTask Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), task.getContext().get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)); } From 948eb01e8590967dfd3e70876a51fbc429790957 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 10 May 2024 08:04:46 +0530 Subject: [PATCH 07/17] Address review comments --- .../ClientCompactionTaskQuerySerdeTest.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index f4993861a916..c6bc89b0408d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -310,6 +310,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) .build(); + Map expectedContext = new HashMap<>(); final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( task.getId(), "datasource", @@ -354,22 +355,17 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), - new HashMap<>() + expectedContext ); final byte[] json = mapper.writeValueAsBytes(task); final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); - Assert.assertEquals(expected.getDataSource(), actual.getDataSource()); - Assert.assertEquals(expected.getId(), actual.getId()); - Assert.assertEquals(expected.getType(), actual.getType()); - Assert.assertEquals(expected.getGranularitySpec(), actual.getGranularitySpec()); - Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); - Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig()); - Assert.assertEquals(expected.getTransformSpec(), actual.getTransformSpec()); - Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); - Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); - Assert.assertEquals(ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()), actual.getContext()); + // Verify that CompactionTask has added new parameters into the context + Assert.assertNotEquals(expected, actual); + + expectedContext.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + Assert.assertEquals(expected, actual); } private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) From 49193cdc3e7eb550409299eb22e8f56efd41c61e Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 10 May 2024 10:39:45 +0530 Subject: [PATCH 08/17] Remove parameterization from test --- .../lookup/cache/LookupLoadingSpecTest.java | 110 +++++++++--------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index d1c2bceb20d1..c8ff703d5258 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -24,13 +24,8 @@ import org.apache.druid.error.DruidException; import org.junit.Assert; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Map; import java.util.Set; public class LookupLoadingSpecTest @@ -67,66 +62,67 @@ public void testLoadingOnlyRequiredLookupsWithNullList() Assert.assertEquals("Expected non-null set of lookups to load.", exception.getMessage()); } - @MethodSource("provideParamsForTestCreateFromContext") - @ParameterizedTest - public void testGetLookupLoadingSpecFromContext(Map context, LookupLoadingSpec defaultSpec, LookupLoadingSpec expectedSpec) - { - LookupLoadingSpec specFromContext = LookupLoadingSpec.createFromContext(context, defaultSpec); - Assert.assertEquals(expectedSpec, specFromContext); - } - - public static Collection provideParamsForTestCreateFromContext() + @Test + public void testCreateLookupLoadingSpecFromContext() { ImmutableSet lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2"); - final ImmutableMap contextWithModeOnlyRequired = ImmutableMap.of( - LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, new ArrayList<>(lookupsToLoad), - LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED); - final ImmutableMap contextWithModeNone = ImmutableMap.of( - LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE); - final ImmutableMap contextWithModeAll = ImmutableMap.of( - LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ALL); - final ImmutableMap contextWithoutLookupKeys = ImmutableMap.of(); - // Return params: - Object[][] params = new Object[][]{ - // Default spec is returned in the case of context not having the lookup keys. - { - contextWithoutLookupKeys, - LookupLoadingSpec.ALL, + // Default spec is returned in the case of context not having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), LookupLoadingSpec.ALL - }, - // Default spec is returned in the case of context not having the lookup keys. - { - contextWithoutLookupKeys, - LookupLoadingSpec.NONE, + ) + ); + + // Default spec is returned in the case of context not having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(), LookupLoadingSpec.NONE - }, - // Only required lookups are returned in the case of context having the lookup keys. - { - contextWithModeOnlyRequired, - LookupLoadingSpec.ALL, - LookupLoadingSpec.loadOnly(lookupsToLoad) - }, - // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. - { - contextWithModeAll, - LookupLoadingSpec.NONE, + ) + ); + + // Only required lookups are returned in the case of context having the lookup keys. + Assert.assertEquals( + LookupLoadingSpec.loadOnly(lookupsToLoad), + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, new ArrayList<>(lookupsToLoad), + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED + ), LookupLoadingSpec.ALL - }, - // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. - { - contextWithModeNone, - LookupLoadingSpec.ALL, + ) + ); + + // No lookups are returned in the case of context having mode=NONE, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE), + LookupLoadingSpec.ALL + ) + ); + + // All lookups are returned in the case of context having mode=ALL, irrespective of the default spec. + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ALL), LookupLoadingSpec.NONE - }, - // Default spec is returned in the case of context=null. - { + ) + ); + + // Default spec is returned in the case of context=null. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( null, - LookupLoadingSpec.NONE, LookupLoadingSpec.NONE - } - }; - - return Arrays.asList(params); + ) + ); } } From 4907acf917ca98c424b9667a52366ed0e3336f1d Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 10 May 2024 10:41:38 +0530 Subject: [PATCH 09/17] Revert the change in junit import --- .../apache/druid/server/lookup/cache/LookupLoadingSpecTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index c8ff703d5258..bdfcd3d4089c 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.junit.Assert; -import org.junit.jupiter.api.Test; +import org.junit.Test; import java.util.ArrayList; import java.util.Set; From b39ce254372d799523bc7a5a0eda400fb8432c27 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 10 May 2024 12:56:11 +0530 Subject: [PATCH 10/17] Ensure backward compatibility by loading all lookups if transformSpec is present, unless overridden by context --- .../indexing/common/task/CompactionTask.java | 11 +- .../ClientCompactionTaskQuerySerdeTest.java | 467 +++++++++++------- .../common/task/CompactionTaskTest.java | 15 + 3 files changed, 298 insertions(+), 195 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ffbff0e453cf..4034103a2b0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -251,8 +251,13 @@ public CompactionTask( this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.segmentCacheManagerFactory = segmentCacheManagerFactory; - // By default, do not load any lookups in sub-tasks launched by compaction task. - addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + // Do not load any lookups in sub-tasks launched by compaction task, unless transformSpec is present. + // If transformSpec is present, we will not modify the context so that the sub-tasks can make the + // decision based on context values, loading all lookups by default. + // This is done to ensure backward compatibility since transformSpec can reference lookups. + if (transformSpec == null) { + addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + } } @VisibleForTesting @@ -1530,6 +1535,6 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) @Override public LookupLoadingSpec getLookupLoadingSpec() { - return LookupLoadingSpec.createFromContext(getContext(), LookupLoadingSpec.NONE); + return LookupLoadingSpec.createFromContext(getContext(), transformSpec == null ? LookupLoadingSpec.NONE : LookupLoadingSpec.ALL); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index c6bc89b0408d..67f3db4ffa57 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -35,6 +35,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; @@ -76,44 +77,155 @@ public class ClientCompactionTaskQuerySerdeTest { + static { + NullHandling.initializeForTests(); + } + private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils().getRowIngestionMetersFactory(); private static final CoordinatorClient COORDINATOR_CLIENT = new NoopCoordinatorClient(); private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); + private static final ObjectMapper MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); + + private static final IndexSpec INDEX_SPEC = IndexSpec.builder() + .withDimensionCompression(CompressionStrategy.LZ4) + .withMetricCompression(CompressionStrategy.LZF) + .withLongEncoding(LongEncodingStrategy.LONGS) + .build(); + private static final IndexSpec INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS = IndexSpec.builder() + .withDimensionCompression(CompressionStrategy.LZ4) + .withMetricCompression(CompressionStrategy.UNCOMPRESSED) + .withLongEncoding(LongEncodingStrategy.AUTO) + .build(); + private static final ClientCompactionIOConfig CLIENT_COMPACTION_IO_CONFIG = new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec( + Intervals.of("2019/2020"), + "testSha256OfSortedSegmentIds"), + true + ); + private static final ClientCompactionTaskGranularitySpec CLIENT_COMPACTION_TASK_GRANULARITY_SPEC = + new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true); + private static final ClientCompactionTaskDimensionsSpec CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC = + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))); + private static final AggregatorFactory[] METRICS_SPEC = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; + private static final ClientCompactionTaskTransformSpec CLIENT_COMPACTION_TASK_TRANSFORM_SPEC = + new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)); + private static final DynamicPartitionsSpec DYNAMIC_PARTITIONS_SPEC = new DynamicPartitionsSpec(100, 30000L); + private static final SegmentsSplitHintSpec SEGMENTS_SPLIT_HINT_SPEC = new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10); + + private static final CompactionTask.Builder COMPACTION_TASK_BUILDER = new CompactionTask.Builder( + "datasource", + new SegmentCacheManagerFactory(MAPPER), + new RetryPolicyFactory(new RetryPolicyConfig()) + ) + .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) + .tuningConfig( + new ParallelIndexTuningConfig( + null, + null, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + null, + null, + null, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + null, + null, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + null, + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + null, + null, + null, + 2, + null, + null, + null + ) + ) + .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC) + .dimensionsSpec( + DimensionsSpec.builder() + .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) + .setDimensionExclusions(ImmutableList.of("__time", "val")) + .build() + ) + .metricsSpec(METRICS_SPEC) + .transformSpec(CLIENT_COMPACTION_TASK_TRANSFORM_SPEC); @Test public void testClientCompactionTaskQueryToCompactionTask() throws IOException { - final Map context = ImmutableMap.of("key", "value"); - final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( "id", "datasource", - new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec( - Intervals.of("2019/2020"), - "testSha256OfSortedSegmentIds" - ), - true + CLIENT_COMPACTION_IO_CONFIG, + new ClientCompactionTaskQueryTuningConfig( + null, + null, + 40000, + 2000L, + null, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, + METRICS_SPEC, + CLIENT_COMPACTION_TASK_TRANSFORM_SPEC, + ImmutableMap.of("key", "value") + ); + + final byte[] json = MAPPER.writeValueAsBytes(query); + final CompactionTask task = (CompactionTask) MAPPER.readValue(json, Task.class); + + assertQueryToTask(query, task); + } + + @Test + public void testClientCompactionTaskQueryToCompactionTaskWithoutTransformSpec() throws IOException + { + Map context = new HashMap<>(); + context.put("key", "value"); + final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( + "id", + "datasource", + CLIENT_COMPACTION_IO_CONFIG, new ClientCompactionTaskQueryTuningConfig( null, null, 40000, 2000L, null, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, 2, 1000L, TmpFileSegmentWriteOutMediumFactory.instance(), @@ -126,16 +238,150 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException 100, 2 ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), - new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), - new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, - new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, + METRICS_SPEC, + null, context ); - final byte[] json = mapper.writeValueAsBytes(query); - final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class); + final byte[] json = MAPPER.writeValueAsBytes(query); + final CompactionTask task = (CompactionTask) MAPPER.readValue(json, Task.class); + + // Verify that CompactionTask has added new parameters into the context because transformSpec was null. + Assert.assertNotEquals(query.getContext(), task.getContext()); + context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + assertQueryToTask(query, task); + } + + @Test + public void testCompactionTaskToClientCompactionTaskQuery() throws IOException + { + final CompactionTask task = COMPACTION_TASK_BUILDER.build(); + + final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( + task.getId(), + "datasource", + CLIENT_COMPACTION_IO_CONFIG, + new ClientCompactionTaskQueryTuningConfig( + 100, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + 30000L, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 + ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, + METRICS_SPEC, + CLIENT_COMPACTION_TASK_TRANSFORM_SPEC, + new HashMap<>() + ); + + final byte[] json = MAPPER.writeValueAsBytes(task); + final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) MAPPER.readValue(json, ClientTaskQuery.class); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testCompactionTaskToClientCompactionTaskQueryWithoutTransformSpec() throws IOException + { + final CompactionTask task = COMPACTION_TASK_BUILDER.transformSpec(null).build(); + Map expectedContext = new HashMap<>(); + final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( + task.getId(), + "datasource", + CLIENT_COMPACTION_IO_CONFIG, + new ClientCompactionTaskQueryTuningConfig( + 100, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + 30000L, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 + ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, + METRICS_SPEC, + null, + expectedContext + ); + + final byte[] json = MAPPER.writeValueAsBytes(task); + final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) MAPPER.readValue(json, ClientTaskQuery.class); + + // Verify that CompactionTask has added new parameters into the context + Assert.assertNotEquals(expected, actual); + + expectedContext.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + Assert.assertEquals(expected, actual); + } + + private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) + { + final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); + objectMapper.setAnnotationIntrospectors( + new AnnotationIntrospectorPair( + guiceIntrospector, + objectMapper.getSerializationConfig().getAnnotationIntrospector() + ), + new AnnotationIntrospectorPair( + guiceIntrospector, + objectMapper.getDeserializationConfig().getAnnotationIntrospector() + ) + ); + GuiceInjectableValues injectableValues = new GuiceInjectableValues( + GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> { + binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); + binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); + binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); + binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); + binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); + } + ) + ) + ); + objectMapper.setInjectableValues(injectableValues); + objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); + return objectMapper; + } + + private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask task) + { Assert.assertEquals(query.getId(), task.getId()); Assert.assertEquals(query.getDataSource(), task.getDataSource()); Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); @@ -223,181 +469,18 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException query.getIoConfig().isDropExisting(), task.getIoConfig().isDropExisting() ); + Assert.assertEquals(query.getContext(), task.getContext()); Assert.assertEquals( query.getDimensionsSpec().getDimensions(), task.getDimensionsSpec().getDimensions() ); Assert.assertEquals( - query.getTransformSpec().getFilter(), - task.getTransformSpec().getFilter() + query.getTransformSpec(), + task.getTransformSpec() ); Assert.assertArrayEquals( query.getMetricsSpec(), task.getMetricsSpec() ); - - // Verify values of context keys originally present in the ClientCompactionTaskQuery - for (String key : context.keySet()) { - Assert.assertEquals(context.get(key), task.getContext().get(key)); - } - // Verify values of context parameters added by the CompactionTask - Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), task.getContext().get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)); - } - - @Test - public void testCompactionTaskToClientCompactionTaskQuery() throws IOException - { - final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - final CompactionTask.Builder builder = new CompactionTask.Builder( - "datasource", - new SegmentCacheManagerFactory(mapper), - new RetryPolicyFactory(new RetryPolicyConfig()) - ); - final CompactionTask task = builder - .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) - .tuningConfig( - new ParallelIndexTuningConfig( - null, - null, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - null, - null, - null, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), - 2, - null, - null, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - null, - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - null, - null, - null, - 2, - null, - null, - null - ) - ) - .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true)) - .dimensionsSpec( - DimensionsSpec.builder() - .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) - .setDimensionExclusions(ImmutableList.of("__time", "val")) - .build() - ) - .metricsSpec(new AggregatorFactory[] {new CountAggregatorFactory("cnt")}) - .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) - .build(); - - Map expectedContext = new HashMap<>(); - final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( - task.getId(), - "datasource", - new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec( - Intervals.of("2019/2020"), - "testSha256OfSortedSegmentIds" - ), - true - ), - new ClientCompactionTaskQueryTuningConfig( - 100, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - 30000L, - new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10), - new DynamicPartitionsSpec(100, 30000L), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.LZF) - .withLongEncoding(LongEncodingStrategy.LONGS) - .build(), - IndexSpec.builder() - .withDimensionCompression(CompressionStrategy.LZ4) - .withMetricCompression(CompressionStrategy.UNCOMPRESSED) - .withLongEncoding(LongEncodingStrategy.AUTO) - .build(), - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true), - new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), - new AggregatorFactory[] {new CountAggregatorFactory("cnt")}, - new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)), - expectedContext - ); - - final byte[] json = mapper.writeValueAsBytes(task); - final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); - - // Verify that CompactionTask has added new parameters into the context - Assert.assertNotEquals(expected, actual); - - expectedContext.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); - Assert.assertEquals(expected, actual); - } - - private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) - { - final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); - objectMapper.setAnnotationIntrospectors( - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getSerializationConfig().getAnnotationIntrospector() - ), - new AnnotationIntrospectorPair( - guiceIntrospector, - objectMapper.getDeserializationConfig().getAnnotationIntrospector() - ) - ); - GuiceInjectableValues injectableValues = new GuiceInjectableValues( - GuiceInjectors.makeStartupInjectorWithModules( - ImmutableList.of( - binder -> { - binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); - binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); - binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY); - binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT); - binder.bind(SegmentCacheManagerFactory.class).toInstance(new SegmentCacheManagerFactory(objectMapper)); - binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER); - binder.bind(OverlordClient.class).toInstance(new NoopOverlordClient()); - } - ) - ) - ); - objectMapper.setInjectableValues(injectableValues); - objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); - return objectMapper; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index e632c49ffdf8..7a39b46631c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -1753,6 +1753,21 @@ public void testGetDefaultLookupLoadingSpec() Assert.assertEquals(LookupLoadingSpec.NONE, task.getLookupLoadingSpec()); } + @Test + public void testGetDefaultLookupLoadingSpecWithTransformSpec() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-02")) + .transformSpec(new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null))) + .build(); + Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec()); + } + private Granularity chooseFinestGranularityHelper(List granularities) { SettableSupplier queryGranularity = new SettableSupplier<>(); From 02acf834397c27614806c8174931cfda0ed203cb Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Fri, 10 May 2024 13:05:24 +0530 Subject: [PATCH 11/17] Remove CompactionTask#getLookupLoadingSpec as it's redundant --- .../apache/druid/indexing/common/task/CompactionTask.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 4034103a2b0f..81447f3fd5eb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -1531,10 +1531,4 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) ); } } - - @Override - public LookupLoadingSpec getLookupLoadingSpec() - { - return LookupLoadingSpec.createFromContext(getContext(), transformSpec == null ? LookupLoadingSpec.NONE : LookupLoadingSpec.ALL); - } } From 0f08126accc14c26f0341b01b77d0dcdcf52f01a Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 11 May 2024 10:21:54 +0530 Subject: [PATCH 12/17] Address review comments --- .../ClientCompactionTaskQuerySerdeTest.java | 306 ++++++------------ .../lookup/cache/LookupLoadingSpecTest.java | 44 ++- 2 files changed, 134 insertions(+), 216 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 67f3db4ffa57..f519519095c5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -35,7 +35,6 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.NoopOverlordClient; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; @@ -77,10 +76,6 @@ public class ClientCompactionTaskQuerySerdeTest { - static { - NullHandling.initializeForTests(); - } - private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY = new TestUtils().getRowIngestionMetersFactory(); private static final CoordinatorClient COORDINATOR_CLIENT = new NoopCoordinatorClient(); @@ -97,109 +92,18 @@ public class ClientCompactionTaskQuerySerdeTest .withMetricCompression(CompressionStrategy.UNCOMPRESSED) .withLongEncoding(LongEncodingStrategy.AUTO) .build(); - private static final ClientCompactionIOConfig CLIENT_COMPACTION_IO_CONFIG = new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec( - Intervals.of("2019/2020"), - "testSha256OfSortedSegmentIds"), - true - ); private static final ClientCompactionTaskGranularitySpec CLIENT_COMPACTION_TASK_GRANULARITY_SPEC = new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true); - private static final ClientCompactionTaskDimensionsSpec CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC = - new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))); private static final AggregatorFactory[] METRICS_SPEC = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; private static final ClientCompactionTaskTransformSpec CLIENT_COMPACTION_TASK_TRANSFORM_SPEC = new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)); private static final DynamicPartitionsSpec DYNAMIC_PARTITIONS_SPEC = new DynamicPartitionsSpec(100, 30000L); private static final SegmentsSplitHintSpec SEGMENTS_SPLIT_HINT_SPEC = new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10); - private static final CompactionTask.Builder COMPACTION_TASK_BUILDER = new CompactionTask.Builder( - "datasource", - new SegmentCacheManagerFactory(MAPPER), - new RetryPolicyFactory(new RetryPolicyConfig()) - ) - .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) - .tuningConfig( - new ParallelIndexTuningConfig( - null, - null, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - null, - null, - null, - SEGMENTS_SPLIT_HINT_SPEC, - DYNAMIC_PARTITIONS_SPEC, - INDEX_SPEC, - INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, - 2, - null, - null, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - null, - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - null, - null, - null, - 2, - null, - null, - null - ) - ) - .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC) - .dimensionsSpec( - DimensionsSpec.builder() - .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) - .setDimensionExclusions(ImmutableList.of("__time", "val")) - .build() - ) - .metricsSpec(METRICS_SPEC) - .transformSpec(CLIENT_COMPACTION_TASK_TRANSFORM_SPEC); - @Test public void testClientCompactionTaskQueryToCompactionTask() throws IOException { - final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( - "id", - "datasource", - CLIENT_COMPACTION_IO_CONFIG, - new ClientCompactionTaskQueryTuningConfig( - null, - null, - 40000, - 2000L, - null, - SEGMENTS_SPLIT_HINT_SPEC, - DYNAMIC_PARTITIONS_SPEC, - INDEX_SPEC, - INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, - CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, - METRICS_SPEC, - CLIENT_COMPACTION_TASK_TRANSFORM_SPEC, - ImmutableMap.of("key", "value") - ); + final ClientCompactionTaskQuery query = createCompactionTaskQuery("id", CLIENT_COMPACTION_TASK_TRANSFORM_SPEC); final byte[] json = MAPPER.writeValueAsBytes(query); final CompactionTask task = (CompactionTask) MAPPER.readValue(json, Task.class); @@ -210,87 +114,23 @@ public void testClientCompactionTaskQueryToCompactionTask() throws IOException @Test public void testClientCompactionTaskQueryToCompactionTaskWithoutTransformSpec() throws IOException { - Map context = new HashMap<>(); - context.put("key", "value"); - final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( - "id", - "datasource", - CLIENT_COMPACTION_IO_CONFIG, - new ClientCompactionTaskQueryTuningConfig( - null, - null, - 40000, - 2000L, - null, - SEGMENTS_SPLIT_HINT_SPEC, - DYNAMIC_PARTITIONS_SPEC, - INDEX_SPEC, - INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, - CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, - METRICS_SPEC, - null, - context - ); + final ClientCompactionTaskQuery query = createCompactionTaskQuery("id", null); final byte[] json = MAPPER.writeValueAsBytes(query); final CompactionTask task = (CompactionTask) MAPPER.readValue(json, Task.class); // Verify that CompactionTask has added new parameters into the context because transformSpec was null. Assert.assertNotEquals(query.getContext(), task.getContext()); - context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + query.getContext().put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); assertQueryToTask(query, task); } @Test public void testCompactionTaskToClientCompactionTaskQuery() throws IOException { - final CompactionTask task = COMPACTION_TASK_BUILDER.build(); + final CompactionTask task = createCompactionTask(CLIENT_COMPACTION_TASK_TRANSFORM_SPEC); - final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( - task.getId(), - "datasource", - CLIENT_COMPACTION_IO_CONFIG, - new ClientCompactionTaskQueryTuningConfig( - 100, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - 30000L, - SEGMENTS_SPLIT_HINT_SPEC, - DYNAMIC_PARTITIONS_SPEC, - INDEX_SPEC, - INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, - CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, - METRICS_SPEC, - CLIENT_COMPACTION_TASK_TRANSFORM_SPEC, - new HashMap<>() - ); + final ClientCompactionTaskQuery expected = createCompactionTaskQuery(task.getId(), CLIENT_COMPACTION_TASK_TRANSFORM_SPEC); final byte[] json = MAPPER.writeValueAsBytes(task); final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) MAPPER.readValue(json, ClientTaskQuery.class); @@ -301,41 +141,9 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException @Test public void testCompactionTaskToClientCompactionTaskQueryWithoutTransformSpec() throws IOException { - final CompactionTask task = COMPACTION_TASK_BUILDER.transformSpec(null).build(); + final CompactionTask task = createCompactionTask(null); - Map expectedContext = new HashMap<>(); - final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( - task.getId(), - "datasource", - CLIENT_COMPACTION_IO_CONFIG, - new ClientCompactionTaskQueryTuningConfig( - 100, - new OnheapIncrementalIndex.Spec(true), - 40000, - 2000L, - 30000L, - SEGMENTS_SPLIT_HINT_SPEC, - DYNAMIC_PARTITIONS_SPEC, - INDEX_SPEC, - INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, - 2, - 1000L, - TmpFileSegmentWriteOutMediumFactory.instance(), - 100, - 5, - 1000L, - new Duration(3000L), - 7, - 1000, - 100, - 2 - ), - CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, - CLIENT_COMPACTION_TASK_DIMENSIONS_SPEC, - METRICS_SPEC, - null, - expectedContext - ); + final ClientCompactionTaskQuery expected = createCompactionTaskQuery(task.getId(), null); final byte[] json = MAPPER.writeValueAsBytes(task); final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) MAPPER.readValue(json, ClientTaskQuery.class); @@ -343,7 +151,7 @@ public void testCompactionTaskToClientCompactionTaskQueryWithoutTransformSpec() // Verify that CompactionTask has added new parameters into the context Assert.assertNotEquals(expected, actual); - expectedContext.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); + expected.getContext().put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.NONE.toString()); Assert.assertEquals(expected, actual); } @@ -483,4 +291,102 @@ private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask t task.getMetricsSpec() ); } + + private ClientCompactionTaskQuery createCompactionTaskQuery(String id, ClientCompactionTaskTransformSpec transformSpec) + { + Map context = new HashMap<>(); + context.put("key", "value"); + return new ClientCompactionTaskQuery( + id, + "datasource", + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true + ), + new ClientCompactionTaskQueryTuningConfig( + 100, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + 30000L, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + 2 + ), + CLIENT_COMPACTION_TASK_GRANULARITY_SPEC, + new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + METRICS_SPEC, + transformSpec, + context + ); + } + + private CompactionTask createCompactionTask(ClientCompactionTaskTransformSpec transformSpec) + { + CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder( + "datasource", + new SegmentCacheManagerFactory(MAPPER), + new RetryPolicyFactory(new RetryPolicyConfig()) + ) + .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) + .tuningConfig( + new ParallelIndexTuningConfig( + null, + null, + new OnheapIncrementalIndex.Spec(true), + 40000, + 2000L, + null, + null, + null, + SEGMENTS_SPLIT_HINT_SPEC, + DYNAMIC_PARTITIONS_SPEC, + INDEX_SPEC, + INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + 2, + null, + null, + 1000L, + TmpFileSegmentWriteOutMediumFactory.instance(), + null, + 100, + 5, + 1000L, + new Duration(3000L), + 7, + 1000, + 100, + null, + null, + null, + 2, + null, + null, + null + ) + ) + .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC) + .dimensionsSpec( + DimensionsSpec.builder() + .setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))) + .setDimensionExclusions(ImmutableList.of("__time", "val")) + .build() + ) + .metricsSpec(METRICS_SPEC) + .transformSpec(transformSpec) + .context(ImmutableMap.of("key", "value")); + + return compactionTaskBuilder.build(); + } } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index bdfcd3d4089c..8d5db9aaae2a 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -25,7 +25,7 @@ import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Set; public class LookupLoadingSpecTest @@ -63,10 +63,8 @@ public void testLoadingOnlyRequiredLookupsWithNullList() } @Test - public void testCreateLookupLoadingSpecFromContext() + public void testCreateLookupLoadingSpecFromEmptyContext() { - ImmutableSet lookupsToLoad = ImmutableSet.of("lookupName1", "lookupName2"); - // Default spec is returned in the case of context not having the lookup keys. Assert.assertEquals( LookupLoadingSpec.ALL, @@ -76,7 +74,6 @@ public void testCreateLookupLoadingSpecFromContext() ) ); - // Default spec is returned in the case of context not having the lookup keys. Assert.assertEquals( LookupLoadingSpec.NONE, LookupLoadingSpec.createFromContext( @@ -84,13 +81,37 @@ public void testCreateLookupLoadingSpecFromContext() LookupLoadingSpec.NONE ) ); + } + @Test + public void testCreateLookupLoadingSpecFromNullContext() + { + // Default spec is returned in the case of context=null. + Assert.assertEquals( + LookupLoadingSpec.NONE, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.NONE + ) + ); + Assert.assertEquals( + LookupLoadingSpec.ALL, + LookupLoadingSpec.createFromContext( + null, + LookupLoadingSpec.ALL + ) + ); + } + + @Test + public void testCreateLookupLoadingSpecFromContext() + { // Only required lookups are returned in the case of context having the lookup keys. Assert.assertEquals( - LookupLoadingSpec.loadOnly(lookupsToLoad), + LookupLoadingSpec.loadOnly(ImmutableSet.of("lookup1", "lookup2")), LookupLoadingSpec.createFromContext( ImmutableMap.of( - LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, new ArrayList<>(lookupsToLoad), + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookup1", "lookup2"), LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED ), LookupLoadingSpec.ALL @@ -115,14 +136,5 @@ public void testCreateLookupLoadingSpecFromContext() LookupLoadingSpec.NONE ) ); - - // Default spec is returned in the case of context=null. - Assert.assertEquals( - LookupLoadingSpec.NONE, - LookupLoadingSpec.createFromContext( - null, - LookupLoadingSpec.NONE - ) - ); } } From ac2d836c51de7bccdab30992337d52941c1cacd9 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Sat, 11 May 2024 17:26:14 +0530 Subject: [PATCH 13/17] Address review comment --- .../apache/druid/server/lookup/cache/LookupLoadingSpecTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 8d5db9aaae2a..e0d404aae3f7 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -82,6 +82,7 @@ public void testCreateLookupLoadingSpecFromEmptyContext() ) ); } + @Test public void testCreateLookupLoadingSpecFromNullContext() { From 262f6a15d37769a05e8e61bbb54e684041bb8d1f Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 13 May 2024 16:03:11 +0530 Subject: [PATCH 14/17] Address review comments --- .../lookup/cache/LookupLoadingSpec.java | 32 ++++++++++---- .../lookup/cache/LookupLoadingSpecTest.java | 43 +++++++++++++++++++ 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 8d95a37fade3..1f408a1fbe0a 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.error.InvalidInput; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -94,19 +95,34 @@ public static LookupLoadingSpec createFromContext(Map context, L return defaultSpec; } - final Object lookupModeValue = context.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE); + final Object lookupModeValue = context.get(CTX_LOOKUP_LOADING_MODE); if (lookupModeValue == null) { return defaultSpec; } - final LookupLoadingSpec.Mode lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + final LookupLoadingSpec.Mode lookupLoadingMode; + try { + lookupLoadingMode = LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString()); + } + catch (IllegalArgumentException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values are %s", + CTX_LOOKUP_LOADING_MODE, lookupModeValue.toString(), Arrays.asList(LookupLoadingSpec.Mode.values())); + } + + if (lookupLoadingMode == Mode.NONE) { + return NONE; + } else if (lookupLoadingMode == Mode.ALL) { + return ALL; + } else if (lookupLoadingMode == Mode.ONLY_REQUIRED) { + Collection lookupsToLoad; + try { + lookupsToLoad = (Collection) context.get(CTX_LOOKUPS_TO_LOAD); + } + catch (ClassCastException e) { + throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", + CTX_LOOKUPS_TO_LOAD, context.get(CTX_LOOKUPS_TO_LOAD)); + } - if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) { - return LookupLoadingSpec.NONE; - } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ALL) { - return LookupLoadingSpec.ALL; - } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) { - Collection lookupsToLoad = (Collection) context.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD); if (lookupsToLoad == null || lookupsToLoad.isEmpty()) { throw InvalidInput.exception("Set of lookups to load cannot be %s for mode[ONLY_REQUIRED].", lookupsToLoad); } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index e0d404aae3f7..745a45755378 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -21,13 +21,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.apache.druid.error.DruidException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; import java.util.Arrays; import java.util.Set; +@RunWith(JUnitParamsRunner.class) public class LookupLoadingSpecTest { @Test @@ -138,4 +142,43 @@ public void testCreateLookupLoadingSpecFromContext() ) ); } + + @Test + @Parameters( + { + "NONE1", + "A", + "Random mode", + "all", + "only required", + "none" + } + ) + public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( + ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), LookupLoadingSpec.ALL)); + Assert.assertEquals(String.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); + } + + @Test + @Parameters( + { + "foo bar", + "foo]" + } + ) + public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) + { + DruidException exception = Assert.assertThrows(DruidException.class, () -> + LookupLoadingSpec.createFromContext( + ImmutableMap.of( + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad, + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED), + LookupLoadingSpec.ALL) + ); + Assert.assertEquals(String.format("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); + } } From 0f6b09dce7ea5f1c7a59668de3de2b5c160eec7c Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 13 May 2024 16:05:03 +0530 Subject: [PATCH 15/17] Make DruidException final in the tests --- .../druid/server/lookup/cache/LookupLoadingSpecTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index 745a45755378..c3d64b1ad11a 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -156,7 +156,7 @@ public void testCreateLookupLoadingSpecFromContext() ) public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) { - DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( + final DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), LookupLoadingSpec.ALL)); Assert.assertEquals(String.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); @@ -171,7 +171,7 @@ public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) ) public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookupsToLoad) { - DruidException exception = Assert.assertThrows(DruidException.class, () -> + final DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( ImmutableMap.of( LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad, From 3f7de755990c665fbd32f50a25797d17e2effbec Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Mon, 13 May 2024 16:35:18 +0530 Subject: [PATCH 16/17] Use StringUtils.format() instead of String.format() --- .../druid/server/lookup/cache/LookupLoadingSpecTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index c3d64b1ad11a..d140e6c11d8f 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -24,6 +24,7 @@ import junitparams.JUnitParamsRunner; import junitparams.Parameters; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -158,8 +159,8 @@ public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode) { final DruidException exception = Assert.assertThrows(DruidException.class, () -> LookupLoadingSpec.createFromContext( ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), LookupLoadingSpec.ALL)); - Assert.assertEquals(String.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", - LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed values are [ALL, NONE, ONLY_REQUIRED]", + LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage()); } @Test @@ -178,7 +179,7 @@ public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookup LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED), LookupLoadingSpec.ALL) ); - Assert.assertEquals(String.format("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); } } From e6e92a46c63efc45a1d092b378a34f47f662a865 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Tue, 14 May 2024 11:59:32 +0530 Subject: [PATCH 17/17] Address review comment --- .../apache/druid/server/lookup/cache/LookupLoadingSpec.java | 3 ++- .../druid/server/lookup/cache/LookupLoadingSpecTest.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java index 1f408a1fbe0a..4665bdd18cf4 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java @@ -119,7 +119,8 @@ public static LookupLoadingSpec createFromContext(Map context, L lookupsToLoad = (Collection) context.get(CTX_LOOKUPS_TO_LOAD); } catch (ClassCastException e) { - throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", + throw InvalidInput.exception("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", CTX_LOOKUPS_TO_LOAD, context.get(CTX_LOOKUPS_TO_LOAD)); } diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java index d140e6c11d8f..d36aff6914cb 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java @@ -179,7 +179,8 @@ public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object lookup LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, LookupLoadingSpec.Mode.ONLY_REQUIRED), LookupLoadingSpec.ALL) ); - Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of lookup names.", - LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); + Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please provide a comma-separated list of " + + "lookup names. For example: [\"lookupName1\", \"lookupName2\"]", + LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage()); } }