Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pull-based bulk processing #15536

Closed
danielmitterdorfer opened this issue Dec 18, 2015 · 4 comments

Comments

Projects
None yet
4 participants
@danielmitterdorfer
Copy link
Member

commented Dec 18, 2015

Terminology

The term "client" refers to any code using the BulkProcessor, not the Elasticsearch Java client.

Rationale

This originates from a discussion in #15125. Currently, BulkProcessor provides a push-based API, i.e. clients actively feed it individual requests (e.g. index, delete, ..). BulkProcessor buffers them and executes a bulk requests synchronously when a certain threshold is reached (blocking the client thread).

For some use cases such as reindexing (see #15125) we'd like to have a pull-based API, i.e. the bulk processor implementation requests items from the client instead of the client feeding the bulk processor. Advantages of this approach:

  • Bulk processing can be completely asynchronous, there is no more blocking in the client thread
  • Backpressure is still applied, so there is no need to buffer lots of data.

API sketch

As pull-based bulk processing requires a completely different API than the push-based model it makes no sense to force both of them into the same class. Therefore, there will be a new class which we'll assign the preliminary name AsyncBulkProcessor for the sake of this discussion.

If we take the Reactive Streams API as guidance for the API, we can sketch the following API (note that this is a bit simplified):

Let's start with the client which wants to provide data to the AsyncBulkProcessor. Clients are called publishers, the bulk processor is called a subscriber in this terminology.

Clients implement the interface Publisher:

interface Publisher<ActionRequest> {
  void subscribe(Subscriber<? super ActionRequest> s);
}

In #subscribe() we just create a Subscription which does the actual work and pass it to to the subscriber. The subscription would need to be implemented as follows:

class SampleSubscription {
  // this is the AsyncBulkProcessor
  private final Subscriber<ActionRequest> subscriber;

  public SampleSubscription(Subscriber<ActionRequest> subscriber) {
    this.subscriber = subscriber;
  }

  public void request(long numberOfItems) {
    // processor has requested more items, now hand them over
    if (hasMoreItems(numberOfItems)) {
      for (ActionRequest item : requestItems(numberOfItems)) {
        subscriber.onNext(item);
      }
    } else {
      subscriber.onComplete();
    }
  }
}

Finally, AsyncBulkProcessor implements the Subscriber interface:

class AsyncBulkProcessor implements Subscriber<ActionRequest> {
  private Subscription subscription;

  public void onSubscribe(Subscription subscription) {
    // a Subscription "connects" the processor with the client
    this.subscription = subscription;
  }

  // called by the client after we have requested more items
  public void onNext(ActionRequest request) {
    // Note: All of this does NOT happen on the caller thread but on a dedicated thread

    // (1) add to internal buffer and create a new bulk request if threshold is reached.

    // (2) Request up to bulkSize more items from the client after we're 
    // done with a bulk request
    subscription.request(bulkSize);
  }

  public void onComplete() {
    // there are no more data, flush internal buffers and probably
    // issue one final bulk request
  }
}

Note: This does not mean that we have to implement the reactive streams API. It just serves as an example of how a pull-based API might look like.

An issue we also need to address is error handling. As everything is asynchronous we have to use some kind of backchannel. We can use the same mechanisms as for regular items, the bulk processor would then be a publisher of errors and some other component (a client or a dedicated error handler) would be the subscriber.

@s1monw

This comment has been minimized.

Copy link
Contributor

commented Dec 18, 2015

When I look at this API I wonder why we can't just pass in an Iterator to the bulk processor that the bulk processor can pull from. It seems to me that this would be much simpler that a callback based API and we can hide the logic in a single place? But I might miss something?

@danielmitterdorfer

This comment has been minimized.

Copy link
Member Author

commented Dec 18, 2015

You're right. These are related concepts. The reactive streams API just offers more flexibility (but admittedly at the cost of more complexity). One thing that we need to consider is whether the bulk processor is "reused", e.g. consider this API:

class AsyncBulkProcessor {
  void sendItems(Iterator<ActionRequest> items) (
    // ...
  }
}

As we perform this operation asynchronously, I think the client needs some kind of handle to this operation (similar to a Future) in order to e.g. cancel it and also to have some kind of "context" if it calls #sendItems() multiple times. This is - to some extent - the role of Subscription in the API sketch above (Subscription really has two roles, providing the items and also being a handle for the subscriber).

So I sense we might end up with a similar API anyway, that's why I have chosen the reactive streams API as an example for the sketch. But nothing is stopping us from starting small...

Btw, one benefit of the callback-based subscription model is that also the provider can be asynchronous (which can obviously not work with an iterator).

@danielmitterdorfer

This comment has been minimized.

Copy link
Member Author

commented Dec 28, 2015

To get a better idea for a suitable API sketch, I'll first review #15125 (reindex API) which could benefit from it.

@danielmitterdorfer

This comment has been minimized.

Copy link
Member Author

commented Oct 5, 2016

No time to work on this for the foreseeable future and interest seems also limited. Therefore I close it for the time being. Feel free to reopen if necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.