Skip to content

[BEAM-1928] Add ParDos#2597

Closed
tgroh wants to merge 3 commits intoapache:masterfrom
tgroh:par_do_serde_protos
Closed

[BEAM-1928] Add ParDos#2597
tgroh wants to merge 3 commits intoapache:masterfrom
tgroh:par_do_serde_protos

Conversation

@tgroh
Copy link
Member

@tgroh tgroh commented Apr 19, 2017

Add ParDoPayloadTranslator to PTransformTranslator

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@davorbonaci
Copy link
Member

More details please?

@tgroh tgroh force-pushed the par_do_serde_protos branch 2 times, most recently from 0061a4a to 7250253 Compare May 15, 2017 18:12
@tgroh
Copy link
Member Author

tgroh commented May 15, 2017

R: @kennknowles

Add ParDoPayloadTranslator to PTransformTranslator
@tgroh tgroh force-pushed the par_do_serde_protos branch from 7250253 to a1a66d1 Compare May 15, 2017 23:08
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.003%) to 70.219% when pulling a1a66d1 on tgroh:par_do_serde_protos into 13e1be2 on apache:master.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good start.

import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;

/** Created by tgroh on 5/15/17. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

string id = 1;
Type type = 2;

enum Type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer top-level definitions, like StateType, so we don't couple names or generated classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted for the time being.

private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS =
ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder().build();
ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove ParDoPayload from the TODO (or just remove the TODO)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RM'd.

/**
* The URN for a {@link ParDoPayload}.
*/
public static final String PAR_DO_PAYLOAD_URN = "urn:beam:runnerapi:pardo:v1";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need runnerapi in the URNs. This is the spec for a "Beam ParDo transform" (v1).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RM'd

*/
public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN =
"urn:beam:windowmappingfn:javasdk:0.1";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) extraneous newline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RM'd

return null;
}
};
Type parameterType = parameter.match(runnerApiCases);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) I prefer putting the new Cases inline so it reads like the enhanced switch that it is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return builder.build();
}

public static PCollectionView<?> fromProto(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point fromProto and toProto are going to get unreadable if they depend on a variety of params for override.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a great scene. This one ends up needing a bunch of context because things like the PCollection are inputs of the PTransform and are only referenced by (two-indirection name) within the SideInput.

Some other things are fooFromProto, which I'm not opposed to.

Also reordered the parameters to make the conversion target first, then context

components);
Coder<?> elemCoder =
Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
Coder<Iterable<WindowedValue<?>>> coder =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be keyed on the materialization. This is the appropriate coder for our current "iterable of full windowed values" materialization. It is OK to not implement any others, but there should be a checkArgument or a conditional with a failing else branch with a meaningful message in an Unsupported exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing an exception if an unknown type for the time being.

Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could probably add a couple more layers of indirection here...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just, though

import org.apache.beam.sdk.values.WindowingStrategy;

/** Created by tgroh on 5/15/17. */
class RunnerPCollectionView<T> extends PValueBase
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this class is a harbinger of things to come in terms of RunnerParDo, etc. These should only include the primitive constructs. It might turn out quite nice, actually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.5%) to 70.734% when pulling 67bb96c on tgroh:par_do_serde_protos into 13e1be2 on apache:master.

@tgroh tgroh force-pushed the par_do_serde_protos branch 2 times, most recently from ea46bb1 to cf52b9c Compare May 17, 2017 19:42
@tgroh tgroh force-pushed the par_do_serde_protos branch from cf52b9c to 4643106 Compare May 17, 2017 20:31
@coveralls
Copy link

Coverage Status

Coverage increased (+0.007%) to 70.759% when pulling 4643106 on tgroh:par_do_serde_protos into 5e3c5c6 on apache:master.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) {
return parameter.match(
new Cases<Optional<RunnerApi.Parameter>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a Cases.Default that lets you put all the unimplemented cases in one place a la switch..default:. Not important, just a tidying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@tgroh tgroh force-pushed the par_do_serde_protos branch from e543656 to f11c673 Compare May 18, 2017 20:24
@tgroh tgroh force-pushed the par_do_serde_protos branch from f11c673 to 207085c Compare May 18, 2017 20:45
@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.789% when pulling 207085c on tgroh:par_do_serde_protos into 8c572ef on apache:master.

@asfgit asfgit closed this in 2e0cf00 May 18, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants