New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-1694: Kafka Spout Trident Implementation Using New Kafka Consumer API #1687
Conversation
Since it does load the two modules - storm-kafka and storm-kafka-client - but only load kafka-client with |
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
final String[] zkBrokerUrl = parseUrl(args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need zookeeper config. This topology using new KafkaSpout right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Refactored examples in this PR
for (int i = 0; i < keys.size(); i++) { | ||
ValueUpdater valueUpdater = updaters.get(i); | ||
Object arg = ((CombinerValueUpdater) valueUpdater).getArg(); | ||
LOG.debug("updateCount = {}, keys = {} => updaterArgs = {}", updateCount, keys.get(i), arg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this just print with info level since this is a debugState why make another hop to enable debug for this topology.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Refactored examples in this PR
} | ||
|
||
private KafkaTridentSpoutOpaque<String, String> createOpaqueKafkaSpoutNew() { | ||
return new KafkaTridentSpoutOpaque<String, String>(getKafkaTridentManager()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we merge this into single a method?. So that it shows the series of steps in creating a KafkaTrident topology. It has few redirections with one method calling another which can be confusing for the users looking for an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially Done in refactored examples in this PR.
There were some redundant "factory methods" that I removed. However, the code creating the "dependency" objects that need to be passed in is not 1 or two lines. I believe that a method with a meaningful name creating and initializing these "dependency" objects makes the code much more cohesive and easier to read. Furthermore, this class is extended for wildcard topics, and some of these methods overridden.
I will be happy to write a more "copy" and "paste" like example in the docs if you feel it's appropriate. Please let me know.
return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, new String[]{"test-trident","test-trident-1"}).build(); | ||
} | ||
|
||
protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not provide a default implementation for KafkaSpoutTupleBuilder and why are we not using Deserializer Interface for this. Similar to scheme we can provide a StringDeserializer and users can implement their own version of Deserializer to parse and return the values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I completely follow this observation. This same object exists for the Kafa Spout. This is only used for the user to be able to provide a custom implementation of how he wishes to create (build) a Tuple from the ConsumerRecord object. The users are free to, when creating their KafkaSpoutTupleBuilder implementation, specify which Deserializer they want to use.
As far as default implementation, I am not sure I am following. I don't think we are providing any default implementation. The class KafkaSpoutTupleBuilder is abstract. This implementation here is for the sake of the example.
|
||
@Override | ||
public void close() { | ||
LOG.debug("Closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should call kafkaManager.kafkaConsumer.close() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
lastOffset = lastBatch.lastOffset; | ||
} | ||
} | ||
LOG.debug("Created {}", this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably useful to log the first and last offset of the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging "this" will call the overridden toString() method for this class, which prints the first and last offset.
} | ||
} | ||
|
||
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata<K, V> lastBatchMeta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will good to have some doc on this method to explain the seek logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
…er API - Kafka New Client - Opaque Transactional Trident Spout Implementation - Implementation supporting multiple named topics and wildcard topics
a2d678d
to
76d6e7e
Compare
@HeartSaVioR I believe I address your valid concerns about the kafka library versions in this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
@hmcl these PR titles are very confusing. Can you fix that in both the PRs. Not sure what it means by "Apache master storm 1694 top storm 2097". Just add the JIRA title as PR title. |
@hmcl also this PR is the same as this one with one more commit? |
@harshach I have changed the titles of the PRs, however what is really important are the git commit messages, and those were correct. Once the patch is merged, no one will ever look at the PR titles again, only at the git commit logs. I don't think we should close this PR in order to keep the history of the review comments. Also, the title of this PR reflects this patch. The title of the other PR reflects the other patch that is on top of this patch. There is no extra work in merging the two PRs and the git commit history is the same. |
@hmcl This PR here has two commits right? https://github.com/apache/storm/pull/1757/commits . Why do we want to merge this PR again?. Closing the PR doesn't mean you'll loose comments. |
@harshach done. The other PR should suffice. |
The Kafka Trident implementation is on top of the Trident logs improvement patch because they are related, and it makes it easier to merge the patch. There is already another PR for STORM-2097