Skip to content

Commit

Permalink
fixes #722 Replace ListenableFuture with CompletableFuture (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 authored and keith-turner committed Dec 11, 2017
1 parent 185a1cb commit ca63aaf
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.data.ConditionalMutation;
Expand All @@ -36,11 +33,10 @@
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Limit;

public class AsyncConditionalWriter
implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
public class AsyncConditionalWriter {

private final ConditionalWriter cw;
private final ListeningExecutorService les;
private final ExecutorService es;
private final Limit semaphore;


Expand All @@ -50,55 +46,38 @@ public AsyncConditionalWriter(Environment env, ConditionalWriter cw) {
FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
this.les =
MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
// the conditional writer currently has not memory limits... give it too much and it blows out
// memory.. need to fix this in conditional writer
// for now this needs to be memory based
this.semaphore = new Limit(permits);
}

private class IterTask implements Callable<Iterator<Result>> {

private Iterator<Result> input;
private int permitsAcquired;

public IterTask(Iterator<Result> iter, int permitsAcquired) {
this.input = iter;
this.permitsAcquired = permitsAcquired;
public CompletableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
if (input.size() == 0) {
return CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
}

@Override
public Iterator<Result> call() throws Exception {
semaphore.acquire(input.size());
Iterator<Result> iter = cw.write(input.iterator());
return CompletableFuture.supplyAsync(() -> {
try {
Builder<Result> imlb = ImmutableList.builder();
while (input.hasNext()) {
Result result = input.next();
while (iter.hasNext()) {
Result result = iter.next();
imlb.add(result);
}
return imlb.build().iterator();
} finally {
semaphore.release(permitsAcquired);
semaphore.release(input.size());
}
}

}

@Override
public ListenableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
if (input.size() == 0) {
return Futures.immediateFuture(Collections.<Result>emptyList().iterator());
}

semaphore.acquire(input.size());
Iterator<Result> iter = cw.write(input.iterator());
return les.submit(new IterTask(iter, input.size()));
}, es);
}

public void close() {
les.shutdownNow();
es.shutdownNow();
try {
les.awaitTermination(5, TimeUnit.SECONDS);
es.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
Expand All @@ -42,13 +41,15 @@ public class SharedBatchWriter {

private AtomicLong asyncBatchesAdded = new AtomicLong(0);
private long asyncBatchesProcessed = 0;
// added to avoid findbugs false positive
private static final Supplier<Void> NULLS = () -> null;

private static class MutationBatch {

private Collection<Mutation> mutations;
private CountDownLatch cdl;
private boolean isAsync = false;
private ListenableFutureTask<Void> lf;
private CompletableFuture<Void> cf;

public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
this.mutations = mutations;
Expand All @@ -58,9 +59,9 @@ public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
}
}

public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) {
public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
this.mutations = mutations;
this.lf = lf;
this.cf = cf;
this.cdl = null;
this.isAsync = false;
}
Expand All @@ -70,8 +71,8 @@ public void countDown() {
cdl.countDown();
}

if (lf != null) {
lf.run();
if (cf != null) {
cf.complete(NULLS.get());
}
}
}
Expand Down Expand Up @@ -170,27 +171,22 @@ void writeMutations(Collection<Mutation> ml) {
}
}

private static final Runnable DO_NOTHING = new Runnable() {
@Override
public void run() {}
};

ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
if (ml.size() == 0) {
return Futures.immediateFuture(null);
return CompletableFuture.completedFuture(NULLS.get());
}

ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, null);
CompletableFuture<Void> cf = new CompletableFuture<>();
try {
MutationBatch mb = new MutationBatch(ml, lf);
MutationBatch mb = new MutationBatch(ml, cf);
mutQueue.put(mb);
return lf;
return cf;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
return writeMutationsAsyncFuture(Collections.singleton(m));
}

Expand Down
Loading

0 comments on commit ca63aaf

Please sign in to comment.