Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More operators in Rx.Net #634

Closed
47 of 53 tasks
zsxwing opened this issue Dec 18, 2013 · 28 comments
Closed
47 of 53 tasks

More operators in Rx.Net #634

zsxwing opened this issue Dec 18, 2013 · 28 comments

Comments

@zsxwing
Copy link
Member

zsxwing commented Dec 18, 2013

I extracted the method names from RxJava and Rx.Net and compared them. I find some operators in Rx.Net(version 2.0.20823.0) do not appear in the MSDN. Do we need to implement them before 1.0? Here is the list:

strong means someone is working on it.
strikethrough means it won't be implememted.

@akarnokd
Copy link
Member

Nice list.

Catch -> onErrorResumeNext (?)
FromAsyncPattern: won't implement (#46)
Generate -> PR available (#519) still debating on return type
LastOrDefault -> BO.lastOrDefault (?)
MostRecent -> BO.mostRecent
Repeat -> PR available (#518)
StartAsync -> start (?)
Switch -> switchOnNext (?)
Throttle -> debounce

I don't recognize the rest.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 18, 2013

Updated.

StartAsync is not the start operator:

    //
    // Summary:
    //     Invokes the asynchronous function, surfacing the result through an observable
    //     sequence.  The CancellationToken is shared by all subscriptions on the resulting
    //     observable sequence. See the remarks section for more information.
    //
    // Parameters:
    //   functionAsync:
    //     Asynchronous function to run.
    //
    // Type parameters:
    //   TResult:
    //     The type of the result returned by the asynchronous function.
    //
    // Returns:
    //     An observable sequence exposing the function's result value, or an exception.
    //
    // Exceptions:
    //   System.ArgumentNullException:
    //     functionAsync is null.
    //
    // Remarks:
    //      The function is started immediately, not during the subscription of the
    //     resulting sequence.  Multiple subscriptions to the resulting sequence can
    //     observe the function's result.  If any subscription to the resulting sequence
    //     is disposed, the CancellationToken is set. The observer associated to the
    //     disposed subscription won't see the TaskCanceledException, but other observers
    //     will. You can protect against this using the Catch operator.  Be careful
    //     when handing out the resulting sequence because of this behavior. The most
    //     common use is to have a single subscription to the resulting sequence, which
    //     controls the CancellationToken state. Alternatively, you can control subscription
    //     behavior using multicast operators.
    public static IObservable<TResult> StartAsync<TResult>(Func<CancellationToken, Task<TResult>> functionAsync);

@akarnokd
Copy link
Member

Here is what's missing and what they generally do:

  • AsObservable: "Hides the identity of an observable sequence."
  • Case: "Uses a selector to determine which source to return", i.e., in onSubscribe, a Func0 selector is called and an Observable looked up in a map, then subscribed to it.
  • Chunkify: "Produces an enumerable sequence of consecutive (possibly empty) chunks of the source sequence.", i.e., implemented as source.collect(() -> new ArrayList, (lst, x) -> {lst.add(x); return lst;}, u -> new ArrayList).
  • Collect: "Produces an enumerable sequence that returns elements collected/aggregated from the source sequence between consecutive iterations.". For example, you start out with an empty list, and each onNext adds to the list. Once the iterator.next() takes the current list, it is replaced by a fresh list. Depending on the frequency of next() you might get lists with various sizes.
  • DeferAsync: "Returns an observable sequence that starts the specified asynchronous factory function whenever a new observer subscribes." Implemented as Defer(() -> StartAsync(factory).Merge()) where factory is a Func0<Task<IObservable<TValue>>>.
  • DelaySubscription: "Time shifts the observable sequence by delaying the subscription with the specified relative time duration.". Does not timeshift the unsubscribe.
  • DoWhile: "Repeats the given source as long as the specified condition holds, where the condition is evaluated after each repeated source completed.". I.e., repeat only if condition holds: do { subscribe & run } while (condition)
  • For: "Concatenates the observable sequences obtained by running the resultSelector for each element in the given enumerable source.". The difference from a plain concat is that the source iterable is consumed on demand rather than all at once.
  • ForEachAsync: "Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.", like making a whole sequence of observations into a big Future. Dual of fromFuture & toAsync in some sense.
  • FromAsync: "Converts to asynchronous function into an observable sequence. Each subscription to the resulting sequence causes the function to be started. The CancellationToken passed to the asynchronous function is tied to the observable sequence's subscription that triggered the function's invocation and can be used for best-effort cancellation.". Implemented as Defer(() -> StartAsync(functionAsync)) where functionAsync is Func<Task<TResult>> or Func<CancellationToken, Task<TResult>>.
  • If: "If the specified condition evaluates true, select the thenSource sequence. Otherwise, select the elseSource sequence.". Condition is evaluated on subscription.
  • GroupBy with capacity overloads (v2.2)
  • GroupByUntil with capacity overloads (v2.2)
  • SingleAsync: "Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence." Returns an observable with a single element or exception.
  • SingleOrDefaultAsync: "Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence."
  • StartAsync: see comment above.
  • TakeLast overload with time window (i.e., return the last 2 seconds worth of source values once it finishes).
  • TakeLastBuffer: "Returns a list with the specified number of contiguous elements from the end of an observable sequence." Has time based overloads. Almost like takeLast but emits a single list of elements.
  • Wait: "Waits for the observable sequence to complete and returns the last element of the sequence. If the sequence terminates with an OnError notification, the exception is throw." Returns T. Looks like a source.takeLast(1).toBlockingObservable().single().
  • While: "Repeats the given source as long as the specified condition holds, where the condition is evaluated before each repeated source is subscribed to." I.e., while(condition) { subscribe }.

@akarnokd
Copy link
Member

@zsxwing or anyone else; do you want to do some of these? I'll do them in batches do I don't have to rebase and restart from master all the time. I'll start with DoWhile, While, If, Case.

@akarnokd
Copy link
Member

Continuing with Chunkify, Collect, Wait and For.

Update: Wait is practically BO.last(). Since we don't usually do aliases, I'll skip Wait.

@akarnokd
Copy link
Member

Next: DeferSubscription, TakeLast, TakeLastBuffer.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 19, 2013

I can add SingleAsync and SingleOrDefaultAsync in PR #520 directly.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 19, 2013

Do we really need DeferAsync, ForEachAsync, FromAsync, StartAsync? We have no CancellationToken in Java.

@akarnokd
Copy link
Member

It appears to me that CancellationToken is used for cancelling tasks, basically a similar way you would use FutureTask and check its isCancelled(), but since these tokens come from the outside, you can signal cancellations to multiple tasks at once. Since we have the Subscription infrastructure for that, I'd vote for not implementing those variants.

@akarnokd
Copy link
Member

Doing AsObservable, GroupBy and GroupByUntil variants.

@headinthebox
Copy link
Contributor

Anyone want to implement all the TestSchedulers?

(CancellationToken is really only useful when you have async await).

@akarnokd
Copy link
Member

A can take those as well, unless someone else beats me to it while I do the ops mentioned above.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 19, 2013

Doing lastasync and lastordefaultasync

@akarnokd
Copy link
Member

@headinthebox what features do you miss from the current TestScheduler? I'm looking at Rx.NET sources of VirtualScheduler, TestScheduler and HistoricalScheduler, but it is not apparent to me what the functional requirements are.

Never mind, I'll start doing it. Btw, I'm starting to require the enhancements of my PRs to avoid feature duplication...

@benjchristensen
Copy link
Member

These are done:

Switch -> switchOnNext (?)
Throttle -> debounce

The "async" ones don't use the async suffix in RxJava since the non-async ones are on BlockingObservable instead. Thus some of those in the list are already implemented just without the async suffix.

@benjchristensen
Copy link
Member

@zsxwing How is the lastasync operator you say you're doing different than last (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L5740) that is already implemented? It is non-blocking.

We have last and takeLast.

last() 
          Returns an Observable that emits the last item emitted by the source or an IllegalArgumentException if the source Observable is empty.

takeLast(int count) 
          Returns an Observable that emits only the last count items emitted by the source Observable.

Difference between takeLast(1) and last() just being the IllegalArgumentException on take. You were involved in both of these I though so I'm confused why we're pursuing lastasync.

@benjchristensen
Copy link
Member

What is a use case for asObservable to "hide identity"?

@akarnokd
Copy link
Member

Rx.NET uses it all around the place to hide subjects. @headinthebox might explain it better.

@benjchristensen
Copy link
Member

Interesting, though what value is there in "hiding" it? If it is returned as an Observable, not a Subject people will use it as an Observable. Is it to prevent people reflecting or inspecting a type and then interacting with the Subject?

@headinthebox
Copy link
Contributor

Yup, as @benjchristensen says, it is to prevent downcasting etc. In .NET people are pretty paranoid about this. Say you have a method that returns an Iterable, but really it is an Array. Now that opens a potential (security) hole since the caller can use the Iterable as a an Array.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 20, 2013

@zsxwing How is the lastasync operator you say you're doing different than last (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L5740) that is already implemented? It is non-blocking.

Sorry. Forget it. We just need to add LastOrDefaultAsync. Hope this comment (#520 (comment)) can make these operators clear. If no problem, I can add this table to the wiki.

@akarnokd
Copy link
Member

Looking into DeferAsync, StartAsync and SingleAsync.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 20, 2013

Looking into DeferAsync, StartAsync and SingleAsync.

SingleAsync has already done in #520

@akarnokd
Copy link
Member

Doing ForEachAsync and FromAsync and probably will amend PR #645.

@benjchristensen
Copy link
Member

The fromAsync behavior seems to be covered in from overloads that take a Scheduler, such as this: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L753

@akarnokd
Copy link
Member

No, this FromAsync is a completely different operation: it uses a factory to get a future of a value which is then observed; per subscriber.

@DavidMGross
Copy link
Collaborator

Already added the table to the wiki:
https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#appendix-similar-blocking-and-non-blocking-operators

On Thu, Dec 19, 2013 at 5:57 PM, Shixiong Zhu notifications@github.comwrote:

@zsxwing https://github.com/zsxwing How is the lastasync operator you
say you're doing different than last (
https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L5740)
that is already implemented? It is non-blocking.

Sorry. Forgot it. We just need to add LastOrDefaultAsync. Hope this
comment (#520 (comment)#520 (comment))
can make these operators clear. If no problem, I can add this table to the
wiki.


Reply to this email directly or view it on GitHubhttps://github.com//issues/634#issuecomment-30983749
.

David M. Gross
PLP Consulting

@benjchristensen
Copy link
Member

@headinthebox and I reviewed the list. All remaining items have been struck out as they should not be implemented.

Great work getting all the others done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants