Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IncrementalCallback support #9

Closed
codefromthecrypt opened this issue Jul 6, 2013 · 13 comments
Closed

IncrementalCallback support #9

codefromthecrypt opened this issue Jul 6, 2013 · 13 comments
Labels
enhancement For recommending new capabilities
Milestone

Comments

@codefromthecrypt
Copy link
Contributor

Incremental callbacks facilitate asynchronous invocations, where the http connection and decoding of elements inside the response are decoupled from the calling thread.

In a typical synchronous response, the calling thread blocks on the http connect, reading from the socket, and also decoding of the response data into a java type. As such, a java type is returned.

interface GitHub {
  @RequestLine("GET /repos/{owner}/{repo}/contributors")
  Iterator<Contributor> contributors(@Named("owner") String owner, @Named("repo") String repo);
}

In an incremental callback pattern, the calling thread blocks until the request is queued, but processing of the response (including connect, read, decode) are free to be implemented on different threads. To unblock the caller, void is returned.

interface GitHub {
  @RequestLine("GET /repos/{owner}/{repo}/contributors")
  contributors(@Named("owner") String owner, @Named("repo") String repo, IncrementalCallback<Contributor> contributors);
}

Motivation

Denominator works primarily on high latency collections, where responses can take up to minutes to return. Even-though denominator returns Iterators, allowing iterative response processing, usually update granularity is per http request due to response parsing semantics of returning a list.

Ex. Sax parsing into a list which is later returned.

    if (qName.equals("HostedZone")) {
            zones.add(Zone.create(this.name, id));
...
    @Override
    public ZoneList getResult() {
        return zones;

Ex. Incremental callback pattern allows for immediate visibility of new elements

    if (qName.equals("HostedZone")) {
            observer.onNext(Zone.create(this.name, id));

Also, Netflix RxJava is increasingly employed, so optimizing for Observers makes sense.

IncrementalCallback Design

The following design takes from RxJava Observer and Retrofit Callback.

public interface IncrementalCallback<T> {
  void onNext(T element);
  void onSuccess();
  void onFailure(Throwable cause);
}

Why Throwable onFailure instead of Exception

The design we are discussing is how to address async callbacks in a Java idiomatic way. We issue Throwable on error instead of Exception, as Feign doesn't have a catch-all exception wrapper and Errors could be the reason why the failure occurred. This allows visibility of Errors such as AssertionErrors or thread starvation to propagate without confusing exception wrappers. It also allows us to simply propagate exception.getCause() without a wrapper.

The above rationale supports the case for Throwable vs Exception where Feign exists in isolation. Where Feign interacts with other libraries unveils other considerations. For example, Every current java async web callback issues throwable, not exception, for error signals. If we chose to retain Exception as opposed to Throwable, converting to or from these interfaces will be lossy.

how do we make this callback asynchronous?

should we decode the response?

IncrementalCallback vs ...

In order to support asynchronous processing of http responses, we need to implement responses of Future, (RxJava) Observable, or allow users to pass in a Callback or and Observer.

As Observable is the centerpiece of RxJava's reactive pattern, it is a relatively wide interface. As such, the source for Observable.java is larger than the entire codebase of feign including tests. Making a move to couple to Observable seems overkill just to afford asynchronous invocation.

Future semantics are hard to guarantee, as operations such as cancel() rarely perform as expected with blocking i/o. For example, many implementations of http futures simply fire a callable which has no means to stop the connection or stop reading from it. Moreover timeouts are hard to make sense in future. For example connect vs read timeouts are hard to understand. Typically a response is returned after connect, but during connection you have no means to stop it. After future.get is returned, another timeout is possible reading from the stream. In summary while Future is often used, it is a tricky abstraction.

Callback and Observer approaches are simplest and incur no core dependencies. They can both be type-safe interface, such as the one in Retrofit or android-http. IncrementalCallback slightly wins as it allows for incremental parsing, and doesn't imply that Feign is Observable.

Why not reuse RxJava Observer?

RxJava Observer is not a wide interface, so much easier to grok than Observable. However, the Observer type in rx is a concrete class and implies a dependency on rxjava, a dependency which is an order of magnitude larger than feign itself. Decoupling in core, and recoupling as an extension is the leanest way to interop.

@codefromthecrypt
Copy link
Contributor Author

cc @NiteshKant @benjchristensen

codefromthecrypt pushed a commit that referenced this issue Jul 7, 2013
codefromthecrypt pushed a commit that referenced this issue Jul 7, 2013
@codefromthecrypt
Copy link
Contributor Author

Current thinking about how to invoke the asynchronous calls.

  • An executor will be used to decouple the http invocation + decode retry loop from the caller.
  • Calls to Observer, such as onNext(T), will initially use the same thread as above (not a second executor).

Rationale.

It is true that calls such as onNext(T) could imply additional I/O, clogging up the http invocation executor. However, this doesn't mean we should solve it in Feign. For example, the caller can make a decorating observer, which queues up pending onNext(T) calls themselves. Delegating this responsibility to the caller keeps Feign focused, with less configuration, and more flexible than presuming an executor or other tool is always best for this issue.

@codefromthecrypt
Copy link
Contributor Author

Thinking about this a bit further. I could see how providing for Observer methods on alternate threads could lead to safer systems. As a runtime, I'd probably prefer the ability to control this behavior, as I could isolate the threads used per-app easier. A nice compromise would be the same approach retrofit uses, which is to default to a synchronous executor, which just calls run.

@codefromthecrypt
Copy link
Contributor Author

I need to clean up my impl, but the following works "on my laptop"TM

    Observer<Contributor> printlnObserver = new Observer<Contributor>() {

      public int count;

      @Override public void onNext(Contributor element) {
        count++;
      }

      @Override public void onSuccess() {
        System.out.println("found " + count + " contributors");
      }

      @Override public void onFailure(Throwable cause) {
        cause.printStackTrace();
      }
    };
    github.contributors("netflix", "feign", printlnObserver);

with the decoder as follows

    final ObserverDecoder jsonObserverDecoder = new ObserverDecoder() {
      Gson gson = new Gson();

      @Override public void decode(Reader reader, Observer observer, Type type) throws Throwable {
        JsonReader jsonReader = new JsonReader(reader);
        jsonReader.beginArray();
        while (jsonReader.hasNext()) {
          observer.onNext(gson.fromJson(jsonReader, type));
        }
        jsonReader.endArray();
      }
    };

stitched in similar to below:

      httpExecutor.get().execute(new Runnable() {
        @Override public void run() {
          Observer<?> observer = Observer.class.cast(argv[argv.length - 1]);
          try {
            // invoke request then call decoder.decode
            observer.onSuccess();
          } catch (Throwable cause) {
            observer.onFailure(cause);
          }
        }
      });

I'll have a similar decoder for SAX (which is meh, I know, but I'm fine with it :) )

@codefromthecrypt
Copy link
Contributor Author

note research for integration with netty or otherwise chunkers is not complete, yet. This may impact the design of ObserverDecoder (and Client, nature of executors etc), but probably not Observer.

@benjchristensen
Copy link

Decoupling in core, and recoupling as an extension is the leanest way to interop.

Makes sense for the reasons you gave.

Throwable vs Exception

This is also being discussed at: ReactiveX/RxJava#296 I'd be interested in your feedback there.

@codefromthecrypt
Copy link
Contributor Author

thx for your feedback.. and returned some on the rx thread.

@codefromthecrypt
Copy link
Contributor Author

latest commits work as expected: https://github.com/Netflix/feign/blob/observer/feign-core/src/test/java/feign/examples/GitHubExample.java

something that may surprise developers is that when writing a decoder, you have to think ObserverDecoder<Foo> decodes Observer<? super Foo>. Normal generics "fun".

@codefromthecrypt
Copy link
Contributor Author

To summarize the POV wrt feign and why Observer.onError(Throwable) and what the impact is.

Feign does not want to catch throwable :) It doesn't in the case of synchronous commands. However, in the case of async, feign is acting as a framework, managing the lifecycle of an observer as defined by onNext, onSuccess, and onError. An Observer should have the ability to at least log or increment a counter on error. It should also have a fighting chance to know that it may never complete successfully. As such, eventhough Feign propagates errors it encounters during the processing of an Observer, it will attempt to signal it via onError before propagating the error to whatever end.

@codefromthecrypt
Copy link
Contributor Author

Having doubts with the name Observer here. In almost all cases where we are doing the same thing as we are here, the name is handler or callback. I think our Observer should really be named Handler or Callback.

cc @benjchristensen

JAX-RS 2.0 InvocationCallback

On 2xx status completed is and the preferred type is decoded via a MessageBodyReader, if not Response.

  • void completed(T response)
  • void failed(Throwable throwable)

Retrofit Callback

On 200 status success is and the preferred type is decoded via a Converter.  Note that RetrofitError can be the result of catch (Throwable)
  * void success(T t, Response response)
  * void failure(RetrofitError error)

Websocket Endpoint, MessageHandler + Decoder.TextStream

Decoupled interactions between Messages and Decoders, where messages deal with transport types such as byte buffers, and java types are addressed with decoders. Developers are allowed to address transport types directly by implementing MessageHandler. If they wish to use custom types, they'd need to annotate an interface with things like @OnMessage.

  • void MessageHandler.Whole<Reader>.onMessage(Reader message)
  • T Decoder.TextStream.decode(Reader reader) throws DecodeException, IOException
  • void Endpoint.onError (Session session, Throwable throwable)
  • void Endpoint.onClose(Session session, CloseReason closeReason)

Android AsyncHttpResponseHandler

No java type mapping, you receive the entire http content onSuccess

  • void onSuccess(String content) {}
  • void onFailure(Throwable error) {}
  • void onFinish() {}

Async HTTP Client AsyncHandler

Similar except that the handler expects to iteratively construct the type T (via ByteBuffer), which is returned on completion.

  • STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception;
  • void onThrowable(Throwable t);
  • T onCompleted() throws Exception;

Apache HC FutureCallback

You receive the http content on completed, and decode it yourself.

  • void completed(Content result)
  • void failed(Exception ex)

Apache HC ResponseHandler

You receive the entire http response on handleResponse, decide whether it is a failure or not, and decode it yourself.

  • T handleResponse(HttpResponse response) throws ClientProtocolException, IOException

@codefromthecrypt
Copy link
Contributor Author

updated above description as there are a couple cases where what we are doing is called Callback.

@codefromthecrypt
Copy link
Contributor Author

Summarizing again based on input since last Friday. cc @NiteshKant @benjchristensen

IncrementalCallback vs Observer

The concept Observer is wider than what we are availing in this design. Other specs choose the word Hander or Callback. Moreover, it is best to have outside feedback where possible, so I asked Retrofit (an inspirator). Seems IncrementalCallback is a better term for what's going on. That said, If there are other suggestions please note them!

onError(Throwable)

The design we are discussing is how to address async callbacks in a Java idiomatic way. There is value in signaling observers/callbacks on non-exception Throwables, but it is bigger than this point alone. Every current java async web callback issues throwable, not exception, for error signals. If we chose to retain Exception as opposed to Throwable, converting to or from these interfaces will be lossy. Eventhough Feign is a subset and simplification of these specs, being lossy wrt error signals is something we should take very seriously. Unless there's an extremely good reason to change course, we'll issue Throwable on error.

@codefromthecrypt
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement For recommending new capabilities
Projects
None yet
Development

No branches or pull requests

2 participants