Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-79] add Gearpump runner #323

Closed
wants to merge 1 commit into from

Conversation

manuzhang
Copy link
Contributor

@manuzhang manuzhang commented May 11, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

This PR adds Gearpump runner to Beam meeting the goals of phase 1 in the design document.

The Gearpump runner supports the following functionalities,

  • Transformations: ParDo, GroupByKey, Flatten
  • Windows: using Beam's window logic and code
  • side outputs
  • serialization/deserialization: using Gearpump's Kryo serializer
  • sources: Beam's UnboundedSource
  • message delivery guarantee: at-most-once
  • tests: integration test for various translators

Here's a snapshot of running the following Beam example on Gearpump cluster

    PCollection<KV<String, Long>> wordCounts =
        p.apply(Read.from(new UnboundedTextSource()).named("WordStream"))
            .apply(ParDo.of(new ExtractWordsFn()))
            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
            .apply(Count.<String>perElement());

    wordCounts.apply(ParDo.of(new FormatAsStringFn()));

snip20160511_4

Note that the Gearpump runner is still in early stage and lacking capabilities like trigger, side inputs, aggregator. However, I'd like to have the community to get a feel of what Gearpump is like, whether Beam and Gearpump go well, and gather ideas for improvements.

@manuzhang manuzhang force-pushed the gearpump_runner branch 2 times, most recently from 9994ddf to a514092 Compare May 11, 2016 12:49
@jbonofre
Copy link
Member

Good job. Let me take a look and try it. Thanks @manuzhang !

@davorbonaci
Copy link
Member

Wow! We didn't expect it this fast!

We'll take a look shortly, but it might take us a few days to parse through.

R: @davorbonaci

@manuzhang manuzhang force-pushed the gearpump_runner branch 3 times, most recently from ed4a200 to b776af4 Compare May 12, 2016 05:25
@manuzhang
Copy link
Contributor Author

Gearpump requires Java 8 so the test matrix for Java 7 will fail.

@davorbonaci
Copy link
Member

Also, R: @kennknowles.

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>java-sdk-all</artifactId>
<version>${beam.version}</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can omit this version and let the parent <dependencyManagement> sections set it up.

@kennknowles
Copy link
Member

This is really cool!

One thing that you can do to get a lot of automatic feedback on your runner is set up the RunnableOnService tests as in #319 (Direct), #291 (Flink), #294 (Spark). This will also help to drive which test categories we need to split into, so you can explicitly exclude tests of capabilities that are still under development.

@manuzhang
Copy link
Contributor Author

@kennknowles Thanks, I'll go through all the stuff with your comments.

@kennknowles
Copy link
Member

kennknowles commented May 24, 2016

Sorry for the slow replies. Please feel free to send back thoughts or questions any time.

@manuzhang
Copy link
Contributor Author

@kennknowles updated most parts against your comments. It seems RunnableOnService tests use PAssert which requires support of sideInput. Unfortunately, sideInput is not yet supported in Gearpump runner so I have to disable the RunnableOnService test for now.

@manuzhang
Copy link
Contributor Author

@kennknowles any more comments ?

@kennknowles
Copy link
Member

Just a heads up: you are going to need to add a dependency on runners/core-java as we are in the process of moving utilities there.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-core-java</artifactId>
</dependency>

@manuzhang
Copy link
Contributor Author

@kennknowles Yes, I saw that. I'm trying out the new PAssert now.

@kennknowles
Copy link
Member

Let me know if there is anything else I can make easier here. Happy to chat, etc.

@manuzhang manuzhang force-pushed the gearpump_runner branch 2 times, most recently from f73393e to bbc2c9a Compare July 10, 2016 01:32
@manuzhang
Copy link
Contributor Author

manuzhang commented Jul 10, 2016

@kennknowles It seems some tests are still implemented with PCollectionViewAssert which requires side input (I excluded them). Another problem is I get tests timeout on travis while I can get them passed locally.

@kennknowles
Copy link
Member

Tests using PCollectionViewAssert: Filtering them out is the right thing to do. The way you have it is fine to get unblocked. It would be even better to tag specific test methods with @Category(StaticSideInputs.class) (from the capability matrix doc shared by @tgroh) and then use <excludedGroups> in your pom. This way you aren't eliminating the whole test class, just the method.

Test timeouts: It may be infeasible to run all these RunnableOnService tests in the presubmit and in Travis. For Dataflow, we run just a couple in the presubmit and then the rest as a post-submit Jenkins job. It probably makes sense to do this for many runners, so that is also no problem. Do you know whether it is just overheads or whether there is a real performance problem?

For right now, there's a RAT plugin failure in the runners-parent module. If we fix that, then then Jenkins should be willing to run your tests to completion even if it takes a long time.

From there, since traffic on the branch will be low enough, I think we can probably check it in to a feature branch and then set up the postsubmit job and try to see if we can speed up the tests.

@kennknowles
Copy link
Member

It looks like the RAT plugin was seeing files that needed to be cleaned up from another branch. We've just adjusted the Jenkins configuration so those files should be cleaned. Go ahead and force a rebuild via rebase and force push, or dummy commit, and we should see Jenkins succeed or timeout.

@manuzhang manuzhang force-pushed the gearpump_runner branch 2 times, most recently from 4341ee0 to c7e6f92 Compare July 12, 2016 04:38
@manuzhang
Copy link
Contributor Author

The travis build has errors as below which cannot be reproduced on my laptop.

java.lang.RuntimeException: Failed to construct instance from factory method TestGearpumpRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:236)
    at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:165)
    at org.apache.beam.sdk.runners.PipelineRunner.fromOptions(PipelineRunner.java:59)
    at org.apache.beam.sdk.testing.TestPipeline.fromOptions(TestPipeline.java:103)
    at org.apache.beam.sdk.testing.TestPipeline.create(TestPipeline.java:98)
    at org.apache.beam.sdk.transforms.CreateTest.testCreateTimestampedEmpty(CreateTest.java:261)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:168)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.junit.runners.Suite.runChild(Suite.java:127)
    at org.junit.runners.Suite.runChild(Suite.java:26)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
    at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
    at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
    at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
    at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
    at org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:161)
    at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:225)
    ... 41 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at akka.remote.Remoting.start(Remoting.scala:181)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:190)
    at akka.cluster.ClusterActorRefProvider.init(ClusterActorRefProvider.scala:31)
    at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:666)
    at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:663)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:663)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:679)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
    at org.apache.gearpump.cluster.embedded.EmbeddedCluster.start(EmbeddedCluster.scala:51)
    at org.apache.beam.runners.gearpump.TestGearpumpRunner.<init>(TestGearpumpRunner.java:40)
    at org.apache.beam.runners.gearpump.TestGearpumpRunner.fromOptions(TestGearpumpRunner.java:48)
    ... 45 more

@kennknowles
Copy link
Member

Curious. I can't say much about the details there. What was at the other end of the future that timed out when the embedded Gearpump cluster was starting up?

I'll pull and try it to see if my environment reproduces the issue.

@manuzhang
Copy link
Contributor Author

@kennknowles finally, travis is green.
I tried mvn clean apache-rat:check locally and the output is

1 Unknown Licenses

*******************************

Unapproved licenses:

  .github/PULL_REQUEST_TEMPLATE.md

*******************************

@kennknowles
Copy link
Member

I can confirm that failure. It should certainly succeed. I'm looking into it.

@kennknowles
Copy link
Member

From a quick search it looks like the failure of apache-rat:check is working as intended (for some definition of "intended") according to this thread which I found through this issue.

The fix suggested is to invoke the plugin only through mvn verify. But if that succeeded, we would expect Jenkins to succeed, since it is currently green outside this PR.

@kennknowles
Copy link
Member

I have identified two issues that may prevent you from finding the real problem:

  1. We have only configured the RAT plugin in the release profile, which Jenkins uses. We probably meant to only execute it in the release profile, but configuration should be global. So you can get the proper excludes on the command line with -Prelease. I will fix this separately.
  2. RAT includes defaults for IDEs to exclude things like Eclipse's .settings and .classpath but the exclude patterns are not correct in the presence of subprojects. I will also fix this.

When I fix these two issues locally, the remaining RAT failure is runners/gearpump/README.md. Obviously we want it. The issue is that your README.md needs to have the Apache license header in a comment. See the source code for any of the other README.md files.

I would fix this for you and merge to a feature branch, but since it is a licensing issue I think it best that you make the edits.

@manuzhang
Copy link
Contributor Author

@kennknowles Thanks for the thorough explanation. I think we are good to go this time.

@dhalperi
Copy link
Contributor

Boom! :shipit:

@kennknowles
Copy link
Member

Awesome!

asfgit pushed a commit that referenced this pull request Jul 20, 2016
@kennknowles
Copy link
Member

Merged to branch gearpump-runner. Since the PR is against master that didn't close it, but feel free to close any time.

@manuzhang
Copy link
Contributor Author

😄

@manuzhang manuzhang closed this Jul 20, 2016
dhalperi pushed a commit to dhalperi/beam that referenced this pull request Aug 23, 2016
iemejia pushed a commit to iemejia/beam that referenced this pull request Jan 12, 2018
pl04351820 pushed a commit to pl04351820/beam that referenced this pull request Dec 20, 2023
* changes without context

        autosynth cannot find the source of changes triggered by earlier changes in this
        repository, or by version upgrades to tools such as linters.

* docs: update python contributing guide

Adds details about blacken, updates version for system tests,
and shows how to pass through pytest arguments.

Source-Author: Chris Cotter <cjcotter@google.com>
Source-Date: Mon Feb 8 17:13:36 2021 -0500
Source-Repo: googleapis/synthtool
Source-Sha: 4679e7e415221f03ff2a71e3ffad75b9ec41d87e
Source-Link: googleapis/synthtool@4679e7e

* build(python): enable flakybot on library unit and system tests

Source-Author: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com>
Source-Date: Wed Feb 17 14:10:46 2021 -0700
Source-Repo: googleapis/synthtool
Source-Sha: d17674372e27fb8f23013935e794aa37502071aa
Source-Link: googleapis/synthtool@d176743

* test: install pyopenssl for mtls testing

Source-Author: arithmetic1728 <58957152+arithmetic1728@users.noreply.github.com>
Source-Date: Tue Mar 2 12:27:56 2021 -0800
Source-Repo: googleapis/synthtool
Source-Sha: 0780323da96d5a53925fe0547757181fe76e8f1e
Source-Link: googleapis/synthtool@0780323

Co-authored-by: Craig Labenz <craig.labenz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants