Skip to content

Commit

Permalink
Merge 8d28b1c into b425920
Browse files Browse the repository at this point in the history
  • Loading branch information
rpeach-sag committed Dec 11, 2018
2 parents b425920 + 8d28b1c commit 2a7e37f
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 13 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
sudo: false

cache:
directories:
- softwareag
Expand Down
21 changes: 21 additions & 0 deletions docs/interfaces/IObservable.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Observable.fromValues([1,2,3,4)
* [Conditional](#conditional)
* [Contains](#contains)/[Every](#every)
* [SequenceEqual](#sequenceequal)
* [IsEmpty](#isempty)
* [Amb](#amb)/[Race](#race)
* [DefaultIfEmpty](#defaultifempty)/[SwitchIfEmpty](#switchifempty)
* [Math and Aggregation](#math-and-aggregation)
Expand Down Expand Up @@ -1632,6 +1633,26 @@ Observable.interval(1.0).take(4)
// Output: true
```

<a name="isempty" href="#isempty">#</a> .**isEmpty**() returns [IObservable](#iobservable)\<boolean> [<>](/src/rx/operators/IsEmpty.mon "Source")

Check if the observable completes without emitting any values.

Note: It will return false immediately if any values are received.

```javascript
Observable.fromValues([1,2,3])
.isEmpty()
...

// Output: false

Observable.empty()
.isEmpty()
...

// Output: true
```

<a name="amb" href="#amb">#</a> .**amb**(*`others:` sequence<[IObservable](#iobservable)<[T](/docs/README.md#wildcard-class-notation)>>*) returns [IObservable](#iobservable)\<[T](/docs/README.md#wildcard-class-notation)> [<>](/src/rx/operators/Amb.mon "Source")<br/>
<a name="race" href="#race">#</a> .**race**(*`others:` sequence<[IObservable](#iobservable)<[T](/docs/README.md#wildcard-class-notation)>>*) returns [IObservable](#iobservable)\<[T](/docs/README.md#wildcard-class-notation)> [<>](/src/rx/operators/Amb.mon "Source")

Expand Down
1 change: 1 addition & 0 deletions src/rx/interfaces/IObservable.mon
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,5 @@ event IObservable {
action<> returns IObservable sortDesc;
action<IObservable> returns IObservable switchIfEmpty;
action<> returns IObservable concatAll;
action<> returns IObservable isEmpty;
}
1 change: 1 addition & 0 deletions src/rx/interfaces/ISubject.mon
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,5 @@ event ISubject {
action<> returns IObservable sortDesc;
action<IObservable> returns IObservable switchIfEmpty;
action<> returns IObservable concatAll;
action<> returns IObservable isEmpty;
}
9 changes: 8 additions & 1 deletion src/rx/objects/Observable.mon
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ using com.industry.rx_epl.operators.GroupBy;
using com.industry.rx_epl.operators.GroupByField;
using com.industry.rx_epl.operators.GroupByWindow;
using com.industry.rx_epl.operators.IgnoreElements;
using com.industry.rx_epl.operators.IsEmpty;
using com.industry.rx_epl.operators.Last;
using com.industry.rx_epl.operators.Let;
using com.industry.rx_epl.operators.Map;
Expand Down Expand Up @@ -1125,6 +1126,12 @@ event Observable {
return o.asIObservable();
}

action isEmpty() returns IObservable {
Observable o := _create();
o.onConnection := IsEmpty.create()(onConnection);
return o.asIObservable();
}

static action invalidConnect() returns IDisposable {
throw Exception("Can only connect a published Observable", "UnsupportedOperation");
}
Expand All @@ -1139,6 +1146,6 @@ event Observable {

action asIObservable() returns IObservable {
return IObservable(self,
connectObserver, subscribe, map, publish, connect, refCount, share, take, takeLast, first, last, skip, skipLast, filter, scan, scanWithInitial, distinct, distinctBy, combineLatestFromInstance, withLatestFrom, delay, async, do, publishReplay, shareReplay, takeUntil, repeatObservable, flatMap, mergeFromInstance, zipFromInstance, switchOnNext, skipUntil, elementAt, reduce, reduceWithInitial, toStream, catchError, retry, observeOn, groupBy, observeToChannel, observeOnNew, every, contains, takeWhile, skipWhile, concatFromInstance, startWith, defaultIfEmpty, ignoreElements, count, sum, sumInteger, sumFloat, sumDecimal, concatString, max, maxInteger, maxFloat, maxDecimal, min, minInteger, minFloat, minDecimal, average, averageDecimal, sequenceEqualFromInstance, ambFromInstance, raceFromInstance, timestamp, updateTimestamp, timeout, getSync, getSyncOr, timeInterval, distinctUntilChanged, distinctByUntilChanged, switchMap, debounce, windowTime, throttleFirst, throttleLast, sampleTime, buffer, bufferTime, bufferCount, bufferTimeOrCount, combineLatestToSequenceFromInstance, withLatestFromToSequence, zipToSequenceFromInstance, groupByWindow, windowCount, windowTimeOrCount, sample, sampleCount, sampleTimeOrCount, bufferCountSkip, pipe, pluck, mergeAll, pipeOn, pipeOnNew, subscribeOn, subscribeOnNew, pairwise, let, groupByField, complexPipe, complexPipeOn, complexPipeOnNew, decouple, distinctByField, distinctByFieldUntilChanged, pausable, pausableBuffered, toChannel, toSortedList, toSortedListAsc, toSortedListDesc, sort, sortAsc, sortDesc, switchIfEmpty, concatAll);
connectObserver, subscribe, map, publish, connect, refCount, share, take, takeLast, first, last, skip, skipLast, filter, scan, scanWithInitial, distinct, distinctBy, combineLatestFromInstance, withLatestFrom, delay, async, do, publishReplay, shareReplay, takeUntil, repeatObservable, flatMap, mergeFromInstance, zipFromInstance, switchOnNext, skipUntil, elementAt, reduce, reduceWithInitial, toStream, catchError, retry, observeOn, groupBy, observeToChannel, observeOnNew, every, contains, takeWhile, skipWhile, concatFromInstance, startWith, defaultIfEmpty, ignoreElements, count, sum, sumInteger, sumFloat, sumDecimal, concatString, max, maxInteger, maxFloat, maxDecimal, min, minInteger, minFloat, minDecimal, average, averageDecimal, sequenceEqualFromInstance, ambFromInstance, raceFromInstance, timestamp, updateTimestamp, timeout, getSync, getSyncOr, timeInterval, distinctUntilChanged, distinctByUntilChanged, switchMap, debounce, windowTime, throttleFirst, throttleLast, sampleTime, buffer, bufferTime, bufferCount, bufferTimeOrCount, combineLatestToSequenceFromInstance, withLatestFromToSequence, zipToSequenceFromInstance, groupByWindow, windowCount, windowTimeOrCount, sample, sampleCount, sampleTimeOrCount, bufferCountSkip, pipe, pluck, mergeAll, pipeOn, pipeOnNew, subscribeOn, subscribeOnNew, pairwise, let, groupByField, complexPipe, complexPipeOn, complexPipeOnNew, decouple, distinctByField, distinctByFieldUntilChanged, pausable, pausableBuffered, toChannel, toSortedList, toSortedListAsc, toSortedListDesc, sort, sortAsc, sortDesc, switchIfEmpty, concatAll, isEmpty);
}
}
4 changes: 2 additions & 2 deletions src/rx/objects/internals/SubjectUtils.mon
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ event SubjectUtils {
static action isubjectFromSubject(any subject, action<any> next, action<any> error, action<> complete, IObservable observable, action<> returns IObservable asIObservable) returns ISubject {
return ISubject(subject,
next, error, complete, asIObservable,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty, observable.concatAll);
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty, observable.concatAll, observable.isEmpty);
}

static action iobservableFromSubject(any subject, IObservable observable) returns IObservable {
return IObservable(subject,
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty, observable.concatAll);
observable.connectObserver, observable.subscribe, observable.map, observable.publish, observable.connect, observable.refCount, observable.share, observable.take, observable.takeLast, observable.first, observable.last, observable.skip, observable.skipLast, observable.filter, observable.scan, observable.scanWithInitial, observable.distinct, observable.distinctBy, observable.combineLatest, observable.withLatestFrom, observable.delay, observable.async, observable.do, observable.publishReplay, observable.shareReplay, observable.takeUntil, observable.repeat, observable.flatMap, observable.merge, observable.zip, observable.switchOnNext, observable.skipUntil, observable.elementAt, observable.reduce, observable.reduceWithInitial, observable.toStream, observable.catchError, observable.retry, observable.observeOn, observable.groupBy, observable.observeToChannel, observable.observeOnNew, observable.every, observable.contains, observable.takeWhile, observable.skipWhile, observable.concat, observable.startWith, observable.defaultIfEmpty, observable.ignoreElements, observable.count, observable.sum, observable.sumInteger, observable.sumFloat, observable.sumDecimal, observable.concatString, observable.max, observable.maxInteger, observable.maxFloat, observable.maxDecimal, observable.min, observable.minInteger, observable.minFloat, observable.minDecimal, observable.average, observable.averageDecimal, observable.sequenceEqual, observable.amb, observable.race, observable.timestamp, observable.updateTimestamp, observable.timeout, observable.getSync, observable.getSyncOr, observable.timeInterval, observable.distinctUntilChanged, observable.distinctByUntilChanged, observable.switchMap, observable.debounce, observable.windowTime, observable.throttleFirst, observable.throttleLast, observable.sampleTime, observable.buffer, observable.bufferTime, observable.bufferCount, observable.bufferTimeOrCount, observable.combineLatestToSequence, observable.withLatestFromToSequence, observable.zipToSequence, observable.groupByWindow, observable.windowCount, observable.windowTimeOrCount, observable.sample, observable.sampleCount, observable.sampleTimeOrCount, observable.bufferCountSkip, observable.pipe, observable.pluck, observable.mergeAll, observable.pipeOn, observable.pipeOnNew, observable.subscribeOn, observable.subscribeOnNew, observable.pairwise, observable.let, observable.groupByField, observable.complexPipe, observable.complexPipeOn, observable.complexPipeOnNew, observable.decouple, observable.distinctByField, observable.distinctByFieldUntilChanged, observable.pausable, observable.pausableBuffered, observable.toChannel, observable.toSortedList, observable.toSortedListAsc, observable.toSortedListDesc, observable.sort, observable.sortAsc, observable.sortDesc, observable.switchIfEmpty, observable.concatAll, observable.isEmpty);
}
}
30 changes: 30 additions & 0 deletions src/rx/operators/IsEmpty.mon
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2018 Software AG, Darmstadt, Germany and/or its licensors
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.industry.rx_epl.operators;

using com.industry.rx_epl.IObservable;
using com.industry.rx_epl.IObserver;
using com.industry.rx_epl.ISubscription;
using com.industry.rx_epl.Subscriber;

event IsEmpty {
static action create() returns action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription {
return SequenceEqual.create([(<action<> returns IObservable> any.newInstance("com.industry.rx_epl.Observable").getAction("empty"))()]);
}
}
14 changes: 7 additions & 7 deletions src/rx/operators/internals/SequenceEqual.mon
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ event SequenceEqualSubscriberFactory {
completeSubscribersById.add(id, true);
upstreamSubscribersById[id].unsubscribe();

// If all of the subscribers are complete then the sequences were equal
if completeSubscribersById.size() = subscriberCount {
downstreamSubscriber.next(true);
downstreamSubscriber.complete();
return;
}

// Check that none of the observables contain more observables than the one that just completed
integer maxValuesCount := valuesById.getOrDefault(id).size();
sequence<any> values;
Expand All @@ -115,6 +108,13 @@ event SequenceEqualSubscriberFactory {
return;
}
}

// If all of the subscribers are complete then the sequences were equal
if completeSubscribersById.size() = subscriberCount {
downstreamSubscriber.next(true);
downstreamSubscriber.complete();
return;
}
}
}

Expand Down

0 comments on commit 2a7e37f

Please sign in to comment.