Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011 #5977

Closed
wants to merge 4 commits into from

Conversation

pnowojski
Copy link
Contributor

Previously if there were two completely independent FlinkKafkaProducer011 data sinks in the job graph, their transactional.id would collide with one another. Fix is to use operator's unique ID as well along task name and subtask id.

In order to do that, operator's unique ID has to be exposed to UDF via RuntimeContext.

This change is backward compatible for recovering from older savepoints, since transactional.ids generated by the old generator still will be used after restoring from state.

Brief change log

Please check individual commit messages

Verifying this change

This change adds a new FlinkKafaProducer011 test case that covers bug fix.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

public MockStreamConfig() {
super(new Configuration());

setTypeSerializerIn1(IntSerializer.INSTANCE);
Copy link
Contributor

@zentol zentol May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this one. I would expect a MockStreamConfig to be generally usable, but this one can (unexpectedly) mess things up in regards to serializers. I would remove this line and replace the usage in AsyncWaitOperatorTestwith

StreamConfig streamConfig = new MockStreamConfig();
streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed.

@pnowojski pnowojski force-pushed the f9295 branch 2 times, most recently from 8ed2ecd to 7cb5f5a Compare May 11, 2018 11:53
Copy link
Contributor

@StephanEwen StephanEwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix, with two comments. Otherwise good to merge...

@@ -837,7 +837,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName(),
getRuntimeContext().getTaskName() + "-" + getRuntimeContext().getOperatorUniqueID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably use only the Operator ID here - the task name does not add to the uniqueness. Unless the purpose of the task name is "human readability" in log files or metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wanted to keep the task name for the readability reasons.

* @return String representation of the operator's unique id.
*/
@PublicEvolving
String getOperatorUniqueID();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding this here and failing for DataSet programs, hoe about adding this to StreamingRuntimeContext and casting inside the Kafka Producer? Not super pretty, but nicer than having something that looks like a pretty generic concept (operator id) throwing an exception in a whole class of programs (batch jobs). This problem should go away anyways with the batch / streaming unification later.

Copy link
Contributor Author

@pnowojski pnowojski May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better solution from the perspective of this particular use case, but do the UDF's have an access to StreamingRuntimeContext? If not, casting requirement would be huge blocker for any users from using this, thus making this almost a private API.

However I personally do not mind one way or the other.

PS, There are already operations/methods that do not work in streaming/batch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly leaning towards Stephan's suggestion, which I also agree is the better solution for this case.

It might be ok to have this as a "hidden" API for now anyways, since 1) it is marked @PublicEvolving, and 2) the API was added in quite a short timeframe.
If we want this fix in 1.5, I wouldn't suggest "fully" exposing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly prefer the current approach. Looking at other methods, this seems to be the "established" way for methods that do not exist in batch. Why mixing up two different ways of dealing with them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different point here, I think we would usually like to return OperatorID here and not string, but it is probably in a wrong module. When moving OperatorID seems to much, I would suggest to call the method something like getOperatorIDAsString to make this a bit more explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still much in favor of not exposing this in the RuntimeContext:

  • Having the state accesses in the RuntimeContext was a necessity of that moment, because there was no initializeState() and it is crucial to be exposed to users.

  • This operatorID is not crucial to be exposed to users, hence a very different case to me.

  • It is super easy to expose it later, it is much harder (even if marked as PublicEvolving) to hide it later. For a quick move, not exposing an addition publicly should always be the default choice, also beyond this specific case here.

@StefanRRichter
Copy link
Contributor

Overall LGTM 👍

…grams

This allows to uniquely and stably across multiple job submissions identify operators.
Previously two different operators that were executed by tasks that had the same name
were indistinguishable.
…ducer011

Previously if there were two completely independent FlinkKafkaProducer011 data sinks
in the job graph, their transactional.id would collide with one another. Fix is to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.
@pnowojski
Copy link
Contributor Author

I have removed getOperatorUniqueID() from RuntimeContext and our FlinkKafkaProducer011 is now casting RuntimeContext to StreamingRuntimeContext.

@tzulitai
Copy link
Contributor

Thanks for the update @pnowojski.
Changes LGTM, +1.

Merging this ..

@asfgit asfgit closed this in 9b7493d May 22, 2018
asfgit pushed a commit that referenced this pull request May 22, 2018
…ducer011

Previously if there were two completely independent FlinkKafkaProducer011 data sinks
in the job graph, their transactional.id would collide with one another. Fix is to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.

This closes #5977.
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
…ducer011

Previously if there were two completely independent FlinkKafkaProducer011 data sinks
in the job graph, their transactional.id would collide with one another. Fix is to
use operator's unique ID as well along task name and subtask id.

This change is backward compatible for recovering from older savepoints,
since transactional.ids generated by the old generator still will be used
after restoring from state.

This closes apache#5977.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants