New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-278] Fix sending lineage event for KafkaSource #2131
Conversation
@yukuai518 Please review |
@@ -538,17 +540,18 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets | |||
} | |||
|
|||
WorkUnit workUnit = WorkUnit.create(extract); | |||
if (topicSpecificState.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicate logic to line 239
* | ||
* @param keyPrefix key prefix | ||
*/ | ||
public void removeProps(String keyPrefix) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this to removePropsWithPrefix
? The current name is too easy to mess up as a typo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
} | ||
WorkUnit workUnit = WorkUnit.create(extract, interval); | ||
// Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one | ||
WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to preserve properties from the workunit by using an existing workunit instead of creating a new one? Will removing the properties below affect other places that access this workunit?
Would it be safer and sufficient to create a new workunit and copy the properties over from the first work unit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to preserve properties from the workunit by using an existing workunit instead of creating a new one?
Yes.
Will removing the properties below affect other places that access this workunit?
No. Because these properties are either ignored or replaced by the configurations immediately set below.
Would it be safer and sufficient to create a new workunit and copy the properties over from the first work unit?
No. We do need to remove those properties. Otherwise, the logic won't be the same as before. And all work units of in the same multi work unit have the same properties but those that are removed and extractid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant copy then remove from the copy. But if it is safe then you can remove directly.
@htran1 @yukuai518 Comments addressed, please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
Closes apache#2131 from zxcware/lineage
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
gobblin.lineage
from the stateKafkaWorkUnitPacker
disregards existing configurations of work unitsTests
State#testRemovePropsByPrefix
Commits