Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncRx: The observer has already terminated when using TakeUntil(...) #1524

Open
TFTomSun opened this issue Apr 22, 2021 · 0 comments
Open

Comments

@TFTomSun
Copy link

TFTomSun commented Apr 22, 2021

I know that AsyncRx is not yet officially supported, but I tried it because I'm currently working in a pure async environment. It works well for the few things I did so far, but I discovered one issue where I don't know how to solve it.

I created a drag / pan observable:

    public static IAsyncObservable<(DragPhase Phase, double DeltaX, double DeltaY)> Drag(
        this IAsyncObservable<IPointProvider> start, 
        Func<IAsyncObservable<IPointProvider>> createChange, Func<IAsyncObservable<object>> createStop)
    {
        return start.SelectMany(
            x =>
            {
                var stop = createStop();
                return createChange().Select(c => (DragPhase.Drag, c.Point.X, c.Point.Y))
                    .StartWith((DragPhase.Start, x.Point.X, x.Point.Y))
                    .CombineWithPrevious((prev, cur) => (cur.Item1, cur.X - prev.X, cur.Y - prev.Y))
                    .TakeUntil(stop.Take(1)).Append((DragPhase.Stop, default, default));
                
            });

    }

 public static IAsyncObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IAsyncObservable<TSource> source,
        Func<TSource, TSource, TResult> resultSelector)
    {
        return source.Scan(
                Tuple.Create(default(TSource), default(TSource)),
                (previous, current) => Tuple.Create(previous.Item2, current))
            .Select(t => resultSelector(t.Item1, t.Item2));
    }

On mouse up (the stop observable) I get sometimes the following exception:

System.InvalidOperationException
  HResult=0x80131509
  Message=The observer has already terminated.
  Source=System.Reactive.Async.Core
  StackTrace:
   at System.Reactive.AsyncObserverBase`1.TryEnter()
   at System.Reactive.AsyncObserverBase`1.OnNextAsync(T value)
   at System.Reactive.Linq.AsyncObserver.<>c__DisplayClass421_0`2.<<TakeUntil>b__0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.AsyncObservableBase`1.AutoDetachAsyncObserver.<OnNextAsyncCore>d__10.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.Linq.AsyncObserver.<>c__DisplayClass343_0`2.<<Select>b__0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.AsyncObservableBase`1.AutoDetachAsyncObserver.<OnNextAsyncCore>d__10.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.Linq.AsyncObserver.<>c__DisplayClass341_0`2.<<Scan>b__0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.AsyncObservableBase`1.AutoDetachAsyncObserver.<OnNextAsyncCore>d__10.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.AsyncObservableBase`1.AutoDetachAsyncObserver.<OnNextAsyncCore>d__10.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.Linq.AsyncObserver.<>c__DisplayClass343_0`2.<<Select>b__0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Reactive.AsyncObservableBase`1.AutoDetachAsyncObserver.<OnNextAsyncCore>d__10.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at FabricClassLibrary.FabricEvent`1.<>c__DisplayClass11_0.<<CreateEventStream>b__1>d.MoveNext() in 

What happens is quite clear. There is a new mouse move incoming while the mouse up already signaled the completion of the inner observable. I wonder whether there's a way to solve it. Since the exception comes from a Try.. method 'TryEnter', I adapted the AsyncRx code temporarily, so that TryEnter returns true/false for success / non success.

 private bool TryEnter(bool throwException)
        {
            var old = Interlocked.CompareExchange(ref _status, Busy, Idle);

            switch (old)
            {
                case Busy:
                {
                    if (throwException)
                    {
                        throw new InvalidOperationException("The observer is currently processing a notification.");
                    }

                    return false;
                }
                case Done:
                {
                    if (throwException)
                    {
                        throw new InvalidOperationException("The observer has already terminated.");
                    }

                    return false;
                }
            }

            return true;
        }

With that little change everything works fine without any exception. I just wonder whether I could avoid the exception somehow without modifying the AsyncRx library code itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant