Skip to content

Commit

Permalink
Merge c7e1784 into 8f10b18
Browse files Browse the repository at this point in the history
  • Loading branch information
rpeach-sag committed Sep 21, 2018
2 parents 8f10b18 + c7e1784 commit fc87be3
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions src/rx/operators/internals/PipeOn.mon
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ event Spawned {
string channel;
}

/*
* Spawns a worker context on first connection and discards after the subscriber count reaches 0, recreating if anyone connects later
*/
/** @private */
event PipeOnOnConnection {
action<IObserver> returns ISubscription parentOnConnection;
string upstreamChannel;
string downstreamChannel;
context downstreamContext;
context otherContext;
sequence<action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription> pipeModifiers;
optional<IDisposable> upstream;
boolean downstreamCreated;
boolean downstreamBeingCreated;
boolean spawned;
boolean spawning;
integer subscriberCount;

static action create(action<IObserver> returns ISubscription parentOnConnection, context downstreamContext, sequence<action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription> pipeModifiers) returns action<IObserver> returns ISubscription {
static action create(action<IObserver> returns ISubscription parentOnConnection, context otherContext, sequence<action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription> pipeModifiers) returns action<IObserver> returns ISubscription {
string uniqueId := integer.getUnique().toString();
PipeOnOnConnection o := PipeOnOnConnection(parentOnConnection, "PipeOnUpstreamChannel" + uniqueId, "PipeOnDownstreamChannel" + uniqueId, downstreamContext, pipeModifiers, new optional<IDisposable>, false, false, 0);
PipeOnOnConnection o := PipeOnOnConnection(parentOnConnection, "PipeOnUpstreamChannel" + uniqueId, "PipeOnDownstreamChannel" + uniqueId, otherContext, pipeModifiers, new optional<IDisposable>, false, false, 0);
return o.onConnection;
}

Expand All @@ -58,17 +61,15 @@ event PipeOnOnConnection {
subscriber.onUnsubscribe(decrementSubscriberCountAndDisposeIfNecessary);
subscriberCount := subscriberCount + 1;

string downstreamChannel := "PipeOnUpstreamChannel" + integer.getUnique().toString();

if not downstreamCreated and not downstreamBeingCreated {
downstreamBeingCreated := true;
if not spawned and not spawning {
spawning := true;
spawnToContext(downstreamChannel);
}
if downstreamBeingCreated {
if spawning {
monitor.subscribe(downstreamChannel);
on Spawned(channel = downstreamChannel) {
downstreamBeingCreated := false;
downstreamCreated := true;
spawning := false;
spawned := true;
ISubscription s := (<action<string> returns IObservable> any.newInstance("com.industry.rx_epl.Observable").getAction("observeFromChannel"))(downstreamChannel).connectObserver(subscriber);
}
} else {
Expand All @@ -84,7 +85,7 @@ event PipeOnOnConnection {
}

action disposeIfNecessary() {
if downstreamBeingCreated {
if spawning {
on wait(1.0) {
disposeIfNecessary();
}
Expand All @@ -96,15 +97,15 @@ event PipeOnOnConnection {
}
upstream := new IDisposable;

if downstreamCreated {
if spawned {
send Dispose(downstreamChannel) to downstreamChannel;
}
downstreamCreated := false;
spawned := false;
}
}

action spawnToContext(string downstreamChannel) {
spawn runOnOtherContext(downstreamChannel) to downstreamContext;
spawn runOnOtherContext(downstreamChannel) to otherContext;
}

action runOnOtherContext(string downstreamChannel) {
Expand Down

0 comments on commit fc87be3

Please sign in to comment.