Flink: Allow setting slot sharing group for fine-grained resource management in DynamicSink #16065
Conversation
Currently all operators created by the dynamic sink are part of the default slot sharing group, and thus getting an equal share of the resources on taskmanagers. However, it is usually the case that the sink and the generator operators are far more resource-heavy than the rest of the operators, making the default resource allocation inefficient. Flink already supports fine-grained resource management mechanism to support use cases exactly like this. This change adds support to wire the dynamic sink into that system, by allowing the users to set slot sharing groups for 1. the shuffle writer 2. the generator+the forward writer -- they need to share the same slot sharing group to enable operator chaining.
|
CC: @mxm, @Guosmilesmile |
mxm
left a comment
There was a problem hiding this comment.
How important is it for the slot sharing groups to be set explicitly? Could we add an option like disableSlotSharing() to put the components into different slots?
The goal is to allow tailoring the resources allocated to each operator using Flink fine-grained resource management, so the user needs to pass in an SSG like this sinkBuilder
.generatorSlotSharingGroup(
SlotSharingGroup.newBuilder('generator-ssg')
.setCpuCores(1)
.setTaskHeapMemoryMB(512)
.build())
.otherSinkBuilderOptions(...)
... |
|
@mxm I also think it's a bit confusing that Flink uses "slot sharing group" which seems to imply some sort of resource isolation mechanism to manage resources, but here we are. :-) |
|
I was curious because the common use case is to just disable slot sharing for certain operators. In that case, we don't need to pass an explicit SlotSharingGroup, but we can generate one behind the scenes. |
|
@mxm How would the user specify resource specs for those operators in that case? |
|
You wouldn't be able to do that. I'm assuming uniform resources across the TaskManagers. |
|
@mxm I see your point. The current topology allows neither the generator/forward-writer nor the shuffle-writer to slot share with other tasks anyway. But I can definitely see the value in explicitly specifying that the tasks be split up. I'll add that |
|
Thanks! Now the question is, whether the option to disable slot sharing would be sufficient. Do you need explicit control over the slot sharing groups? |
|
@mxm Yes, I do need explicit control over the slot sharing groups. My use case is to assign different resource spec for 1. generator 2. sink 3. other operators in my pipeline. |
|
Hi @mxm ! Just wondering if you got the chance to take another pass at this. |
| if (generatorSlotSharingGroup != null) { | ||
| forwardWriteResults.slotSharingGroup(generatorSlotSharingGroup); | ||
| } | ||
|
|
There was a problem hiding this comment.
Instead of this, could we default to StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP?
The code would be a bit nicer.
There was a problem hiding this comment.
Unfortunately no, that is not equivalent. If unset, Flink tries to inherit the slot sharing group from the input operators; if set to DEFAULT_SLOT_SHARING_GROUP, that behavior is bypassed.
There was a problem hiding this comment.
Thanks for the info!
This means that we should behave similarly with TableMaintenance.slotSharingGroup.
This is for another PR though.
CC: @Guosmilesmile
There was a problem hiding this comment.
Thanks for pinging me on this. I'll open a PR to update the behavior in TableMaintenance so that it aligns with the standard Flink behavior.
| | `shuffeSinkSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. | | ||
| | `generatorSlotSharingGroup(SlotSharingGroup ssg)` | Set the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). | |
There was a problem hiding this comment.
Would it make sense to add these configs to FlinkDynamicSinkConf and FlinkDynamicSinkOptions too?
There was a problem hiding this comment.
I think it's probably going to be too messy, because we'll have to make a config translation layer to set cpu/heap/off heap/managed, and even external resource if we want to support everything Flink offers.
There was a problem hiding this comment.
Should we set only the name and rely on env.registerSlotSharingGroup(ssgWithResource) for registering the resource?
There was a problem hiding this comment.
That is cleaner but I am still a bit hesitant. If the ssg names aren't registered, these configs silently take no effect. When we do correctly register the SSGs, this will split the config into two places (sounds like a hidden footgun). I am slightly negative, but if you still think it's valuable I'll make the change.
There was a problem hiding this comment.
I would like to hear what @mxm and @Guosmilesmile think about this.
The TableMaintenance has a precedent to use the String as a config which makes it easier to use in SQL too (if the SSG is already defined in the env), but I fully understand your point too.
There was a problem hiding this comment.
Here's my immature take - if we go with the SlotSharingGroup class approach, we'd need to split it into a bunch of extra configs for SQL/config integration, while a simple string would be much easier to plug into SQL. For users who need custom slot group resources, they can just define them upfront in the env, which keeps the config simple. If more users ask for it later, we can always add slot group resource configs then. WDYT?
There was a problem hiding this comment.
Hi @Guosmilesmile thanks for looking. Just to confirm I understand your proposal, you are agreeing with what pvary suggests; and then in the future, we can add the full-fledged SSG resource config (like CPU/memory) if there's user interest. Am I understanding you correctly?
There was a problem hiding this comment.
Configuring SlotSharingGroups is sort of an special case. Most users, especially SQL users, will never do this. I would support the case of going with Java only config. If in the future, if is requested for SQL, we can still come up with a way to serialize the SlotSharingGroup into a string.
There was a problem hiding this comment.
I don't really like special cases. In the long run it is usually better to handle things consistently.
I would still prefer to have a string setting for ssc and also we should add it to FlinkDynamicSinkConf.
There was a problem hiding this comment.
Changed to using a string setting instead.
|
Merged to main. |

Currently all operators created by the dynamic sink are part of the default slot sharing group, and thus getting an equal share of the resources on taskmanagers. However, it is usually the case that the sink and the generator operators are far more resource-heavy than the rest of the operators, making the default resource allocation inefficient.
Flink already supports fine-grained resource management mechanism to support use cases exactly like this. This change adds support to wire the dynamic sink into that system, by allowing the users to set slot sharing groups for 1. the shuffle writer 2. the generator+the forward writer -- they need to share the same slot sharing group to enable operator chaining.