Skip to content
This repository has been archived by the owner on Apr 3, 2018. It is now read-only.

Commit

Permalink
Refactor ExternalProcessHandler. Refactor and fix DataUpdateFuture.
Browse files Browse the repository at this point in the history
Fix CompositeCancelMonitor
  • Loading branch information
bruno-medeiros committed Jul 21, 2016
1 parent a3d0a19 commit 42ffd9b
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 172 deletions.
Expand Up @@ -14,13 +14,12 @@
import static melnorme.utilbox.core.Assert.AssertNamespace.assertTrue; import static melnorme.utilbox.core.Assert.AssertNamespace.assertTrue;


import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import melnorme.lang.tooling.common.ops.IOperationMonitor; import melnorme.lang.tooling.common.ops.IOperationMonitor;
import melnorme.utilbox.concurrency.AsyncSupplier;
import melnorme.utilbox.concurrency.CancellableTask; import melnorme.utilbox.concurrency.CancellableTask;
import melnorme.utilbox.concurrency.NonCancellableFuture;
import melnorme.utilbox.concurrency.OperationCancellation; import melnorme.utilbox.concurrency.OperationCancellation;
import melnorme.utilbox.core.fntypes.CallableX; import melnorme.utilbox.core.fntypes.CallableX;
import melnorme.utilbox.fields.ListenerListHelper; import melnorme.utilbox.fields.ListenerListHelper;
Expand Down Expand Up @@ -171,48 +170,43 @@ protected final void doRun() {


/* ----------------- ----------------- */ /* ----------------- ----------------- */



protected final DataUpdateFuture asFuture = new DataUpdateFuture();

/** /**
* @return a {@link Future} that can be used to wait for the first non-stale data that becomes available. * @return a {@link AsyncSupplier} that can be used to wait for the first non-stale data that becomes available.
*/ */
public DataUpdateFuture asFuture() { public DataUpdateFuture getSupplierForNextUpdate() {
return asFuture; return new DataUpdateFuture();
} }


public DATA awaitUpdatedData() throws InterruptedException { public DATA awaitUpdatedData() throws InterruptedException {
return asFuture().awaitResult(); return getSupplierForNextUpdate().awaitResult();
} }


public DATA awaitUpdatedData(IOperationMonitor om) throws OperationCancellation { public DATA awaitUpdatedData(IOperationMonitor om) throws OperationCancellation {
return asFuture().awaitData(om); return getSupplierForNextUpdate().awaitResult(om);
} }


public class DataUpdateFuture implements NonCancellableFuture<DATA> { public class DataUpdateFuture implements AsyncSupplier<DATA> {
@Override
public boolean isTerminated() {
return !isStale();
}


@Override public void awaitTermination() throws InterruptedException {
public DATA awaitResult() throws InterruptedException {
getLatchForUpdateTask().await(); getLatchForUpdateTask().await();
return data;
} }


@Override public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
public DATA awaitResult(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
boolean success = getLatchForUpdateTask().await(timeout, unit); boolean success = getLatchForUpdateTask().await(timeout, unit);
if(!success) { if(!success) {
throw new TimeoutException(); throw new TimeoutException();
} }
}

@Override
public DATA awaitResult() throws InterruptedException {
awaitTermination();
return data; return data;
} }


@Override @Override
public DATA getResult_forSuccessfulyCompleted() { public DATA awaitResult(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
assertTrue(isCompletedSuccessfully()); awaitTermination(timeout, unit);
return data; return data;
} }


Expand Down
Expand Up @@ -50,14 +50,13 @@ protected void handleCancellation() {
} }


@Override @Override
public RET awaitResult() throws InterruptedException, OperationCancellation { public void awaitTermination() throws InterruptedException {
return completableResult.awaitResult(); completableResult.awaitTermination();
} }


@Override @Override
public RET awaitResult(long timeout, TimeUnit unit) public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, OperationCancellation { completableResult.awaitTermination(timeout, unit);
return completableResult.awaitResult(timeout, unit);
} }


@Override @Override
Expand Down
Expand Up @@ -11,7 +11,7 @@
package melnorme.utilbox.concurrency; package melnorme.utilbox.concurrency;


public abstract class AbstractRunnableFuture2<RET> extends AbstractFuture2<RET> public abstract class AbstractRunnableFuture2<RET> extends AbstractFuture2<RET>
implements IRunnableFuture2<RET> implements IRunnableFuture2<RET>
{ {


public AbstractRunnableFuture2() { public AbstractRunnableFuture2() {
Expand Down Expand Up @@ -50,6 +50,12 @@ protected void internalTaskRun() {


protected abstract RET internalInvoke(); protected abstract RET internalInvoke();



public void completeWithResult(RET result) {
cancellableTask.markExecuted();
completableResult.setResult(result);
}

/* ----------------- ----------------- */ /* ----------------- ----------------- */


@Override @Override
Expand Down
@@ -0,0 +1,58 @@
/*******************************************************************************
* Copyright (c) 2016 Bruno Medeiros and other Contributors.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Bruno Medeiros - initial API and implementation
*******************************************************************************/
package melnorme.utilbox.concurrency;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Interface for a blocking/asynchrounous supplier.
* There is no guarantee, with this interface alone, that the supplier is the same for multiple calls
* of awaitResult
*
* @param <RESULT>
*/
public interface AsyncSupplier<RESULT> {

RESULT awaitResult()
throws OperationCancellation, InterruptedException;

RESULT awaitResult(long timeout, TimeUnit unit)
throws OperationCancellation, InterruptedException, TimeoutException;

/** Same as {@link #awaitResult()},
* but throw InterruptedException as an OperationCancellation. */
default RESULT awaitResult2() throws OperationCancellation {
try {
return awaitResult();
} catch(InterruptedException e) {
throw new OperationCancellation();
}
}

/* ----------------- ----------------- */

default RESULT awaitResult(ICancelMonitor cm) throws OperationCancellation {

while(true) {
cm.checkCancellation();

try {
return this.awaitResult(100, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
throw new OperationCancellation();
} catch(TimeoutException e) {
continue;
}
}
}

}
Expand Up @@ -32,7 +32,7 @@
* @see Future2 * @see Future2
* *
*/ */
public interface BasicFuture<RESULT> { public interface BasicFuture<RESULT> extends AsyncSupplier<RESULT> {


/** @return true if execution of this future has terminated already. */ /** @return true if execution of this future has terminated already. */
boolean isTerminated(); boolean isTerminated();
Expand All @@ -47,22 +47,10 @@ default boolean isCompletedSuccessfully() {




/* ----------------- ----------------- */ /* ----------------- ----------------- */

RESULT awaitResult()
throws OperationCancellation, InterruptedException;

RESULT awaitResult(long timeout, TimeUnit unit)
throws OperationCancellation, InterruptedException, TimeoutException;


/** Same as {@link #awaitResult()}, void awaitTermination() throws InterruptedException;
* but throw InterruptedException as an OperationCancellation. */
default RESULT awaitResult2() throws OperationCancellation { void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
try {
return awaitResult();
} catch(InterruptedException e) {
throw new OperationCancellation();
}
}


/** /**
* It is only legal to call this method if the future has terminated already. * It is only legal to call this method if the future has terminated already.
Expand All @@ -84,4 +72,20 @@ default RESULT getResult_forTerminated() throws OperationCancellation {
*/ */
RESULT getResult_forSuccessfulyCompleted(); RESULT getResult_forSuccessfulyCompleted();


/* ----------------- ----------------- */

@Override
default RESULT awaitResult()
throws OperationCancellation, InterruptedException {
awaitTermination();
return getResult_forTerminated();
}

@Override
default RESULT awaitResult(long timeout, TimeUnit unit)
throws OperationCancellation, InterruptedException, TimeoutException {
awaitTermination(timeout, unit);
return getResult_forTerminated();
}

} }
Expand Up @@ -70,7 +70,7 @@ public final void run() {
} }


// Can only run once // Can only run once
checkExecuted(); markExecuted();
assertTrue(runningThread == null); assertTrue(runningThread == null);
runningThread = Thread.currentThread(); runningThread = Thread.currentThread();
} }
Expand All @@ -96,7 +96,7 @@ public final void run() {
} }
} }


protected void checkExecuted() { public void markExecuted() {
assertTrue(executed == false); // Check that run is only called once assertTrue(executed == false); // Check that run is only called once
executed = true; executed = true;
} }
Expand Down
Expand Up @@ -37,11 +37,9 @@
* as opposed to getting silently swallowed/ignored by an executor worker thread. * as opposed to getting silently swallowed/ignored by an executor worker thread.
* *
*/ */
public class CompletableResult<DATA> public class CompletableResult<DATA> implements BasicFuture<DATA> {
implements BasicFuture<DATA>
{


protected final CountDownLatch completionLatch = new CountDownLatch(1); protected final CountDownLatch terminationLatch = new CountDownLatch(1);
protected final Object lock = new Object(); protected final Object lock = new Object();


protected volatile ResultStatus status = ResultStatus.NOT_TERMINATED; protected volatile ResultStatus status = ResultStatus.NOT_TERMINATED;
Expand All @@ -59,17 +57,32 @@ public boolean isTerminated() {
return status != ResultStatus.NOT_TERMINATED; return status != ResultStatus.NOT_TERMINATED;
} }


public boolean isCompleted() {
return isTerminated();
}

@Override @Override
public boolean isCancelled() { public boolean isCancelled() {
return status == ResultStatus.CANCELLED; return status == ResultStatus.CANCELLED;
} }


public CountDownLatch getCompletionLatch() { protected CountDownLatch getTerminationLatch() {
return completionLatch; return terminationLatch;
}

@Override
public void awaitTermination() throws InterruptedException {
if(isTerminated()) {
return; // Early check so that InterruptedException is not thrown
}
getTerminationLatch().await();
}

@Override
public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if(isTerminated()) {
return; // Early check so that InterruptedException is not thrown
}
boolean success = getTerminationLatch().await(timeout, unit);
if(!success) {
throw new TimeoutException();
}
} }


public void setResultFromCallable(CallableX<DATA, RuntimeException> resultCallable) { public void setResultFromCallable(CallableX<DATA, RuntimeException> resultCallable) {
Expand Down Expand Up @@ -104,7 +117,7 @@ protected void doSetResult(DATA result, RuntimeException re) {
this.result = result; this.result = result;
this.resultRuntimeException = re; this.resultRuntimeException = re;
status = ResultStatus.RESULT_SET; status = ResultStatus.RESULT_SET;
completionLatch.countDown(); terminationLatch.countDown();
} }
} }


Expand All @@ -121,7 +134,7 @@ public boolean setCancelledResult() {
return false; return false;
} else { } else {
status = ResultStatus.CANCELLED; status = ResultStatus.CANCELLED;
completionLatch.countDown(); terminationLatch.countDown();
return true; return true;
} }
} }
Expand All @@ -131,37 +144,6 @@ protected void handleReSetResult() {
throw assertFail(); throw assertFail();
} }


public void awaitCompletion() throws InterruptedException {
if(isCompleted()) {
return; // Early check so that InterruptedException is not thrown
}
completionLatch.await();
}

public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if(isCompleted()) {
return; // Early check so that InterruptedException is not thrown
}
boolean success = completionLatch.await(timeout, unit);
if(!success) {
throw new TimeoutException();
}
}

@Override
public DATA awaitResult()
throws OperationCancellation, InterruptedException {
awaitCompletion();
return getResult_forTerminated();
}

@Override
public DATA awaitResult(long timeout, TimeUnit unit)
throws OperationCancellation, InterruptedException, TimeoutException {
awaitCompletion(timeout, unit);
return getResult_forTerminated();
}

@Override @Override
public DATA getResult_forSuccessfulyCompleted() { public DATA getResult_forSuccessfulyCompleted() {
assertTrue(isCompletedSuccessfully()); assertTrue(isCompletedSuccessfully());
Expand Down
Expand Up @@ -66,7 +66,7 @@ public CompositeCancelMonitor(ICancelMonitor parentCancelMonitor) {


@Override @Override
public boolean isCancelled() { public boolean isCancelled() {
return super.isCancelled() && parentCancelMonitor.isCancelled(); return super.isCancelled() || parentCancelMonitor.isCancelled();
} }
} }


Expand Down

0 comments on commit 42ffd9b

Please sign in to comment.