Skip to content

Commit

Permalink
Replace ExecutionList with linked list of RunnableExecutionPair (#1287)
Browse files Browse the repository at this point in the history
Patch does CAS loop and XCHG instead of synchronized which should
be faster.

Patch also reduces per-future memory consumption.
  • Loading branch information
stepancheg authored and slandelle committed Oct 27, 2016
1 parent 84d1278 commit 671378f
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 174 deletions.
Expand Up @@ -29,12 +29,13 @@
package org.asynchttpclient.future; package org.asynchttpclient.future;


import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.ListenableFuture;


/** /**
* An abstract base implementation of the listener support provided by {@link ListenableFuture}. This class uses an {@link ExecutionList} to guarantee that all registered listeners * An abstract base implementation of the listener support provided by {@link ListenableFuture}.
* will be executed. Listener/Executor pairs are stored in the execution list and executed in the order in which they were added, but because of thread scheduling issues there is * Listener/Executor pairs are stored in the {@link RunnableExecutorPair} linked list in the order in which they were added, but because of thread scheduling issues there is
* no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added * no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added
* listeners have not yet been executed. * listeners have not yet been executed.
* *
Expand All @@ -43,41 +44,49 @@
*/ */
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> { public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {


private volatile boolean hasRun; /**
private volatile boolean executionListInitialized; * Marks that execution is already done, and new runnables
private volatile ExecutionList executionList; * should be executed right away instead of begin added to the list.
*/
private static final RunnableExecutorPair executedMarker = new RunnableExecutorPair();


private ExecutionList executionList() { /**
ExecutionList localExecutionList = executionList; * Linked list of executions or a {@link #executedMarker}.
if (localExecutionList == null) { */
synchronized (this) { private volatile RunnableExecutorPair executionList;
localExecutionList = executionList; private static final AtomicReferenceFieldUpdater<AbstractListenableFuture, RunnableExecutorPair> executionListField =
if (localExecutionList == null) { AtomicReferenceFieldUpdater.newUpdater(AbstractListenableFuture.class, RunnableExecutorPair.class, "executionList");
localExecutionList = new ExecutionList();
executionList = localExecutionList;
executionListInitialized = true;
}
}
}
return localExecutionList;
}


@Override @Override
public ListenableFuture<V> addListener(Runnable listener, Executor exec) { public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
executionList().add(listener, exec); for (;;) {
if (hasRun) { RunnableExecutorPair executionListLocal = this.executionList;
runListeners(); if (executionListLocal == executedMarker) {
RunnableExecutorPair.executeListener(listener, exec);
return this;
}

RunnableExecutorPair pair = new RunnableExecutorPair(listener, exec, executionListLocal);
if (executionListField.compareAndSet(this, executionListLocal, pair)) {
return this;
}
} }
return this;
} }


/** /**
* Execute the execution list. * Execute the execution list.
*/ */
protected void runListeners() { protected void runListeners() {
hasRun = true; RunnableExecutorPair execution = executionListField.getAndSet(this, executedMarker);
if (executionListInitialized) { if (execution == executedMarker) {
executionList().execute(); return;
}

RunnableExecutorPair reversedList = RunnableExecutorPair.reverseList(execution);

while (reversedList != null) {
RunnableExecutorPair.executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
} }
} }
} }
148 changes: 0 additions & 148 deletions client/src/main/java/org/asynchttpclient/future/ExecutionList.java

This file was deleted.

@@ -0,0 +1,70 @@
package org.asynchttpclient.future;

import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.asynchttpclient.util.Assertions;

/**
* Linked list of runnables with executors.
*/
final class RunnableExecutorPair {
private static final Logger log = Logger.getLogger(RunnableExecutorPair.class.getPackage().getName());

final Runnable runnable;
final Executor executor;
RunnableExecutorPair next;

RunnableExecutorPair() {
runnable = null;
executor = null;
}

RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
Assertions.assertNotNull(runnable, "runnable");

this.runnable = runnable;
this.executor = executor;
this.next = next;
}

/**
* Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
static void executeListener(Runnable runnable, Executor executor) {
try {
if (executor != null) {
executor.execute(runnable);
} else {
runnable.run();
}
} catch (RuntimeException e) {
// Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
// up.
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
}
}

static RunnableExecutorPair reverseList(RunnableExecutorPair list) {
// The pairs in the stack are in the opposite order from how they were added
// so we need to reverse the list to fulfill our contract.
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
// could drop the contract on the method that enforces this queue like behavior since depending
// on it is likely to be a bug anyway.

// N.B. All writes to the list and the next pointers must have happened before the above
// synchronized block, so we can iterate the list without the lock held here.
RunnableExecutorPair prev = null;

while (list != null) {
RunnableExecutorPair next = list.next;
list.next = prev;
prev = list;
list = next;
}

return prev;
}
}
@@ -0,0 +1,36 @@
package org.asynchttpclient.future;

import java.util.ArrayList;

import org.testng.Assert;
import org.testng.annotations.Test;

/**
* @author Stepan Koltsov
*/
public class RunnableExecutorPairTest {

@Test
public void testReverseList() {
// empty
{
Assert.assertNull(RunnableExecutorPair.reverseList(null));
}

for (int len = 1; len < 5; ++len) {
ArrayList<RunnableExecutorPair> list = new ArrayList<>();
for (int i = 0; i < len; ++i) {
RunnableExecutorPair prev = i != 0 ? list.get(i - 1) : null;
list.add(new RunnableExecutorPair(() -> {}, null, prev));
}

RunnableExecutorPair reversed = RunnableExecutorPair.reverseList(list.get(list.size() - 1));
for (int i = 0; i < len; ++i) {
Assert.assertSame(reversed, list.get(i));
Assert.assertSame(i != len - 1 ? list.get(i + 1) : null, reversed.next);
reversed = reversed.next;
}
}
}

}

0 comments on commit 671378f

Please sign in to comment.