Skip to content

Comments

Introduce batched sink interface#3514

Closed
srkukarni wants to merge 3 commits intoapache:masterfrom
srkukarni:batchedsink
Closed

Introduce batched sink interface#3514
srkukarni wants to merge 3 commits intoapache:masterfrom
srkukarni:batchedsink

Conversation

@srkukarni
Copy link
Contributor

Motivation

Often times sinks want to sink a collection of messages instead of one message at a time. An example might be writing to a snowflake database where it is far more efficient to write bunch of records at one time as opposed to one at a time. For these types of sinks, the current one record per invocation sink interface is too low level.
This pr introduces the BatchedSink interface. This interface exposes a write method that supplies a collection of records(as opposed to one record in the Sink interface). Users can program the parameters of this collection using the WindowConfig parameter inside the SinkConfig.

Modifications

We introduce BatchedSink interface in pulsar-io/core package.
We also expose a WindowConfig parameter inside the SinkConfig so users can program the batch sizes/intervals.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@srkukarni srkukarni added this to the 2.3.0 milestone Feb 3, 2019
@srkukarni srkukarni self-assigned this Feb 3, 2019
@srkukarni srkukarni requested a review from jerrypeng February 3, 2019 20:39
* @param sinkContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a WindowedSinkContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need a WindowedSinkContext wrapping around here. I can't think of any new methods other than whats already present in SinkContext to put in there.

@jerrypeng
Copy link
Contributor

@srkukarni can we have an integration test for this?

protected Long ram;
@Parameter(names = "--disk", description = "The disk (in bytes) that need to be allocated per sink instance (applicable only to Docker runtime)")
protected Long disk;
@Parameter(names = "--window-length-count", description = "The number of messages per window")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have too many CLI args. We need to clean them up as some point. Only expose the basic CLI args and for the advanced ones just allow users to specify in a function config yaml file

}
windowConfig.setWindowLengthDurationMs(windowLengthDurationMs);
}
if (null != slidingIntervalCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have sliding windows for batched sink. What would be the use case for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to make one up. think about a influxdb type sink where you want to write the average of some value over the last few seconds on a sliding basis?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are introducing a batched sink not a windowed sink thus I don't think the batched sink should have the same semantics and configs as a windowed function. This will be very confusing to users. To start with we should just start with a batchSize or batchTime configs and be distinct from windowing configs.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two high level comments regarding this feature.

  1. We are introducing a brand new interface for writing a batch of records. It will confuse people about the interface.
  2. We are implementing a "batching" logic in the runtime instead of outside of the runtime.

This approach seems too heavy to me. Instead, can't we just implement the logic outside and have an abstract implementation BatchSink that implements the windowing and batch logic? because I have seen many sink implementations are implementing similar batching logic, we can just abstract those batching logic into one implementation and reuse the same logic across different sinks.


abstract class BatchSink implement Sink {

     void write(Record record) {
          // implement the batching logic
     }

     abstract void write(Collection<Record<T>> records) throws Exception.

}

@jerrypeng
Copy link
Contributor

I agree with @sijie comment. @srkukarni can we implement the BatchedSink as an external library instead of having to support yet another interface in the function backend code?

@srkukarni
Copy link
Contributor Author

@sijie @jerrypeng that makes sense. I will try to reforumulate this pr. meanwhile, have remove the 2.3 tag from it so that the release can proceed without it.

@srkukarni srkukarni modified the milestones: 2.3.0, 2.4.0 Feb 12, 2019
@sijie sijie removed this from the 2.4.0 milestone Jun 9, 2019
@aahmed-se
Copy link
Contributor

@srkukarni should be abandon this and start a new repo for functions extensions ?

@github-actions
Copy link

github-actions bot commented Mar 4, 2022

@srkukarni:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@tisonkun
Copy link
Member

Closed a stale. The codebase has evolved quite a lot from then on.

@tisonkun tisonkun closed this Nov 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants