Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement topic routing on a per record basis #2605

Merged
merged 9 commits into from Sep 26, 2018

Conversation

Projects
None yet
3 participants
@jerrypeng
Copy link
Contributor

jerrypeng commented Sep 18, 2018

Motivation

There are use cases that the destination topic for a message cannot be determined at source submission time. This requires the ability for sources to to set which topic a record should be written to.

Modifications

  1. add an interface Record that allows users to set the destination topic for a record
  2. Refactored the Pulsar sink code to support this

@jerrypeng jerrypeng requested review from merlimat, sijie, srkukarni and rdhabalia Sep 18, 2018

@jerrypeng jerrypeng self-assigned this Sep 18, 2018

@jerrypeng jerrypeng added this to the 2.3.0-incubating milestone Sep 18, 2018

if (!record.getPartitionId().isPresent()) {
throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
}
if (record.getDestinationTopic().isPresent()) {

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 18, 2018

Contributor

may be can reduce some code..

String destinationTopic = record.getDestinationTopic().isPresent() ? record.getDestinationTopic().get(): pulsarSinkConfig.getTopic();
return getProducer(destinationTopic,..);

This comment has been minimized.

Copy link
@jerrypeng

jerrypeng Sep 18, 2018

Author Contributor

Oh I think we can just do this:

String destinationTopic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 18, 2018

Contributor

oh.. yes, I didn't realize it Optional.

}

@Override
public TypedMessageBuilder<T> newMessage(Record<T> record) {
return producer.newMessage();
if (record.getDestinationTopic().isPresent()) {

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 18, 2018

Contributor

same here, can reduce code by taking topic-name out..

}));
}
try {
FutureUtils.result(FutureUtils.collect(closeFutures));

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 18, 2018

Contributor

org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);

for (Map.Entry<String, Producer<T>> entry: publishProducers.entrySet()) {
String topicId = entry.getKey();
Producer<T> producer = entry.getValue();
closeFutures.add(producer.closeAsync().exceptionally(throwable -> {

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 18, 2018

Contributor

should we just do: closeFutures.add(producer.closeAsync()); because logging will not help much;

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 18, 2018

@rdhabalia thanks for the review. I have addressed your comments

@rdhabalia
Copy link
Contributor

rdhabalia left a comment

LGTM.. actually, I didn't understand exact usecase where we need different types of sink/output topic for a source topic?
and can we also add a test to simulate this scenario.

schema,
fqfn);

Producer<T> existingProducer = publishProducers.putIfAbsent(producerId, newProducer);

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 19, 2018

Contributor

I don't understand this logic. here it creates producer only if producer is not present under publishProducers. So, does it really need it?

This comment has been minimized.

Copy link
@jerrypeng

jerrypeng Sep 19, 2018

Author Contributor

yes because a message can have any destination topic. At the time of processing, we may or maybe not have already created a producer for that topic. We cache producers, so that we don't create a producer for each message and try to reuse existing ones. This is basically the same logic for MultiConsumerProducer used for effectively once where we have to have separate producers for each partition

This comment has been minimized.

Copy link
@rdhabalia

rdhabalia Sep 20, 2018

Contributor

sure, I mean , can we do something like: return publishProducers.computeIfAbsent(producerId, createProducer(..));

This comment has been minimized.

Copy link
@jerrypeng

jerrypeng Sep 20, 2018

Author Contributor

yup we can just do that. This logic is from before

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

@rdhabalia a usecase we have is will a source for ingesting data from databases. Users will often want to just specify schema or db and have the data of all the tables underneath automatically be sent to corresponding topics in Pulsar. Thus, at the end of the day we will have table -> topic mapping. For a use case as such, when its too cumbersome to explicitly specify some sort of mapping to topics and also for use cases when you don't know which topic to sent until runtime or upon message inspection

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

rerun cpp tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

rerun java8 tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

rerun integration tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

@rdhabalia I will write some tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 19, 2018

rerun integration tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 20, 2018

rerun java8 tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 20, 2018

rerun integration tests

@sijie

sijie approved these changes Sep 20, 2018

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 20, 2018

rerun integration tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 20, 2018

rerun java8 tests

@sijie

This comment has been minimized.

Copy link
Contributor

sijie commented Sep 21, 2018

run java8 tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 21, 2018

rerun java8 tests

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 21, 2018

rerun integration tests

@sijie

This comment has been minimized.

Copy link
Contributor

sijie commented Sep 24, 2018

rerun java8 tests

sijie added some commits Sep 25, 2018

@jerrypeng

This comment has been minimized.

Copy link
Contributor Author

jerrypeng commented Sep 25, 2018

rerun integration tests

@sijie sijie modified the milestones: 2.3.0, 2.2.0 Sep 26, 2018

@sijie sijie merged commit e15a606 into apache:master Sep 26, 2018

2 of 3 checks passed

Jenkins: Integration Tests FAILURE
Details
Jenkins: C++ / Python Tests SUCCESS
Details
Jenkins: Java 8 - Unit Tests SUCCESS
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.