-
Notifications
You must be signed in to change notification settings - Fork 22
CDAP-13280 Add Spark 2 streaming Kafka source #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDAP-13280 Add Spark 2 streaming Kafka source #25
Conversation
yaojiefeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small comments.
Also wondering if we should keep kafka 9 module, since this kafka 9 client will not be able to read from kafka 10
|
|
||
| ## License and Trademarks | ||
|
|
||
| Copyright © 2017 Cask Data, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be 2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| ## License and Trademarks | ||
|
|
||
| Copyright © 2017 Cask Data, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be 2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| producer.send(record); | ||
| } catch (Exception e) { | ||
| // catch the exception and continue processing rest of the alerts | ||
| LOG.error("Exception while emitting alert {}", alert, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we limit the logs we sent, since we send the record one by one to kafka. If something breaks, this will flood the log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed a JIRA to do this later since the log sampling classes are not directly usable now - https://issues.cask.co/browse/CDAP-13479
| import co.cask.hydrator.common.KeyValueListParser; | ||
| import co.cask.hydrator.common.ReferenceBatchSink; | ||
| import co.cask.hydrator.common.ReferencePluginConfig; | ||
| import co.cask.hydrator.plugin.common.KafkaHelpers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not related to this pr, but I think we should name this class to KafkaBatchSink to be consistent with the KafkaBatchSource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| public KafkaConfig(String referenceName, String brokers, String topic, String partitions, | ||
| String initialPartitionOffsets, Long defaultInitialOffset, String schema, String format, | ||
| String timeField, String keyField, String partitionField, String offsetField) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this constructor is not used. But should we have principal and keytab location in the parameters since they are now fields in the config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor already has too many parameters, and the constructor is not used anywhere. So I just removed the constructors.
| producer.send(record); | ||
| } catch (Exception e) { | ||
| // catch the exception and continue processing rest of the alerts | ||
| LOG.error("Exception while emitting alert {}", alert, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we limit the logs here since if something bad happens sending the alerts, the message may flood the log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate comment
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.hadoop.io.Text; | ||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||
| import org.apache.kafka.common.serialization.StringSerializer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though it is not related to this pr, I think we should name this class as KafkaBatchSink to be consistent with the batch source, otherwise it will confuse people why this is called Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| kafkaParams.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName()); | ||
| KafkaHelpers.setupKerberosLogin(kafkaParams, conf.getPrincipal(), conf.getKeytabLocation()); | ||
| // Create a unique string for the group.id using the pipeline name and the topic | ||
| kafkaParams.put("group.id", Joiner.on("-").join(context.getPipelineName().length(), conf.getTopic().length(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about this property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) { | ||
| TopicPartition topicAndPartition = entry.getKey(); | ||
| Long offset = entry.getValue(); | ||
| if (offset == OffsetRequest.EarliestTime()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this logic. The offsets can have -1 and -2 as their value, if that is the case, this will never match right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OffsetRequest.EarliestTime() always returns -2 and OffsetRequest.LatestTime() always returns -1, so they will match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| @ClassRule | ||
| public static TemporaryFolder tmpFolder = new TemporaryFolder(); | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
@yaojiefeng I have addressed the comments, please take another look. Since not many people are using Kafka 0.9, I think it is expensive to have support for it. The Kafka 0.8 client can be still used for non-Kerberos Kafka 0.9 servers. Just that Kerberos support is not available. We can revisit this decision later if needed. |
yaojiefeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments about import, rest LGTM
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
| import java.io.IOException; | ||
| import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use specific imports instead of using *
| import java.util.Map; | ||
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
| import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use specific imports instead of using *
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.Set; | ||
| import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use specific imports instead of using *
| import org.junit.BeforeClass; | ||
| import org.junit.ClassRule; | ||
| import org.junit.Test; | ||
| import org.junit.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use specific imports instead of using *
yaojiefeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
JIRA - https://issues.cask.co/browse/CDAP-13280
Build - Ran
mvn clean packagelocallyNote: Majority of the change is due to renaming kafka-plugins-0.9 module to kafka-plugins-0.10