Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flink - Pulsar Batch Sink Support #2979

Merged
merged 4 commits into from Nov 16, 2018

Conversation

@erenavsarogullari
Copy link
Member

erenavsarogullari commented Nov 12, 2018

Motivation

This PR aims to bring Flink - Pulsar Batch Sink Support. If user works with Flink DataSet API and would like to write these DataSets to Pulsar, this sink can help.

Ref: Flink Batch Sink API

Modifications

Please find the change-set as follows:

  • Defines PulsarOutputFormat to write Flink Batch DataSets into Pulsar.
  • UT Coverage
  • FlinkPulsarBatchSinkExample to show how to use and to be used by users.
  • README.md documentation
  • Minor javadoc fix
@merlimat merlimat added this to the 2.3.0 milestone Nov 12, 2018
}

@Override
public void close() throws IOException {

This comment has been minimized.

Copy link
@sijie

sijie Nov 13, 2018

Contributor

close the producer when the output format is closed?

This comment has been minimized.

Copy link
@erenavsarogullari

erenavsarogullari Nov 13, 2018

Author Member

@sijie Thanks for the review this.

This is the my concern as well. However, afais in local tests, close is called multiple times and causing to be closed the Producer before submitting all batch.


private transient Function<Throwable, MessageId> failureCallback;

private static volatile Producer<byte[]> producer;

This comment has been minimized.

Copy link
@sijie

sijie Nov 13, 2018

Contributor

any reason why you make it static?

This comment has been minimized.

Copy link
@erenavsarogullari

erenavsarogullari Nov 13, 2018

Author Member

As same with close function, open function is also called multiple times. To avoid multiple Producer instance creation, it needs to be singleton at class level so it is static.

@erenavsarogullari erenavsarogullari force-pushed the erenavsarogullari:PulsarPR_FlinkBatchSinkSupport branch from 2f8cafb to c3140d5 Nov 15, 2018
@sijie
sijie approved these changes Nov 16, 2018
@sijie sijie merged commit aefbaac into apache:master Nov 16, 2018
3 checks passed
3 checks passed
Jenkins: C++ / Python Tests SUCCESS
Details
Jenkins: Integration Tests SUCCESS
Details
Jenkins: Java 8 - Unit Tests SUCCESS
Details
@erenavsarogullari erenavsarogullari deleted the erenavsarogullari:PulsarPR_FlinkBatchSinkSupport branch Nov 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.