Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weird behaviour of switch when the outer sequence completes but the last inner still has elements #737

Closed
dvtomas opened this issue Jan 10, 2014 · 14 comments

Comments

@dvtomas
Copy link

dvtomas commented Jan 10, 2014

Hi all,
I just ran into a behavior of switch (as of RxJava 0.16.0) that I don't think is correct. When the outer sequence completes, but the last inner sequence that the operator was subscribed to still has some elements, I'd expect the operator to either

  • also complete immediately, or
  • pass on the rest of the elements in the inner sequence and complete when the inner sequence completes.

However, the output of the code below makes me think that neither is the case with the current implementation. When I try to run

      import rx.lang.scala.Observable
      import rx.lang.scala.schedulers.NewThreadScheduler
      import scala.concurrent.duration
      import duration._

      val source: Observable[Observable[String]] = Observable.items(
        Observable.interval(200.milli, NewThreadScheduler()).map("a " + _.toString).take(10),
        Observable.interval(200.milli, NewThreadScheduler()).map("b " + _.toString).take(10)
      ).zip(Observable.interval(500.milli, NewThreadScheduler())).map(_._1)

      val items = source.switch.toBlockingObservable.toList
      println(items)

it prints (at least for me) List(a 0, a 1, b 0, b 1, b 2, b 3, b 4, b 5, b 6)

That seems correct for the a's - two of them fit into the 500 milli window, but why there were 6 b's emitted I don't understand, and it seems to me as quite an arbitrary number.

Thank you for any insights.
Best regards,
Tomáš Dvořák

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

Confirm this is a bug in switch. Thanks, @dvtomas

zsxwing added a commit to zsxwing/RxJava that referenced this issue Jan 13, 2014
@akarnokd
Copy link
Member

I see another issue. The op uses MultipleAssignmentSubscription to store the inner subscription. When the switch to the next occurs, it does not unsubscribe from the previous Observable which will keep running indefinitely. I think child should be SerialSubscription instead.

In addition, latest isn't volatile so checking the current index wouldn't be safe. My mistake, accessed under gate.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

latest does not need volatile as it's protected by synchronized (gate) and never used outside the lock.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

Sorry, @akarnokd , you're right. The initial step is not thread-safe.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

If latest is not volatile, latest can not be guaranteed to be 0.

@akarnokd
Copy link
Member

I was wrong, didn't "see" the synchronized. Btw, instance initialization esures that the latest field is seen to be 0, no need for volatile.

While you are at it, could you move any unsubscribe() outside the synchronization blocks?

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

Just refreshed my java concurrency knowledge. latest is ensured to be 0 by synchronized rather than instance initialization. Right?

@akarnokd
Copy link
Member

When you call new SourceObserver(), the constructor is guaranteed to make fields visible.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

From JSR-133 3.5 Final Fields, "a thread that can only see a reference to an object after that object has been completely initialized is guaranteed to see the correctly initialized values for that object’s final fields."

http://download.oracle.com/otn-pub/jcp/memory_model-1.0-prd-oth-G-F/memory_model-1_0-prd-spec.pdf?AuthParam=1389609770_22e201e49d67d0a8ae6bcaccaedd8518

the constructor is only guaranteed to make final fields visible.

@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2014

However, I don't know if the default value 0 of a non-final field will be guaranteed to be visible to other threads after the constructor is finished. Is it possible that other threads see a random uninitialized value?

@duarten
Copy link

duarten commented Jan 13, 2014

If publication is not safe, then there is no happens-before relationship between the assignment of the instance's fields and its unsafe publication. If the instance contains final fields, then Java inserts a memory barrier at the end of the constructor and publication is ensured to be safe

benjchristensen added a commit that referenced this issue Jan 14, 2014
@zsxwing
Copy link
Member

zsxwing commented Jan 14, 2014

From JLS 17.4.4. Synchronization Order,

The write of the default value (zero, false, or null) to each variable synchronizes-with the first action in every thread.
Although it may seem a little strange to write a default value to a variable before the object containing the variable is allocated, conceptually every object is created at the start of the program with its default initialized values.

So if a field is the default value (zero, false, or null) and does not be assigned, the default value is always visible to every thread.

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

The buffer operator has been rewritten with extra effort for making sure values don't fall between two windows if timespan == timeshift. Does it work for you?

@benjchristensen
Copy link
Member

Closing as no further comments in a long time. This is likely fixed with the recent fixes to buffer.

@dvtomas Please re-open if there are still issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants