New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5480] Introduce user-provided hash for JobVertexes #3117

Closed
wants to merge 4 commits into
base: master
from

Conversation

Projects
None yet
4 participants
@StefanRRichter
Contributor

StefanRRichter commented Jan 13, 2017

This PR allows users to provided (alternative) hashes for operators in a StreamGraph. This can make migration between Flink versions easier, in case the automatically produced hashes between versions are incompatible. For example, users could just copy the old hashes from the web ui to their job.

@StefanRRichter

This comment has been minimized.

Show comment
Hide comment
@StefanRRichter
Contributor

StefanRRichter commented Jan 13, 2017

cc @uce

@uce

Changes look good. Thanks! With your recently introduced framework for providing additional hashes this becomes a quite straight forward implementation. 😉

I had some inline comments.

Furthermore:

  • For the API JavaDocs I would add an additional paragraph explaining when you would want to call the method.
  • Naming this in a good way seems to be the hardest problem for me. I had an inline comment about set vs. provide, but the bigger question I have is whether we want to keep the node hash or replace it with something else. The node hash refers to the fact that internally the JobVertexID is used to identify the state of an operator for checkpoints/savepoints. Is this something we want to expose at the API level? With the uid(String) method we tried to not do it (e.g. provide some String identifier that is unique and the rest is done by the system). Of course, I don't have a better idea. 😉 Just throwing it out there. @aljoscha or @StephanEwen do you have any input on this?
Show outdated Hide outdated .../java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java Outdated
Show outdated Hide outdated .../java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java Outdated
Show outdated Hide outdated .../java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java Outdated
Show outdated Hide outdated .../main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java Outdated
Show outdated Hide outdated ...rg/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java Outdated
Show outdated Hide outdated ...java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java Outdated
}
public void setUserHash(String userHash) {
this.userHash = userHash;

This comment has been minimized.

@uce

uce Jan 16, 2017

Contributor

Since this is executed on the client before submitting the job it might be helpful to do some early sanity checking here. The expected String is a hex representation of a JobVertexID

@uce

uce Jan 16, 2017

Contributor

Since this is executed on the client before submitting the job it might be helpful to do some early sanity checking here. The expected String is a hex representation of a JobVertexID

Show outdated Hide outdated ...org/apache/flink/streaming/api/transformations/StreamTransformation.java Outdated
Show outdated Hide outdated ...ala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala Outdated
Show outdated Hide outdated .../java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java Outdated
@StefanRRichter

This comment has been minimized.

Show comment
Hide comment
@StefanRRichter

StefanRRichter Jan 18, 2017

Contributor

Thanks a lot for the reviews @zentol and @uce . I made another commit that addresses the review comments.

Contributor

StefanRRichter commented Jan 18, 2017

Thanks a lot for the reviews @zentol and @uce . I made another commit that addresses the review comments.

@uce

This comment has been minimized.

Show comment
Hide comment
@uce

uce Jan 20, 2017

Contributor

Thanks for addressing the comments.

I had a quick offline chat with Stephan regarding the naming of setAdditionalNodeHash. He also agrees that the name moves internal implementation details to the user API. Furthermore he raised the point that there is no way for the user to get the relation between uid and setAdditionalNodeHash.

He proposed the following, which I liked.

  • Rename setAdditionalNodeHash to setUidHash. Add a big fat comment that this should be used as a work around or for trouble shooting.
  • Add a comment to uid saying that the given String will be hashed to the ID that corresponds to the JobVertexID in the logs/web UI so users now what's happening.
  • Make in both Javadocs clear how the methods relate to eachother.

What do you think?

Contributor

uce commented Jan 20, 2017

Thanks for addressing the comments.

I had a quick offline chat with Stephan regarding the naming of setAdditionalNodeHash. He also agrees that the name moves internal implementation details to the user API. Furthermore he raised the point that there is no way for the user to get the relation between uid and setAdditionalNodeHash.

He proposed the following, which I liked.

  • Rename setAdditionalNodeHash to setUidHash. Add a big fat comment that this should be used as a work around or for trouble shooting.
  • Add a comment to uid saying that the given String will be hashed to the ID that corresponds to the JobVertexID in the logs/web UI so users now what's happening.
  • Make in both Javadocs clear how the methods relate to eachother.

What do you think?

@StefanRRichter

This comment has been minimized.

Show comment
Hide comment
@StefanRRichter

StefanRRichter Jan 23, 2017

Contributor

Ok, I changed method names and commented as proposed.

Contributor

StefanRRichter commented Jan 23, 2017

Ok, I changed method names and commented as proposed.

@uce

This comment has been minimized.

Show comment
Hide comment
@uce

uce Jan 23, 2017

Contributor

Thanks! Looks good. Just tested this with a job where I did not set any UIDs. After changing in an incompatible way (with respect to auto generated IDs) I was able to restore my state with setUidHash. I further played around with it by adding a completely new operator, which also restored the state of an operator in addition to the regular one. I think this will be very powerful once we provide some less low level plumbing around it. The biggest pain point right now (independent of this PR) is figuring out the old IDs.

I will go ahead and merge this for master and release-1.2.

Contributor

uce commented Jan 23, 2017

Thanks! Looks good. Just tested this with a job where I did not set any UIDs. After changing in an incompatible way (with respect to auto generated IDs) I was able to restore my state with setUidHash. I further played around with it by adding a completely new operator, which also restored the state of an operator in addition to the regular one. I think this will be very powerful once we provide some less low level plumbing around it. The biggest pain point right now (independent of this PR) is figuring out the old IDs.

I will go ahead and merge this for master and release-1.2.

@StefanRRichter

This comment has been minimized.

Show comment
Hide comment
@StefanRRichter

StefanRRichter Jan 23, 2017

Contributor

Thanks a lot for the review and such intensive testing @uce !

Contributor

StefanRRichter commented Jan 23, 2017

Thanks a lot for the review and such intensive testing @uce !

asfgit pushed a commit that referenced this pull request Jan 23, 2017

@asfgit asfgit closed this in 0de2bc3 Jan 23, 2017

joseprupi added a commit to joseprupi/flink that referenced this pull request Feb 12, 2017

@ShashwatRastogi-Reflektion

This comment has been minimized.

Show comment
Hide comment
@ShashwatRastogi-Reflektion

ShashwatRastogi-Reflektion Apr 3, 2018

Hey @uce, @StefanRRichter

I too have a flink job running in production where operators doesn't have uids. Now, I have to add new operators and the job doesn't restart from savepoint.
I came across this PR, and I tried setting the uid hash of the operator-chain before adding a new operator but still loading from savepoint doesn't work.

I'm sorry to bother you, and I know this has been a long closed PR, but can you help me out in figuring out how to restart from savepoint when we don't have uids set initially.

Thank you so much. :)
PS. Sorry to spam here.

ShashwatRastogi-Reflektion commented Apr 3, 2018

Hey @uce, @StefanRRichter

I too have a flink job running in production where operators doesn't have uids. Now, I have to add new operators and the job doesn't restart from savepoint.
I came across this PR, and I tried setting the uid hash of the operator-chain before adding a new operator but still loading from savepoint doesn't work.

I'm sorry to bother you, and I know this has been a long closed PR, but can you help me out in figuring out how to restart from savepoint when we don't have uids set initially.

Thank you so much. :)
PS. Sorry to spam here.

@zentol

This comment has been minimized.

Show comment
Hide comment
@zentol

zentol Apr 3, 2018

Contributor

@ShashwatRastogi-Reflektion

  1. For each task to load from the savepoint S1:
  • Determine the existing hash by searching for the task ID in the logs/UI
  • Set the uidHash for that task to that value via SingleOutputStreamOperator#setUidHash
  1. For each task:
  • Set the uid to whatever value you wish in the future to use via SingleOutputStreamOperator#setUid
  1. Resume the job from the savepoint S1.
  2. Create a new savepoint S2, and remove all calls to setUidHash
  3. Resume the job from the savepoint S2.
Contributor

zentol commented Apr 3, 2018

@ShashwatRastogi-Reflektion

  1. For each task to load from the savepoint S1:
  • Determine the existing hash by searching for the task ID in the logs/UI
  • Set the uidHash for that task to that value via SingleOutputStreamOperator#setUidHash
  1. For each task:
  • Set the uid to whatever value you wish in the future to use via SingleOutputStreamOperator#setUid
  1. Resume the job from the savepoint S1.
  2. Create a new savepoint S2, and remove all calls to setUidHash
  3. Resume the job from the savepoint S2.
@ShashwatRastogi-Reflektion

This comment has been minimized.

Show comment
Hide comment
@ShashwatRastogi-Reflektion

ShashwatRastogi-Reflektion Apr 3, 2018

Hey @zentol

Thank you for replying back. I was trying to do exactly the same thing, but I think I am messing something up that is why it isn't working in my case.

In my logs, i get the task description like:
2018-04-03 12:24:45,876 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from CREATED to SCHEDULED. Is the hash present in this log statement called uid hash?

This hash is not the same from the UI.
image

I have tried using both and both of them doesn't work.

Also, my operators are chained together so I will get one uid-hash for the entire chain, right?
And, I would be setting the same uid-hash for all operators in the chain?

ShashwatRastogi-Reflektion commented Apr 3, 2018

Hey @zentol

Thank you for replying back. I was trying to do exactly the same thing, but I think I am messing something up that is why it isn't working in my case.

In my logs, i get the task description like:
2018-04-03 12:24:45,876 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Filter -> events-with-timestamp -> filtered-events-with-timestamp -> events-mapped-to-session (1/1) (5c69d4ad31a844978a740c1d24297b68) switched from CREATED to SCHEDULED. Is the hash present in this log statement called uid hash?

This hash is not the same from the UI.
image

I have tried using both and both of them doesn't work.

Also, my operators are chained together so I will get one uid-hash for the entire chain, right?
And, I would be setting the same uid-hash for all operators in the chain?

@zentol

This comment has been minimized.

Show comment
Hide comment
@zentol

zentol Apr 3, 2018

Contributor

@ShashwatRastogi-Reflektion It's a bit odd that there are different ID's shown, I will have to look into that. It may be that one display accounts for the uid while the other one doesn't; in any case one of them should be the task ID.

I should've asked earlier; which version of Flink are you using?

If it is 1.2 or below, then I don't know at the moment what the problem could be.
If it is 1.3 or above, the steps I mentioned have to be done for each operator, and not task. That said, I'm not sure if we actually expose the id of each operator anywhere in 1.3 in a nice way... You may have to resort to trial&error; if a state can't be assigned to an operator you should get an exception containing the ID of the state (unless you explicitly allow non-restored state, which you can then use as the uid hash. If you are already on 1.4 you can figure them out with the metric system (for example with the JMXReporter).

Contributor

zentol commented Apr 3, 2018

@ShashwatRastogi-Reflektion It's a bit odd that there are different ID's shown, I will have to look into that. It may be that one display accounts for the uid while the other one doesn't; in any case one of them should be the task ID.

I should've asked earlier; which version of Flink are you using?

If it is 1.2 or below, then I don't know at the moment what the problem could be.
If it is 1.3 or above, the steps I mentioned have to be done for each operator, and not task. That said, I'm not sure if we actually expose the id of each operator anywhere in 1.3 in a nice way... You may have to resort to trial&error; if a state can't be assigned to an operator you should get an exception containing the ID of the state (unless you explicitly allow non-restored state, which you can then use as the uid hash. If you are already on 1.4 you can figure them out with the metric system (for example with the JMXReporter).

@ShashwatRastogi-Reflektion

This comment has been minimized.

Show comment
Hide comment
@ShashwatRastogi-Reflektion

ShashwatRastogi-Reflektion Apr 3, 2018

@zentol I am using flink 1.3.2.
Normally, I don't use --allowNonRestoredState and I get an error while restarting the job. After using this allowNonRestoredState the job starts up fine but there is a data loss because of lost state, which is what I want to avoid.
I think, the problem with my implementation is that I am wrongly assigning some uid-hashes to the operators. Is there sure-shot way of finding out uid or uid-hash generated by flink for each operator(with/without chaining) ?

ShashwatRastogi-Reflektion commented Apr 3, 2018

@zentol I am using flink 1.3.2.
Normally, I don't use --allowNonRestoredState and I get an error while restarting the job. After using this allowNonRestoredState the job starts up fine but there is a data loss because of lost state, which is what I want to avoid.
I think, the problem with my implementation is that I am wrongly assigning some uid-hashes to the operators. Is there sure-shot way of finding out uid or uid-hash generated by flink for each operator(with/without chaining) ?

@zentol

This comment has been minimized.

Show comment
Hide comment
@zentol

zentol Apr 3, 2018

Contributor

@ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could try is to explicitly disable chaining (StreamExecutionEnvironment#disableOperatorChaining). This way the ID of each operator (that now runs as a separate task) should be logged / be visible in the UI.

Contributor

zentol commented Apr 3, 2018

@ShashwatRastogi-Reflektion I'm not entirely sure, but one thing you could try is to explicitly disable chaining (StreamExecutionEnvironment#disableOperatorChaining). This way the ID of each operator (that now runs as a separate task) should be logged / be visible in the UI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment