Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 0.16.0 Release Candidate #601

Closed
benjchristensen opened this issue Dec 11, 2013 · 8 comments
Closed

Version 0.16.0 Release Candidate #601

benjchristensen opened this issue Dec 11, 2013 · 8 comments

Comments

@benjchristensen
Copy link
Member

Following are the changes and release notes for 0.16.0. Since so much has changed for this release I want to give people a chance to review and try the code before releasing.

Changes:


Version 0.16.0 (Maven Central)

  • Pull 516 rxjava-string module with StringObservable
  • Pull 533 Operator: ToAsync
  • Pull 535 Fix compilation errors due to referencing the Android support library directly
  • Pull 545 Fixed Zip issue with infinite streams
  • Pull 539 Zipping a finite and an infinite Observable
  • Pull 541 Operator: SkipUntil
  • Pull 537 Add scala adapters for doOnEach operator
  • Pull 560 Add type variances for doOnEach actions
  • Pull 562 Scala Adaptor Improvements
  • Pull 563 Operator: GroupByUntil
  • Pull 561 Revised Approach to Creating Observables in Scala
  • Pull 565 Operator: GroupJoin v2
  • Pull 567 Operator: Timestamp with Scheduler
  • Pull 568 Use lock free strategy for several Subscription implementations
  • Pull 571 Operator: Sample with Observable v2
  • Pull 572 Multiple Subscriptions to ObserveOn
  • Pull 573 Removed Opening and Closing historical artifacts
  • Pull 575 Operator: SequenceEqual reimplementation
  • Pull 587 Operator: LongCount
  • Pull 586 Fix Concat to allow multiple observers
  • Pull 598 New Scala Bindings
  • Pull 596 Fix for buffer not stopping when unsubscribed
  • Pull 576 Operators: Timer and Delay
  • Pull 593 Lock-free subscriptions
  • Pull 599 Refactor rx.concurrency to rx.schedulers
  • Pull 600 BugFix: Replay Subject
  • Pull 594 Operator: Start
  • Pull 604 StringObservable.join
  • Pull 609 Operation: Timer
  • Pull 612 Operation: Replay (overloads)
  • Pull 628 BugFix: MergeDelayError Synchronization
  • Pull 602 BugFix: ObserveOn Subscription leak
  • Pull 631 Make NewThreadScheduler create Daemon threads
  • Pull 651 Subjects Refactor - Non-Blocking, Common Abstraction, Performance
  • Pull 661 Subscriptions Rewrite
  • Pull 520 BugFix: blocking/non-blocking first
  • Pull 621 Scala: SerialSubscription & From
  • Pull 626 BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
  • Pull 633 BugFix: null in toList operator
  • Pull 635 Conditional Operators
  • Pull 638 Operations: DelaySubscription, TakeLast w/ time, TakeLastBuffer
  • Pull 659 Missing fixes from the subject rewrite
  • Pull 688 Fix SafeObserver handling of onComplete errors
  • Pull 690 Fixed Scala bindings
  • Pull 693 Kotlin M6.2
  • Pull 689 Removed ObserverBase
  • Pull 664 Operation: AsObservable
  • Pull 697 Operations: Skip, SkipLast, Take with time
  • Pull 698 Operations: Average, Sum
  • Pull 699 Operation: Repeat
  • Pull 701 Operation: Collect
  • Pull 707 Module: rxjava-async-util
  • Pull 708 BugFix: combineLatest
  • Pull 712 Fix Scheduler Memory Leaks
  • Pull 714 Module: rxjava-computation-expressions
  • Pull 715 Add missing type hint to clojure example
  • Pull 717 Scala: Added ConnectableObservable
  • Pull 723 Deprecate multiple arity ‘from’
  • Pull 724 Revert use of CurrentThreadScheduler for Observable.from
  • Pull 725 Simpler computation/io naming for Schedulers
  • Pull 727 ImmediateScheduler optimization for toObservableIterable

  • This release includes breaking changes to Scala bindings. Details are below.
  • The rx.concurrency package has been renamed to rx.schedulers. Existing classes still remain in rx.concurrency but are deprecated. Use of rx.concurrency should be migrated to rx.schedulers as these deprecated classes will be removed in a future release.
  • Refactor of Subjects and Subscriptions to non-blocking implementations
  • Many bug fixes, new operators and behavior changes to match Rx.Net.
  • Deprecation of some operators due to renaming or eliminating duplicates
  • New modules: rxjava-string, rxjava-async-util and rxjava-computation-expressions for operators deemed not applicable to the core library.

Scala Release Notes


This release of the RxScala bindings builds on the previous 0.15 release to make the Rx bindings for Scala
include all Rx types. In particular this release focuses on fleshing out the bindings for the Subject and Scheduler
types, as well as aligning the constructor functions for Observable with those in the RxJava.

Expect to see ongoing additions to make the Scala binding match the equivalent underlying Java API,
as well as minor changes in the existing API as we keep fine-tuning the experience on our way to a V1.0 release.

Observer

In this release we have made the asJavaObserver property in Observable[T]as well the the factory method in the
companion object that takes an rx.Observer private to the Scala bindings package, thus properly hiding irrelevant
implementation details from the user-facing API. The Observer[T] trait now looks like a clean, native Scala type:

trait Observer[-T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def onCompleted(): Unit
}

object Observer {...}

To create an instance of a specific Observer, say Observer[SensorEvent] in user code, you can create a new instance
of the Observer trait by implementing any of the methods that you care about:

   val printObserver = new Observer[SensorEvent] {
      override def onNext(value: SensorEvent): Unit = {...value.toString...}
   }

or you can use one of the overloads of the companion Observer object by passing in implementations of the onNext,
onError or onCompleted methods.

Note that typically you do not need to create an Observer since all of the methods that accept an Observer[T]
(for instance subscribe) usually come with overloads that accept the individual methods
onNext, onError, and onCompleted and will automatically create an Observer for you under the covers.

While technically it is a breaking change make the asJavaObserver property private, you should probably not have
touched asJavaObserver in the first place. If you really feel you need to access the underlying rx.Observer
call toJava.

Observable

Just like for Observer, the Observable trait now also hides its asJavaObservable property and makes the constructor
function in the companion object that takes an rx.Observable private (but leaves the companion object itself public).
Again, while technically this is a breaking change, this should not have any influence on user code.

trait Observable[+T] {
    def subscribe(observer: Observer[T]): Subscription = {...}
    def apply(observer: Observer[T]): Subscription = {...}
    ...
}
object Observable {
   def create[T](func: Observer[T] => Subscription): Observable[T] = {...}
   ...
}

The major changes in Observable are wrt to the factory methods where too libral use of overloading of the apply
method hindered type inference and made Scala code look unnecessarily different than that in other language bindings.
All factory methods now have their own name corresponding to the Java and .NET operators
(plus overloads that take a Scheduler).

  • def from[T](future: Future[T]): Observable[T],
  • def from[T](iterable: Iterable[T]): Observable[T],
  • def error[T](exception: Throwable): Observable[T],
  • def empty[T]: Observable[T],
  • `def items[T](items: T*): Observable[T],
  • Extension method on toObservable: Observable[T] on List[T].

In the pre-release of this version, we expose both apply and create for the mother of all creation functions.
We would like to solicit feedback which of these two names is preferred
(or both, but there is a high probability that only one will be chosen).

  • def apply[T](subscribe: Observer[T]=>Subscription): Observable[T]
  • def create[T](subscribe: Observer[T] => Subscription): Observable[T]

Subject

The Subject trait now also hides the underlying Java asJavaSubject: rx.subjects.Subject[_ >: T, _<: T]
and takes only a single invariant type parameter T. all existing implementations of Subject are parametrized
by a single type, and this reflects that reality.

trait Subject[T] extends Observable[T] with Observer[T] {}
object Subject {
   def apply(): Subject[T] = {...}
}

For each kind of subject, there is a class with a private constructor and a companion object that you should use
to create a new kind of subject. The subjects that are available are:

  • AsyncSubject[T](),
  • BehaviorSubject[T](value),
  • Subject[T](),
  • ReplaySubject[T]().

The latter is still missing various overloads http://msdn.microsoft.com/en-us/library/hh211810(v=vs.103).aspx which
you can expect to appear once they are added to the underlying RxJava implementation.

Compared with release 0.15.1, the breaking changes in Subject for this release are
making asJavaSubject private, and collapsing its type parameters, neither of these should cause trouble,
and renaming PublishSubject to Subject.

Schedulers

The biggest breaking change compared to the 0.15.1 release is giving Scheduler the same structure as the other types.
The trait itself remains unchanged, except that we made the underlying Java representation hidden as above.
as part of this reshuffling, the scheduler package has been renamed from rx.lang.scala.concurrency
to rx.lang.scala.schedulers. There is a high probability that this package renaming will also happen in RxJava.

trait Scheduler {...}

In the previous release, you created schedulers by selecting them from the Schedulers object,
as in Schedulers.immediate or Schedulers.newThread where each would return an instance of the Scheduler trait.
However, several of the scheduler implementations have additional methods, such as the TestScheduler,
which already deviated from the pattern.

In this release, we changed this to make scheduler more like Subject and provide a family of schedulers
that you create using their factory function:

  • CurrentThreadScheduler(),
  • ExecutorScheduler(executor),
  • ImmediateScheduler(),
  • NewThreadScheduler(),
  • ScheduledExecutorServiceScheduler(scheduledExecutorService),
  • TestScheduler(),
  • ThreadPoolForComputationScheduler(),
  • ThreadPoolForIOScheduler().

In the future we expect that this list will grow further with new schedulers as they are imported from .NET
(http://msdn.microsoft.com/en-us/library/system.reactive.concurrency(v=vs.103).aspx).

To make your code compile in the new release you will have to change all occurrences of Schedulers.xxx
into XxxScheduler(), and import rx.lang.scala.schedulers instead of rx.lang.scala.schedulers.

Subscriptions

The Subscription trait in Scala now has isUnsubscribed as a member, effectively collapsing the old Subscription
and BooleanSubscription, and the latter has been removed from the public surface. Pending a bug fix in RxJava,
SerialSubscription implements its own isUnsubscribed.

trait Subscription {
  def unsubscribe(): Unit = { ... }
  def isUnsubscribed: Boolean = ...
}

object Subscription {...}

To create a Subscription use one of the following factory methods:

  • Subscription{...}, Subscription(),

  • CompositeSubscription(subscriptions),

  • MultipleAssignmentSubscription(),

  • SerialSubscription().

    In case you do feel tempted to call new Subscription{...} directly make sure you wire up isUnsubscribed
    and unsubscribe() properly, but for all practical purposes you should just use one of the factory methods.

Notifications

All underlying wrapped Java types in the Notification trait are made private like all previous types. The companion
objects of Notification now have both constructor (apply) and extractor (unapply) functions:

object Notification {...}
trait Notification[+T] {
   override def equals(that: Any): Boolean = {...}
   override def hashCode(): Int = {...}
   def apply[R](onNext: T=>R, onError: Throwable=>R, onCompleted: ()=>R): R = {...}
}

The nested companion objects of Notification now have both constructor (apply) and extractor (unapply) functions:

object Notification {
   object OnNext { def apply(...){}; def unapply(...){...} }
   object OnError { def apply(...){}; def unapply(...){...} }
   object OnCompleted { def apply(...){}; def unapply(...){...} }
}

To construct a Notification, you import rx.lang.scala.Notification._ and use OnNext("hello"),
or OnError(new Exception("Oops!")), or OnCompleted().

To pattern match on a notification you create a partial function like so: case Notification.OnNext(v) => { ... v ... },
or you use the apply function to pass in functions for each possibility.

There are no breaking changes for notifications.

Java Interop Helpers

Since the Scala traits wrap the underlying Java types, yoo may occasionally will have to wrap an unwrap
between the two representations. The JavaConversion object provides helper functions of the form toJavaXXX and
toScalaXXX for this purpose, properly hiding how precisely the wrapped types are stored.
Note the (un)wrap conversions are defined as implicits in Scala, but in the unlikely event that you do need them
be kind to the reader of your code and call them explicitly.

object JavaConversions {
  import language.implicitConversions

  implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = {...}
  implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = {...}
  implicit def toJavaSubscription(s: Subscription): rx.Subscription = {...}
  implicit def toScalaSubscription(s: rx.Subscription): Subscription = {...}
  implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = {...}
  implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = {...}
  implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = {...}
  implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = {...}
  implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = {...}
  implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = {...}
}
@benjchristensen
Copy link
Member Author

Items of discussion or that I would like to resolve before releasing:

  • ReplaySubject has a different style from other Subject implementations. I'm not tied to either approach, nor am I sure that the idea of an AbstractSubject was the right way to go.
  • I'm not convinced that Subject implementations have correct thread-safety or concurrency behavior. For example, we synchronize on subscriptions but not event emission and I'm not sure whether that means we're vulnerable to losing events or not.
  • Yet more observeOn fixes/tweaks are needed.
  • I'd like to eliminate the rx.util dumping ground
    • Put exceptions into rx.exceptions or rx.util.exceptions?
    • Put functions into rx.functions or rx.util.functions?
    • Where does Range go? TimeInterval? Timestamped? perhaps in rx.observables?
    • Why is Timestamped not camelCase whereas TimeInterval is?
  • Should AbstractSubject be public? I think it should be package private, particularly as I don't necessarily agree with the approach (even though I wrote it).
  • For consistency should we have rx.operators with classes named OperatorXYZ or rx.operations and classes named OperationXYZ?
  • ObserverBase needs to be package private or moved into a non-advertised package like rx.operators

@zsxwing
Copy link
Member

zsxwing commented Dec 11, 2013

Why is Timestamped not camelCase whereas TimeInterval is?

Because timestamp is a single word, like java.sql.Timestamp?

@benjchristensen
Copy link
Member Author

ObserveOn fixes: #602

@benjchristensen
Copy link
Member Author

Because timestamp is a single word, like java.sql.Timestamp?

I guess you're right. Never mind :-)

@akarnokd
Copy link
Member

I've looked at the Subject implementations and they seem to be incorrect due to the following reasons:

  • the AbstractSubject allows the concurrent execution of onNext and onError/onCompleted calls, i.e, the emitNotification doesn't lock on SUBSCRIPTION_LOCK.
  • PublishSubject, AsyncSubject and BehaviorSubject seem to overwrite the last notification value even after the termination condition as the call state.currentValue.set(new Notification<T>(...)); is always executed in the subjects onXXX methods regardless of the state. New subscribers may observe various states when subscribing.

@benjchristensen
Copy link
Member Author

the AbstractSubject allows the concurrent execution of onNext and onError/onCompleted calls

All of Rx allows this as it is up to the provider to obey the contract and not call them concurrently. That is why we don't synchronize everything in Rx, for performance reasons.

@benjchristensen
Copy link
Member Author

Rx Design Guideline 6.8

6.8. Avoid serializing operators

As all Rx operators are bound to guideline 6.7, operators can safely assume that their inputs are serialized. Adding too much synchronization would clutter the code and can lead to performance degradation.

If an observable sequence is not following the Rx contract (see chapter 0), it is up to the developer writing the end-user application to fix the observable sequence by calling the Synchronize operator at the first place the developer gets a hold of the observable sequence. This way the scope of additional synchronization is limited to where it is needed.

Due to this, a Subject implementation should not need to synchronize onNext/onCompleted/onError calls. They do need to handle subscribe/unsubscribe concurrent access.

It is however tricky to achieve both of these since we want to ensure we don't have race-conditions in observers subscribing and seeing the correct view of events. That said, we don't want to add unnecessary synchronization.

For example, on PublishSubject it is completely fine during a concurrent race between onNext and subscribe that it may or may not get that onNext event. That's the nature of concurrently subscribing to a hot Observable.

We don't want to pay the overhead of synchronization on onNext just so that occasional subscribe events can occur.

@benjchristensen
Copy link
Member Author

/cc @akarnokd @headinthebox @zsxwing I am ready to release 0.16.0 after running it on a Netflix API production canary to assert performance and functionality of the core operators, schedulers, etc (at least of what we use, not new operators).

I'm blocked on a build issue that I can't solve until tomorrow at the office so will release sometime Tuesday hopefully.

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants