Skip to content

Commit

Permalink
Dynamic auto scale Kafka-Stream ingest tasks (#10524)
Browse files Browse the repository at this point in the history
* druid task auto scale based on kafka lag

* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig

* druid task auto scale based on kafka lag

* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig

* test dynamic auto scale done

* auto scale tasks tested on prd cluster

* auto scale tasks tested on prd cluster

* modify code style to solve 29055.10 29055.9 29055.17 29055.18 29055.19 29055.20

* rename test fiel function

* change codes and add docs based on capistrant reviewed

* midify test docs

* modify docs

* modify docs

* modify docs

* merge from master

* Extract the autoScale logic out of SeekableStreamSupervisor to minimize putting more stuff inside there &&  Make autoscaling algorithm configurable and scalable.

* fix ci failed

* revert msic.xml

* add uts to test autoscaler create && scale out/in and kafka ingest with scale enable

* add more uts

* fix inner class check

* add IT for kafka ingestion with autoscaler

* add new IT in groups=kafka-index named testKafkaIndexDataWithWithAutoscaler

* review change

* code review

* remove unused imports

* fix NLP

* fix docs and UTs

* revert misc.xml

* use jackson to build autoScaleConfig with default values

* add uts

* use jackson to init AutoScalerConfig in IOConfig instead of Map<>

* autoscalerConfig interface and provide a defaultAutoScalerConfig

* modify uts

* modify docs

* fix checkstyle

* revert misc.xml

* modify uts

* reviewed code change

* reviewed code change

* code reviewed

* code review

* log changed

* do StringUtils.encodeForFormat when create allocationExec

* code review && limit taskCountMax to partitionNumbers

* modify docs

* code review

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Mar 6, 2021
1 parent 16acd66 commit bddacbb
Show file tree
Hide file tree
Showing 36 changed files with 2,671 additions and 23 deletions.
109 changes: 109 additions & 0 deletions docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,115 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|

### Task Autoscaler Properties

> Note that Task AutoScaler is currently designated as experimental.
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes |
| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |

### Lag Based AutoScaler Strategy Related Properties
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) |
| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) |
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |

A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [

],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 6000000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
},
"tuningConfig":{
"type":"kafka",
"maxRowsPerSegment":5000000
}
}
```

#### More on consumerProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
Expand Down Expand Up @@ -282,6 +283,18 @@ public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
// do nothing
}

@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public int getActiveTaskGroupsCount()
{
throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
}

/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
Expand All @@ -50,6 +52,7 @@
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.concurrent.Callable;

public class MaterializedViewSupervisorSpecTest
{
Expand Down Expand Up @@ -155,6 +158,85 @@ public void testSupervisorSerialization() throws IOException
Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
}

@Test
public void testMaterializedViewSupervisorSpecCreated()
{
Exception ex = null;

try {
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
),
null,
null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
Supervisor supervisor = spec.createSupervisor();
Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);

SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);

try {
supervisor.computeLagStats();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

try {
int count = supervisor.getActiveTaskGroupsCount();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
{
return -1;
}
};

}
catch (Exception e) {
ex = e;
}

Assert.assertNull(ex);
}

@Test
public void testSuspendResuume() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
Expand All @@ -58,6 +59,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -330,6 +332,17 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
return false;
}

@Override
public LagStats computeLagStats()
{
Map<Integer, Long> partitionRecordLag = getPartitionRecordLag();
if (partitionRecordLag == null) {
return new LagStats(0, 0, 0);
}

return computeLags(partitionRecordLag);
}

@Override
protected void updatePartitionLagFromStream()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;

import javax.annotation.Nullable;
import java.util.Map;

public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
Expand All @@ -51,6 +53,7 @@ public KafkaSupervisorIOConfig(
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
Expand All @@ -73,6 +76,7 @@ public KafkaSupervisorIOConfig(
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
);

Expand Down Expand Up @@ -117,6 +121,7 @@ public String toString()
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
", autoScalerConfig=" + getAutoscalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void testSample()
null,
null,
null,
null,
true,
null,
null,
Expand Down
Loading

0 comments on commit bddacbb

Please sign in to comment.