New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-236] Add a ControlMessage injector as a RecordStreamProcessor #2090
Conversation
@ibuenros, please review. |
6957bd7
to
369bfe2
Compare
369bfe2
to
6206f8a
Compare
Hi @htran1, could you add more description in the PR or the jira ticket about the motivation and general idea of control message injector? This can help reviewers obtain a direction of what to look for in terms of design and implementation. |
public class GlobalMetadata<S> implements Copyable<GlobalMetadata<S>> { | ||
@Getter | ||
private S schema; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to provide more functional methods:
/** Generate a copy of this object with a new schema. */
public GlobalMetadata<S> withSchema(S newSchema);
That way, when we add new attributes to GlobalMetadata
, converters will require no change and still copy the correct metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, added a builder.
.getSchema(), workUnitState)); | ||
out = new MetadataUpdateControlMessage<SO, DO>(this.outputGlobalMetadata); | ||
} | ||
|
||
getMessageHandler().handleMessage((ControlMessage) in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this receive in
or out
? If the decision is for in
, then I would prefer this line be before the MetadataUpdateControlMessage
block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, moved it before. I think in
should be the one to be handled so that the handler gets a chance to handle the control message without modifications. The handler can choose to do schema conversion if it needs it.
* @param workUnitState a {@link WorkUnitState} object carrying configuration properties | ||
* @return an initialized {@link ControlMessageInjector} instance | ||
*/ | ||
public ControlMessageInjector<SI, DI> init(WorkUnitState workUnitState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be protected
instead? Same with all methods except processStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
this.converter = closer.register(new MultiConverter(converters)); | ||
|
||
// can't have both record stream processors and converter lists configured | ||
if (!this.recordStreamProcessors.isEmpty() && !converters.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make it a bit clearer, can you do something like Preconditions.checkState(!this.recordStreamProcessor.isEmpty() ^ !this.converters.isEmpty(), "Mesasge...");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to call the closer here and both empty is valid so ^ won't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added precondition check wrapped by a try catch block.
if (in instanceof MetadataUpdateControlMessage) { | ||
setInputGlobalMetadata(((MetadataUpdateControlMessage) in).getGlobalMetadata(), | ||
workUnitState); | ||
out = new MetadataUpdateControlMessage<SI, DI>(this.inputGlobalMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a new output message? It seems to be identical to the input message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setInputGlobalMetadata() can set this.inputGlobalMetadata to something other than the input metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed inputGlobalMetadata from this class so that there is no confusion over the propagation of metadata. A ControlMessageInjector
can store a modified copy of the metadata, but it won't be able to pass that change along to the next construct. Only converters should change schemas.
|
||
// update the output schema with the new input schema from the MetadataUpdateControlMessage | ||
if (in instanceof MetadataUpdateControlMessage) { | ||
this.outputGlobalMetadata = GlobalMetadata.<SI, SO>builderWithInput(inputStream.getGlobalMetadata(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be GlobalMetadata.builderWithInput(in.getMetadata, Optional.of...)
? (instead of inputStream
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently there are no other fields, so it either works. The original idea was to start with the inputStream GlobalMetadata and then overlay from in
. But I think the suggestion of using in.getGlobalMetadata()
is good since the copy will be handled in builderWithInput
.
+1 |
Closes apache#2090 from htran1/control_message_injection
Closes apache#2090 from htran1/control_message_injection
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Add a ControlMessage injector as a RecordStreamProcessor.
A
ControlMessageInjector
inspects an incoming record and can injectControlMessages
based on the content of the incoming record.One use case for this is the injection of
MetadataUpdateControlMessages
when the latest schema has been updated to trigger the update of the schema used by other constructs downstream from theControlMessageInjector
.Long running jobs are more likely to encounter issues due to the schema being updated after the start of the job.
Tests
Commits