Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,4 @@ public String getType()
{
return TYPE;
}

@Override
public boolean supportsQueries()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,4 @@ public Set<ResourceAction> getInputSourceResources()
{
return INPUT_SOURCE_RESOURCES;
}

@Override
public boolean supportsQueries()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ public void testRunAfterDataInserted() throws Exception
Duration.standardHours(2).getStandardMinutes()
)
);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down Expand Up @@ -1296,7 +1295,6 @@ public void testKafkaRecordEntityInputFormat() throws Exception
Duration.standardHours(2).getStandardMinutes()
)
);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down Expand Up @@ -1369,7 +1367,6 @@ public void testKafkaInputFormat() throws Exception
Duration.standardHours(2).getStandardMinutes()
)
);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down Expand Up @@ -3095,7 +3092,6 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
)
);

Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Expand Down Expand Up @@ -3168,7 +3164,6 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception
)
);

Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit
Expand Down Expand Up @@ -3243,7 +3238,6 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception
)
);

Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);

// Wait for task to exit. Should fail and trip up with the first two bad messages in the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,6 @@ public Set<ResourceAction> getInputSourceResources()
return INPUT_SOURCE_RESOURCES;
}

@Override
public boolean supportsQueries()
{
return true;
}

@VisibleForTesting
AWSCredentialsConfig getAwsCredentialsConfig()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void injectsProperAwsCredentialsConfig() throws Exception
Assert.assertEquals(ACCESS_KEY, awsCredentialsConfig.getAccessKey().getPassword());
Assert.assertEquals(SECRET_KEY, awsCredentialsConfig.getSecretKey().getPassword());
Assert.assertEquals(FILE_SESSION_CREDENTIALS, awsCredentialsConfig.getFileSessionCredentials());
Assert.assertNotNull(target.getPeonProcessingModuleConfig());
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ public void testRunAfterDataInserted() throws Exception
ImmutableMap.of(SHARD_ID1, "2"),
ImmutableMap.of(SHARD_ID1, "4")
);
Assert.assertTrue(task.supportsQueries());

final ListenableFuture<TaskStatus> future = runTask(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private List<String> generateCommand(Task task)
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
if (task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private Collection<EnvVar> getEnv(Task task) throws IOException
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
.withValue(Boolean.toString(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()))
.build()
);
if (!shouldUseDeepStorageForTaskPayload(task)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ public void test_fromTask_taskSupportsQueries() throws IOException
);

Task task = EasyMock.mock(Task.class);
EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
Expand Down Expand Up @@ -456,7 +455,6 @@ public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOExcep
);

Task task = EasyMock.mock(Task.class);
EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spec:
- name: "TASK_ID"
value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ spec:
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spec:
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
value: "ALL"
value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ public QueryProcessingPool getProcessingExecutorPool(
Lifecycle lifecycle
)
{
if (task.supportsQueries()) {
if (task.getPeonProcessingModuleConfig().hasProcessingThreads()) {
return DruidProcessingModule.createProcessingExecutorPool(config, executorServiceMonitor, lifecycle);
} else {
if (config.isNumThreadsConfigured()) {
log.warn(
"Ignoring the configured numThreads[%d] because task[%s] of type[%s] does not support queries",
"Ignoring the configured numThreads[%d] because task[%s] of type[%s] does not need processing threads",
config.getNumThreads(),
task.getId(),
task.getType()
Expand All @@ -108,7 +108,7 @@ public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(
RuntimeInfo runtimeInfo
)
{
if (task.supportsQueries()) {
if (task.getPeonProcessingModuleConfig().hasProcessingBuffers()) {
return DruidProcessingModule.createIntermediateResultsPool(config, runtimeInfo);
} else {
return DummyNonBlockingPool.instance();
Expand All @@ -120,13 +120,13 @@ public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(
@Merging
public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, DruidProcessingConfig config, RuntimeInfo runtimeInfo)
{
if (task.supportsQueries()) {
if (task.getPeonProcessingModuleConfig().hasMergeBuffers()) {
return DruidProcessingModule.createMergeBufferPool(config, runtimeInfo);
} else {
if (config.isNumMergeBuffersConfigured()) {
log.warn(
"Ignoring the configured numMergeBuffers[%d] because task[%s] of type[%s] does not support queries",
config.getNumThreads(),
"Ignoring the configured numMergeBuffers[%d] because task[%s] of type[%s] does not need merge buffers",
config.getNumMergeBuffers(),
task.getId(),
task.getType()
);
Expand All @@ -145,4 +145,47 @@ public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
{
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig);
}

/**
* Returned by {@link Task#getPeonProcessingModuleConfig()} to declare which resources the task actually needs.
*/
public static class Config
{
private boolean processingBuffers;
private boolean processingThreads;
private boolean mergeBuffers;

public Config withProcessingBuffers()
{
this.processingBuffers = true;
return this;
}

public Config withProcessingThreads()
{
this.processingThreads = true;
return this;
}

public Config withMergeBuffers()
{
this.mergeBuffers = true;
return this;
}

public boolean hasProcessingBuffers()
{
return processingBuffers;
}

public boolean hasProcessingThreads()
{
return processingThreads;
}

public boolean hasMergeBuffers()
{
return mergeBuffers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,6 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return null;
}

@Override
public boolean supportsQueries()
{
return false;
}

@Override
public String getClasspathPrefix()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -146,4 +148,16 @@ public static NoopTask ofPriority(int priority)
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(null, null, null, 0, 0, context);
}

@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}

@Override
public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.indexer.TaskIdStatus;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
Expand Down Expand Up @@ -176,16 +177,13 @@ default Set<ResourceAction> getInputSourceResources() throws UOE
<T> QueryRunner<T> getQueryRunner(Query<T> query);

/**
* True if this task type embeds a query stack, and therefore should preload resources (like broadcast tables)
* that may be needed by queries. Tasks supporting queries are also allocated processing buffers, processing threads
* and merge buffers. Those which do not should not assume that these resources are present and must explicitly allocate
* any direct buffers or processing pools if required.
*
* If true, {@link #getQueryRunner(Query)} does not necessarily return nonnull query runners. For example,
* MSQWorkerTask returns true from this method (because it embeds a query stack for running multi-stage queries)
* even though it is not directly queryable via HTTP.
* Declares which resources provided by {@link PeonProcessingModule} this task actually needs. The default
* implementation has all the optional items disabled.
*/
boolean supportsQueries();
default PeonProcessingModule.Config getPeonProcessingModuleConfig()
{
return new PeonProcessingModule.Config();
}

/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public TaskStatus call()
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
if (task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Suppliers;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
Expand Down Expand Up @@ -335,4 +336,13 @@ public Appenderator getAppenderator()
{
return runnerSupplier.get();
}

@Override
public PeonProcessingModule.Config getPeonProcessingModuleConfig()
{
return new PeonProcessingModule.Config()
.withProcessingBuffers()
.withProcessingThreads()
.withMergeBuffers();
}
}
Loading
Loading