Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5101] Refactor CassandraConnectorITCase #2866

Closed
wants to merge 6 commits into from

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Nov 25, 2016

This PR refactors the CassandraConnectorITCase to be a bit more stable and easier to debug.

The following changes were made:

  • we no longer run actual flink jobs; all tests directly interact with the sink to save resources
  • every test uses a different table, preventing race conditions related to truncating the table
  • the at-least-once sinks were modified to track pending updates
    => the pojo sink was modified to use a method that returns an actually useful Future
    => since the sink waits in close() for pending updates it can no longer occur that a test checks a condition prematurely, improving stability
  • the initial connection is established across a time-span of 30 seconds, increasing the chance that cassandra has started before the tests are run

try {
updatesPending.wait();
} catch (InterruptedException e) {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good practice is to set the interruption flag back: Thread.currentThread().interrupt();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better here even: Throw an exception, because closing is incomplete when interrupted (and cannot guarantee correctness)

@@ -94,5 +122,9 @@ public void close() {
} catch (Exception e) {
LOG.error("Error while closing cluster.", e);
}
Throwable e = exception.get();
if (e != null) {
LOG.error("Error while sending value.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be rethrown, otherwise close() may complete without an exception, but the result may be incomplete.

try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) { //give cassandra a few seconds to start up
// give cassandra a few seconds to start up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we skip this and simply rely on the loop below that tries to connect?

throw e;
}
try {
Thread.sleep(2000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about reducing this to, say 500ms, to make it react a bit sooner once the connection can be established.

cluster = builder.getCluster();
session = cluster.connect();
// start establishing a connection within 30 seconds
start = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use System.nanoTime(), because System.curremtTimeMillis is not stable. Seems especially unstable on Travis.


ResultSet rs = session.execute(SELECT_DATA_QUERY);
synchronized (sink.updatesPending) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed when close() already waits for pending requests?


sink.close();

synchronized (sink.updatesPending) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@StephanEwen
Copy link
Contributor

Good set of changes. Some comments for small adjustments, otherwise +1 for the approch

@zentol
Copy link
Contributor Author

zentol commented Jan 26, 2017

@StephanEwen I've addressed your comments and Travis is passing :)

} catch (InterruptedException e) { //give cassandra a few seconds to start up
// start establishing a connection within 30 seconds
long start = System.nanoTime();
long deadline = start + 1000 * 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be long deadline = start + 30_000_000_000 (because its nanos)

session = cluster.connect();
break;
} catch (Exception e) {
if (System.currentTimeMillis() > deadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be System.nanoTime()

@@ -40,26 +42,42 @@
protected transient Cluster cluster;
protected transient Session session;

protected transient Throwable exception = null;
protected transient AtomicReference<Throwable> exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does exception have to be an AtomicReference, or does a volatile variable suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be volatile.

@StephanEwen
Copy link
Contributor

Looks much better, one mixup is still in the tests.
The comment about the Atomic Reference is a minor improvement comment.

@zentol
Copy link
Contributor Author

zentol commented Feb 22, 2017

@StephanEwen I've addressed your comments and rebased the branch.

@zentol
Copy link
Contributor Author

zentol commented May 10, 2017

Will start merging this; it's blocking #3810.

zentol added a commit to zentol/flink that referenced this pull request May 11, 2017
zentol added a commit to zentol/flink that referenced this pull request May 12, 2017
zentol added a commit to zentol/flink that referenced this pull request May 12, 2017
@asfgit asfgit closed this in 0be04b4 May 12, 2017
fanyon pushed a commit to fanyon/flink that referenced this pull request May 15, 2017
@zentol zentol deleted the 4177_cass_test branch November 6, 2017 14:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants