Skip to content

Conversation

@chadrik
Copy link
Contributor

@chadrik chadrik commented Jul 1, 2019

This adds a GetPipeline method to the gRPC JobService and implements it for InMemoryJobService. The method is quite similar to the existing GetState method in design/behavior.

As described in the Jira issue, I see this feature as a useful first step towards making the Job API capable of serving as a backend to a web app for submitting, viewing, and monitoring Beam jobs. The next step after this PR would be to add a GetJobs method that sends a summary of all of the active jobs and their ids.

Remaining issues:

  • go: I have not implemented anything for the go sdk yet. I'm still trying to figure out which version of protoc I need to use with go generate: 3.6 and 3.7 produced significant diffs, with a lot of style changes, so I assume it's something older. Any help here would be greatly appreciated.
  • tests: I've looked at the related tests for Java and Python and they don't seem to be testing the rpc service endpoints. The python tests in portable_runner_test seem to be largely starting the service and shutting it down in different ways, without actually running any test methods. Basically what I'm saying is I don't see a clear framework for testing RPC methods in any language, but feel free to point me in the right direction.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@chadrik
Copy link
Contributor Author

chadrik commented Jul 1, 2019

R: @herohde
R: @lukecwik

@chadrik
Copy link
Contributor Author

chadrik commented Jul 6, 2019

R: @angoenka

responseObserver.onCompleted();
} catch (Exception e) {
String errMessage =
String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
String.format("Encountered unexpected exception for invocation %s", invocationId);

Copy link
Contributor Author

@chadrik chadrik Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this exact phrase (including capitalization) is present in 6 other messages in this file.

} catch (Exception e) {
String errMessage =
String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
LOG.error(errMessage, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SLF4J allows you to do

LOG.error(Encountered unexpected exception for invocation {}", invocationId, e); 

instead of using String.format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this pattern is repeated in 5 other places in this file. I was just trying to stay consistent.

If you don't like the style used in the file, I would recommend addressing them all at once in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

String invocationId = request.getJobId();
try {
JobInvocation invocation = getInvocation(invocationId);
RunnerApi.Pipeline pipeline = invocation.getPipeline();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't handling the NOT_FOUND case as described in the API contract in the proto.

Copy link
Contributor Author

@chadrik chadrik Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the API contract from getState, and handled the error case in exactly the same way, so if it's not correct for getPipeline, that's because it's not correct for getState or the other 4 methods that use this same pattern.

In case of error it's responding with the exception:

catch (Exception e) {
  ...
  responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}

I'm new to Java, so I just assumed this was responding with NOT_FOUND in the case that getInvocation(invocationId) failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are handling the exception but are doing it incorrectly since getInvocation() throws a Status.NOT_FOUND exception but since we are catching it and then rethrowing it as an INTERNAL exception we are losing the NOT_FOUND.

The logic needs to be updated to be:

try {
  ... do stuff ...
} catch (StatusException e) {
  responseObserver.onError(e);
} catch (Exception e) {
  ... original exception handling that converts the status to INTERNAL ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll make a separate issue in Jira for this, since it's technically a breaking change, and I'll follow up with another PR with your suggestion, and the style fixes (if you don't mind me lumping them together).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider it a bugfix since contractually we said we were going to do X and did Y and not a breaking change.

I'm fine with you lumping it in or not.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add tests to InMemoryJobServiceTest using the mocks that exist there or migrate to spinning up a test gRPC service/client using an inprocess servier/client as described in https://stackoverflow.com/questions/37552468/how-to-unit-test-grpc-java-server-implementation-functions

@chadrik
Copy link
Contributor Author

chadrik commented Jul 9, 2019

Added tests.

@chadrik
Copy link
Contributor Author

chadrik commented Jul 10, 2019

I think this PR is ready to go. I'm not sure if the Python PreCommit test failure is real or not. I clicked on details but Jenkins says there are no failures.

@lukecwik
Copy link
Member

Run Python PreCommit


def GetPipeline(self, request, context=None):
return beam_job_api_pb2.GetJobPipelineResponse(
state=self._jobs[request.job_id]._pipeline_proto)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state=self._jobs[request.job_id]._pipeline_proto)
pipeline=self._jobs[request.job_id]._pipeline_proto)

asfgit pushed a commit that referenced this pull request Jul 10, 2019
@lukecwik
Copy link
Member

I merged this with a fixup addressing my last comment: 05dba6a

@lukecwik lukecwik closed this Jul 10, 2019
@chadrik
Copy link
Contributor Author

chadrik commented Jul 10, 2019

thanks!

@chadrik
Copy link
Contributor Author

chadrik commented Jul 11, 2019

I created an issue for the proto contract: https://issues.apache.org/jira/browse/BEAM-7720

@chadrik
Copy link
Contributor Author

chadrik commented Jul 13, 2019

Note, I just realized that I did not update the go stubs... Is there a reason that the generated code is committed to the repo for go, but not for python and java?

@lukecwik
Copy link
Member

It may be because of how Go modules have dependencies on other Go modules and since they are statically compiled it may require those Go modules to perform the proto generation on our behalf to perform the compilation but I'm not exactly certain. @lostluck any details you can provide here?

@lostluck
Copy link
Contributor

Nit: beam isn't yet using Go Modules. Go calls the unit of dependency packages.

Conventionally, the go tooling doesn't do generation at compile time. The main reason is there are ample tools to do that externally to the go tooling (eg. Gradle, bazel, make, etc) that any end user can set that up should they desire it.
Further, generated code is for the package authors, not the end user, and should be commitable.

The impedance mismatch is that beam is a mid-level library. So the proto code needs to be generated with go generate, and committed, because of the above conventions. Eg. Beam devs may use Gradle, but end go programmers will probably not, not can we mandate it.

We haven't been rigorous about what version of protoc and the go proto packages to use, hence the piles of differences. As long as the external proto API doesn't change in a backwards incompatible way and the integration tests against flink and spark work, I'm not bothered about what the generated code looks like until we can ensure people can use consistent versions to generate the code, with Go Modules.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants