Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3079] add Samza runner #4340

Merged
merged 6 commits into from
Jan 26, 2018
Merged

Conversation

xinyuiscool
Copy link
Contributor

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

This PR adds Samza runner to BEAM. The overall design is here. The Samza runner supports most of the BEAM transformations, side input/output, and unbounded/bounded sources. The features not in the scope of this PR are:

  • Support for stateful DoFn
  • Support for splittable DoFn

Integration tests are verified by running mvn install -P local-validates-runner-tests.

@kennknowles : Please help us take a look. Thanks!

@kennknowles kennknowles self-requested a review January 3, 2018 18:57
@kennknowles kennknowles self-assigned this Jan 3, 2018
@kennknowles
Copy link
Member

Nice! I'll take a look. Have patience with my review - it is a pretty big PR :-)

@kennknowles
Copy link
Member

Reviewed 11 of 61 files at r1.
Review status: 11 of 61 files reviewed at latest revision, 4 unresolved discussions.


a discussion (no related file):
I made it through the basic first few files and everything looked fine. Continuing...


runners/pom.xml, line 48 at r1 (raw file):

    <module>apex</module>
    <module>gcp</module>
    <module>samza</module>

I want to note that we are currently dropping Java 7 support so that this can stay here. But it is worth seeing the java8 profile below where the gearpump runner module lives. Since this will go to a feature branch first and only later to master the switch to Java 8 will certainly be done by then.


runners/samza/.gitignore, line 1 at r1 (raw file):

/state/

Out of curiosity - where does this come from?


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

  void setSystemBufferSize(int consumerBufferSize);

  @Description("The maximum parallelism allowed for a given data source")

To make sure I understand this - it means that every Samza source will be set to this same max parallelism?


Comments from Reviewable

@xinyuiscool
Copy link
Contributor Author

Review status: 11 of 61 files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/pom.xml, line 48 at r1 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

I want to note that we are currently dropping Java 7 support so that this can stay here. But it is worth seeing the java8 profile below where the gearpump runner module lives. Since this will go to a feature branch first and only later to master the switch to Java 8 will certainly be done by then.

Thanks for letting me know.


runners/samza/.gitignore, line 1 at r1 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

Out of curiosity - where does this come from?

This is the folder for the RocksDb local state files of Samza processors. By default it will be created under the current folder, which is runners/samza/ when running the tests. In real deployment we set a Samza env var for the location.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

To make sure I understand this - it means that every Samza source will be set to this same max parallelism?

Right. This is similar to Flink's parallism, and it's used by all the sources in Samza. The subsequent Samza tasks will be created based on the actual splits returned from the source.

Do we allow user to set splits/parallism at individual source level? I am curious how other runners like dataflow split the sources.


Comments from Reviewable

@xinyuiscool
Copy link
Contributor Author

Review status: 11 of 61 files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, xinyuiscool wrote…

Right. This is similar to Flink's parallism, and it's used by all the sources in Samza. The subsequent Samza tasks will be created based on the actual splits returned from the source.

Do we allow user to set splits/parallism at individual source level? I am curious how other runners like dataflow split the sources.

One more question: for Samza, it'll be nice we know the partitions of a Unbounded/Bounded source if it is partitioned, e.g Kafka. With this I can split a source by its partition count by default and each partition will be a split. This is how Samza itself works today by default, so it's consistent with the current model. Seems right now I don't see such metadata being exposed in the general UnboundedSource/BoundedSource.


Comments from Reviewable

@kennknowles
Copy link
Member

Reviewed 49 of 61 files at r1, 1 of 1 files at r2.
Review status: all files reviewed at latest revision, 3 unresolved discussions, some commit checks failed.


a discussion (no related file):
@swegner before I merge this to a branch - plenty of time to do more - do you have any high level comments on the metrics bits?


runners/samza/.gitignore, line 1 at r1 (raw file):

Previously, xinyuiscool wrote…

This is the folder for the RocksDb local state files of Samza processors. By default it will be created under the current folder, which is runners/samza/ when running the tests. In real deployment we set a Samza env var for the location.

Perhaps SamzaPipelineOptions is a good place to manage this and set it up to be a tmpdir in local tests?


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, xinyuiscool wrote…

One more question: for Samza, it'll be nice we know the partitions of a Unbounded/Bounded source if it is partitioned, e.g Kafka. With this I can split a source by its partition count by default and each partition will be a split. This is how Samza itself works today by default, so it's consistent with the current model. Seems right now I don't see such metadata being exposed in the general UnboundedSource/BoundedSource.

@rangadi any comment on this? I think that the way it works it that the source itself will split into the default number. Dataflow used to do this during translation, which was not a great thing. Now splittable DoFn makes it necessary to do it dynamically.


Comments from Reviewable

<profiles>
<profile>
<id>local-validates-runner-tests</id>
<activation><activeByDefault>false</activeByDefault></activation>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is on a feature branch, let's set this to true

@kennknowles kennknowles changed the base branch from master to samza-runner January 18, 2018 19:41
@kennknowles
Copy link
Member

I've updated the samza-runner to match today's master branch, which is fairly green with minor flakes, and pointed this PR at that branch. So now the new test results, once you enable the ValidatesRunner tests in your pom, should be good signal.

@swegner
Copy link
Contributor

swegner commented Jan 18, 2018

Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


a discussion (no related file):

Previously, kennknowles (Kenn Knowles) wrote…

@swegner before I merge this to a branch - plenty of time to do more - do you have any high level comments on the metrics bits?

Thanks for the heads up. Can I take a look early next week or do you need feedback sooner?


Comments from Reviewable

@rangadi
Copy link
Contributor

rangadi commented Jan 18, 2018

Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

@rangadi any comment on this? I think that the way it works it that the source itself will split into the default number. Dataflow used to do this during translation, which was not a great thing. Now splittable DoFn makes it necessary to do it dynamically.

The UnboundedSource interface is somewhat similar to hadoop. split() method passes in 'desiredNumSplits' which is a hint from the runner to the source. The source can try to obey or return any number splits that makes sense for the source. Runner decides how to map those splits into its internal processing parallelism. In that sense does 'maxSourceParallism' directly influence 'desiredNumSplits'?

How it works in : Streaming applications in Dataflow: Its 'desiredNumSplits' is based on max number of cores the jobs is configured (due to autoscaling, actually number of cores might be fewer). If the source returns more splits than suggested, I think Dataflow tries to run all of the splits in parallel (need to check if there is a limit).


Comments from Reviewable

@kennknowles
Copy link
Member

The master branch has now switched to allowing Java 8, so you can move this out of the java8 profile in the pom.xml

@xinyuiscool
Copy link
Contributor Author

Rebased with master and samza is with all the other runners in pom.xml.


Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/samza/.gitignore, line 1 at r1 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

Perhaps SamzaPipelineOptions is a good place to manage this and set it up to be a tmpdir in local tests?

I agree, let me put up some changes for it.


runners/samza/pom.xml, line 40 at r2 (raw file):

Previously, kennknowles (Kenn Knowles) wrote…

While it is on a feature branch, let's set this to true

Sure, just committed the change.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, rangadi (Raghu Angadi) wrote…

The UnboundedSource interface is somewhat similar to hadoop. split() method passes in 'desiredNumSplits' which is a hint from the runner to the source. The source can try to obey or return any number splits that makes sense for the source. Runner decides how to map those splits into its internal processing parallelism. In that sense does 'maxSourceParallism' directly influence 'desiredNumSplits'?

How it works in : Streaming applications in Dataflow: Its 'desiredNumSplits' is based on max number of cores the jobs is configured (due to autoscaling, actually number of cores might be fewer). If the source returns more splits than suggested, I think Dataflow tries to run all of the splits in parallel (need to check if there is a limit).

@rangadi you're right about maxSourceParallism, which is used to set the "desiredNumSplits" so the splits are bounded by this number. In samza, this will decide the number of tasks for a job.

What I was saying (I think Kenneth meant the same) is that it'll be super nice that BEAM provides an API for the source to split into a default number, like the number of partitions in Kafka. So as a user, he doesn't need to bother finding out the number of each input kafka topic parittions when he runs the job. For LinkedIn, this is very valuable since a Kafka topic might have different partitions in different fabrics, so a default will be very helpful to the users. That's also the behavior in Samza today if the user doesn't provide a customized grouper of partitions: each partition becomes a task (or a split in this case).


Comments from Reviewable

@rangadi
Copy link
Contributor

rangadi commented Jan 25, 2018

Review status: 0 of 77 files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, xinyuiscool wrote…

@rangadi you're right about maxSourceParallism, which is used to set the "desiredNumSplits" so the splits are bounded by this number. In samza, this will decide the number of tasks for a job.

What I was saying (I think Kenneth meant the same) is that it'll be super nice that BEAM provides an API for the source to split into a default number, like the number of partitions in Kafka. So as a user, he doesn't need to bother finding out the number of each input kafka topic parittions when he runs the job. For LinkedIn, this is very valuable since a Kafka topic might have different partitions in different fabrics, so a default will be very helpful to the users. That's also the behavior in Samza today if the user doesn't provide a customized grouper of partitions: each partition becomes a task (or a split in this case).

Thanks. I am not sure if I you would like me to address any specific question. Please let me know. If the purpose is to let the user set number of tasks for a job, may be the option could be named '(max)NumberOfTasks'. I am just thinking aloud with little overall context.


Comments from Reviewable

@kennknowles
Copy link
Member

:lgtm: Confirmed locally:

mvn install -pl runners/samza --also-make -DskipTests -Dcheckstyle.skip -Dfindbugs.skip -Drat.skip -Dmdeps.analyze.skip
mvn verify -P local-validates-runner-tests -pl runners/samza/

I think this is definitely ready to go in.


Reviewed 3 of 64 files at r3, 11 of 11 files at r4, 2 of 2 files at r5, 61 of 61 files at r6.
Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


runners/samza/pom.xml, line 40 at r2 (raw file):

Previously, xinyuiscool wrote…

Sure, just committed the change.

Ah, I realized that since the Jenkins job has narrowed its scope, it will not pick this up. I will run myself and then we will put it into the gradle build later.


runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java, line 46 at r1 (raw file):

Previously, rangadi (Raghu Angadi) wrote…

Thanks. I am not sure if I you would like me to address any specific question. Please let me know. If the purpose is to let the user set number of tasks for a job, may be the option could be named '(max)NumberOfTasks'. I am just thinking aloud with little overall context.

If I understand everything correctly, things work in the best way: you don't need this pipeline option because KafkaIO knows how to split itself. Is that not true? It should be true.

For user-defined ParDo steps, some runners that cannot dynamically scale may need a configuration to add specific parallelism for given steps. But for sources if you want it to split into an "appropriate" number then I think that should "just work".

By the way this is not a blocker for getting this onto a branch. It will be better to review specific pull requests on focused changes.


Comments from Reviewable

@kennknowles
Copy link
Member

Can you fix this:

2018-01-23T00:18:40.114 [WARNING] Files with unapproved licenses:
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestSourceHelpers.java
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
  runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java

@xinyuiscool
Copy link
Contributor Author

@kennknowles : Added the licenses. Thanks!

@kennknowles
Copy link
Member

OK, the maven failure is legitimate due to findbugs. We can fix it on the feature branch, actually. It will be nicer to do them one at a time, and we should first integrate to gradle.

@kennknowles kennknowles merged commit 1840a58 into apache:samza-runner Jan 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants