Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
Browse files Browse the repository at this point in the history
  • Loading branch information
somu-imply committed Mar 27, 2024
2 parents c3e2c29 + a65b2d4 commit 735b621
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 155 deletions.
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.|

If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
implements Supervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION = "scalingSkipReason";
public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = "task/autoScaler/requiredCount";

private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
Expand Down Expand Up @@ -403,11 +405,13 @@ public boolean equals(Object obj)
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
ServiceEmitter emitter;
private static final String TYPE = "dynamic_allocation_tasks_notice";

DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
this.scaleAction = scaleAction;
this.emitter = emitter;
}

/**
Expand Down Expand Up @@ -448,17 +452,35 @@ public void handle()
return;
}
}
final Integer desiredTaskCount = scaleAction.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired task count is [%s], active task count is [%s]",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
dataSource
dataSource,
desiredTaskCount,
getActiveTaskGroupsCount()
);

if (desiredTaskCount > 0) {
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"minTriggerScaleActionFrequencyMillis not elapsed yet"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
return;
}
final Integer desriedTaskCount = scaleAction.call();
boolean allocationSuccess = changeTaskCount(desriedTaskCount);

if (desiredTaskCount > 0) {
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}

boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
Expand Down Expand Up @@ -1208,9 +1230,9 @@ public void tryInit()
}
}

public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter));
}

private Runnable buildRunTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
}
return new NoopTaskAutoScaler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
Expand All @@ -38,6 +39,6 @@ public interface AutoScalerConfig
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter);
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -45,11 +48,17 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
private final SupervisorSpec spec;
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;

private static final ReentrantLock LOCK = new ReentrantLock(true);

public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
public LagBasedAutoScaler(
SeekableStreamSupervisor supervisor,
String dataSource,
LagBasedAutoScalerConfig autoScalerConfig,
SupervisorSpec spec,
ServiceEmitter emitter
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
Expand All @@ -62,6 +71,10 @@ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
this.spec = spec;
this.supervisor = supervisor;
this.emitter = emitter;
metricBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream());
}

@Override
Expand Down Expand Up @@ -93,7 +106,7 @@ public void start()
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction),
supervisor.buildDynamicAllocationTask(scaleAction, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
Expand Down Expand Up @@ -214,6 +227,12 @@ private int computeDesiredTaskCount(List<Long> lags)
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at max task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
Expand All @@ -228,6 +247,12 @@ private int computeDesiredTaskCount(List<Long> lags)
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at min task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -154,9 +155,9 @@ public int getTaskCountMin()
}

@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec, emitter);
}

@JsonProperty
Expand Down

0 comments on commit 735b621

Please sign in to comment.