New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4822] Ensure that the Kafka 0.8 connector is compatible with k… #5050
Conversation
…afka-consumer-groups.sh
Don't you need a unique ID for this to work properly? (The task name is not unique) |
the prefix is the same as kafka high level api, contain the uuid string, so it could be unique |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work @taizilongxu.
I have left some comments after a first pass of the changes.
PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, | ||
List<KafkaTopicPartitionState<TopicAndPartition>> partitionStates, | ||
ExceptionProxy errorHandler, | ||
long commitInterval) { | ||
long commitInterval, String taskId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please list this parameter on a separate new line for consistently code style.
try { | ||
hostName = InetAddress.getLocalHost().getHostName(); | ||
} catch (UnknownHostException e) { | ||
LOG.error("Can not get host name!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add the exception to the log also.
@@ -64,6 +71,20 @@ public ZookeeperOffsetHandler(Properties props) { | |||
int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); | |||
int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); | |||
|
|||
// set consumerId to register ownership in zookeeper, just like kafka high level API | |||
UUID uuid = UUID.randomUUID(); | |||
String hostName = "Unkonw"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type: Unknown
curatorClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, info.getBytes()); | ||
} | ||
} catch (KeeperException.NodeExistsException e) { | ||
LOG.warn("NodeExists for {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about simply: Node exists for {}
, instead of the camel case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, shouldn't the log arguments be be ("Node exists for {}", consumerId, e)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to review, I will edit them
Hi, is there any thing to edit or change? |
I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. Please re-open this if you feel otherwise and work should continue. It's Kafka 0.8... |
What is the purpose of the change
When we deploy the taskmanager in docker of our cluster, it's hard to locate which taskmanager cosnume the right partition of kafka except looking up the log in docker, so I just add the owner in zk path when PeriodOffsetCommitter the offset.
Brief change log
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
:noDocumentation