Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce an empty observable #423

Closed
zsxwing opened this issue Oct 7, 2013 · 25 comments
Closed

Reduce an empty observable #423

zsxwing opened this issue Oct 7, 2013 · 25 comments

Comments

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2013

Hi,

In RxJava, reduce(Func2<T, T, T> accumulator) may be not implemented correctly. Now reduce is implemented using scan. When reducing on empty observable, it will invoke onCompleted and do nothing. This is against my expectation. I suppose that reducing an empty observable should throw an exception.

Actually, Scan and Aggregate(I think this is reduce in C#) have different behaviors in C#. Scan an empty observable will do nothing, but Aggregate will throw a System.InvalidOperationException.

Here are my test codes in C#.
Scan:

Observable.Empty<int>().Scan((x, y) => x + y).Subscribe(
                x => 
                    Console.WriteLine("subscriber got " + x)
            );

Aggregate:

Observable.Empty<int>().Aggregate((x, y) => x + y).Subscribe(
                x => 
                    Console.WriteLine("subscriber got " + x)
            );

I also tried the reduce method in other languages.

List[Int]().reduce(_ + _) will throw
java.lang.UnsupportedOperationException: empty.reduceLeft in scala.

reduce(lambda x, y: x + y, []) will throw reduce() of empty sequence with no initial value in python.

If reducing an empty observable throws an exception, we can implement min and max by reduce directly.

@headinthebox
Copy link
Contributor

@zsxwing, Thanks for diving into this.

(aside: When you write snippets like this it is a good idea to also look at onCompleted and onError. Aggregate does not "throw" an exception, but sends out an onError with an exception. And return the empty sequence by immediately calling onCompleted, which is not the same as "do nothing". Subtle, but important to be super precise as we look at the edge cases)

var xs = new[]{ 1, 2 , 3}.ToObservable();
var ys = new int[]{}.ToObservable();

xs.Aggregate(accumulator: (x,y)=>x+y).Subscribe
( x => Console.WriteLine("[1, 2, 3].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [1, 2, 3].Aggregate((x,y)=>x+1) = 6
// Done

ys.Aggregate(accumulator: (x,y)=>x+y).Subscribe
( x => Console.WriteLine("[].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// Boom = System.InvalidOperationException: Sequence contains no elements.

xs.Aggregate(seed: 0, accumulator: (x,y)=>x+y).Subscribe
(x => Console.WriteLine("[1, 2, 3].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [1, 2, 3].Aggregate((x,y)=>x+1) = 6
// Done

ys.Aggregate(seed: 0, accumulator:(x,y)=>x+y).Subscribe
(x => Console.WriteLine("[].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [].Aggregate((x,y)=>x+1) = 0
// Done

xs.Aggregate(seed: 0, accumulator: (x,y)=>x+y, resultSelector: x => string.Format("[{0}]", x)).Subscribe
( x => Console.WriteLine("[1, 2, 3].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [1, 2, 3].Aggregate((x,y)=>x+1) = [6]
// Done

ys.Aggregate(seed: 0, accumulator:(x,y)=>x+y, resultSelector: x => string.Format("[{0}]", x)).Subscribe
( x => Console.WriteLine("[].Aggregate((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [].Aggregate((x,y)=>x+1) = [0]
// Done

@headinthebox
Copy link
Contributor

No, looking at scan, I would say that scan in Rx.NET is broken.

In Haskell, the seed in a scan is sent out as well

Prelude> scanl (+) 0 []
[0]

Prelude> scanl (+) 0 [1,2,3]
[0,1,3,6]

With no seed, the results look like this

Prelude> scanl1 (+) [1,2,3]
[1,3,6]

Prelude> scanl1 (+) []
[]

But in C#, the seed is not send out.

var xs = new[]{ 1, 2 , 3}.ToObservable();
var ys = new int[]{}.ToObservable();

xs.Scan(accumulator: (x,y)=>x+y).Subscribe
( x => Console.WriteLine("[1, 2, 3].Scan((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [1, 2, 3].Scan((x,y)=>x+1) = 1
// [1, 2, 3].Scan((x,y)=>x+1) = 3
// [1, 2, 3].Scan((x,y)=>x+1) = 6
// Done

ys.Scan(accumulator: (x,y)=>x+y).Subscribe
( x => Console.WriteLine("[].Scan((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// Done

xs.Scan(seed: 0, accumulator: (x,y)=>x+y).Subscribe
(x => Console.WriteLine("[1, 2, 3].Scan((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// [1, 2, 3].Scan((x,y)=>x+1) = 1
// [1, 2, 3].Scan((x,y)=>x+1) = 3
// [1, 2, 3].Scan((x,y)=>x+1) = 6
// Done

ys.Scan(seed: 0, accumulator:(x,y)=>x+y).Subscribe
(x => Console.WriteLine("[].Scan((x,y)=>x+1) = {0}", x)
, e => Console.WriteLine("Boom = {0}", e)
, () => Console.WriteLine("Done")
);

// Done

@headinthebox
Copy link
Contributor

I'd say we define for once and for all that (see http://www.haskell.org/hoogle/?hoogle=scanl)

xs.aggregate(op) = xs.scan(op).lastasync()
xs.aggregate(e, op) = xs.scan(e, op).lastasync()
xs.aggregate(e, op, f) = xs.scan(e, op).lastasync().map(f)

@headinthebox
Copy link
Contributor

RxJava misses .LastAsync() and instead uses takeLast(n), which behaves differently.

var xs = Observable.Empty();
xs.TakeLast(1).Dump(); // onCompleted()
xs.LastAsync().Dump();// onError(... invalidOperation exception ...)

@zsxwing
Copy link
Member Author

zsxwing commented Nov 4, 2013

I found http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.aspx was out of date. Now Rx.Net has a blocking version of Last and First, and a unblocking version of LastAsync and FirstAsync.

Here is the document I copy from VS:

        // Summary:
        //     Returns the first element of an observable sequence.
        //
        // Parameters:
        //   source:
        //     Source observable sequence.
        //
        // Type parameters:
        //   TSource:
        //     The type of the elements in the source sequence.
        //
        // Returns:
        //     Sequence containing the first element in the observable sequence.
        //
        // Exceptions:
        //   System.ArgumentNullException:
        //     source is null.
        //
        //   System.InvalidOperationException:
        //     (Asynchronous) The source sequence is empty.
        public static IObservable<TSource> FirstAsync<TSource>(this IObservable<TSource> source);
        //
        // Summary:
        //     Returns the first element of an observable sequence that satisfies the condition
        //     in the predicate.
        //
        // Parameters:
        //   source:
        //     Source observable sequence.
        //
        //   predicate:
        //     A predicate function to evaluate for elements in the source sequence.
        //
        // Type parameters:
        //   TSource:
        //     The type of the elements in the source sequence.
        //
        // Returns:
        //     Sequence containing the first element in the observable sequence that satisfies
        //     the condition in the predicate.
        //
        // Exceptions:
        //   System.ArgumentNullException:
        //     source or predicate is null.
        //
        //   System.InvalidOperationException:
        //     (Asynchronous) No element satisfies the condition in the predicate. -or-
        //     The source sequence is empty.
        public static IObservable<TSource> FirstAsync<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate);

        //
        // Summary:
        //     Returns the last element of an observable sequence.
        //
        // Parameters:
        //   source:
        //     Source observable sequence.
        //
        // Type parameters:
        //   TSource:
        //     The type of the elements in the source sequence.
        //
        // Returns:
        //     Sequence containing the last element in the observable sequence.
        //
        // Exceptions:
        //   System.ArgumentNullException:
        //     source is null.
        //
        //   System.InvalidOperationException:
        //     (Asynchronous) The source sequence is empty.
        public static IObservable<TSource> LastAsync<TSource>(this IObservable<TSource> source);
        //
        // Summary:
        //     Returns the last element of an observable sequence that satisfies the condition
        //     in the predicate.
        //
        // Parameters:
        //   source:
        //     Source observable sequence.
        //
        //   predicate:
        //     A predicate function to evaluate for elements in the source sequence.
        //
        // Type parameters:
        //   TSource:
        //     The type of the elements in the source sequence.
        //
        // Returns:
        //     Sequence containing the last element in the observable sequence that satisfies
        //     the condition in the predicate.
        //
        // Exceptions:
        //   System.ArgumentNullException:
        //     source or predicate is null.
        //
        //   System.InvalidOperationException:
        //     (Asynchronous) No element satisfies the condition in the predicate. -or-
        //     The source sequence is empty.
        public static IObservable<TSource> LastAsync<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate);

@zsxwing
Copy link
Member Author

zsxwing commented Nov 4, 2013

@benjchristensen do we need to add the new Rx.Net interfaces?

@headinthebox
Copy link
Contributor

I have an implementation for this ready. This will fax the issue for reduce/scan. Did you see the comments.

Sent from my iPad

On Nov 4, 2013, at 3:51 AM, Shixiong Zhu notifications@github.com wrote:

@benjchristensen do we need to add the new Rx.Net interfaces?


Reply to this email directly or view it on GitHub.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 4, 2013

@headinthebox sorry, I mean that do we need to add these Rx.Net interfaces (lastAsync, firstAsync) into RxJava.

@headinthebox
Copy link
Contributor

@zsxwing Yes, that's what I meant. I have an implementation for them, but then recursively found some bugs in other operators.

@benjchristensen
Copy link
Member

@zsxwing @headinthebox

We already have a first operator in Observable that is async: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L3919

We solved the problem of duplicate naming for blocking/non-blocking in RxJava by separating all blocking operators into the BlockingObservable class so it is explicit. Everything on Observable is non-blocking and returns an Observable except for subscribe which returns Subscription.

The takeLast operator could have a last method aliased to takeLast(1) in Observable.

@benjchristensen
Copy link
Member

RxJava misses .LastAsync() and instead uses takeLast(n), which behaves differently.
var xs = Observable.Empty();
xs.TakeLast(1).Dump(); // onCompleted()
xs.LastAsync().Dump();// onError(... invalidOperation exception ...)

That seems non-obvious why 2 variants of last should behave differently regarding error propagation.

Why should lastAsync behave differently than takeLast(1) when last means "take the last value". Should takeLast(1) emit an error since you have asked for more than are available? Same with takeLast(5) if less than 5 are available? If wanted without errors, then takeUpToLast(5) would take 0-5 items, but takeLast(5) would only take 5 or error.

@samuelgruetter
Copy link
Contributor

I just realized that the question "what should the operator do if the source observable does not have enough elements?" should be asked for some other operators as well:

  • elementAt
  • reduce(Func2<T, T, T>) and aggregate(Func2<T, T, T>)
  • average{Longs/Floats/Doubles}
  • min
  • max
  • skip
  • first (2 overloads) and takeFirst (2 overloads)
  • take
  • takeLast
  • skipLast

I'd prefer that onError be called, instead of ignoring that there were not enough elements. That's also what scala collections do.

And if takeLast on an empty observable calls onError, reduce can nicely be implemented using scan.

@benjchristensen
Copy link
Member

@headinthebox I am not aware of a rule that covers this, can you provide an answer for @samuelgruetter's question? We can then go build the unit tests and get code to match the rule.

@headinthebox
Copy link
Contributor

elementAt ==> OnError("out of range")

reduce(Func2) and aggregate(Func2) ==> that's what started this discussion, see above.
average{Longs/Floats/Doubles} ==> same, defined as reduce.
min ==> ==> same, defined as reduce.
max ==> same, defined as reduce.
skip ==> onCompleted()
first (2 overloads) ==> throws for blocking observable, onError("empty sequence")
and takeFirst (2 overloads) ==> as many elements as it can get, ending with onCompleted.
take ==> I think this is an alias for takeFirst.

takeLast ==> as many elements as it can get, ending with onCompleted.
skipLast ==> onCompleted

@headinthebox
Copy link
Contributor

The behavior in Rx.NET is the same as in Haskell, except for the bug in scan discussed up here.

Prelude> take 5 [1..3]
[1,2,3]

etc.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 7, 2013

I reviewed these operators in RxJava. Here is my conclusion.

Please correct me if I have misunderstood something.

@benjchristensen
Copy link
Member

I have completed the blocking and non-blocking last impls in #470

@benjchristensen
Copy link
Member

@zsxwing Are you working on any of these others already so we don't duplicate effort?

@benjchristensen
Copy link
Member

Actually, seems like reduce is the only one needing fixing as average will fix itself once reduce is done.

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 7, 2013
This fixes issue ReactiveX#423

The fix is based on this comment by @headinthebox: ReactiveX#423 (comment)
@benjchristensen
Copy link
Member

The reduce operator is now fixed for the empty Observable case.

When the Observable is empty but a seed is passed in, it just emits the seed:

    /**
     * A reduce on an empty Observable and a seed should just pass the seed through.
     * 
     * This is confirmed at https://github.com/Netflix/RxJava/issues/423#issuecomment-27642456
     */
    @Test
    public void testReduceWithEmptyObservableAndSeed() {
        Observable<Integer> observable = Observable.range(1, 0);
        int value = observable.reduce(1, new Func2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }

        }).toBlockingObservable().last();

        assertEquals(1, value);
    }

Please confirm that current code and unit test match the expected behavior and then we can close this ... or we correct again.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 8, 2013

There is still a difference between Average in RxJava and Rx.Net when the observable is empty.

This is a unit test for Average in RxJava.

    @Test
    public void testEmptyAverage() throws Throwable {
        Observable<Integer> src = Observable.empty();
        average(src).subscribe(w);

        verify(w, never()).onNext(anyInt());
        verify(w, times(1)).onError(any(ArithmeticException.class));
        verify(w, never()).onCompleted();
    }

If the observable is empty, average emits an ArithmeticException.

Here is a sample for Average in Rx.Net.

            IObservable<double> o = Observable.Empty<int>().Average();
            o.Subscribe(
                x => Console.WriteLine("OnNext: " + x),
                e => Console.WriteLine("OnError: " + e),
                () => Console.WriteLine("Done")
                );

It outputs "OnError: System.InvalidOperationException: Sequence contains no elements.".

@zsxwing
Copy link
Member Author

zsxwing commented Nov 8, 2013

@benjchristensen Now I'm only working on min and max operators.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 8, 2013

I sent the PR #478 for min and max.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 8, 2013

I fixed the Average issue for the empty Observable case in #480. Please help me confirm it.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 24, 2013

Closed as it's fixed.

@zsxwing zsxwing closed this as completed Dec 24, 2013
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
This fixes issue ReactiveX#423

The fix is based on this comment by @headinthebox: ReactiveX#423 (comment)
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
* Updated Spring Boot from 1.5.19 to 1.5.20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants