Skip to content

Commit ca63aaf

Browse files
jkosh44keith-turner
authored andcommitted
fixes #722 Replace ListenableFuture with CompletableFuture (#975)
1 parent 185a1cb commit ca63aaf

4 files changed

Lines changed: 86 additions & 162 deletions

File tree

modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919
import java.util.Collections;
2020
import java.util.Iterator;
2121
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutorService;
2224
import java.util.concurrent.TimeUnit;
2325

2426
import com.google.common.collect.ImmutableList;
2527
import com.google.common.collect.ImmutableList.Builder;
26-
import com.google.common.util.concurrent.AsyncFunction;
27-
import com.google.common.util.concurrent.Futures;
28-
import com.google.common.util.concurrent.ListenableFuture;
29-
import com.google.common.util.concurrent.ListeningExecutorService;
30-
import com.google.common.util.concurrent.MoreExecutors;
3128
import org.apache.accumulo.core.client.ConditionalWriter;
3229
import org.apache.accumulo.core.client.ConditionalWriter.Result;
3330
import org.apache.accumulo.core.data.ConditionalMutation;
@@ -36,11 +33,10 @@
3633
import org.apache.fluo.core.util.FluoExecutors;
3734
import org.apache.fluo.core.util.Limit;
3835

39-
public class AsyncConditionalWriter
40-
implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
36+
public class AsyncConditionalWriter {
4137

4238
private final ConditionalWriter cw;
43-
private final ListeningExecutorService les;
39+
private final ExecutorService es;
4440
private final Limit semaphore;
4541

4642

@@ -50,55 +46,38 @@ public AsyncConditionalWriter(Environment env, ConditionalWriter cw) {
5046
FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
5147
int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
5248
FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
53-
this.les =
54-
MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
49+
this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
5550
// the conditional writer currently has not memory limits... give it too much and it blows out
5651
// memory.. need to fix this in conditional writer
5752
// for now this needs to be memory based
5853
this.semaphore = new Limit(permits);
5954
}
6055

61-
private class IterTask implements Callable<Iterator<Result>> {
62-
63-
private Iterator<Result> input;
64-
private int permitsAcquired;
65-
66-
public IterTask(Iterator<Result> iter, int permitsAcquired) {
67-
this.input = iter;
68-
this.permitsAcquired = permitsAcquired;
56+
public CompletableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
57+
if (input.size() == 0) {
58+
return CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
6959
}
7060

71-
@Override
72-
public Iterator<Result> call() throws Exception {
61+
semaphore.acquire(input.size());
62+
Iterator<Result> iter = cw.write(input.iterator());
63+
return CompletableFuture.supplyAsync(() -> {
7364
try {
7465
Builder<Result> imlb = ImmutableList.builder();
75-
while (input.hasNext()) {
76-
Result result = input.next();
66+
while (iter.hasNext()) {
67+
Result result = iter.next();
7768
imlb.add(result);
7869
}
7970
return imlb.build().iterator();
8071
} finally {
81-
semaphore.release(permitsAcquired);
72+
semaphore.release(input.size());
8273
}
83-
}
84-
85-
}
86-
87-
@Override
88-
public ListenableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
89-
if (input.size() == 0) {
90-
return Futures.immediateFuture(Collections.<Result>emptyList().iterator());
91-
}
92-
93-
semaphore.acquire(input.size());
94-
Iterator<Result> iter = cw.write(input.iterator());
95-
return les.submit(new IterTask(iter, input.size()));
74+
}, es);
9675
}
9776

9877
public void close() {
99-
les.shutdownNow();
78+
es.shutdownNow();
10079
try {
101-
les.awaitTermination(5, TimeUnit.SECONDS);
80+
es.awaitTermination(5, TimeUnit.SECONDS);
10281
} catch (InterruptedException e) {
10382
throw new RuntimeException(e);
10483
}

modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.concurrent.ArrayBlockingQueue;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.atomic.AtomicLong;
27+
import java.util.function.Supplier;
2628

27-
import com.google.common.util.concurrent.Futures;
28-
import com.google.common.util.concurrent.ListenableFuture;
29-
import com.google.common.util.concurrent.ListenableFutureTask;
3029
import org.apache.accumulo.core.client.BatchWriter;
3130
import org.apache.accumulo.core.client.MutationsRejectedException;
3231
import org.apache.accumulo.core.data.Mutation;
@@ -42,13 +41,15 @@ public class SharedBatchWriter {
4241

4342
private AtomicLong asyncBatchesAdded = new AtomicLong(0);
4443
private long asyncBatchesProcessed = 0;
44+
// added to avoid findbugs false positive
45+
private static final Supplier<Void> NULLS = () -> null;
4546

4647
private static class MutationBatch {
4748

4849
private Collection<Mutation> mutations;
4950
private CountDownLatch cdl;
5051
private boolean isAsync = false;
51-
private ListenableFutureTask<Void> lf;
52+
private CompletableFuture<Void> cf;
5253

5354
public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
5455
this.mutations = mutations;
@@ -58,9 +59,9 @@ public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
5859
}
5960
}
6061

61-
public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) {
62+
public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
6263
this.mutations = mutations;
63-
this.lf = lf;
64+
this.cf = cf;
6465
this.cdl = null;
6566
this.isAsync = false;
6667
}
@@ -70,8 +71,8 @@ public void countDown() {
7071
cdl.countDown();
7172
}
7273

73-
if (lf != null) {
74-
lf.run();
74+
if (cf != null) {
75+
cf.complete(NULLS.get());
7576
}
7677
}
7778
}
@@ -170,27 +171,22 @@ void writeMutations(Collection<Mutation> ml) {
170171
}
171172
}
172173

173-
private static final Runnable DO_NOTHING = new Runnable() {
174-
@Override
175-
public void run() {}
176-
};
177-
178-
ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
174+
CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
179175
if (ml.size() == 0) {
180-
return Futures.immediateFuture(null);
176+
return CompletableFuture.completedFuture(NULLS.get());
181177
}
182178

183-
ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, null);
179+
CompletableFuture<Void> cf = new CompletableFuture<>();
184180
try {
185-
MutationBatch mb = new MutationBatch(ml, lf);
181+
MutationBatch mb = new MutationBatch(ml, cf);
186182
mutQueue.put(mb);
187-
return lf;
183+
return cf;
188184
} catch (Exception e) {
189185
throw new RuntimeException(e);
190186
}
191187
}
192188

193-
ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
189+
CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
194190
return writeMutationsAsyncFuture(Collections.singleton(m));
195191
}
196192

0 commit comments

Comments
 (0)