Skip to content

Commit

Permalink
Add JoinableFactory interface and use it in the query stack. (#9247)
Browse files Browse the repository at this point in the history
* Add JoinableFactory interface and use it in the query stack.

Also includes InlineJoinableFactory, which enables joining against
inline datasources. This is the first patch where a basic join query
actually works. It includes integration tests.

* Fix test issues.

* Adjustments from code review.
  • Loading branch information
gianm committed Jan 24, 2020
1 parent 3daf0f8 commit 19b427e
Show file tree
Hide file tree
Showing 63 changed files with 1,120 additions and 96 deletions.
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/druid/utils/JvmUtils.java
Expand Up @@ -29,6 +29,8 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -80,13 +82,29 @@ public static long safeGetThreadCpuTime()
* @return total CPU time for the current thread in nanoseconds.
*
* @throws UnsupportedOperationException if the Java virtual machine does not support CPU time measurement for
* the current thread.
* the current thread.
*/
public static long getCurrentThreadCpuTime()
{
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}

/**
* Executes and returns the value of {@code function}. Also accumulates the CPU time taken for the function (as
* reported by {@link #getCurrentThreadCpuTime()} into {@param accumulator}.
*/
public static <T> T safeAccumulateThreadCpuTime(final AtomicLong accumulator, final Supplier<T> function)
{
final long start = safeGetThreadCpuTime();

try {
return function.get();
}
finally {
accumulator.addAndGet(safeGetThreadCpuTime() - start);
}
}

public static List<URL> systemClassPath()
{
List<URL> jobURLs;
Expand Down
Expand Up @@ -105,6 +105,7 @@
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
Expand Down Expand Up @@ -2652,6 +2653,7 @@ public void close()
handoffNotifierFactory,
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
Expand Down
Expand Up @@ -97,6 +97,7 @@
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
Expand Down Expand Up @@ -2888,6 +2889,7 @@ public void close()
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class TaskToolbox
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
private final ObjectMapper jsonMapper;
private final File taskWorkDir;
Expand Down Expand Up @@ -116,6 +118,7 @@ public TaskToolbox(
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper jsonMapper,
Expand Down Expand Up @@ -146,6 +149,7 @@ public TaskToolbox(
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.jsonMapper = jsonMapper;
Expand Down Expand Up @@ -229,6 +233,11 @@ public ExecutorService getQueryExecutorService()
return queryExecutorService;
}

public JoinableFactory getJoinableFactory()
{
return joinableFactory;
}

public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class TaskToolboxFactory
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final MonitorScheduler monitorScheduler;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper jsonMapper;
Expand Down Expand Up @@ -102,6 +104,7 @@ public TaskToolboxFactory(
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
SegmentLoaderFactory segmentLoaderFactory,
@Json ObjectMapper jsonMapper,
Expand Down Expand Up @@ -131,6 +134,7 @@ public TaskToolboxFactory(
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.segmentLoaderFactory = segmentLoaderFactory;
this.jsonMapper = jsonMapper;
Expand Down Expand Up @@ -164,6 +168,7 @@ public TaskToolbox build(Task task)
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
joinableFactory,
monitorScheduler,
segmentLoaderFactory.manufacturate(taskWorkDir),
jsonMapper,
Expand Down
Expand Up @@ -772,6 +772,7 @@ private Appenderator newAppenderator(
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()
Expand Down
Expand Up @@ -341,6 +341,7 @@ public String getVersion(final Interval interval)
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
toolbox.getCache(),
Expand Down
Expand Up @@ -206,6 +206,7 @@ public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox t
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
Expand Down Expand Up @@ -109,6 +110,7 @@ public void setUp() throws IOException
mockHandoffNotifierFactory,
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
NoopJoinableFactory.INSTANCE,
mockMonitorScheduler,
mockSegmentLoaderFactory,
ObjectMapper,
Expand Down
Expand Up @@ -113,6 +113,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
Expand Down Expand Up @@ -1591,6 +1592,7 @@ public List<StorageLocationConfig> getLocations()
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
Expand Down Expand Up @@ -877,6 +878,7 @@ public List<StorageLocationConfig> getLocations()
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
loader,
objectMapper,
Expand Down
Expand Up @@ -108,6 +108,7 @@
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -1274,6 +1275,7 @@ private static class TestTaskToolbox extends TaskToolbox
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
null,
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
Expand Down Expand Up @@ -306,6 +307,7 @@ public ListenableFuture<TaskStatus> run(Task task)
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
objectMapper,
Expand Down
Expand Up @@ -100,6 +100,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.FireDepartment;
Expand Down Expand Up @@ -973,6 +974,7 @@ public List<StorageLocationConfig> getLocations()
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
Expand Down Expand Up @@ -60,6 +61,7 @@ public Appenderator createRealtimeAppenderatorForTask(
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
Expand All @@ -78,6 +80,7 @@ public Appenderator createRealtimeAppenderatorForTask(
segmentAnnouncer,
emitter,
queryExecutorService,
joinableFactory,
cache,
cacheConfig,
cachePopulatorStats
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
Expand Down Expand Up @@ -301,6 +302,7 @@ public File getStorageDirectory()
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
newSegmentLoader(temporaryFolder.newFolder()),
getObjectMapper(),
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void setup() throws IOException
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, utils.getTestObjectMapper()),
utils.getTestObjectMapper(),
Expand Down
Expand Up @@ -117,6 +117,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand Down Expand Up @@ -663,6 +664,7 @@ public void unannounceSegments(Iterable<DataSegment> segments)
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
Execs.directExecutor(), // query executor service
NoopJoinableFactory.INSTANCE,
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(null, new DefaultObjectMapper()),
MAPPER,
Expand Down Expand Up @@ -1329,6 +1331,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception

UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
exec,
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(2048),
new CacheConfig(),
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
Expand Down Expand Up @@ -120,6 +121,7 @@ public List<StorageLocationConfig> getLocations()
notifierFactory,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, jsonMapper),
jsonMapper,
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.IndexerZkConfig;
Expand Down Expand Up @@ -170,7 +171,18 @@ private WorkerTaskMonitor createTaskMonitor()
taskConfig,
null,
taskActionClientFactory,
null, null, null, null, null, null, null, notifierFactory, null, null, null,
null,
null,
null,
null,
null,
null,
null,
notifierFactory,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentLoaderFactory(null, jsonMapper),
jsonMapper,
indexIO,
Expand Down

0 comments on commit 19b427e

Please sign in to comment.