Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9784] Fix inconsistent use of 'static' in AsyncIOExample.java #6298

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -116,10 +116,8 @@ public void cancel() {
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;

private static ExecutorService executorService;
private static Random random;

private int counter;
private ExecutorService executorService;
private Random random;

/**
* The result of multiplying sleepFactor with a random float is used to pause
Expand All @@ -145,41 +143,28 @@ private static class SampleAsyncFunction extends RichAsyncFunction<Integer, Stri
public void open(Configuration parameters) throws Exception {
super.open(parameters);

synchronized (SampleAsyncFunction.class) {
if (counter == 0) {
executorService = Executors.newFixedThreadPool(30);

random = new Random();
}

++counter;
}
executorService = Executors.newFixedThreadPool(30);
random = new Random();
}

@Override
public void close() throws Exception {
super.close();

synchronized (SampleAsyncFunction.class) {
--counter;
executorService.shutdown();

if (counter == 0) {
executorService.shutdown();

try {
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
try {
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}

@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
this.executorService.submit(new Runnable() {
executorService.submit(new Runnable() {
@Override
public void run() {
// wait for while to simulate async operation here
Expand Down