Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30593][autoscaler] Determine restart time on the fly fo Autoscaler #711

Merged
merged 5 commits into from
Nov 24, 2023

Conversation

afedulov
Copy link
Contributor

@afedulov afedulov commented Nov 14, 2023

This PR does not yet contain tests since I would like to first reach consensus on the general approach.

What is the purpose of the change

Currently the autoscaler uses a preconfigured restart time for the job. This PR adds the ability to dynamically adjust this based on the observed restart times for scale operations

Brief change log

  • Adds ScalingTracking for tracking job-scoped scaling data.
  • Adds autoscaler properties for enabling the observed restart times usage in the rescaling logic(opt in).

High level approach:
The autoscaler adds a ScalingTracking object into the config map in the following format:

 scalingTracking: |                                                                                                                                                         
 ---
    scalingRecords: 
      "2023-11-13T13:36:35.240245Z": 
         endTime: null
           targetParallelism: 
             e44d8435128fbb8c0406b9ae78407aec: 2                                                                                                                                           

It contains a map of applied scaling decisions together with the target topology and the endTime, sorted by the time when the scaling was applied (map's key). When the job transitions into the RUNNING state, the latest record is fetched, the current parallelism is compared with the target one and the endTime is set to now (only if was not previously set).

Verifying this change

Manual. Testing implementation will follow after the general approach gets approved.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (yes / no)
  • Core observer or reconciler logic that is regularly executed: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the overall approach but I would recommend the following changes:

  1. Remove the newly introduced configs, this should be automatic and always on
  2. Track the start/end times for the restart in memory and only record the observed_restart_time in the autoscaler state store. This way we add minimal extra state that is easy to implement.
  3. Instead of computing restart time from a fixed number of samples, use a simple moving average: observed_restart_time = (prev_observed + new_observed) / 2
  4. During autoscaler logic use: restart_time = min(conf_restart_time, observed_restart_time)

@mxm
Copy link
Contributor

mxm commented Nov 14, 2023

Commenting on some of the requests:

  1. Remove the newly introduced configs, this should be automatic and always on

I think it is fair to have a ON/OFF switch. It should be on by default but we want to keep the ability to roll back to the old behavior.

  1. Track the start/end times for the restart in memory and only record the observed_restart_time in the autoscaler state store. This way we add minimal extra state that is easy to implement.

We already have the start time of the last scaling in memory via the scaling history. We can then keep note of the end time once we detect the scaling is over. That leaves a little bit of error in case of downtime of the operator which will produce a long rescaling time. I think that should be fine though, since we cap at the max configured rescale time.

  1. Instead of computing restart time from a fixed number of samples, use a simple moving average: observed_restart_time = (prev_observed + new_observed) / 2

I think we can do an exponentially weighted average.

  1. During autoscaler logic use: restart_time = min(conf_restart_time, observed_restart_time)

+1

@afedulov
Copy link
Contributor Author

afedulov commented Nov 14, 2023

@gyfora @mxm thanks for the feedback.

We already have the start time of the last scaling in memory via the scaling history. We can then keep note of the end time once we detect the scaling is over. That leaves a little bit of error in case of downtime of the operator which will produce a long rescaling time. I think that should be fine though, since we cap at the max configured rescale time.

Are we talking about having a field in the ScalingExecutor? Because fetching scalingHistory won't be sufficient - we need some indication that a new rescaling was applied by the time we see the transition into RUNNING with the expected parallelism. This "flag" then needs to be cleaned.
General question: it feels like we are very focused on optimizing the size of this particular configmap. Can't we create a separate configmap, if this is a concern? We already store so much stuff in the flink-config-*** configmap (see the amount of logging configuration alone) that it feels like obsessing over whether we store two timestamps or one for 5-10 values we'll keep in state at the expense of significantly increased code complexity and potentially losing restart data is not worth it.

@mxm
Copy link
Contributor

mxm commented Nov 14, 2023

Are we talking about having a field in the ScalingExecutor? Because fetching scalingHistory won't be sufficient - we need some indication that a new rescaling was applied by the time we see the transition into RUNNING with the expected parallelism. This "flag" then needs to be cleaned.

I meant a flag in the ConfigMap. I guess something like startTime: endTime would be sufficient? The endTime would initially be null.

General question: it feels like we are very focused on optimizing the size of this particular configmap. Can't we create a separate configmap, if this is a concern?

In the past we ran into the 1MB size limit for large pipelines with hundreds of vertices. That's why we don't want to increase the state size too much. We also added compression because of this. Of course we can add a new configmap. We have just not come around to do it and it might not be a good idea to put too much load on etcd. While it would be nice to have a separate config map for the scaling metrics and the scaling history, but it is much simpler to do it in one.

@afedulov
Copy link
Contributor Author

afedulov commented Nov 14, 2023

I meant a flag in the ConfigMap. I guess something like startTime: endTime would be sufficient? The endTime would initially be null.

Got it, this is how it actually works currently (modulo additional target vertices parallelism tracking that I will attempt to fetch from the history instead) - I thought it was more about the configmap vs in-memory discussion.

In the past we ran into the 1MB size limit for large pipelines with hundreds of vertices.

I see, let's I'll try to keep the size to the minimum then.

@mxm
Copy link
Contributor

mxm commented Nov 14, 2023

Got it, this is how it actually works currently (modulo additional target vertices parallelism tracking that I will attempt to fetch from the history instead) - I thought it was more about the configmap vs in-memory discussion.

Yes, I saw that. I think Gyula is right though, that it's enough to use in-memory state to start tracking a scaling execution. When the scaling is completed, we then persist the duration in the ConfigMap. The reason is that keeping track of the start time when we begin tracking would only be useful if we could use that state to recover, e.g. in case of downtime. However, that's only useful when the job does not complete rescaling during the downtime. If it completes before, we don't know how long a scaling took when we come back up. For simplicity, I think it makes sense to just track the scaling start time in memory and then persist the actual rescale time in the ConfigMap.

@afedulov
Copy link
Contributor Author

afedulov commented Nov 14, 2023

Sounds reasonable. If we are optimizing state size at this level, how about we also remove RECOMMENDED_PARALLELISM and PARALLELISM from the history metrics, since they are duplicated as ScalingSummary top level fields?

@afedulov
Copy link
Contributor Author

afedulov commented Nov 14, 2023

@gyfora I'd like to propose to use this format:

scalingTracking: |                                                                                                                             
---                                                                                                                                            
scalingRecords:
     "2023-11-14T20:20:39.250503Z": 
           endTime: "2023-11-14T20:22:04.360972Z"
      "2023-11-14T20:22:04.360972Z": 
           endTime: "2023-11-14T20:23:39.022220Z"

Some reasoning:

  • It allows to associate tracked restart time with scaling history, which could be useful for debugging. Without the start timestamp, this won't be possible.
  • It lays out a data structure that is open for extension. This makes it trivial to add additional fields if we ever need some other scaling- and not vertex-scoped data (which we currently do not anticipate ) in the future. The decision to move it into a different config map in such case is a separate issue.
  • The overhead when compared with just storing the restart time is pretty minimal.

@mxm, I removed the redundant tracking of the target parallelism from the ScalingRecords. The current implementation completely relies on the scalingHistory for this. I had to change the scaleResource signature to avoid fetching the tracking and the history twice (we need both in the JobAutoScalerImpl now). Clock in ScalingExecutor became unnecessary.

@mxm
Copy link
Contributor

mxm commented Nov 15, 2023

Sounds reasonable. If we are optimizing state size at this level, how about we also remove RECOMMENDED_PARALLELISM and PARALLELISM from the history metrics, since they are duplicated as ScalingSummary top level fields?

We can file a JIRA for this. This requires more changes since those metrics are also directly reported via Prometheus and removing them would be unexpected to users.

@afedulov afedulov force-pushed the 30593-restart branch 5 times, most recently from 4a01731 to 6bb5dd7 Compare November 16, 2023 21:54
@afedulov afedulov marked this pull request as ready for review November 16, 2023 21:54
@afedulov
Copy link
Contributor Author

afedulov commented Nov 16, 2023

@mxm @gyfora the PR is ready for review


@gyfora me and Max briefly discussed offline and came to the conclusion that starting with evaluating the maximum restart time capped by the RESTART_TIME setting is probably good enough for the first step. It has the benefit of giving the most "conservative" evaluation and we can add the moving average after some baseline testing. What do you think?

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Alex!

Comment on lines +26 to +35
/**
* Class for tracking scaling details, including time it took for the job to transition to the
* target parallelism.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ScalingRecord {
private Instant endTime;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just store Instant directly and remove this wrapper class?

Copy link
Contributor Author

@afedulov afedulov Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to have structures in place that would be easily extensible. If we are sure we do not not ever need to store anything else here we can store the instant directly.

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we are going to use Math.min(maxRestartTime, restartTimeFromConfig) we should increase the default restart time value in the config from 5 minutes to at least 15 minutes (maybe even higher like 20-30)

With 5 minutes almost all prod jobs with many pods /slower envs or large state will probably take longer (let's say 10-15 minutes) and they will be cut back to 5 minutes leading to an underestimation and potentially multiple scaleups.

What do you think?
cc @mxm

@mxm
Copy link
Contributor

mxm commented Nov 20, 2023

What you describe is already the case. Any deployment which currently takes longer than the configured rescale time, will not scale accurately. By adding the feature in its current state, we are conservatively addressing those deployment which come back up much quicker. In our environment, typical rescale time is 1 minute or less.

I think your request requires adding a new configuration MAX_RESTART_TIME because we want to (1) have a good default when we haven't yet estimated the rescale time (e.g. first scaling) via the existing setting, and (2) cap the determined rescale time for safety reasons (e.g. unexpected cluster downtime).

I'm ok with adding MAX_RESTART_TIME. It also solves the problem of adjusting rescale times of existing deployments. Most users already have a rescale time configured explicitly.

@afedulov
Copy link
Contributor Author

afedulov commented Nov 20, 2023

@mxm @gyfora I added the proposed MAX_RESTART_TIME option with the default value of 30 minutes:
ae79219#diff-c63eb3ce6a3229c2e0e664d8032f966df8e0b90a67b5e1119d8ec1d862599348

To keep things simple for the users I decided to only cap by it when the PREFER_TRACKED_RESTART_TIME options is enabled. Otherwise, the restart time is completely governed by the current RESTART_TIME setting, as before, even it it exceeds the MAX_RESTART_TIME.
ae79219#diff-2241a4e55db07ff736cd174042179254aea0ca8d6884635b1146e5b5a9c17633R57

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following up so quickly! 🙏

public static final ConfigOption<Duration> MAX_RESTART_TIME =
autoScalerConfig("restart.time.max")
.durationType()
.defaultValue(Duration.ofMinutes(30))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have internal overrides for all configuration defaults, but I don't think this overly pessimistic value is a good default for the general public. I would set this to not more than 15 minutes, even that is still conservative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -142,9 +142,11 @@ public double getMaxRestartTimeSecondsOrDefault(Configuration conf) {
}
}
}
long restartTimeFromConfig = conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
long maxRestartTimeFromConfig = conf.get(AutoScalerOptions.MAX_RESTART_TIME).toSeconds();
return maxRestartTime == -1
? restartTimeFromConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always cap the RESTART_TIME by the configured MAX_RESTART_TIME (if configured).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I'd like to hear your thoughts on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with Max offline - we decided to rename the option to TRACKED_RESTART_TIME_LIMIT to make the scope clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you Alex!

@mxm
Copy link
Contributor

mxm commented Nov 23, 2023 via email

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I did not have time to add some of these comments earlier. I wonder why we decided to keep a history instead of the exponential moving average. That would eliminate the bookkeeping

Comment on lines 84 to 90
<td><h5>job.autoscaler.restart.time.tracked.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to use the actually observed rescaling restart times instead of the fixed 'job.autoscaler.restart.time' configuration. If set to true, the maximum restart duration over a number of samples will be used. The value of 'job.autoscaler.restart.time' will act as an upper bound.</td>
</tr>
<tr>
<td><h5>job.autoscaler.restart.time.tracked.limit</h5></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late comment, we could consider changing this to job.autoscaler.restart.time-tracking.enabled and job.autoscaler.restart.time-tracking.limit so that we avoid having another config match a prefix (and therefore make it non yaml compliant)

Not a huge thing but if we want to support yaml configs like in flink we will have to fix it eventually


if (targetParallelismMatchesActual(
targetParallelism, actualParallelism)) {
value.setEndTime(now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe log this on debug? so we have an overview if we want to debug this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

public class ScalingTracking {

/** Details related to recent rescaling operations. */
private final TreeMap<Instant, ScalingRecord> scalingRecords = new TreeMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were going to keep a single record and exponential moving avg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this got buried in the notifications:

@gyfora me and Max briefly discussed offline and came to the conclusion that starting with
 evaluating the maximum restart time capped by the RESTART_TIME setting is probably 
good enough for the first step. It has the benefit of giving the most "conservative" 
evaluation and we can add the moving average after some baseline testing. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this but this doesn't mention anything about history etc and refers to an offline discussion :)
Combined with the other comment related to the trimming issue (losing the restart info after 24h) I think the exponential moving avg is a simpler and slightly more robust initial approach

Copy link
Contributor Author

@afedulov afedulov Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMA requires to know how many previous records in the window are taken into account because this determines the weight coefficient of the new record (smoothing factor). The length of the "window" of observation is also supposed to be fixed and not span all time from the beginning, so I am not sure we are talking about the classic definition of EMA. Maybe you could sketch the calculation you have in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimatedEMA = estimatedEMA * x + newMeasurmenet * (1-x)
we could start with x=0.5 which is pretty aggressive smoothing but should be fine give we don't have many scalings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question - we are missing the job-based data structure that keeps track of the past rescaling details. Should be need to add something in the future, with the current structure it is as simple as adding data fields to the ScalingRecord. I am OK with removing the map, but the question is - are we sure we won't require something similar in the future anyways?

Copy link
Contributor Author

@afedulov afedulov Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could be argued that we can always send statistics about previous rescalings as metrics, but why do we then keep the vertex-based scalingHistory?

var cutoffTime = now.minus(keptTimeSpan);

// Remove records older than the cutoff time
scalingRecords.headMap(cutoffTime).clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this clear the history if we don't scale for 24 hours? then we fall back to the config ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep at least 1 in the history. But given that scalings do not happen that often the history will always only have 1-2 records only. So EMA may be more robust. cc @mxm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed offline to keep at least one observation to avoid having to recalibrate the restart time when the observation history expires. EMA is a good alternative as well but for now we chose to take the max of the last observed restart times.

@mxm
Copy link
Contributor

mxm commented Nov 24, 2023

The docs need to be regenerated after the last config change.

public static final ConfigOption<Boolean> PREFER_TRACKED_RESTART_TIME =
autoScalerConfig("restart.time-tracking.enabled")
.booleanType()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be enabled by default. We can still change this default though.

@mxm
Copy link
Contributor

mxm commented Nov 24, 2023

Great work! Thanks @afedulov!

@mxm mxm merged commit cdf32de into apache:main Nov 24, 2023
119 checks passed
@afedulov
Copy link
Contributor Author

Thanks @mxm and @gyfora !

mxm pushed a commit to mxm/flink-kubernetes-operator that referenced this pull request Dec 1, 2023
…aler (apache#711)

Currently the autoscaler uses a preconfigured restart time for the job. This PR adds the ability to dynamically adjust this based on the observed restart times for scale operations
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants