Skip to content

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Dec 1, 2023

Previous fix in #720 made the parallelism override string deterministic, but it will likely result in existing deployments to trigger a one-off unneeded spec update.

To prevent this and to update only once an actual scaling occurs, we need to compare the existing overrides with the new ones and check if they are identical. In this case, we restore the current override string with its own permutation.

… the same overrides

Previous fix in apache#720 made the parallelism override string deterministic, but it
will likely result in existing deployments to trigger a one-off unneeded spec
update.

To prevent this and to update only once an actual scaling occurs, we
need to compare the existing overrides with the new ones and check if they are
identical. In this case, we restore the current override string with its own
permutation.
@mxm mxm requested review from gyfora and 1996fanrui December 1, 2023 13:13
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);

autoscaler.scale(autoScalerCtx);
Copy link
Contributor

@afedulov afedulov Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to the section below explaining why this call to scale does not lead to the unnecessary update if we only patch the new config after it. I personally do not directly understand why this is the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to scale can still lead to a spec change, even with the code below. It just prevents spec changes related to a non-deterministic ordering of the parallelism overrides, e.g. a:1,b2 and b:2,a:1. I think it is a good idea to add documentation on how the reconciliation loop works, but it doesn't feel directly related to the changes here. There is a comment in line 211 which states that we avoid changing the spec. I can try to expand a little more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correct. The root cause is the logic related to boolean specChanged in AbstractFlinkResourceReconciler#reconcile. The specChanged will be true, when the ordering of the parallelism overrides is changed. And operator will scale job.

Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.


// Check that the overrides actually changed and not merely the String representation
var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration();
var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s not safe to access configs by key directly . We can use the ctx getDeployConfig and getObserveConfig to get the configs and then we can even avoid passing the extra parameter to the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, why is it not safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general due to possible deprecated and other keys in the ConfigOption

applyAutoscaler(ctx);
applyAutoscaler(
ctx,
lastReconciledSpec != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should extract the configs through the ctx and avoid the extra logic and parameter passing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me look into that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora Please check out the newest commit.

@gyfora
Copy link
Contributor

gyfora commented Dec 2, 2023

@mxm i would like to take a look tomorrow or Monday morning because today I’m traveling.


autoscaler.scale(autoScalerCtx);

// Check that the overrides actually changed and not merely the String representation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check that the overrides actually changed and not merely the String representation
// Prevents subsequent unneeded spec updates when the `scale` operation only changes the order of the parallelism overrides (required because an unsorted map was used in the past).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is logic has been moved. I think it is easier to understand now.

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mxm , I think we should move this to the KubernetesScalingRealizer to remove this from the reconciler. It doesn't really belong here and moving it to the realiser where the problem originates will simplify the logic further.

CR extends AbstractFlinkResource<SPEC, STATUS>,
SPEC extends AbstractFlinkSpec,
STATUS extends CommonStatus<SPEC>>
void putBackOldParallelismOverridesIfNewOnesAreMerelyAPermutation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this logic to the ScalingRealizer? 158cbe2 adds the logic there and this issue comes from that change. I feel these 2 changes logically belong together and it's weird that we break something in once place and fix it in another while it could be simply next to each other.

It will likely also make the whole logic a bit simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible to add the change there because we do not have access to the current overrides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also 158cbe2 does not introduce the problem. It fixes it at the cost of restarting once instead of every time we redeploy the operator.

private static <
CR extends AbstractFlinkResource<SPEC, STATUS>,
SPEC extends AbstractFlinkSpec,
STATUS extends CommonStatus<SPEC>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to resetParallelismOverridesIfUnchanged ? The current naming is unusually verbose for the codebase. It may be better to add a javadoc comment instead. But this may be irrelevant if you check my other comment. We don't need to replace it if we don't set it in the first place, but for that we need to move the logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset would imply we are discarding them. We are not. I can try to reduce the name but I find long method names perfectly valid. Especially if they are only used one-off.

@mxm
Copy link
Contributor Author

mxm commented Dec 5, 2023

PTAL @gyfora

// Create resource with existing parallelism overrides
resource.getSpec()
.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");
Copy link
Member

@1996fanrui 1996fanrui Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we test "a:1,b:2" and "b:2,a:1"?

Also, the assertThat have "a:1,b:2" as well, it's better to extract a variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to test any permutations since the overrides are deterministically set to a:1,b:2 in the beginning. Then, new overrides are supplied via a LinkedHashMap which produces a deterministic b:2,a:1. Then the test verifies that the config string is not changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your clarification!

I prefer to test both of "a:1,b:2" and "b:2,a:1" for 2 reasons:

  • We always use the black box testing, it means we don't care about some detailed logic or Data Structure inside of test method. We just test its feature.
  • In the most of cases, the traversal order is determined when we using the LinkedHashMap, and the serialization result is often b:2,a:1, but I don't think the serialization result is fixed. That's means when 2 LinkedHashMap are equal, but the serialization results may not be equal. And that's why we need this PR, right?
    • So I don't think our test should make an assumption: the serialization result is fixed.
    • Also, the test is better to cover more cases. (For example, the LinkedHashMap is changed to TreeMap in the future.)

BTW, current change is fine, and this comment isn't necessary for me, so I approved it. If you think it's not needed, feel free to merge it. Thanks~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating further on this. Testing is somewhat of an art. There is also an argument for striking a balance between a specific test and one which tests all possible code paths (maybe even ones which don't yet exist).

Black box testing is one way of testing. White box testing is also a valid approach. It depends on the situation and it is also a matter of taste and how confident one feels that the test covers the most important scenarios.

I completely understand your thought, and I've changed the test method to address this concern.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback! ❤️

// Create resource with existing parallelism overrides
resource.getSpec()
.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to reverse this to "b:2,a1" so that the test actually fails without the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's HashMap instead of TreeMap or SortedMap, so the ordering cannot be determined, right?

If so, I prefer to test both of "a:1,b:2" and "b:2,a:1", wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails without the non-test change:

org.opentest4j.AssertionFailedError: 
expected: "a:1,b:2"
 but was: "b:2,a:1"

Also see #721 (comment).

@mxm mxm merged commit ca1d847 into apache:main Dec 6, 2023
@mxm mxm deleted the FLINK-33710 branch December 6, 2023 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants