-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-7880][QS] Wait for proper resource cleanup after each ITCase. #5062
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f7f8d2a to
43a2af6
Compare
|
This is a follow-up to #4993 |
|
Your follow-up reference points to this very PR :P |
|
Fixed @zentol ! |
zentol
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review for the first 3 commits.
| }); | ||
|
|
||
| // check again if in the meantime another thread completed the future | ||
| if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can never be true. We can only arrive here if clientShutdownFuture.compareAndSet(null, newShutdownFuture) succeeds, and there is no other code path that ever sets it to null again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right!
| // don't want any lingering connections after shut down, | ||
| // which can happen if we don't check this here. | ||
| if (shutDown.get()) { | ||
| if (!clientShutdownFuture.compareAndSet(null, null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for clarity we should call get() instead of compareAndSet since the setting part is a no-op anyway.
| failureCause = cause; | ||
| } | ||
|
|
||
| closed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this field should be redundant, as (connectionShutdownFuture.get() != null) == closed should always hold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I will remove that.
| private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); | ||
| @Before | ||
| public void setUp() throws Exception { | ||
| nioGroup = new NioEventLoopGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method isn't strictly necessary; you can also do private final NioEventLoopGroup nioGroup = new NioEventLoopGroup(). Then you would also no longer need the null-check in @After.
| } catch (Exception e) { | ||
| e.printStackTrace(); | ||
| } | ||
| Assert.assertTrue(client.isEventGroupShutdown()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer if we would throw e in this and similar cases.
| } | ||
| public CompletableFuture<Void> shutdownServer(Time timeout) throws InterruptedException { | ||
| CompletableFuture<Void> shutdownFuture = new CompletableFuture<>(); | ||
| if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling shutdown twice in a row will result in a future being returned that never completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the idea is that we atomically set the future the first time (serverShutdownFuture.compareAndSet(null, shutdownFuture)), and then, at each subsequent call we return that exact future (serverShutdownFuture.get();). So I cannot see how this can lead to returning a future that never completes.
| TimestampedCompletableFuture pending = pendingRequests.remove(requestId); | ||
| if (pending != null && pending.completeExceptionally(cause)) { | ||
| stats.reportFailedRequest(); | ||
| if (connectionShutdownFuture.compareAndSet(null, shutdownFuture) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can result in odd race conditions, where one thread has set the shutdownFuture, but another one the failureCause. I would suggest to create a container for the future and exception and update them atomically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also log other shutdown attempts as DEBUG.
| // requests will be handled by this failed channel. | ||
| establishedConnections.remove(serverAddress, this); | ||
| } | ||
| close(cause).thenAccept(cancelled -> establishedConnections.remove(serverAddress, this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we remove the connection in any case, since if we can't close something is probably wrong with it anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
|
|
||
| Await.ready(cancellation, deadline.timeLeft()); | ||
| // we are not waiting for the cancellation to happen because the | ||
| // job has actually failed, as tested above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't guarantee this in the finally block. (for example if submitJobDetached failed but the job is actually running)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. So I will just remove the finally block and check if the status of the job is FAILED.
| * contains a wrong jobId or wrong queryable state name. | ||
| */ | ||
| @Test | ||
| @Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
43a2af6 to
9ba9f6f
Compare
| if (throwable != null) { | ||
| newShutdownFuture.completeExceptionally(throwable); | ||
| } else if (bootstrap != null) { | ||
| LOG.warn("Problem while shutting down the connections at the {}: {}", clientName, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.warn("Problem while shutting down the connections at the {}", clientName, throwable); instead?
|
|
||
| server2.start(); | ||
| List<Integer> occupiedPortList = new ArrayList<>(); | ||
| occupiedPortList.add(server1.getServerAddress().getPort()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could use Collections.singletonList() here
|
|
||
| private CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs( | ||
| /** | ||
| * A wrapper of the job graph that makes sure to cancell the job and wait for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: cancell -> cancel
9ba9f6f to
7ae3e16
Compare
| assertEquals(expectedRequests, stats.getNumSuccessful()); | ||
| assertEquals(expectedRequests, stats.getNumFailed()); | ||
| } finally { | ||
| Exception exc = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be moved into the if block.
| } catch (Exception e) { | ||
| e.printStackTrace(); | ||
| exc = e; | ||
| LOG.error(ExceptionUtils.stringifyException(e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use LOG.error("An exception occurred while shutting down netty.", e) instead
| try { | ||
| final TestingJobManagerMessages.JobStatusIs jobStatus = | ||
| failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); | ||
| assertEquals(JobStatus.FAILED, jobStatus.state()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this always true if the future did not time out? (In which case get() throws a TimeoutException)
6282a77 to
e11e56a
Compare
|
👍 |
e11e56a to
8e3c4f3
Compare
8e3c4f3 to
a3fd548
Compare
|
Thanks @zentol ! I will let another travis run, and then merge. |
R @aljoscha