Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3287] Add Go support for universal runners, incl Flink #4888

Merged
merged 3 commits into from Mar 21, 2018

Conversation

herohde
Copy link
Contributor

@herohde herohde commented Mar 17, 2018

Extracted a runnerlib for runners that wish to use the portable model, but adds special logic or integration. Updated the portable URNs.

To run against a Flink runner, do from sdks/go:

$ go run examples/wordcount/wordcount.go --runner=flink --output=/tmp/foo --endpoint=localhost:3000

@herohde
Copy link
Contributor Author

herohde commented Mar 17, 2018

R: @lostluck @aljoscha

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exciting! I have largely documentation related comments, but otherwise looks good.

@@ -28,12 +28,12 @@ import (
const (
// Model constants

URNImpulse = "urn:beam:transform:impulse:v1"
URNImpulse = "beam:transform:impulse:v1"
URNParDo = "urn:beam:transform:pardo:v1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ParDo still be prefixed with "urn:" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that one is still the old way.

// limitations under the License.

// Package runnerlib contains utilities for submitting Go pipelines
// to a Beam model runner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle question & statement: Beam Model runner is what's we're going with for the name of portability runners long term? I like it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is an agreed upon name long-term. The code package is "universal", so I'm not even internally consistent in this PR :)

// Experiments are additional experiments.
Experiments []string

// TODO(herohde) 3/17/2018: add further parametrization as needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: parameterization

}

// Prepare prepares a job to the given endpoint. It returns an id and endpoint, if successful.
func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pb.Pipeline, opt *JobOptions) (string, string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function returns two parameters of the same type, it may be useful to use named parameters to be unambiguous about which returned value is which. eg (id, endpoint string, err error)
This is not advocating for naked returns being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

InternalJavaRunner string
}

// Prepare prepares a job to the given endpoint. It returns an id and endpoint, if successful.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may be able to improve this comment, as I'm having difficulty reconciling what's here, and in the implementation.

Knowing nothing about what PrepareJob request on the service does, but reading this implementation, it looks like the idea is to prepare the runner to receive the job, largely by getting a place to stage artifacts (like the worker) to run the job later.
So, it's preparing the service for the job. Is that right?
With that in mind, and looking at how the returns are used in this PR perhaps:

// Prepare prepares a given Beam model runner to receive a job. If successful, Prepare returns an id, and a URL
// endpoint where the runner expects job artifacts to be staged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Rephrased the description.

return resp.GetPreparationId(), resp.GetArtifactStagingEndpoint().GetUrl(), nil
}

// Submit submits a job to the given endpoint. It returns a jobID, if successful.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the comment on Prepare, we should probably change "given endpoint" here to "given Beam model runner" for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made them consistent.

"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
)

// Stage stages the worker binary and any additional content to the given endpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "content" be changed to "artifacts" for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to files, which is what this helper function accepts.

@aljoscha
Copy link
Contributor

I think I'm not really helpful since I don't really know Go but the structure of this looks good!

Did you actually try this against the hacking job server we have? What did you get?

 * Extracted a runnerlib for runners that wish to use the portable
   model, but adds special logic or integration.
@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

@aljoscha Thanks! Yes! That is how I'm testing this stuff. I'm currently hitting an issue with the artifacts on the Flink side:

$ go run examples/wordcount/wordcount.go --runner=flink --output=/tmp/foo --endpoint=localhost:3000
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
[{10: KV<int,string>/GW/KV<int[varintz],bytes>}]
2018/03/19 18:19:09 No container image specified. Using dev image: 'herohde-docker-apache.bintray.io/beam/go:latest'
2018/03/19 18:19:09 Prepared job with id: go-job-1521508749032988000_2100233503
2018/03/19 18:19:09 Cross-compiling /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go as /var/folders/s2/97strbs55_353t_t7r24yf94009w4s/T/beam-go-1521508749041446000
2018/03/19 18:19:25 Staged binary artifact with token: /private/tmp/beam-artifact-staging/go-job-1521508749032988000_2100233503
2018/03/19 18:19:25 Submitted job: go-job-1521508749032988000_2100233503_-2090543569
2018/03/19 18:19:25 Job state: RUNNING
2018/03/19 18:19:25 Job state: FAILED
2018/03/19 18:19:25 Failed to execute job: job go-job-1521508749032988000_2100233503_-2090543569 failed
exit status 1

Flink runner logs (info):

[...]
[grpc-default-executor-1] INFO org.apache.beam.runners.flink.FlinkRunner - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 1 files. Enable logging at DEBUG level to see which files will be staged.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkRunner - Executing pipeline using FlinkRunner.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkRunner - Translating pipeline to Flink program.
[grpc-default-executor-2] WARN org.apache.beam.runners.flink.FlinkJobInvocation - addMessageObserver() not yet implemented.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment - Using portability layer
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment - Creating the required Batch Execution Environment.
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator -  enterCompositeTransform- 
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator - |    visitPrimitiveTransform- Impulse
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator - |    visitPrimitiveTransform- 
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator - |    visitPrimitiveTransform- CoGBK
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator - |    visitPrimitiveTransform- 
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator - |    visitPrimitiveTransform- CoGBK
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkBatchPipelineTranslator -  leaveCompositeTransform- 
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkRunner - Registering pipeline artifacts in Flink program.
[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkRunner - Artifact registration failed
java.lang.IllegalMonitorStateException
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:350)
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:121)
	at org.apache.beam.runners.flink.FlinkJobInvocation.lambda$start$0(FlinkJobInvocation.java:61)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
	at org.apache.beam.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation go-job-1521508749032988000_2100233503_-2090543569.
java.lang.RuntimeException: Artifact registration failed
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:124)
	at org.apache.beam.runners.flink.FlinkJobInvocation.lambda$start$0(FlinkJobInvocation.java:61)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
	at org.apache.beam.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalMonitorStateException
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:350)
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:121)
	... 7 more

@aljoscha
Copy link
Contributor

@herohde We figured out what the problem is: this line

calls wait() while it should call get() on the future.

@axelmagn Is fixing it on his hacking-job-server branch.

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

@lostluck Thanks! PTAL

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

Thanks @aljoscha. I figured it was something simple.

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

Now I get:

[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkRunner - Artifact registration failed
java.lang.RuntimeException: Unexpected exception while writing artifact
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:358)
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:121)
	at org.apache.beam.runners.flink.FlinkJobInvocation.lambda$start$0(FlinkJobInvocation.java:61)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
	at org.apache.beam.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusException: INVALID_ARGUMENT: No such artifact ARTIFACT_default_worker
	at org.apache.beam.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500)
	at org.apache.beam.util.concurrent.AbstractFuture.get(AbstractFuture.java:459)
	at org.apache.beam.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:352)
	... 8 more
Caused by: io.grpc.StatusException: INVALID_ARGUMENT: No such artifact ARTIFACT_default_worker
	at io.grpc.Status.asException(Status.java:548)
	at org.apache.beam.artifact.local.LocalArtifactSource.getArtifact(LocalArtifactSource.java:54)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:350)
	... 8 more
Caused by: java.io.FileNotFoundException: /tmp/beam-artifact-staging/go-job-1521511453974075000_1141769330/artifacts/ARTIFACT_default_worker (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at org.apache.beam.artifact.local.LocalArtifactSource.getArtifact(LocalArtifactSource.java:42)
	... 9 more
[flink-runner-job-server] ERROR org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation go-job-1521511453974075000_1141769330_1342644097.
java.lang.RuntimeException: Artifact registration failed
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:124)
	at org.apache.beam.runners.flink.FlinkJobInvocation.lambda$start$0(FlinkJobInvocation.java:61)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
	at org.apache.beam.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
	at org.apache.beam.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Unexpected exception while writing artifact
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:358)
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:121)
	... 7 more
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusException: INVALID_ARGUMENT: No such artifact ARTIFACT_default_worker
	at org.apache.beam.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500)
	at org.apache.beam.util.concurrent.AbstractFuture.get(AbstractFuture.java:459)
	at org.apache.beam.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:352)
	... 8 more
Caused by: io.grpc.StatusException: INVALID_ARGUMENT: No such artifact ARTIFACT_default_worker
	at io.grpc.Status.asException(Status.java:548)
	at org.apache.beam.artifact.local.LocalArtifactSource.getArtifact(LocalArtifactSource.java:54)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.loadStagedArtifacts(FlinkPipelineExecutionEnvironment.java:350)
	... 8 more
Caused by: java.io.FileNotFoundException: /tmp/beam-artifact-staging/go-job-1521511453974075000_1141769330/artifacts/ARTIFACT_default_worker (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at org.apache.beam.artifact.local.LocalArtifactSource.getArtifact(LocalArtifactSource.java:42)
	... 9 more

It seems we're mangling the name. The correct file, named "worker", is indeed in this directory:

$ ls /tmp/beam-artifact-staging/go-job-1521511453974075000_1141769330/artifacts
worker

@axelmagn
Copy link
Contributor

Yes, you're right. This was a case of misapplied mangling. It should be fixed now:

axelmagn@48509fc

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

Tried to remove the demangling and using the go container and wordcount seems to work:

go run examples/wordcount/wordcount.go --runner=flink --output=/tmp/foo --endpoint=localhost:3000 --input=/etc/profile
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
[{10: KV<int,string>/GW/KV<int[varintz],bytes>}]
2018/03/20 10:29:06 No container image specified. Using dev image: 'herohde-docker-apache.bintray.io/beam/go:latest'
2018/03/20 10:29:06 Prepared job with id: go-job-1521566946548724000_69028718
2018/03/20 10:29:06 Cross-compiling /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go as /var/folders/s2/97strbs55_353t_t7r24yf94009w4s/T/beam-go-1521566946813592000
2018/03/20 10:29:24 Staged binary artifact with token: /private/tmp/beam-artifact-staging/go-job-1521566946548724000_69028718
2018/03/20 10:29:25 Submitted job: go-job-1521566946548724000_69028718_-50661279
2018/03/20 10:29:25 Job state: RUNNING
2018/03/20 10:29:35 Job state: DONE

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

@axelmagn Tested with your changes and it seems to work as well! Awesome!

beam.RegisterRunner("universal", Execute)
}

// Execute execute the pipeline on a universal beam runner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute executes...

beam.RegisterRunner("universal", Execute)
}

// Execute execute the pipeline on a universal beam runner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute executes...

beam.RegisterRunner("universal", Execute)
}

// Execute execute the pipeline on a universal beam runner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute executes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@herohde
Copy link
Contributor Author

herohde commented Mar 20, 2018

Thanks @lostluck! @aljoscha -- would you mind merging the code?

@lukecwik lukecwik merged commit c159a5f into apache:master Mar 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants