Skip to content

Commit

Permalink
Fixing unsubscribing issues, also including missing Range Negative te…
Browse files Browse the repository at this point in the history
…sts (#41)

* Initial test changes

* More changes
  • Loading branch information
rpeach-sag committed Sep 26, 2018
1 parent 90d3861 commit 3635bff
Show file tree
Hide file tree
Showing 452 changed files with 3,752 additions and 82 deletions.
50 changes: 25 additions & 25 deletions src/rx/operators/internals/ObserveFrom.mon
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,21 @@ event ObserveFromOnUnsubscribe {
listener nextListener;
listener errorListener;
listener completeListener;
listener disposeListener;
listener connectionTimeout;

static action create(string channel, string connectionId, listener nextListener, listener errorListener, listener completeListener) returns action<> {
return ObserveFromOnUnsubscribe(channel, connectionId, nextListener, errorListener, completeListener).unsubscribe;
static action create(string channel, string connectionId, listener nextListener, listener errorListener, listener completeListener, listener disposeListener, listener connectionTimeout) returns action<> {
return ObserveFromOnUnsubscribe(channel, connectionId, nextListener, errorListener, completeListener, disposeListener, connectionTimeout).unsubscribe;
}

action unsubscribe() {
monitor.unsubscribe(channel);
send Unsubscribe(channel, connectionId) to channel;
nextListener.quit();
errorListener.quit();
completeListener.quit();
completeListener.quit();
disposeListener.quit();
connectionTimeout.quit();
}
}

Expand All @@ -64,36 +69,31 @@ event ObserveFromDispose {
/** @private */
event ObserveFromOnConnection {
string channel;
boolean isDisposed;

static action create(string channel) returns action<IObserver> returns ISubscription {
ObserveFromOnConnection r := ObserveFromOnConnection(channel, false);
on Dispose(channel = channel) {
r.isDisposed := true;
}
ObserveFromOnConnection r := ObserveFromOnConnection(channel);
return CreateOnConnection(r.downstreamResolver).onConnection;
}

action downstreamResolver(IResolver resolver) {
monitor.subscribe(channel);
if isDisposed {
string connectionId := "Connection" + integer.getUnique().toString();
listener nextListener := on all Next(channel = channel, connectionId = connectionId) as n and not Dispose(channel = channel) {
resolver.next(n.value);
}
listener errorListener := on Error(channel = channel, connectionId = connectionId) as e and not Dispose(channel = channel) {
resolver.error(e.error);
}
listener completeListener := on Complete(channel = channel, connectionId = connectionId) as c and not Dispose(channel = channel) {
resolver.complete();
} else {
string connectionId := "Connection" + integer.getUnique().toString();
listener nextListener := on all Next(channel = channel, connectionId = connectionId) as n and not Dispose(channel = channel) {
resolver.next(n.value);
}
listener errorListener := on Error(channel = channel, connectionId = connectionId) as e and not Dispose(channel = channel) {
resolver.error(e.error);
}
listener completeListener := on Complete(channel = channel, connectionId = connectionId) as c and not Dispose(channel = channel) {
resolver.complete();
}
on Dispose(channel = channel) {
resolver.complete();
}
resolver.onUnsubscribe(ObserveFromOnUnsubscribe.create(channel, connectionId, nextListener, errorListener, completeListener));
send Connect(channel, connectionId) to channel;
}
listener disposeListener := on Dispose(channel = channel) {
resolver.complete();
}
listener connectionTimeout := on wait(10.0) and not Connected(channel = channel, connectionId = connectionId) and not Dispose(channel = channel) {
resolver.error(Exception("Unable to connect to channel: " + channel, "ConnectionError"));
}
resolver.onUnsubscribe(ObserveFromOnUnsubscribe.create(channel, connectionId, nextListener, errorListener, completeListener, disposeListener, connectionTimeout));
send Connect(channel, connectionId) to channel;
}
}
58 changes: 48 additions & 10 deletions src/rx/operators/internals/ObserveTo.mon
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ event Connect {
string connectionId;
}

/** @private */
event Connected {
string channel;
string connectionId;
}

/** @private */
event Next {
string channel;
Expand Down Expand Up @@ -89,32 +95,64 @@ event ObserveToChannelSubscriber {
}
}

/** @private */
event ObserveToChannelUnsubscribe {
dictionary<integer, IObserver> subscribers;
integer id;
listener unsubscribeListener;
string channel;

static action create(dictionary<integer, IObserver> subscribers, integer id, listener unsubscribeListener, string channel) returns action<> {
return ObserveToChannelUnsubscribe(subscribers, id, unsubscribeListener, channel).unsubscribe;
}

action unsubscribe() {
unsubscribeListener.quit();
if subscribers.hasKey(id) {
subscribers.remove(id);
}
}
}

/** @private */
event ObserveToHandler {
action<IObserver> returns ISubscription parentOnConnect;
string channel;
dictionary<integer, IObserver> subscribers;
listener connectionListener;

static action create(action<IObserver> returns ISubscription parentOnConnect, string channel) returns IDisposable {
ObserveToHandler p := ObserveToHandler(parentOnConnect, channel);
monitor.subscribe(channel);
ObserveToHandler p := ObserveToHandler(parentOnConnect, channel, new dictionary<integer, IObserver>, new listener);
return p.init();
}

action init() returns IDisposable {
on (all Connect(channel = channel) as connection) and not Dispose(channel = channel) {
monitor.subscribe(channel);
connectionListener := on (all Connect(channel = channel) as connection) and not Dispose(channel = channel) {
send Connected(channel, connection.connectionId) to channel;
integer id := integer.incrementCounter("ObserveToSubscribers");
IObserver subscriber := ObserveToChannelSubscriber.create(channel, connection.connectionId);
on Unsubscribe(channel = channel, connectionId = connection.connectionId) or Dispose(channel = channel) {
subscriber.unsubscribe();
listener unsubscribeListener := on Unsubscribe(channel = channel, connectionId = connection.connectionId) or Dispose(channel = channel) {
if subscriber.subscribed() {
subscriber.unsubscribe();
}
}
subscriber.onUnsubscribe(ObserveToChannelUnsubscribe.create(subscribers, id, unsubscribeListener, channel));
ISubscription s := parentOnConnect(subscriber);
}
on (Dispose(channel = channel) -> Connect(channel = channel) as connection) and not (Dispose(channel) -> wait(1.0)) {
sendDispose();
}
return IDisposable(sendDispose);
return IDisposable(dispose);
}

action sendDispose() {
action dispose() {
monitor.unsubscribe(channel);
IObserver o;
for o in subscribers.values() {
if o.subscribed() {
o.unsubscribe();
}
}
subscribers.clear();
send Dispose(channel) to channel;
connectionListener.quit();
}
}
3 changes: 2 additions & 1 deletion src/rx/operators/internals/PipeOn.mon
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ event PipeOnOnConnection {
spawnToContext(downstreamChannel);
}
if spawning {
monitor.subscribe(downstreamChannel);
on Spawned(channel = downstreamChannel) {
spawning := false;
spawned := true;
Expand Down Expand Up @@ -98,6 +97,8 @@ event PipeOnOnConnection {
}
upstream := new optional<IDisposable>;

monitor.unsubscribe(downstreamChannel);

if spawned {
send Dispose(downstreamChannel) to downstreamChannel;
}
Expand Down
6 changes: 4 additions & 2 deletions src/rx/operators/internals/SubscribeOn.mon
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ event SubscribeOn {
}

action doOnOtherContext() {
handleUnsubscribe();
subscriber.onUnsubscribe(sendUnsubscribed);
handleUnsubscribe();
ISubscription s := subscribe(subscriber);
send Spawned(channel) to channel;
}
Expand All @@ -57,9 +56,12 @@ event SubscribeOn {
}

action handleUnsubscribe() {
// If either the creating context or the runner context unsubscribe then we want the other to know about it
subscriber.onUnsubscribe(sendUnsubscribed);
monitor.subscribe(channel);
on Spawned(channel = channel) and Unsubscribe(channel = channel) {
subscriber.unsubscribe();
monitor.unsubscribe(channel);
}
}
}
5 changes: 4 additions & 1 deletion src/rx/operators/internals/ToStream.mon
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ event ToStreamSubscriber {

/** @private */
event ToStream {
string channelName;
IObserver subscriber;
stream<any> outputStream;

static action create(IObservable observable) returns DisposableStream {
string channelName := "__ToStream__";
integer id := integer.getUnique();
monitor.subscribe(channelName);
ToStream toStream := ToStream(new IObserver, from a in all WrappedAnyFromChannel(channelName = channelName, uniqueId = id) select a.value);
ToStream toStream := ToStream(channelName, new IObserver, from a in all WrappedAnyFromChannel(channelName = channelName, uniqueId = id) select a.value);
toStream.subscriber := ToStreamSubscriber.create(toStream.outputStream, channelName, id, toStream.onComplete);
ISubscription s := observable.connectObserver(toStream.subscriber);
return DisposableStream(toStream.outputStream, toStream.dispose);
Expand All @@ -64,6 +65,7 @@ event ToStream {
if subscriber.subscribed() {
subscriber.unsubscribe();
outputStream.quit();
monitor.unsubscribe(channelName);
log "Stream killed" at DEBUG;
}
}
Expand All @@ -74,6 +76,7 @@ event ToStream {
on wait(0.0) {
// handle completes async to give any queued values a chance to be received
outputStream.quit();
monitor.unsubscribe(channelName);
log "Stream killed" at DEBUG;
}
}
Expand Down
1 change: 1 addition & 0 deletions test/tests/Operators/Amb/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ monitor TestObservable {
integer teardownCount := 0;

action onload() {
on utils.KeepAliveUntilTerminated() {}
// Both Sync
any discard := Observable.fromValues([<any>0,1,2,3])
.amb([Observable.fromValues([<any>4,5,6,7])])
Expand Down
15 changes: 15 additions & 0 deletions test/tests/Operators/Amb/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,25 @@ def execute(self):
# wait for test to complete
self.waitForSignal('TestResult.evt', expr="TestComplete", condition="==1", timeout=10)

# Output the engine_inspect result to a file to check for any remaining subscribed channels
correlator.inspect(filename='preTerminateInspect.txt', arguments=['-x'])

correlator.sendEventStrings('utils.KeepAliveUntilTerminated()')

# Output the engine_inspect result to a file to check for non-terminated listeners
correlator.inspect(filename='postTerminateInspect.txt', raw=True, arguments=['-x'])

def validate(self):
# check the main correlator log for Errors
self.assertGrep('correlator.log', expr=' (ERROR|FATAL) ', contains=False)

# Check that the test didn't fail
self.assertGrep('TestResult.evt', expr='TestFailed', contains=False)

# Check that there are no subscribed channels left
self.assertGrep('preTerminateInspect.txt', expr='main\\s*\\d+\\s*\\d+\\s*(\\w+)', contains=False)

# Check that there is nothing keeping the correlator alive
self.assertDiff('postTerminateInspect.txt', 'terminatedEngineInspectReference.txt', filedir2=PROJECT.UTILS_DIR)


1 change: 1 addition & 0 deletions test/tests/Operators/Async/Async/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ monitor TestObservable {
Test test := Test("TestResult");

action onload() {
on utils.KeepAliveUntilTerminated() {}
any discard := TestObservables.createAsync().take(5)
.async()
.subscribe(ExpectValues.create([<any>0.0,1.0,2.0,3.0,4.0], test.complete, test.fail));
Expand Down
15 changes: 15 additions & 0 deletions test/tests/Operators/Async/Async/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,25 @@ def execute(self):
# wait for test to complete
self.waitForSignal('TestResult.evt', expr="TestComplete", condition="==1", timeout=10)

# Output the engine_inspect result to a file to check for any remaining subscribed channels
correlator.inspect(filename='preTerminateInspect.txt', arguments=['-x'])

correlator.sendEventStrings('utils.KeepAliveUntilTerminated()')

# Output the engine_inspect result to a file to check for non-terminated listeners
correlator.inspect(filename='postTerminateInspect.txt', raw=True, arguments=['-x'])

def validate(self):
# check the main correlator log for Errors
self.assertGrep('correlator.log', expr=' (ERROR|FATAL) ', contains=False)

# Check that the test didn't fail
self.assertGrep('TestResult.evt', expr='TestFailed', contains=False)

# Check that there are no subscribed channels left
self.assertGrep('preTerminateInspect.txt', expr='main\\s*\\d+\\s*\\d+\\s*(\\w+)', contains=False)

# Check that there is nothing keeping the correlator alive
self.assertDiff('postTerminateInspect.txt', 'terminatedEngineInspectReference.txt', filedir2=PROJECT.UTILS_DIR)


1 change: 1 addition & 0 deletions test/tests/Operators/Async/Sync/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ monitor TestObservable {
boolean isAsync := false;

action onload() {
on utils.KeepAliveUntilTerminated() {}
any discard := TestObservables.createSync().take(5)
.async()
.subscribe(ExpectValues.create([<any>0.0,1.0,2.0,3.0,4.0], checkAsync, test.fail));
Expand Down
15 changes: 15 additions & 0 deletions test/tests/Operators/Async/Sync/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,25 @@ def execute(self):
# wait for test to complete
self.waitForSignal('TestResult.evt', expr="TestComplete", condition="==1", timeout=10)

# Output the engine_inspect result to a file to check for any remaining subscribed channels
correlator.inspect(filename='preTerminateInspect.txt', arguments=['-x'])

correlator.sendEventStrings('utils.KeepAliveUntilTerminated()')

# Output the engine_inspect result to a file to check for non-terminated listeners
correlator.inspect(filename='postTerminateInspect.txt', raw=True, arguments=['-x'])

def validate(self):
# check the main correlator log for Errors
self.assertGrep('correlator.log', expr=' (ERROR|FATAL) ', contains=False)

# Check that the test didn't fail
self.assertGrep('TestResult.evt', expr='TestFailed', contains=False)

# Check that there are no subscribed channels left
self.assertGrep('preTerminateInspect.txt', expr='main\\s*\\d+\\s*\\d+\\s*(\\w+)', contains=False)

# Check that there is nothing keeping the correlator alive
self.assertDiff('postTerminateInspect.txt', 'terminatedEngineInspectReference.txt', filedir2=PROJECT.UTILS_DIR)


1 change: 1 addition & 0 deletions test/tests/Operators/Average/Async/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ monitor TestObservable {
integer teardownCount := 0;

action onload() {
on utils.KeepAliveUntilTerminated() {}
any discard := Observable.fromValues([<any>1,2.0,3.0d]).async()
.average()
.subscribe(ExpectValues.create([<any>2.0], onTeardown, test.fail));
Expand Down
15 changes: 15 additions & 0 deletions test/tests/Operators/Average/Async/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,25 @@ def execute(self):
# wait for test to complete
self.waitForSignal('TestResult.evt', expr="TestComplete", condition="==1", timeout=10)

# Output the engine_inspect result to a file to check for any remaining subscribed channels
correlator.inspect(filename='preTerminateInspect.txt', arguments=['-x'])

correlator.sendEventStrings('utils.KeepAliveUntilTerminated()')

# Output the engine_inspect result to a file to check for non-terminated listeners
correlator.inspect(filename='postTerminateInspect.txt', raw=True, arguments=['-x'])

def validate(self):
# check the main correlator log for Errors
self.assertGrep('correlator.log', expr=' (ERROR|FATAL) ', contains=False)

# Check that the test didn't fail
self.assertGrep('TestResult.evt', expr='TestFailed', contains=False)

# Check that there are no subscribed channels left
self.assertGrep('preTerminateInspect.txt', expr='main\\s*\\d+\\s*\\d+\\s*(\\w+)', contains=False)

# Check that there is nothing keeping the correlator alive
self.assertDiff('postTerminateInspect.txt', 'terminatedEngineInspectReference.txt', filedir2=PROJECT.UTILS_DIR)


1 change: 1 addition & 0 deletions test/tests/Operators/Average/Sync/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ monitor TestObservable {
integer teardownCount := 0;

action onload() {
on utils.KeepAliveUntilTerminated() {}
any discard := Observable.fromValues([<any>1,2.0,3.0d])
.average()
.subscribe(ExpectValues.create([<any>2.0], onTeardown, test.fail));
Expand Down

0 comments on commit 3635bff

Please sign in to comment.