Skip to content

Commit

Permalink
Fixing bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
rpeach-sag committed Nov 20, 2018
1 parent e81556a commit 5e2001b
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 63 deletions.
1 change: 0 additions & 1 deletion src/rx/interfaces/ISubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ event ISubject {
action<any> next;
action<any> error;
action<> complete;
action<> dispose;

action<> returns IObservable asIObservable;

Expand Down
5 changes: 2 additions & 3 deletions src/rx/objects/BehaviorSubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,18 @@ event BehaviorSubject {
action<any> next;
action<any> error;
action<> complete;
action<> dispose;

static action create(any initialValue) returns ISubject {
BehaviorSubjectHandler h := BehaviorSubjectHandler.create(initialValue);
// Can't pull from a subject so a behavior subject is hot
BehaviorSubject s := BehaviorSubject(Observable._create(), h.next, h.error, h.complete, h.dispose);
BehaviorSubject s := BehaviorSubject(Observable._create(), h.next, h.error, h.complete);
s.observable.onConnection := h.onConnection;
return s.asISubject();
}

action asISubject() returns ISubject {
return ISubject(self,
next, error, complete, dispose, asIObservable,
next, error, complete, asIObservable,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatestFromInstance, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeatObservable, observable.flatMap, observable.mergeFromInstance, observable.zipFromInstance, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concatFromInstance, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqualFromInstance, observable.ambFromInstance, observable.raceFromInstance, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequenceFromInstance, observable.withLatestFromToSequence, observable.zipToSequenceFromInstance, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty);
}

Expand Down
13 changes: 2 additions & 11 deletions src/rx/objects/internals/BaseRepeatToFirstSubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ event RepeatToFirstSubjectHandler {
isComplete := true;
subscribers.clear();
}

action dispose() {
IObserver s;
for s in subscribers.values() {
s.unsubscribe();
s.disconnect();
}
}
}

/** @private */
Expand All @@ -108,19 +100,18 @@ event BaseRepeatToFirstSubject {
action<any> next;
action<any> error;
action<> complete;
action<> dispose;

static action create(IObservable observable, action<action<IObserver> returns ISubscription> setOnConnection) returns ISubject {
RepeatToFirstSubjectHandler h := RepeatToFirstSubjectHandler.create();
// Can't pull from a subject so the observable is hot
BaseRepeatToFirstSubject s := BaseRepeatToFirstSubject(observable, h.next, h.error, h.complete, h.dispose);
BaseRepeatToFirstSubject s := BaseRepeatToFirstSubject(observable, h.next, h.error, h.complete);
setOnConnection(h.onConnection);
return s.asISubject();
}

action asISubject() returns ISubject {
return ISubject(self,
next, error, complete, dispose, asIObservable,
next, error, complete, asIObservable,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty);
}

Expand Down
13 changes: 2 additions & 11 deletions src/rx/objects/internals/BaseReplaySubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,6 @@ event ReplaySubjectHandler {
s.disconnect();
}
}

action dispose() {
IObserver s;
for s in subscribers.values() {
s.unsubscribe();
s.disconnect();
}
}
}

/** @private */
Expand All @@ -104,19 +96,18 @@ event BaseReplaySubject {
action<any> next;
action<any> error;
action<> complete;
action<> dispose;

static action create(IObservable observable, action<action<IObserver> returns ISubscription> setOnConnection, integer replayCount) returns ISubject {
ReplaySubjectHandler h := ReplaySubjectHandler.create(replayCount);
// Can't pull from a subject so the observable is hot
BaseReplaySubject s := BaseReplaySubject(observable, h.next, h.error, h.complete, h.dispose);
BaseReplaySubject s := BaseReplaySubject(observable, h.next, h.error, h.complete);
setOnConnection(h.onConnection);
return s.asISubject();
}

action asISubject() returns ISubject {
return ISubject(self,
next, error, complete, dispose, asIObservable,
next, error, complete, asIObservable,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty);
}

Expand Down
13 changes: 2 additions & 11 deletions src/rx/objects/internals/BaseSubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ event SubjectHandler {
isComplete := true;
subscribers.clear();
}

action dispose() {
IObserver s;
for s in subscribers.values() {
s.unsubscribe();
s.disconnect();
}
}
}

/** @private */
Expand All @@ -96,19 +88,18 @@ event BaseSubject {
action<any> next;
action<any> error;
action<> complete;
action<> dispose;

static action create(IObservable observable, action<action<IObserver> returns ISubscription> setOnConnection) returns ISubject {
SubjectHandler h := SubjectHandler.create();
// Can't pull from a subject so the observable is hot
BaseSubject s := BaseSubject(observable, h.next, h.error, h.complete, h.dispose);
BaseSubject s := BaseSubject(observable, h.next, h.error, h.complete);
setOnConnection(h.onConnection);
return s.asISubject();
}

action asISubject() returns ISubject {
return ISubject(self,
next, error, complete, dispose, asIObservable,
next, error, complete, asIObservable,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty);
}

Expand Down
8 changes: 0 additions & 8 deletions src/rx/objects/internals/BehaviorSubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,4 @@ event BehaviorSubjectHandler {
isComplete := true;
subscribers.clear();
}

action dispose() {
IObserver s;
for s in subscribers.values() {
s.unsubscribe();
s.disconnect();
}
}
}
3 changes: 1 addition & 2 deletions src/rx/operators/internals/Publish.mon
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ event PublishHandler {
ifpresent upstreamSubscriber {
upstreamSubscriber.unsubscribe();
}
publishSubject.dispose();
publishSubject := publishSubject; // Note: If we want to support: action<> returns ISubject, then we'd do it here
publishSubject := publishSubject; // Note: If we want to support: action<> returns ISubject, then we'd do it here (although we might have to add some way to copy subscribers from the old to the new subject)
upstreamSubscriber := new optional<IObserver>;
upstreamSubscription := new ISubscription;
}
Expand Down
2 changes: 1 addition & 1 deletion src/rx/operators/internals/RefCount.mon
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ event RefCountOnConnection {
refCount := refCount + 1;
subscriber.onUnsubscribe(decrementRefCount);
ISubscription subscription := parentOnConnection(subscriber);
if connection.disposed() {
if connection.disposed() and refCount > 0 { // Note: the refCount may drop to zero after subscribing so we need to check before connecting
connection := connect();
}
return subscription;
Expand Down
35 changes: 20 additions & 15 deletions test/tests/Operators/ConnectAndDispose/Async/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,36 @@ monitor TestObservable {
Test test := Test("TestResult");

boolean isConnected := false;
boolean isDisposed := false;
IDisposable connection;

action onload() {
on utils.KeepAliveUntilTerminated() {}
IObservable connectableObservable := Observable.interval(0.1)
.take(6)
.map(makeSureNotCalledUntilConnected)
.publish();

any subscription := connectableObservable.subscribe(ExpectValues.createNoComplete([<any>0,1,2,3,4],onTeardown, test.fail));
any subscription := connectableObservable.subscribe(ExpectValues.create([<any>0,1,2,0,1,0,1,2,3,4,5], test.complete, test.fail));

on wait(0.1) {
isConnected := true;
IDisposable connection := connectableObservable.connect();
on wait(0.5) {
isDisposed := true;
connection.dispose();
}
connection := connectableObservable.connect();
}
on wait(0.4) {
isConnected := false;
connection.dispose();
}
on wait(0.5) {
isConnected := true;
connection := connectableObservable.connect();
}
on wait(0.7) {
isConnected := false;
connection.dispose();
}
on wait(0.8) {
isConnected := true;
connection := connectableObservable.connect();
}
}

Expand All @@ -37,12 +50,4 @@ monitor TestObservable {
}
return value;
}

action onTeardown() {
if isDisposed {
test.complete();
} else {
test.fail("Torn down without being disposed");
}
}
}

0 comments on commit 5e2001b

Please sign in to comment.