Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate TransformIOMap as Config for Beam Samza Runner #26276

Merged
merged 6 commits into from
Apr 21, 2023

Conversation

Sanil15
Copy link
Contributor

@Sanil15 Sanil15 commented Apr 14, 2023

Summary

  • Add support for populating serialized map as config for PTransform inputs and output PCollections
  • Transform Inputs & Outputs PCollections are identified by PValue
  • Topologically traverse the pipeline to serialize list of inputs & outputs as part of generated BEAM_JSON_GRAPH
  • This config can be used later for adding per transform metric (throughput & latency) support for Samza runner

Tests

  • Added Unit tests

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@Sanil15
Copy link
Contributor Author

Sanil15 commented Apr 14, 2023

R: @xinyuiscool @mynameborat please take a look

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@mynameborat
Copy link
Contributor

mynameborat commented Apr 19, 2023

General questions

  1. Why use JSON as the serialization format when protobuf is the recommended way of serializing internal components within beam?
  2. Can we leverage existing protobuf representation of the pipeline and use that instead of JSON? i.e., populating the same configuration w/ toProto(pipeline) and deserializing from the config?
  3. Are there any potential pitfalls with varying versions of jackson across beam and samza which can cause issues?

@xinyuiscool in case you have something to chime in on leveraging protobuf representations

@Sanil15
Copy link
Contributor Author

Sanil15 commented Apr 19, 2023

General questions

  1. Why use JSON as the serialization format when protobuf is the recommended way of serializing internal components within beam?
  2. Can we leverage existing protobuf representation of the pipeline and use that instead of JSON? i.e., populating the same configuration w/ toProto(pipeline) and deserializing from the config?
  3. Are there any potential pitfalls with varying versions of jackson across beam and samza which can cause issues?

@xinyuiscool in case you have something to chime in on leveraging protobuf representations

  • IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer
  • Protobuf is better since it has a schema defined but here we are just converting, I wanted to to ideally use Pair<String, String>, but there is no serializer defined for it
  • PipelineJsonRenderer already uses JSON
  • Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer

@mynameborat
Copy link
Contributor

  • Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer

This is an internal configuration not for external consumption for user. I'd rather keep it hidden than have it readable. If you want it for debug purpose, you can use the debug logs to infer as opposed to exposing this configuration externally.

PipelineJsonRenderer already uses JSON

It is external facing so that we can render the DAG for observability. It is not necessarily used internally.

IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer

Map<string, string> is not an evolvable data model and if you need additional metadata evolving the string to have record delimitters and so-on complicates evolution and compatibility handling.

All said, If we can't leverage the protobuf representation of the pipeline, can we use BEAM_JSON_GRAPH which also has information about the transforms. Can that be leveraged?

@Sanil15
Copy link
Contributor Author

Sanil15 commented Apr 19, 2023

  • Configs needs to be readable for debugging, if we serialize this with protobuf, the serialised string will not be readable via configs while debugging - it would need a deserializer

This is an internal configuration not for external consumption for user. I'd rather keep it hidden than have it readable. If you want it for debug purpose, you can use the debug logs to infer as opposed to exposing this configuration externally.

PipelineJsonRenderer already uses JSON

It is external facing so that we can render the DAG for observability. It is not necessarily used internally.

IMO using protobuf might over-complicate things for a simple MapEntry<String, String> serializer

Map<string, string> is not an evolvable data model and if you need additional metadata evolving the string to have record delimitters and so-on complicates evolution and compatibility handling.

All said, If we can't leverage the protobuf representation of the pipeline, can we use BEAM_JSON_GRAPH which also has information about the transforms. Can that be leveraged?

@mynameborat Found a way to use BEAM_JSON_GRAPH to attach transformIoInfo in addition to graphLinks and the graph there - check the ExpectedDag as the sample.

  • Removed Jackson with custom MapEntrySerializer we do not need it

Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for leveraging existing beam json graph. Looks good to me.

final ConfigBuilder configBuilder = new ConfigBuilder(options);

SamzaPipelineTranslator.createConfig(
pipeline, options, idMap, nonUniqueStateIds, configBuilder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's refactor this method a bit to pass in the previously created ConfigContext instead of options, idMap, nonUni.. That way the code is much more readable and extendable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

* Builds a map from PTransform to its input and output PValues. The map is serialized and stored
* in the job config.
*/
public static Map<String, Map.Entry<String, String>> buildTransformIOMap(
Copy link
Contributor

@xinyuiscool xinyuiscool Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the json renderer class. This method has nothing to do with translation. Please also think about whether this needs a separate scan of pipeline or we can consolidate with the scan inside Json renderer.

Copy link
Contributor Author

@Sanil15 Sanil15 Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to JSON renderer, it still needs to be a separate scan of the pipeline using SamzaPipelineVisitor instead of a generic Beam PipelineVisitor which is used by the JSONRenderer reason being SamzaPipelineVisitor traverses the pipeline differently (does not enter some composite transforms I if they can be directly translated). The PValues Maps we need for MetricOp stuff at runtime need to be populated using this traversal logic from SamzaPipelineVisitor

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the clean up.

@xinyuiscool xinyuiscool merged commit 168577f into apache:master Apr 21, 2023
@Sanil15 Sanil15 deleted the transform-io-map branch April 24, 2023 18:15
Sanil15 added a commit to Sanil15/beam that referenced this pull request Apr 28, 2023
Sanil15 added a commit to linkedin/beam that referenced this pull request Apr 28, 2023
* Populate TransformIOMap as Config for Beam Samza Runner (apache#26276)

* Basic Opeartor Metric Support For Non Data Shuffle Operators for Samza Runner

* Add end to end test

* Minor nitpicks

* Update test version for internal release testing

* Minor Fixes
Sanil15 added a commit to Sanil15/beam that referenced this pull request May 22, 2023
Sanil15 added a commit to linkedin/beam that referenced this pull request May 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants