Skip to content

Commit

Permalink
Removed BThreadJSProxy. More cleaups
Browse files Browse the repository at this point in the history
  • Loading branch information
michbarsinai committed Dec 25, 2018
1 parent 8a0801a commit 1a64722
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 131 deletions.
3 changes: 3 additions & 0 deletions README.md
Expand Up @@ -47,6 +47,9 @@ a link to this page somewhere in the documentation/system about section.
* :sparkles: Removed dependencies (closes #69)
* :bug: fixed a crash in `PrioritizedBThreadsEventSelectionStrategy` when all the events were blocked (#70).
* :sparkles: more tests
* :sparkles: Cleanups
* :arrows_counterclockwise: Using a fixed thread pool, rather than a cached one (so preventing cases where too many threads from running at once).
* :put_litter_in_its_place: Setting interrupt handlers, the last BPjs task that was not under `bp` object, is now a `bp` method. So `setInterruptHandler` is now `bp.setInterruptHandler`.

### 2018-12-23
* :put_litter_in_its_place: `BProgram` cannot evaluate resources anymore. This change also removes this ability from running JavaScript b-program. It made no sense anyway, and was not used.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/BPjsTutorial/code/interrupts-handler.js
Expand Up @@ -18,7 +18,7 @@ bp.registerBThread("Oven", function(){

// Make me a cake as fast as you can
bp.registerBThread("Baker", function() {
setInterruptHandler( function(evt) {
bp.setInterruptHandler( function(evt) {
bp.log.warn("Error making cake: " + evt);
bp.enqueueExternalEvent(bp.Event("No cake for you!"));
bp.enqueueExternalEvent(bp.Event("Come back - 1 month!"));
Expand Down
Expand Up @@ -28,7 +28,6 @@
import il.ac.bgu.cs.bp.bpjs.model.SyncStatement;
import il.ac.bgu.cs.bp.bpjs.model.BThreadSyncSnapshot;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BProgramJsProxy;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BThreadJsProxy;
import il.ac.bgu.cs.bp.bpjs.model.BEvent;
import il.ac.bgu.cs.bp.bpjs.model.FailedAssertion;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -198,16 +197,12 @@ private BThreadSyncSnapshot readBThreadSnapshot(ScriptableInputStream sis, Scrip
String name = (String) sis.readObject();
byte[] contBytes = (byte[]) sis.readObject();

final BThreadJsProxy btProxy = new BThreadJsProxy();
final BProgramJsProxy bpProxy = new BProgramJsProxy(bprogram);

StubProvider stubPrv = (StreamObjectStub stub) -> {
if (stub == StreamObjectStub.BP_PROXY) {
return bpProxy;
}
if (stub == StreamObjectStub.BT_PROXY) {
return btProxy;
}
throw new IllegalArgumentException("Unknown stub " + stub);
};

Expand All @@ -220,7 +215,6 @@ private BThreadSyncSnapshot readBThreadSnapshot(ScriptableInputStream sis, Scrip
Object cont = bssis.readObject();
final BThreadSyncSnapshot bThreadSyncSnapshot = new BThreadSyncSnapshot(name, entryPoint, interruptHandler, btScope, cont, stmt);

btProxy.setBThread(bThreadSyncSnapshot);
return bThreadSyncSnapshot;
}

Expand Down
Expand Up @@ -23,9 +23,8 @@
*/
package il.ac.bgu.cs.bp.bpjs.bprogramio;

import il.ac.bgu.cs.bp.bpjs.model.BThreadSyncSnapshot;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BProgramJsProxy;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BThreadJsProxy;
import il.ac.bgu.cs.bp.bpjs.model.BThreadSyncSnapshot;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
Expand All @@ -50,11 +49,7 @@ public BThreadSyncSnapshotOutputStream(OutputStream out, Scriptable scope) throw

@Override
protected Object replaceObject(Object obj) throws IOException {
if ( obj instanceof BThreadJsProxy ) {
stubs.add(StreamObjectStub.BT_PROXY);
return StreamObjectStub.BT_PROXY;

} else if ( obj instanceof BProgramJsProxy ) {
if ( obj instanceof BProgramJsProxy ) {
stubs.add(StreamObjectStub.BP_PROXY);
return StreamObjectStub.BP_PROXY;

Expand Down
Expand Up @@ -33,7 +33,6 @@
public class StreamObjectStub implements java.io.Serializable {

public static final StreamObjectStub BP_PROXY = new StreamObjectStub("bp-proxy");
public static final StreamObjectStub BT_PROXY = new StreamObjectStub("bt-proxy");

private final String name;

Expand Down Expand Up @@ -61,7 +60,6 @@ public int hashCode() {

private Object readResolve() {
if ( name.equals(BP_PROXY.name) ) return BP_PROXY;
if ( name.equals(BT_PROXY.name) ) return BT_PROXY;
return this;
}

Expand Down
Expand Up @@ -79,11 +79,10 @@ public void run() {
cur.getBThreadSnapshots().forEach(sn->listeners.forEach( l -> l.bthreadAdded(bprog, sn)) );

// start it
listeners.forEach(l -> l.started(bprog));
cur = cur.start(execSvc);

go.set(true);
halted = false;
listeners.forEach(l -> l.started(bprog));
cur = cur.start(execSvc);

if ( ! cur.isStateValid() ) {
failedAssertion = cur.getFailedAssertion();
Expand All @@ -93,11 +92,11 @@ public void run() {

// while snapshot not empty, select an event and get the next snapshot.
while ( (!cur.noBThreadsLeft()) && go.get() ) {

// see which events are selectable
Set<BEvent> possibleEvents = bprog.getEventSelectionStrategy().selectableEvents(cur.getStatements(), cur.getExternalEvents());
if ( possibleEvents.isEmpty() ) {
// Superstep done: No events available or selection.
// Superstep done: No events available for selection.

if ( bprog.isWaitForExternalEvents() ) {
listeners.forEach( l->l.superstepDone(bprog) );
Expand Down
Expand Up @@ -45,6 +45,16 @@
public class BProgramJsProxy extends SyncStatementBuilder
implements java.io.Serializable {

private static final ThreadLocal<BThreadSyncSnapshot> CURRENT_BTHREAD = new ThreadLocal<>();


public static void setCurrentBThread( BThreadSyncSnapshot bss ) {
CURRENT_BTHREAD.set(bss);
}

public static void clearCurrentBThread(){
CURRENT_BTHREAD.remove();
}

private final BProgram program;

Expand Down Expand Up @@ -150,6 +160,11 @@ public void fork() throws ContinuationPending {
}


public void setInterruptHandler( Object aPossibleHandler ) {
CURRENT_BTHREAD.get().setInterruptHandler(
(aPossibleHandler instanceof Function) ? (Function) aPossibleHandler: null );
}

////////////////////////
// sync ("bsync") related code

Expand Down

This file was deleted.

@@ -1,6 +1,7 @@
package il.ac.bgu.cs.bp.bpjs.execution.tasks;

import il.ac.bgu.cs.bp.bpjs.bprogramio.BThreadSyncSnapshotOutputStream;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BProgramJsProxy;
import il.ac.bgu.cs.bp.bpjs.model.BProgram;
import il.ac.bgu.cs.bp.bpjs.model.SyncStatement;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -48,6 +49,7 @@ public BThreadSyncSnapshot call() {

Context jsContext = Context.enter();
try {
BProgramJsProxy.setCurrentBThread(bss);
return callImpl( jsContext );

} catch (ContinuationPending cbs) {
Expand All @@ -67,6 +69,7 @@ public BThreadSyncSnapshot call() {
if (Context.getCurrentContext() != null) {
Context.exit();
}
BProgramJsProxy.clearCurrentBThread();
}

}
Expand All @@ -84,7 +87,6 @@ private BThreadSyncSnapshot handleContinuationPending(ContinuationPending cbs, C

if ( capturedStatement instanceof SyncStatement ) {
final SyncStatement syncStatement = (SyncStatement) cbs.getApplicationState();
syncStatement.setBthread(bss);
return bss.copyWith(cbs.getContinuation(), syncStatement);

} else if ( capturedStatement instanceof ForkStatement ) {
Expand All @@ -105,21 +107,20 @@ private BThreadSyncSnapshot handleContinuationPending(ContinuationPending cbs, C
}

listener.addFork(forkStmt);
return continueParentOfFork(forkStmt, cbs, jsContext);
return continueParentOfFork(cbs, jsContext);

} else {
throw new IllegalStateException("Captured a statement of an unknown type: " + capturedStatement);
}
}

private BThreadSyncSnapshot continueParentOfFork( ForkStatement forkStmt, ContinuationPending cbs, Context jsContext){
private BThreadSyncSnapshot continueParentOfFork( ContinuationPending cbs, Context jsContext){
try {
System.out.println("Fork: " + forkStmt);
jsContext.resumeContinuation(cbs.getContinuation(),
(Scriptable)cbs.getContinuation(), Undefined.instance);
return null;

} catch (ContinuationPending cbs2) {
} catch ( ContinuationPending cbs2 ) {
return handleContinuationPending(cbs2, jsContext);

} catch ( WrappedException wfae ) {
Expand Down
Expand Up @@ -44,7 +44,7 @@ public static ExecutorService makeWithName( String threadNameTemplate ) {
retVal.setName(threadNameTemplate + "#" + threadCoutner.incrementAndGet() );
return retVal;
};
return Executors.newCachedThreadPool(tf);
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), tf);
}

private ExecutorServiceMaker(){
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/il/ac/bgu/cs/bp/bpjs/model/BProgram.java
Expand Up @@ -293,10 +293,12 @@ public BProgramSyncSnapshot setup() {
// evaluate code in order
if (prependedCode != null) {
prependedCode.forEach(s -> evaluate(s, "prependedCode"));
prependedCode = null;
}
setupProgramScope(programScope);
if (appendedCode != null) {
appendedCode.forEach(s -> evaluate(s, "appendedCode"));
appendedCode = null;
}

// setup registered b-threads
Expand Down
21 changes: 8 additions & 13 deletions src/main/java/il/ac/bgu/cs/bp/bpjs/model/BProgramSyncSnapshot.java
Expand Up @@ -6,7 +6,6 @@
import il.ac.bgu.cs.bp.bpjs.bprogramio.StubProvider;
import il.ac.bgu.cs.bp.bpjs.exceptions.BPjsRuntimeException;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BProgramJsProxy;
import il.ac.bgu.cs.bp.bpjs.execution.jsproxy.BThreadJsProxy;
import il.ac.bgu.cs.bp.bpjs.execution.tasks.ResumeBThread;
import il.ac.bgu.cs.bp.bpjs.execution.tasks.StartBThread;

Expand Down Expand Up @@ -60,18 +59,20 @@ public class BProgramSyncSnapshot {
* A listener that populates the {@link #violationRecord} field, and
* shuts down the {@link ExecutorService} running the tasks.
*/
private class HaltOnAssertion implements BPEngineTask.Listener {
private static class HaltOnAssertion implements BPEngineTask.Listener {
private final ExecutorService exSvc;
private final BProgram bprogram;
private final AtomicReference<FailedAssertion> vioRec;

public HaltOnAssertion(ExecutorService exSvc, BProgram bprogram) {
public HaltOnAssertion(ExecutorService exSvc, BProgram bprogram, AtomicReference<FailedAssertion> aViolationRecord) {
this.exSvc = exSvc;
this.bprogram = bprogram;
vioRec = aViolationRecord;
}

@Override
public void assertionFailed(FailedAssertion fa) {
violationRecord.compareAndSet(null, fa);
vioRec.compareAndSet(null, fa);
exSvc.shutdownNow();
}

Expand Down Expand Up @@ -103,13 +104,12 @@ public BProgramSyncSnapshot copyWith( List<BEvent> updatedExternalEvents ) {
*/
public BProgramSyncSnapshot start( ExecutorService exSvc ) throws InterruptedException {
Set<BThreadSyncSnapshot> nextRound = new HashSet<>(threadSnapshots.size());
BPEngineTask.Listener halter = new HaltOnAssertion(exSvc, bprog);
BPEngineTask.Listener halter = new HaltOnAssertion(exSvc, bprog, violationRecord);
nextRound.addAll(exSvc.invokeAll(threadSnapshots.stream()
.map(bt -> new StartBThread(bt, halter))
.collect(toList())
).stream().map(f -> safeGet(f) ).collect(toList())
);
// FIXME test for assertion failures
executeAllAddedBThreads(nextRound, exSvc, halter);
List<BEvent> nextExternalEvents = new ArrayList<>(getExternalEvents());
nextExternalEvents.addAll( bprog.drainEnqueuedExternalEvents() );
Expand Down Expand Up @@ -148,7 +148,7 @@ public BProgramSyncSnapshot triggerEvent(BEvent anEvent, ExecutorService exSvc,
Context.exit();
}

BPEngineTask.Listener halter = new HaltOnAssertion(exSvc, bprog);
BPEngineTask.Listener halter = new HaltOnAssertion(exSvc, bprog, violationRecord);

try {
// add the run results of all those who advance this stage
Expand Down Expand Up @@ -195,7 +195,7 @@ private void handleInterrupts(BEvent anEvent, Iterable<BProgramRunnerListener> l
try {
ctxt.callFunctionWithContinuations(func, scope, new Object[]{anEvent});
} catch ( ContinuationPending ise ) {
throw new BPjsRuntimeException("Cannot call bsync from a break-upon handler. Please consider pushing an external event.");
throw new BPjsRuntimeException("Cannot call bsync or fork from a break-upon handler. Please consider pushing an external event.");
}
});
});
Expand Down Expand Up @@ -298,16 +298,12 @@ Stream<StartFork> convertToTasks(ForkStatement fkStmt, BPEngineTask.Listener lis

// read continuation
Object cont=null;
final BThreadJsProxy btProxy = new BThreadJsProxy();
final BProgramJsProxy bpProxy = new BProgramJsProxy(bprog);

StubProvider stubPrv = (StreamObjectStub stub) -> {
if (stub == StreamObjectStub.BP_PROXY) {
return bpProxy;
}
if (stub == StreamObjectStub.BT_PROXY) {
return btProxy;
}
throw new IllegalArgumentException("Unknown stub " + stub);
};

Expand All @@ -328,7 +324,6 @@ Stream<StartFork> convertToTasks(ForkStatement fkStmt, BPEngineTask.Listener lis
cont,
null
);
btProxy.setBThread(btss);

// duplicate snapshot and register the copy with the b-program
try {
Expand Down

0 comments on commit 1a64722

Please sign in to comment.