Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New operator state interfaces #747

Merged
merged 11 commits into from Jun 25, 2015
Merged

New operator state interfaces #747

merged 11 commits into from Jun 25, 2015

Conversation

gyfora
Copy link
Contributor

@gyfora gyfora commented May 29, 2015

This PR contains the proposed changes for the streaming operator state interfaces as described in

https://docs.google.com/document/d/1nTn4Tpafsnt-TCT6L1vlHtGGgRevU90yRsUQEmkRMjk/edit?usp=sharing

Highlights:

  • Added OperatorState interface accessible from RuntimeContext
  • System managed checkpointing/restore of the states (no required implementations for Serializable states)
  • confirmCheckpoint(..) method in the CheckpointCommitter interface has been changed to return the StateHandle for the checkpointed state
  • Updated Persistent Kafka source to work on the new interfaces
  • Updated Tests and IT cases

public interface CheckpointCommittingOperator {

void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the chckpointId still needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we could potentially omit it but we kept it for now since it will make testing easier to associate confirmations with specific checkpoints at least.

@mbalassi
Copy link
Contributor

Generally looks good. I was wondering whether we could use Flink's serialization instead of having Serializable as a bound - do we have always Flink code on every side of the serialization?


this.lastOffsets = getRuntimeContext().getOperatorState("offset", defaultOffset);

//TODO: commit fetched offset to ZK if not default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not merge the PR with this TODO open.

@gyfora gyfora force-pushed the new_state branch 3 times, most recently from 3392117 to 8306ddf Compare June 8, 2015 16:06
@mbalassi
Copy link
Contributor

mbalassi commented Jun 9, 2015

release-0.9 is forked off, this pull request is not blocked by that any more. Did you manage to fix the Kafka source or is some help needed there?

@gyfora
Copy link
Contributor Author

gyfora commented Jun 9, 2015

I could not fix the kafka test yet. Also I need to fix some tests hardcoded for the previous interfaces that have been added lately :p

This will probably happen later this week. At some point i might ask for help for getting kafka to work.

@gyfora gyfora force-pushed the new_state branch 2 times, most recently from 288ae6f to 5767ae6 Compare June 17, 2015 08:51
@gyfora
Copy link
Contributor Author

gyfora commented Jun 17, 2015

Some issues that we need to fix before merge:

Committing the checkpoint doesnt work properly. We need to commit the states one by one and also pass the names for the state.

Statehandles are not properly discarded. I need to add a wrapper statehandle that will discard the ones wrapped as well.

@gyfora
Copy link
Contributor Author

gyfora commented Jun 17, 2015

Also we need to add the partitioning setting to the operators as it is currently not exposed through the API

@gyfora gyfora force-pushed the new_state branch 3 times, most recently from 0fe84fd to 450d5f5 Compare June 18, 2015 07:42
@gyfora
Copy link
Contributor Author

gyfora commented Jun 18, 2015

Alright I think I fixed the issues, now the only thing remains is to add partitioning setting to the API.

State partitioning should be a property of the operator therefore it should be set afterwards like parallelism.

For example: stream.map(Mapper).setStatePartitioner(...)

This is quite tricky however as the state partitioner should affect the partitioning scheme of the input streams (otherwise it makes no sense). I see two approaches here: 1. simply overwrite the partitioning without warning
2. Only overwrite in case it is not defined (forward), otherwise throw exception stating that partitioning cannot be different from statePartitioning

@mbalassi
Copy link
Contributor

I vote for the second option, it is more clean and more inline with the current behavior of the API.

public StateForTask getState(JobVertexID jobVertexID)
{
if(vertexToState.containsKey(jobVertexID)) {
return vertexToState.get(jobVertexID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can save one map lookup by calling get() immediately. It will return null on a missing key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@StephanEwen
Copy link
Contributor

Let's keep the current interfaces Checkpointed and AsynchronouslyCheckpointed to not fully break current programs. They are used actually in examples that have been published.

@gyfora
Copy link
Contributor Author

gyfora commented Jun 19, 2015

Thats's a good point Stephan, I fixed it.

@gyfora
Copy link
Contributor Author

gyfora commented Jun 19, 2015

I still get these random KafkaITCase failures on travis. This might be a timeout issue of some sort, @rmetzger do you have any tips on debugging that?

@gyfora gyfora force-pushed the new_state branch 3 times, most recently from 8c999ba to cc50603 Compare June 20, 2015 18:30
@rmetzger
Copy link
Contributor

Mh. Is it always the same test failing with the same message?
What is the failure?

*
* It creates a new {@link KeyedDataStream} that uses the provided key for partitioning
* its operator states. Mind that keyBy does not affect the partitioning of the {@link DataStream}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it seems to affect the partitioning of the stream since the constructor of KeyedDataStream calls partitionByHash() on the DataStream. (This also applied to the other keyBy() methods)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed. I removed that part from the javadoc

@aljoscha
Copy link
Contributor

@senorcarbone You mentioned that the KeyedDataStream disallows calls to shuffle()/repartition()/groupBy() and so on. But KeyedDataStream doesn't seem to have this functionality, unless I'm mistaken.

@gyfora
Copy link
Contributor Author

gyfora commented Jun 24, 2015

The setConnectionType method is overwritten which will actually make this happen

@aljoscha
Copy link
Contributor

Ahh I see, my bad. 😅

@senorcarbone senorcarbone force-pushed the new_state branch 2 times, most recently from 4d694af to a024a2e Compare June 24, 2015 16:24
@gyfora
Copy link
Contributor Author

gyfora commented Jun 25, 2015

Should we merge this?

@StephanEwen
Copy link
Contributor

I think we have no real blocker here. I would prefer the exception issue could be addressed (message for wrapping exception).

Everything else will probably show best when we implement sample jobs and sample backends for this new functionality.

@gyfora
Copy link
Contributor Author

gyfora commented Jun 25, 2015

Okay I will fix the exceptions and will merge it afterwards

@gyfora gyfora force-pushed the new_state branch 2 times, most recently from 4cf7971 to ce4d2ff Compare June 25, 2015 14:37
@asfgit asfgit merged commit cad8510 into apache:master Jun 25, 2015
@gyfora gyfora deleted the new_state branch June 27, 2015 18:46
@rmetzger
Copy link
Contributor

Please create JIRAs for changes in the future.

nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
nltran pushed a commit to nltran/flink that referenced this pull request Jan 8, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants