Skip to content

Commit

Permalink
[FLINK-13059][Cassandra Connector] Use getMaxConcurrentRequestsTimeou…
Browse files Browse the repository at this point in the history
…t() also for flush
  • Loading branch information
mchro committed Jul 6, 2019
1 parent 5f06981 commit 7fe8595
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Expand Up @@ -164,8 +164,17 @@ private void checkAsyncErrors() throws Exception {
}
}

private void flush() {
semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
private void flush() throws InterruptedException, TimeoutException {
if (!semaphore.tryAcquire(config.getMaxConcurrentRequests(), config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException(
String.format(
"Failed to acquire all permits (%s) for flush in %s, only %s available",
config.getMaxConcurrentRequests(),
config.getMaxConcurrentRequestsTimeout(),
semaphore.availablePermits()
)
);
}
semaphore.release(config.getMaxConcurrentRequests());
}

Expand Down
Expand Up @@ -181,7 +181,7 @@ public void go() throws Exception {
}
};
t.start();
while (t.getState() != Thread.State.WAITING) {
while (t.getState() != Thread.State.TIMED_WAITING) {
Thread.sleep(5);
}

Expand Down Expand Up @@ -213,7 +213,7 @@ public void go() throws Exception {
}
};
t.start();
while (t.getState() != Thread.State.WAITING) {
while (t.getState() != Thread.State.TIMED_WAITING) {
Thread.sleep(5);
}

Expand Down

0 comments on commit 7fe8595

Please sign in to comment.