Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic auto scale Kafka-Stream ingest tasks #10524

Merged

Conversation

zhangyue19921010
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 commented Oct 21, 2020

Description

In druid, users need to set 'taskCount' when submit Kafka ingestion supervisor. It has a few limitations :

  1. When supervisor is running, we can't modify the task count number. We may meet data lag during sudden peak traffic period. Users have to re-submit the supervisor with a larger task number, aiming to catch up Kafka delay. But if there are too many supervisors, this re-submit operation is very complicated. In addition do scale in action manually after sudden traffic peak.
  2. In order to avoid Kafka lag during regular traffic peak, users have to set a large task count in supervisors. So that it will cause the waste of resource during regular traffic off-peak.
    For example,

traffic-pattern

Here is our traffic pattern. I have to set taskCount to 8, avoiding Kafka lag during traffic peak. At other times, 4 tasks are enough. This PR provides the ability of auto scaling the number of Kafka ingest tasks based on Lag metrics when supervisors are running. Enable this feature and ingest tasks will auto scale out during traffic peak and scale in during traffic off-peak.

Design

Here are the designs of this PR:
The work flow of supervisor controller based on druid source code
屏幕快照 2020-10-21 下午1 44 54
As the picture shows, SupervisorManger controls all the supervisors in OverLord Service. Each Kafka Supervisor serially consume notices in LinkedBlockingQueue. Notice is an interface. RunNotice, ShutdownNotice and RestNotice are implementations of this interface. I design a new implementation named DynamicAllocationTasksNotice. I create a new Timer(lagComputationExec) to collect Kafka lags at fix rate and create a new Timer(allocationExec) to check and do scale action at fix rate, as shown below
屏幕快照 2020-10-21 下午1 45 11
For allocationExec details ,
屏幕快照 2020-10-21 下午1 45 36
Furthermore, I expand the ioConfig spec and add new parameters to control the scale behave, for example

"ioConfig": {
      "topic": "dummy_topic",
      "inputFormat": null,
      "replicas": 1,
      "taskCount": 1,
      "taskDuration": "PT3600S",
      "consumerProperties": {
        "bootstrap.servers": "xxx,xxx,xxx"
      },
      "autoScalerConfig": {
        "enableTaskAutoScaler": true,
        "lagCollectionIntervalMillis": 30000,
        "lagCollectionRangeMillis": 600000,
        "scaleOutThreshold": 6000000,
        "triggerScaleOutThresholdFrequency": 0.3,
        "scaleInThreshold": 1000000,
        "triggerScaleInThresholdFrequency": 0.9,
        "scaleActionStartDelayMillis": 300000,
        "scaleActionPeriodMillis": 60000,
        "taskCountMax": 6,
        "taskCountMin": 2,
        "scaleInStep": 1,
        "scaleOutStep": 2,
        "minTriggerScaleActionFrequencyMillis": 600000
      },
      "pollTimeout": 100,
      "startDelay": "PT5S",
      "period": "PT30S",
      "useEarliestOffset": false,
      "completionTimeout": "PT1800S",
      "lateMessageRejectionPeriod": null,
      "earlyMessageRejectionPeriod": null,
      "lateMessageRejectionStartDateTime": null,
      "stream": "dummy_topic",
      "useEarliestSequenceNumber": false
    }
Property Description Required
enableTaskAutoScaler Whether enable this feature or not. Set false or ignored here will disable autoScaler even though autoScalerConfig is not null no (default == false)
lagCollectionIntervalMillis Define the frequency of lag points collection. no (default == 30000)
lagCollectionRangeMillis The total time window of lag collection, Use with lagCollectionIntervalMillis,it means that in the recent lagCollectionRangeMillis, collect lag metric points every lagCollectionIntervalMillis. no (default == 600000)
scaleOutThreshold The Threshold of scale out action no (default == 6000000)
triggerScaleOutThresholdFrequency If triggerScaleOutThresholdFrequency percent of lag points are higher than scaleOutThreshold, then do scale out action. no (default == 0.3)
scaleInThreshold The Threshold of scale in action no (default == 1000000)
triggerScaleInThresholdFrequency If triggerScaleInThresholdFrequency percent of lag points are lower than scaleOutThreshold, then do scale in action. no (default == 0.9)
scaleActionStartDelayMillis Number of milliseconds after supervisor starts when first check scale logic. no (default == 300000)
scaleActionPeriodMillis The frequency of checking whether to do scale action in millis no (default == 60000)
taskCountMax Maximum value of task count. Make Sure taskCountMax >= taskCountMin yes
taskCountMin Minimum value of task count. When enable autoscaler, the value of taskCount in IOConfig will be ignored, and taskCountMin will be the number of tasks that ingestion starts going up to taskCountMax yes
scaleInStep How many tasks to reduce at a time no (default == 1)
scaleOutStep How many tasks to add at a time no (default == 2)
minTriggerScaleActionFrequencyMillis Minimum time interval between two scale actions no (default == 600000)
autoScalerStrategy The algorithm of autoScaler. ONLY lagBased is supported for now. no (default == lagBased)

Effect evaluation :

I have deployed this feature in our Production Environment
Figure 1 : Kafka ingestion lag
Ingest kafka lag

Figure 2 : Task count
Running task count per datasource

Figure 3 : Ingest speed total
Ingest speed per datasource

Druid ingestion task is divided into two states: reading and writing, When druid scales out at 10:38, druid will launch 3 new tasks in reading state, and change the old one's state from reading to writing which will finish writing in few minutes. This is why the figure2 shows a peak of 4 from 10:38 to 10:42(3 new reading tasks and one writing task) and a peak of 5 from 11:06 to 11:08. In fact, what we really care about is the tasks in reading state. In other words, the real peak of task number is 3 all the time, which scale out at 10:39 due to Kafka lag and there is no gap between the traffic peak and task peak.

Conclusion
Here are the benefits of Druid Auto Scale :

  1. Help improve data SLA: whenever there is heavy traffic, Druid can scale out automatically and sensitively  to provide stronger consuming power so that there is no delay for downstream.
  2. Resource Saving :
    • Cost Saving, because the task number of each datasource is reduced, so that there is no need for as many task solts as before.
    • Support Resource Saving, the entire process from scale out action to scale in action does not require human intervention.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • SeekableStreamSupervisor.java
  • KafkaSupervisor.java
  • KafkaSupervisorIOConfig.java

@zhangyue19921010 zhangyue19921010 changed the title Kafka dynamic scale ingest tasks Dynamic scale ingest tasks Oct 23, 2020
@zhangyue19921010 zhangyue19921010 changed the title Dynamic scale ingest tasks Dynamic scale Kafka-Stream ingest tasks Oct 23, 2020
@zhangyue19921010 zhangyue19921010 changed the title Dynamic scale Kafka-Stream ingest tasks Dynamic auto scale Kafka-Stream ingest tasks Oct 23, 2020
@nishantmonu51 nishantmonu51 self-requested a review October 30, 2020 13:05
Copy link
Contributor

@capistrant capistrant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a super cool idea. I have begun a review and included comments/questions/suggestions.

initial takeaways:

  • Docs, lets get all the new IOConfig stuff documented in .md files
  • Javadocs, would be great to get javadocs created for all the new methods, especially the more critical/complex ones
  • logging. Lots of logging that does not provide information about the supervisor that is logging. Also I think we can do some scaling back on what gets info level. Some logs seemed better off for debug.
  • logging 2. in places where we catch errors and just continue on, should we log warn instead of error?
  • configuration: For the new config block in IOconfig, is there anyway to have a POJO structure for this new block? If not that, I think at least extracting the default values out to final variables in the classes they live would help make things easier to follow.

I'll try to continue reviewing during the rest of the week but wanted to submit what I have so far as I need to take a break to work on other things

@zhangyue19921010
Copy link
Contributor Author

@capistrant Thanks for your review! I will make changes as soon as possible :)

@zhangyue19921010
Copy link
Contributor Author

Hi @pjain1 Thanks a lot for your review and approval.

@zhangyue19921010
Copy link
Contributor Author

Hi @capistrant and Hi @himanshug Sorry to bother you. All your suggested changes have been completed and tested. Also CI is passed. So could you please +1 for this PR? Or If there's any further suggestion, pleeeeeease let me know, I will try my best to get it done! Thanks.

@pjain1
Copy link
Member

pjain1 commented Mar 1, 2021

@zhangyue19921010 I was wondering if you have thought of the case when the desired task count becomes more than the number of topic partitions. In that case the number of actual tasks will remain equal to the number of topic partitions and will not grow beyond that. In case the overall lag is still higher than threshold increasing the desired task count will not help and I think as per the logic every time dynamic allocate task notice is run it will increase the desired task count by scaleOutStep. Do you see this as a problem ?

@zhangyue19921010
Copy link
Contributor Author

Hi @pjain1 Thanks for asking. Yep, if desired task count becomes more than the number of topic partitions, it will increase task number every dynamicCheckPeriod until meet taskCountMax instead of unlimited growth. So that the increase behavior is controllable. Also to avoid this unnecessary scenario maybe we can document the relationship between taskCountMax and kafka partition like taskcount in IOConfig does? For example The number of reading tasks will be less than taskCountMax if taskCountMax > {numKafkaPartitions}.
屏幕快照 2021-03-01 下午6 49 59

@pjain1
Copy link
Member

pjain1 commented Mar 1, 2021

I think we can add this to the documentation. I was just wondering if we can be more defensive in computeDesiredTaskCount method and do not return desired task count greater than num topic partitions to prevent unnecessary deletion and creation of tasks in case task count max is set to a big number.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, we are getting much closer now.

As a user, I find it hard to understand autoscaling behavior based on what is documented and had to read the code. But, hopefully this can be improved/refined as this feature gets more adoption.

Also, for the first release when this shows up. I think, we should call it an experimental feature mostly so that we can change the naming of various fields in documented autoscaler configuration slightly based on user feedback if needed.

| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. | no (default == `lagBased`) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the distinction that, following properties are common to any autoscaler and rest are specific to lagBased autoscaler , maybe have two tables.

autoScalerStrategy
enableTaskAutoScaler
taskCountMin
taskCountMax
minTriggerScaleActionFrequencyMillis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done

| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
| `lagCollectionIntervalMillis` | Define the frequency of lag points collection. | no (default == 30000) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `lagCollectionIntervalMillis` | Define the frequency of lag points collection. | no (default == 30000) |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && changed.

lagMetricsQueue.offer(0L);
} else {
long totalLags = lagStats.getTotalLag();
lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why shouldn't we expect lagStats.getTotalLag() to return a value >= 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we can occasionally get negative lags in our practice. Something like https://stackoverflow.com/questions/60847952/how-to-get-rid-of-negative-consumer-lag-in-kafka

Negative lag values is un-necessary and a poison into our lag metrics. So just filter it here.

* @param lags the lag metrics of Stream(Kafka/Kinesis)
* @return Integer. target number of tasksCount, -1 means skip scale action.
*/
private Integer computeDesiredTaskCount(List<Long> lags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Integer computeDesiredTaskCount(List<Long> lags)
private int computeDesiredTaskCount(List<Long> lags)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && changed.

);

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
if (currentActiveTaskCount < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it legitimate for supervisor.getActiveTaskGroupsCount() to return a negative value? if not, then supervisor.getActiveTaskGroupsCount() should always return a value >= 0 and this check shouldn't be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && removed.

Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
currentActiveTaskCount = activeTaskGroups.size();

if (desiredActiveTaskCount == -1 || desiredActiveTaskCount == currentActiveTaskCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (desiredActiveTaskCount == -1 || desiredActiveTaskCount == currentActiveTaskCount) {
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && changed.

allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why lagCollectionRangeMillis was added to scaleActionStartDelayMillis .

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When scaleActionStartDelayMillis meets, lagComputationExec start to work to collect metrics. And allocationExec need to wait for another lagCollectionRangeMillis which means wait for lagComputationExec to collect enough lag metrics.

| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't time interval between two scale actions be always greater/equal to scaleActionPeriodMillis ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, scaleActionPeriodMillis is to control the frequency of detection and minTriggerScaleActionFrequencyMillis is to set a cool-down time between two scale actions. There is no hard association between the two parameters. For example users can set scaleActionPeriodMillis == 10min and minTriggerScaleActionFrequencyMillis == 5min. It means Druid will check lags every 10mins. If triggered scale action, then could not scale again within 5 minutes.

| `lagCollectionIntervalMillis` | Define the frequency of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it is a "frequency". maybe triggerScaleOutFractionThreshold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && changed.

| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInThresholdFrequency` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks && changed.

@himanshug
Copy link
Contributor

@bananaaggle saw your message earlier, It is harder to associate any timelines specially with PRs that are more involved and consequently take more time to get through the review process. But, hopefully this is getting closer to getting merged.

@himanshug
Copy link
Contributor

@pjain1 @zhangyue19921010

I think we can add this to the documentation. I was just wondering if we can be more defensive in computeDesiredTaskCount method and do not return desired task count greater than num topic partitions to prevent unnecessary deletion and creation of tasks in case task count max is set to a big number.

We should definitely make sure that returned desiredTaskCount is always less than or equal to max partitions in the topic.

@zhangyue19921010
Copy link
Contributor Author

Thanks guys, will get it done ASAP.

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Mar 2, 2021

Hi @pjain1 and @himanshug changes are done including new condition between desiredTaskCount, partitionNumbers and taskCountMax, documented. Also add a new UT to test this scenario taskCountMin = 1, taskCountMax = 3, scaleOutStep = 2, partitionNumbers = 2 and scale action expands task numbers from 1 to 2 as expected.
Could you please take a look at your convince?
This PR related CI jobs are passed other failure jobs may be successful after re-running.

@pjain1
Copy link
Member

pjain1 commented Mar 4, 2021

restarted travis jobs, just looked at partitionNumber task limitation part and that looks good. Thanks 👍

@zhangyue19921010
Copy link
Contributor Author

Thanks @pjain1 appreciate it.

@@ -1901,6 +2058,11 @@ protected boolean supportsPartitionExpiration()
return false;
}

public int getPartitionNumbers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public int getPartitionNumbers()
public int getPartitionsCount()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks a lot for your review and approval!

Copy link
Contributor

@capistrant capistrant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not able to give a full detailed review right now so I will just comment. Left 3 comments. The only one that would be a blocker for merge is the licenses comment. I want to make sure we handle that correctly according to the document I linked in the comment.

Overall the code looks good to me and I think the idea is sound and implementation looks logical and extensible

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Mar 5, 2021

Not able to give a full detailed review right now so I will just comment. Left 3 comments. The only one that would be a blocker for merge is the licenses comment. I want to make sure we handle that correctly according to the document I linked in the comment.

Overall the code looks good to me and I think the idea is sound and implementation looks logical and extensible

Hi @capistrant . Thanks for your review. All the changes are done.

As for licenses,<artifactId>commons-collections4</artifactId> version: 4.2 is already added into licenses.yaml before. If i am correct, it wouldn't be a blocker :)

Copy link
Contributor

@capistrant capistrant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. My bad on missing that this was already in licenses!

@pjain1 pjain1 merged commit bddacbb into apache:master Mar 6, 2021
@zhangyue19921010
Copy link
Contributor Author

Hi @pjain1 @himanshug and @capistrant Thanks a lot for your guys’ help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants