Skip to content

Commit

Permalink
Issue #64: Added a future object for AnalyzerResult
Browse files Browse the repository at this point in the history
  • Loading branch information
kaspersorensen committed Dec 8, 2014
1 parent 4a5d6cf commit e5078fc
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 2 deletions.
@@ -0,0 +1,233 @@
/**
* AnalyzerBeans
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.eobjects.analyzer.result;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.metamodel.util.HasName;
import org.apache.metamodel.util.Ref;
import org.apache.metamodel.util.SharedExecutorService;
import org.eobjects.analyzer.beans.api.Analyzer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Represents an {@link AnalyzerResult} that is still being produced.
*
* Usually {@link AnalyzerResult}s are produced immediately by the
* {@link Analyzer#getResult()} method, but in cases where this may take a long
* time, an {@link Analyzer} can instead return a result of this type and
* thereby indicate that some process is still going on, but the rest of the job
* is ready to return.
*
* @param <R>
* the wrapped {@link AnalyzerResult} type.
*/
public class AnalyzerResultFuture<R extends AnalyzerResult> implements AnalyzerResult, HasName, Ref<R> {

/**
* Listener interface for objects that want to be notified when the wrapped
* {@link AnalyzerResult} is ready.
*
* @param <R>
*/
public static interface Listener<R extends AnalyzerResult> {

public void onSuccess(R result);

public void onError(RuntimeException error);
}

private static final Logger logger = LoggerFactory.getLogger(AnalyzerResultFuture.class);

private static final long serialVersionUID = 1L;

private transient final CountDownLatch _countDownLatch;
private transient List<Listener<R>> _listeners;

private final String _name;
private R _result;
private RuntimeException _error;

/**
* Constructs an {@link AnalyzerResultFuture}
*
* @param name
* a name/label to use for presenting and distinguishing this
* result from others.
* @param resultRef
* a reference for the result being processed.
*/
public AnalyzerResultFuture(String name, final Ref<? extends R> resultRef) {
_name = name;
_countDownLatch = new CountDownLatch(1);
_result = null;
_error = null;

SharedExecutorService.get().submit(new Runnable() {

@Override
public void run() {
try {
_result = resultRef.get();
onSuccess();
} catch (RuntimeException e) {
_error = e;
onError();
} finally {
_countDownLatch.countDown();
}
}
});
}

/**
* Adds a {@link Listener} to this {@link AnalyzerResultFuture}.
*
* @param listener
*/
public synchronized void addListener(Listener<R> listener) {
// it might be we add a listener AFTER the result is actually produced,
// in which case we simply inform the listener immediately.
if (isReady()) {
if (_error != null) {
listener.onError(_error);
} else {
listener.onSuccess(_result);
}
return;
}

if (_listeners == null) {
_listeners = new LinkedList<>();
}
_listeners.add(listener);
}

/**
* Removes a {@link Listener} from this {@link AnalyzerResultFuture}.
*
* @param listener
*/
public synchronized void removeListener(Listener<R> listener) {
if (_listeners == null) {
return;
}
_listeners.remove(listener);
}

private synchronized void onSuccess() {
if (_listeners == null) {
return;
}
try {
for (final Listener<R> listener : _listeners) {
try {
listener.onSuccess(_result);
} catch (Exception e) {
logger.warn("Unexpected exception while informing listener of success: {}", listener, e);
}
}
} catch (Exception e) {
logger.warn("Unexpected exception while iterating listeners on success", e);
} finally {
_listeners = null;
}
}

private synchronized void onError() {
if (_listeners == null) {
return;
}
try {
for (final Listener<R> listener : _listeners) {
try {
listener.onError(_error);
} catch (Exception e) {
logger.warn("Unexpected exception while informing listener on error: {}", listener, e);
}
}
} catch (Exception e) {
logger.warn("Unexpected exception while iterating listeners on error", e);
} finally {
_listeners = null;
}
}

/**
* Determines if the wrapped {@link AnalyzerResult} is ready or if
* processing is still going on to produce it.
*
* Once ready, call {@link #get()} to get it.
*
* @return true if the wrapped {@link AnalyzerResult} is ready or false if
* it is not.
*/
public boolean isReady() {
if (_countDownLatch == null) {
return true;
}

return _countDownLatch.getCount() == 0;
}

@Override
public R get() {
if (_countDownLatch != null) {
try {
_countDownLatch.await();
} catch (InterruptedException e) {
// do nothing
}
}
if (_error != null) {
throw _error;
}
return _result;
}

@Override
public String toString() {
return "AnalyzerResultFuture[" + _name + "]";
}

@Override
public String getName() {
return _name;
}

/**
* Method invoked when serialization takes place. Makes sure that we await
* the loading of the result reference before writing any data.
*
* @param out
* @throws IOException
*/
private void writeObject(ObjectOutputStream out) throws IOException {
logger.info("Serialization requested, awaiting reference to load.");
get();
out.defaultWriteObject();
logger.info("Serialization finished!");
}
}
Expand Up @@ -26,8 +26,6 @@
* KPI or something like that).
*
* Mostly used for testing purposes.
*
*
*/
public class NumberResult implements AnalyzerResult {

Expand Down
@@ -0,0 +1,131 @@
/**
* AnalyzerBeans
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.eobjects.analyzer.result;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import junit.framework.TestCase;

import org.apache.metamodel.util.ImmutableRef;
import org.apache.metamodel.util.LazyRef;
import org.apache.metamodel.util.Ref;
import org.eobjects.analyzer.result.AnalyzerResultFuture.Listener;

public class AnalyzerResultFutureTest extends TestCase {

public void testAddListenerWhenResultIsReady() throws Exception {
final NumberResult result1 = new NumberResult(42);

final AnalyzerResultFuture<NumberResult> future = new AnalyzerResultFuture<>("foo",
new ImmutableRef<NumberResult>(result1));

final NumberResult result2 = future.get();

assertEquals(result1, result2);

final AtomicBoolean b = new AtomicBoolean(false);

future.addListener(new Listener<NumberResult>() {
@Override
public void onSuccess(NumberResult result) {
assertEquals(result1, result);
b.set(true);
}

@Override
public void onError(RuntimeException error) {
fail("This should never happen");
}
});

assertTrue(b.get());

assertEquals("AnalyzerResultFuture[foo]", future.toString());
}

public void testMultiThreadedListenerScenario() throws Exception {
final int threadCount = 10;

final Thread[] threads = new Thread[threadCount];
@SuppressWarnings({ "unchecked" })
final Listener<NumberResult>[] listeners = new Listener[threadCount];
final Queue<Object> resultQueue = new ArrayBlockingQueue<>(threadCount);

for (int i = 0; i < listeners.length; i++) {
listeners[i] = new Listener<NumberResult>() {
@Override
public void onSuccess(NumberResult result) {
resultQueue.add(result);
}

@Override
public void onError(RuntimeException error) {
resultQueue.add(error);
}
};
}

final Ref<NumberResult> resultRef = new LazyRef<NumberResult>() {
@Override
protected NumberResult fetch() throws Throwable {
long randomSleepTime = (long) (1000 * Math.random());
Thread.sleep(randomSleepTime);
return new NumberResult(43);
}
};

final AnalyzerResultFuture<NumberResult> future = new AnalyzerResultFuture<>("foo", resultRef);

for (int i = 0; i < threads.length; i++) {
final Listener<NumberResult> listener = listeners[i];
threads[i] = new Thread() {
@Override
public void run() {
future.addListener(listener);
}
};
}

final int halfOfTheThreads = threads.length / 2;
for (int i = 0; i < halfOfTheThreads; i++) {
threads[i].start();
}
for (int i = 0; i < halfOfTheThreads; i++) {
threads[i].join();
}

future.get();

assertEquals("[43, 43, 43, 43, 43]", resultQueue.toString());
assertEquals(halfOfTheThreads, resultQueue.size());

for (int i = halfOfTheThreads; i < threads.length; i++) {
threads[i].start();
}
for (int i = halfOfTheThreads; i < threads.length; i++) {
threads[i].join();
}

assertEquals("[43, 43, 43, 43, 43, 43, 43, 43, 43, 43]", resultQueue.toString());
assertEquals(threads.length, resultQueue.size());
}
}

0 comments on commit e5078fc

Please sign in to comment.