[BEAM-4056] Identify side inputs by transform id and local name#5118
[BEAM-4056] Identify side inputs by transform id and local name#5118tgroh merged 2 commits intoapache:masterfrom
Conversation
|
R: @tgroh |
| // Side Input PCollection ids. Each must be present as a value in the inputs of | ||
| // any PTransform the ExecutableStagePayload is the payload of. | ||
| repeated string side_inputs = 3; | ||
| // The side inputs required for this executable stage. Each must be prsent as a side input of |
There was a problem hiding this comment.
Aren't you familiar with that common abbreviation??? It shaves off a whole character!
| String outerLocalName = String.format("%s:%s", | ||
| sideInput.transformId(), sideInput.localName()); | ||
| pt.putInputs(outerLocalName, sideInput.getCollection().getId()); | ||
| payload.addSideInputs(SideInputId.newBuilder() |
There was a problem hiding this comment.
Let me know if this is better.
| public abstract class SideInputReference { | ||
|
|
||
| /** Create a side input reference. */ | ||
| public static SideInputReference of(String transformId, String localName, |
There was a problem hiding this comment.
Maybe a PTransformNode? Would that be available everywhere we're constructing this?
There was a problem hiding this comment.
Yes, it's available. We already require components everywhere due to PCollectionNode. Done.
bsidhom
left a comment
There was a problem hiding this comment.
Addressed comments. PTAL.
| // Side Input PCollection ids. Each must be present as a value in the inputs of | ||
| // any PTransform the ExecutableStagePayload is the payload of. | ||
| repeated string side_inputs = 3; | ||
| // The side inputs required for this executable stage. Each must be prsent as a side input of |
There was a problem hiding this comment.
Aren't you familiar with that common abbreviation??? It shaves off a whole character!
| String outerLocalName = String.format("%s:%s", | ||
| sideInput.transformId(), sideInput.localName()); | ||
| pt.putInputs(outerLocalName, sideInput.getCollection().getId()); | ||
| payload.addSideInputs(SideInputId.newBuilder() |
There was a problem hiding this comment.
Let me know if this is better.
| public abstract class SideInputReference { | ||
|
|
||
| /** Create a side input reference. */ | ||
| public static SideInputReference of(String transformId, String localName, |
There was a problem hiding this comment.
Yes, it's available. We already require components everywhere due to PCollectionNode. Done.
| // any PTransform the ExecutableStagePayload is the payload of. | ||
| repeated string side_inputs = 3; | ||
| // The side inputs required for this executable stage. Each must be present as a side input of | ||
| // exactly one PTransform within this ExecutableStagePayload. |
There was a problem hiding this comment.
I would modify this spec a little - "Each Side Input of each PTransform within this ExecutableStagePayload must be represented within this field." or thereabouts.
That way it represents the minimum contents instead of the maximum contents; if we make more side inputs available than required, I don't expect either harness to break, just to be very confused.
| public abstract class SideInputReference { | ||
|
|
||
| /** Create a side input reference. */ | ||
| public static SideInputReference of(PTransformNode transform, String localName, |
There was a problem hiding this comment.
Changed. Let me know if this is any better. I'm sad that there's no Beam auto-formatter. 😭
bsidhom
left a comment
There was a problem hiding this comment.
Addressing review comments.
| // any PTransform the ExecutableStagePayload is the payload of. | ||
| repeated string side_inputs = 3; | ||
| // The side inputs required for this executable stage. Each must be present as a side input of | ||
| // exactly one PTransform within this ExecutableStagePayload. |
| public abstract class SideInputReference { | ||
|
|
||
| /** Create a side input reference. */ | ||
| public static SideInputReference of(PTransformNode transform, String localName, |
There was a problem hiding this comment.
Changed. Let me know if this is any better. I'm sad that there's no Beam auto-formatter. 😭
tgroh
left a comment
There was a problem hiding this comment.
LGTM.
Please clean up history pre-merge.
|
Done. |
This is necessary to identify side inputs during portable pipeline translation and execution.
This is necessary to identify side inputs during portable pipeline translation and execution.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.