Skip to content

[FLINK-8005] [runtime] Set user code class loader before snapshot#4980

Closed
GJL wants to merge 6 commits intoapache:masterfrom
GJL:FLINK-8005-2
Closed

[FLINK-8005] [runtime] Set user code class loader before snapshot#4980
GJL wants to merge 6 commits intoapache:masterfrom
GJL:FLINK-8005-2

Conversation

@GJL
Copy link
Copy Markdown
Member

@GJL GJL commented Nov 8, 2017

What is the purpose of the change

During checkpointing, user code may dynamically load classes from the user code
jar. This is a problem if the thread invoking the snapshot callbacks does not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.

Brief change log

  • Set user code class loader in ThreadFactory of Task#asyncCallDispatcher
  • Clean up TaskAsyncCallTest

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests to verify that context class loader is set
  • Started job with FlinkKafkaProducer011 and verified that snapshotting works

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

During checkpointing, user code may dynamically load classes from the user code
jar. This is a problem if the thread invoking the snapshot callbacks does not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.
for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint());
}
catch (Exception e) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantics of existing tests did not change: I removed the try-catch and simplified the assertion:

if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) {
    fail("Task should be RUNNING or FINISHED, but is " + currentState);
}

to

assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the diff on GitHub is a bit hard to read but I figured it out. 😅

@aljoscha
Copy link
Copy Markdown
Contributor

aljoscha commented Nov 8, 2017

These changes look good! 👍

I'll wait for travis and then merge.

@GJL GJL closed this Nov 8, 2017
@GJL GJL reopened this Nov 8, 2017
}

@Test(timeout = 20000)
public void testStopExecution() throws Exception {
Copy link
Copy Markdown
Member Author

@GJL GJL Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only tested that stop is invoked. Should be covered by testSetsUserCodeClassLoader now.

}

@Test(expected = RuntimeException.class)
public void testStopExecutionFail() throws Exception {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now covered by testThrowExceptionIfStopInvokedWithNotStoppableTask

fail("Task should be RUNNING or FINISHED, but is " + currentState);
}

task.cancelExecution();
Copy link
Copy Markdown
Member Author

@GJL GJL Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to the AutoCloseable TaskCleaner to avoid duplication.

}

// assert after task is canceled and executing thread is stopped to avoid race conditions
assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed that all three calls have been made at this point or could this be flaky due to race conditions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @aljoscha suggested, I think that there is no guarantee that the 3 calls have finished by the time we check, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you are right. I introduced another latch to counter this.

Copy link
Copy Markdown
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @GJL ! I have some minor comments, and a question about a test. As soon as these are addressed, I think that this is good to merge.

* @param classLoader The {@link ClassLoader} to be set as context class loader.
*/
public DispatcherThreadFactory(
ThreadGroup group,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a code style preference rather than an issue, but I would suggest to indent the arguments by a tab to separate them from the body of the method.

Copy link
Copy Markdown
Member Author

@GJL GJL Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is Indented now.


@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException("Should not be called");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure anymore but I decided to add it again.


@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("Should not be called");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Copy Markdown
Member Author

@GJL GJL Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure anymore but I decided to add it again.

}

// assert after task is canceled and executing thread is stopped to avoid race conditions
assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @aljoscha suggested, I think that there is no guarantee that the 3 calls have finished by the time we check, right?

GJL added 3 commits November 9, 2017 13:59
Throw UnsupportedOperationException when
CheckpointsInOrderInvokable#triggerCheckpointOnBarrier() and
CheckpointsInOrderInvokable#abortCheckpointOnBarrier() are called.
@GJL
Copy link
Copy Markdown
Member Author

GJL commented Nov 9, 2017

I addressed the comments. Let's wait for Travis and let me know if something else needs to be changed.

@aljoscha @kl0u

@aljoscha
Copy link
Copy Markdown
Contributor

aljoscha commented Nov 9, 2017

I think waiting on the stop latch might not be enough (in 100 % of cases) because the other two calls are also asynchronous.

@GJL
Copy link
Copy Markdown
Member Author

GJL commented Nov 9, 2017

As it is now, it should be enough as there is only one thread dispatching the calls:

executor = Executors.newSingleThreadExecutor(
						new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask));
				this.asyncCallDispatcher = executor;

The tasks cannot overtake each other. I could make the test stricter and wait additionally on triggerLatch in case somebody decides to have multiple threads.

@aljoscha

@aljoscha
Copy link
Copy Markdown
Contributor

aljoscha commented Nov 9, 2017

Yes, but I think this is making an assumption about the internal implementation. If someone changes that the test could break/not test the right thing anymore.

triggerLatch.trigger();
if (error != null) {
// exit method prematurely due to error but make sure that the tests can finish
triggerLatch.trigger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all latches, it should also have:if (!latch.isTriggered()) { latch.await() }

Copy link
Copy Markdown
Member Author

@GJL GJL Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that? I think at this point the latch might not get triggered at all (except here).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was just looking on the IDE and missed the lines. This line should be before every time you call await on the latch.

Copy link
Copy Markdown
Member Author

@GJL GJL Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't matter because the latch already checks for the flag:

	public void await() throws InterruptedException {
		synchronized (lock) {
			while (!triggered) {
				lock.wait();
			}
		}
	}
	public void trigger() {
		synchronized (lock) {
			triggered = true;
			lock.notifyAll();
		}
	}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a latch that was already triggered will simply return immediately, no need for an additional check

@aljoscha
Copy link
Copy Markdown
Contributor

aljoscha commented Nov 9, 2017

thanks, I think this is excellent now. 👌

I'll merge as soon as travis is green.

@kl0u
Copy link
Copy Markdown
Contributor

kl0u commented Nov 9, 2017

I agree! +1 to merge as soon as Travis gives us the green light.

@aljoscha
Copy link
Copy Markdown
Contributor

Thanks again for this fix! 👍

Could you please close if GitHub doesn't auto-close?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants