Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2098] Ensure checkpoints and element emission are in order #742

Merged
merged 2 commits into from Jun 4, 2015

Conversation

aljoscha
Copy link
Contributor

Before, it could happen that a streaming source would keep emitting
elements while a checkpoint is being performed. This can lead to
inconsistencies in the checkpoints with other operators.

This also adds a test that checks whether only one checkpoint is
executed at a time and the serial behaviour of checkpointing and
emission.

@aljoscha
Copy link
Contributor Author

This also enables the exactly once checkpointing test added earlier by @StephanEwen.

package org.apache.flink.streaming.runtime.tasks;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial: two licenses.

@aljoscha aljoscha force-pushed the checkpoint-hardening branch 6 times, most recently from b575070 to 80753dd Compare May 29, 2015 17:24
@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 1, 2015

As per the discussion on the mailing list I'm rewriting the Source interface to only have the run()/cancel() variant.

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 1, 2015

I reworked the sources now. Could someone please have another pass over this. I think this is very critical code.

@StephanEwen
Copy link
Contributor

I'll make a pass


import java.io.Serializable;

/**
* Base interface for all stream data sources in Flink. The contract of a stream source
* is similar to an iterator - it is consumed as in the following pseudo code:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe streaming data sources, instead of stream data sources. Not to confuse with file streams are intermediate results.

@StephanEwen
Copy link
Contributor

Looks good all in all. I am preparing a followup pull request that cleans up a few things, adds comments, and addresses Marton's comments.

One thing I noticed is that all the non-checkpointed sources have a checkpoint lock in the signature as well.

Should we offer two source interfaces: SourceFunction and CheckpointedSourceFunction ?

@StephanEwen
Copy link
Contributor

Some important comments:

  • Exceptions should always propagate, and exceptions during cancelling can be thrown. The Task class filters out exceptions that come after the transition to the "canceling" state. Don't try to be super smart there, propagate your exceptions, and the context will decide whether they should be logged.
  • The RabbitMQ source is not doing any proper failure handling (critical!). I opened a separate JIRA issue for that.
  • Just commenting out the twitter sources is bad style, in my opinion. There is actually no chance of getting this pull request in, and this one here is a release blocker, while the Twitter Source one is not a release blocker.
  • All functions set their running flag to true at the beginning of the run() method. In the case of races between invoking and canceling the source, it can mean that the flag is set to false in the cancel() method and then to true in the run() method, resulting in a non-canceled source. I fixed some cases in my follow up pull-request, but many are still in. Please make another careful pass with respect to that.
  • In general, the streaming sources are extremely badly commented. This needs big improvements!

@StephanEwen
Copy link
Contributor

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 2, 2015

I will fix the remaining cancel() races. The twitter stuff I just commented out because I assumed that the new TwitterSource could be merged right away and I was waiting for that.

As for two Source interfaces. We can certainly do that. The reason I didn't do it is because there would be a lot of duplication because we have SourceFunction, ParallelSourceFunction, RichSourceFunction and ParallelRichSourceFunction. With the new Source interface this would go up to 8 interfaces for the sources. (That's also the reason why I didn't have Kafka derived from the RichParallelSourceFunction, I thought that maybe we could get rid of the special interfaces for parallel sources.)

Also, I realize there are many more problems. I just can't address them all in a single PR. 😅

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 2, 2015

I addressed the problem with the race conditions and re-enabled the twitter source.

Which exceptions are you referring to? I don't think I touched any of the exception handling or the general way that the steam tasks work in this PR.

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 3, 2015

I thought about what @StephanEwen said about uncheckpointed sources also having the locking object in the signature of the run() method and also about extensibility.

We might have to tweak the source interface a little bit more. What I propose is to have this run method:

void run(SourceContext context);

Then the source context would have methods to retrieve the locking object (for checkpointed sources) and for emitting elements. Part of my motivation for this is that this can be extended in the future without breaking existing sources. If we introduce proper timestamps at some point we can extend the SourceContext with a method for emitting elements with a timestamp. Then, if we want to have watermarks the context can have methods for activating automatically generated watermarks and for emitting watermarks. And so on...

I think we should fix this now, before the release. What do you think?

@StephanEwen
Copy link
Contributor

I generally like that idea. Especially the extensibility with respect to timestamps and watermark generation is a good point.

Retrieving the lock object from the context is not very obvious, but then again, someone who implements a fault tolerant exactly-once source should ready the javadocs and have a look at an example.

@StephanEwen
Copy link
Contributor

How about we still merge this now, to make sure we have a good version in to start testing? The change you propose is API only, and would not affect internals/timings...

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 3, 2015

Yes, the change can basically be done by a regex so I also propose merging this as early as possible now.

By the way, we could ensure that the source is actually holding the monitor lock with Thread.holdsLock(obj). Not sure about the performance impact, though.

@StephanEwen
Copy link
Contributor

I was thinking the same thing, about Thread.holdsLock(obj). That call probably costs way more than the lock itself, though. Would be nice to have a debug mode, to activate it there.

@aljoscha
Copy link
Contributor Author

aljoscha commented Jun 4, 2015

I will run some benchmarks, then we can decide about that.

@mbalassi
Copy link
Contributor

mbalassi commented Jun 4, 2015

Big +1 for the run(SourceContext) interface and experimenting with Thread.holdslock(obj).

@aljoscha aljoscha force-pushed the checkpoint-hardening branch 6 times, most recently from ebea2fe to 37fa18c Compare June 4, 2015 14:29
aljoscha and others added 2 commits June 4, 2015 17:51
Before, it could happen that a streaming source would keep emitting
elements while a checkpoint is being performed. This can lead to
inconsistencies in the checkpoints with other operators.

This also adds a test that checks whether only one checkpoint is
executed at a time and the serial behaviour of checkpointing and
emission.

This changes the SourceFunction interface to have run()/cancel() methods
where the run() method takes a lock object on which it needs to
synchronize updates to state and emission of elements.
@asfgit asfgit merged commit f4195ac into apache:master Jun 4, 2015
@aljoscha aljoscha deleted the checkpoint-hardening branch June 4, 2015 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants