Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A standard for continuation-local variables? #2885

Closed
eirslett opened this issue Apr 17, 2015 · 27 comments
Closed

A standard for continuation-local variables? #2885

eirslett opened this issue Apr 17, 2015 · 27 comments

Comments

@eirslett
Copy link

https://groups.google.com/forum/#!topic/rxjava/3iW-U6VZ_8c

I'm cross-posting this here, since I'm not quite sure where it belongs. One could think of it as a feature request. (Just close the issue if it shouldn't be on GitHub) The topic didn't get that much discussion on the mailing list...

Hey!

I've been working a lot with Twitter's Zipkin project, which makes heavy use of the "Local" construct: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Local.scala
It's basically an improved ThreadLocal, but a key feature of a Local variable is that its state is passed through to other threads whenever you make Futures, and compose these Futures. That makes it easy to do debug tracing in reactive systems. (Especially Zipkin tracing!)

Scala has DynamicVariable, and I think it works in a similar way with Scala Futures, but I'm not 100 % sure.

On of the problems with RxJava, is that ThreadLocal state is not passed on between Observables, at least not intentionally. So you have to add a custom execution hook that is Zipkin-specific, and transfer the state between Observables.
Would it be a good idea to factor out a common construct for this type of situation, a ContinuationLocal maybe (or just modify Twitter's "Local", as it's already implemented, but without the Scala dependency and dependency on Twitter's libraries) - so that one could reliably expect its state to be passed on across threads, as long as it's within the same context?

Even better, it would be great to have a construct in the core Java library that is expected to be passed between ExecutorServices - it's a real pain when working with legacy code, and Trace debugging doesn't work because we lose the state inside e.g. a ForkJoinPool. One has to decorate every ExecutorService in the application with a custom wrapper, but that's very intrusive! (and requires a lot of rewriting, also it's not always possible.)

I really think the concept should be shared across different implementations of reactive patterns, so you get interop between libraries whenever they're running within the same JVM.
(E.g. If I use a bridge from Finagle's thrift to Hystrix/RxJava, I would love to have the context transferred, without having to write custom code.)
Should this even be defined as a "mini standard" a.la. Reactive Streams? What are your thoughts?

@jbripley
Copy link
Contributor

Count me as and interested party in this. We're also switching a lot between Finagle, at service in/out end points, and Observable business logic in the middle.

We've been meaning to start using Zipkin for request tracking and we suspected that this might become an issue.

@eirslett
Copy link
Author

@paulp @odersky @retronym It would be nice to get some input from a Scala language point of view as well! (Seen in the light of Scala's DynamicVariable)

@retronym
Copy link

Scala's DynamicVariable is basically a wrapper around InheritableThreadLocal. It also suffers from the threadpool problem:

scala> val dv = new scala.util.DynamicVariable[String](null)
dv: scala.util.DynamicVariable[String] = DynamicVariable(null)

scala> { dv.withValue("here") { new Thread { override def run = println(dv.value) } start() } }
here

scala> { dv.withValue("here") { import concurrent.ExecutionContext.Implicits.global; concurrent.Future( println(dv.value) ) } }
null
res14: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@3b74ac8

Perhaps @viktorklang / @richdougherty might be able to chime in with their experience from Akka / Play.

@eirslett
Copy link
Author

With Akka, it could be hairy - since it would be natural to propagate such data between actors across services, and Akka has location transparency. Given that you store a trace ID in a continuation local (or whatever to call it), and you also store a Trace service (like in the Zipkin case), which is the implementation of the transport (how to store the trace ID to e.g. Cassandra later on). You probably wouldn't want to propagate the service implementation across actors that run on different machines, while it would be natural to propagate it within the same application - which is how Zipkin does it.

@beku8
Copy link

beku8 commented Apr 23, 2015

+1

@akarnokd
Copy link
Member

it's a real pain when working with legacy code, and Trace debugging doesn't work because we lose the state inside e.g. a ForkJoinPool. One has to decorate every ExecutorService in the application with a custom wrapper, but that's very intrusive! (and requires a lot of rewriting, also it's not always possible.

Yes, this is what happens with many of the concurrency logic we hide behind schedulers and operators, but it is generally painful. In addition, I think such construct poses considerable overhead for every task scheduled and since it is opt-in, there is no way to optimize against it. The RxJava way is to use composite objects and pass along any relevant state in the stream. Besides, I'd say that local-passing works in Akka because it is a framework: the framework is in control when you are in or out of an actor and as such, can manage the lifecycle and pass-around of the local state for you. RxJava, however, is a library which can't impose such constraints.

@eirslett
Copy link
Author

I think such construct poses considerable overhead for every task scheduled and since it is opt-in, there is no way to optimize against it

Well, as you said, it would be opt-in. Every time you make a new continuation-local variable, we record the fact that there exists at least one continuation-local. If there is at least one, then we transfer all continuation-local variables' state to the new thread. If there are no continuation-locals, we noop (so there is no overhead).

The RxJava way is to use composite objects and pass along any relevant state in the stream.

You're missing a key point here - the reference to the continuation-local must be implicit, outside of the traditional scope where you explicitly pass around references. If you were to pass around trace data explicitly, that would mean you'd have to rewrite your entire application's internal (and external?) API to support tracing. An important feature of tracing systems is that they should be transparent and not intrusive in userland code. (That's one of the things ThreadLocals are made for in the first place, what we're after is a kind of improved ThreadLocal)

@beku8
Copy link

beku8 commented Apr 25, 2015

Can't there be a way to attach an EventListener, which gets triggered every time a thread is "assigned"?Therefore we will configure it only once for each chain, ThreadLocals will be bound & removed when only appropriate. I'm pretty new to the library, so I'm aware I might be oversimplifying.

@eirslett
Copy link
Author

You can use RxJavaSchedulersHook to achieve this already, but you'll have to provide your own continuation local (in this example it's Twitter's), and you'll have to run your application with a system property for it to work:

package com.example
final class Hook extends RxJavaSchedulersHook {
  import com.twitter.util.Local
  override def onSchedule(action: Action0) : Action0 = {
    val ctx = Local.save()
    new Action0 {
      override def call() {
        Local.let(ctx) {
          action.call()
        }
      }
    }
  }
}

This is a Scala snippet that transfers Twitter's Locals between threads in RxJava, but you'll have to run your app with the property

java -jar myapp.jar -Drxjava.plugin.RxJavaSchedulersHook.implementation=com.example.Hook

or you can also configure the plugin with Java, but then you'll have to add code snippets somewhere in your application on startup. The ideal situation would be that it could just work out of the box.

A compromise maybe, would be to let rxjava read classpath:rxjava-plugins.properties (if it exists) and load plugins specified there? It could also be implemented with a ServiceLoader. That way, we could add an optional jar to the classpath that added continuation-local support. (Though I would still say the best option is to provide continuation-locals as a core feature, independent of scheduler hooks)

@beku8
Copy link

beku8 commented Apr 29, 2015

@eirslett Thanks for the direction! I found what I wanted.

@stealthcode
Copy link

Joining the party late but I have something that might be of interest to you. So the continuation-local variable would be accessible to a sequence of observable onNexts. For instance if an observable emitted [1, 2, 3] then you want a context to follow each value through the observable chain that you could add objects to and fetch from. I believe that would be equivalent to your continuation-local variables.

I have a proof of concept project that introduces a BiObservable (TriObservable and etc) that passes more than one data element per emission. This would allow you to map over either data element. For instance, you could do something like this...

Observable<Integer> nums = Observable.range(0,10);
BiObservable<Integer, String> vs = BiObservable.generate(nums, Object::toString);
BiObservable<String[], String> arr = vs.map1((n) -> { return new String[n]; })
    .doOnNext((a, s) -> {
        for (int i = 0; i < a.length; i++)
            a[i] = s;
    }).subscribe(System.out::println);

The operators for BiObservable generally can take a Func1<T1> or Func2<T1, T2> (or actions as the case would be). Your user functions (funcs you pass into map, doOnNext, etc) would remain simple funcs from String to String for instance instead of a complex aggregate object like String to Context where context contains all the data you may need later. However the funcs that need to append new state would be explicit as in the case of the BiObservable.generate above.

Here is a link to the proof of concept project. I would greatly appreciate any feedback or ideas.

@akarnokd
Copy link
Member

I did a schedulers blog post with an example that did automatic context copying on the scheduling boundary.

As for the BiObserver, I'm quite interested how you implemented observeOn and the other queueing operators.

@stealthcode
Copy link

I have not implemented observeOn yet. I think I could build that out once I merge it into RxJava. But my initial impression is that the existing OperatorObserveOn would generalize very well once NotificationLite is generalized into something like a BiNotificationLite. I think we could actually use the same class for all variations.

Thanks, that's great feedback @akarnokd.

@abersnaze
Copy link
Contributor

I don't think there is a way to make the BiNotification very light. To avoid an allowaction of a tuple on each onNext I think you would need two NotificationLite & and a BiRingBuffer.

@louiscryan
Copy link

Cross posting here for interested folks

grpc/grpc-java#562
https://blog.golang.org/context

@benjchristensen
Copy link
Member

You can use RxJavaSchedulersHook to achieve this already

As per this earlier comment, we use ThreadLocals at Netflix (but hidden inside company-wide RequestContext, RequestVariable and ThreadVariable types) and use the RxJavaSchedulersHook and HystrixConcurrencyStrategy to copy across threads when scheduling occurs in RxJava and Hystrix.

I'm not convinced this is something that is easily turned into a standard. I certainly would not support an intrusive and broad standard such as what Google enforces internally as per this (https://blog.golang.org/context - thanks @louiscryan for the links):

At Google, we require that Go programmers pass a Context parameter as the first argument to every function on the call path between incoming and outgoing requests

That definitely works, but is better left as a choice for each library, team, company, not as a broad standard.

Is there anything further for us to do in RxJava on this topic? Does someone want to tackle providing better examples or documentation? Or does someone have a proposal for what an industry-wide Java standard would look like?

Are there any design choices in RxJava 2.0 that we could do differently that would better support this functionality?

@eirslett
Copy link
Author

@benjchristensen There is a discussion here: DistributedTracing/continuation-local-storage-jvm#1 RxJava is also mentioned there. We would have to build a non-intrusive construct so it would be transparent to the RxJava API and other userland APIs. There's some discussion about whether one should transfer state explicitly or implicitly. Feel free to have a look, and comment!

@robstarling
Copy link

Would the Akka concerns re: location-transparency be solveable by requiring the context be serializable?

@viktorklang
Copy link

@robstarling In general, Java Serialization is more of a liability rather than a desirable solution.

@robstarling
Copy link

Touché, @viktorklang . Agreed that little has ever been solved by it. :) N/m.

@viktorklang
Copy link

@robstarling Another option is to specify a binary coding, but I am not sure as to the value/benefit ratio. Keep in mind that the format would need to be possibly versioned etc.

@robstarling
Copy link

@viktorklang even with no versioning, a simple string would be really nice for associating a log or a failure with a triggering action. (or a uuid, or maybe even a long number -- it wouldn't have to be "strongly unique", just unique enough in a reasonable time scale for most debugging purposes).

It really depends if we're just trying to target the Zipkin-like case, or something much more general. If the general solutions are all undesirable, it doesn't mean we should't consider focusing on the simpler case.

Maybe have an initially-empty stack of uuids that propagates through future-creations, which code can read ("what's my context-chain?") and prepend to ("let this work and the work of futures i spawn have context X::...")

@viktorklang
Copy link

@robstarling Perhaps I misunderstood, you'd have to store arbitrary Java Objects in this "continuation-local"?

@eirslett
Copy link
Author

eirslett commented Jan 2, 2016

A simple primitive could be to only let it store either (1) long/64-bit/128-bit values or (2) object references; I guess they're both more or less interchangeable, in the sense that an object reference could be a pointer to a Long, or a Long value could be a key in a store (e.g. HashMap) which contains the actual value.
People could then build higher-level abstractions on top, that would take care of more complex use cases like binary formats and serialization.

@viktorklang
Copy link

@eirslett Agreed, and in my mind it is not only about serialization—it is about sending it to another system, and having that system able and willing to deserialize it. What possible serialization strategies exist, are they interoperable, are they versioned, how does framing work, etc.

@akarnokd
Copy link
Member

akarnokd commented Apr 2, 2016

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests