New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4176] Initial implementation for running portable runner tests #5935
Conversation
Note that the validate runner tests are not yet passing for flink and will have to be debugged separately |
Wow, nice!!! (didn't look at code yet) Can we start with ULR then? CC: @youngoli |
Yes, though I have not plugged in the gradle task for ULR but it should just follow the Flink task. |
verifyPAssertsSucceeded requires support for metrics in pipelineResult which is causing almost all the tests to fail. |
Ah, yes. I had forgotten about that. That will cause problems for us unless we somehow rewrite PAssert to not depend on metrics if we're running portably. I guess a pipeline option could do it. |
4d2d949
to
36a6595
Compare
JIRA? |
The tests are not passing yet but that seems to be a separate issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does validatesRunner get run as part of any automated framework? I'm happy to merge this and then iterate on getting tests to pass, but I worry about putting any automated tests into a red state.
@Option(name = "--job-host", required = true, usage = "The job server host string") | ||
/** Configuration for the jobServer. */ | ||
public static class ServerConfiguration { | ||
@Option(name = "--job-host", usage = "The job server host string") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we no longer requiring this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ports can be chosen dynamically if not provided.
This help facilitate launch of multiple jobServices on the same machine by finding an unused port.
Finding and managing unused ports reliably is surprisingly difficult. The major challenge is that once you pick a port without using it, it can be picked and used by some other process without your knowledge.
Please let me know of any other ways/library to get dynamic unused ports in tests case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. that makes sense.
I have no idea about other ways to manage unused ports (short of writing our own manager), so I think you're right that this is the way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a local port? If so, it's quite easy to do in Java by allocating a new TCP server socket at port 0. You can then query it for the chosen port. This works as long as you are able to pass that socket directly to the service implementation. (Note that you can do this with gRPC and we have a Beam-specific tool that does this for you).
If you're trying to find an unused port for a non-Java server or for a server that will run separately (i.e., for which you do not directly control the socket), you can use the same server allocation trick and note the chosen port number. You can then kill the server socket and pass that port into your new server's configuration. Note that doing this approach introduces a race condition but will work in most cases (especially with retries).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems to be possible.
Comparing the approach
- Optional port and query the port back
- Passing a socket to the flink driver
I feel option 1 seems to be simpler while keeping initialization flexible.
36a6595
to
9e1ff24
Compare
Ping for the review! |
boolean streaming | ||
} | ||
|
||
def createPortableValidatesRunnerTask(Map m) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use a lambda and the implicit it
parameter as it allows you to call the method without any paramemters or with parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG.
I am not very sure with the semantics so please take a look again.
} | ||
} | ||
|
||
createPortableValidatesRunnerTask(name: "validatesPortableRunner", jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", jobServerConfig: "[]", streaming: false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you expect jobServerDriver
to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for Flink.
But I want to move it to createPortableValidatesRunnerTask
reference runner project so that it can be reused. I am not sure how to do it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll want to move it into the BeamModulePlugin.groovy and expose it like applyJavaNature and the other methods there. This can happen in a follow up PR.
@@ -45,3 +59,55 @@ runShadow { | |||
// Enable remote debugging. | |||
jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] | |||
} | |||
|
|||
class PortableValidatesRunnerConfig { | |||
String name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments for what each of these configuration properties do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
* the following projects are evaluated before we evaluate this project. This is because | ||
* we are attempting to reference the "sourceSets.test.output" directly. | ||
*/ | ||
evaluationDependsOn(":beam-runners-flink_2.11") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You reference :beam-sdks-java-core
and :beam-runners-core-java
sourceSets.test.output directly and not :beam-runners-flink_2.11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
dependencies { | ||
compile project(path: ":beam-runners-flink_2.11", configuration: "shadow") | ||
validatesRunner project(path: ":beam-runners-flink_2.11", configuration: "shadowTest") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to add :beam-sdks-java-core / shadowTest
and :beam-runners-core-java / shadowTest
as a validatesRunner
dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
"ArtifactStagingServer stopped on {}", jobServer.getApiServiceDescriptor().getUrl()); | ||
artifactStagingServer = null; | ||
} catch (Exception e) { | ||
LOG.error("Error while closing the artifactStagingServer."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log the error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
@Required | ||
@Description( | ||
"Fully qualified class name of TestJobServiceDriver capable of managing the JobService.") | ||
String getJobServerDriver(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be of type Class, PipelineOptionsFactory will convert string -> Class for you automatically during parsing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
void setJobServerDriver(String jobServerDriver); | ||
|
||
@Description("String containing comma separated arguments for the JobServer.") | ||
String getJobServerConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PipelineOptionsFactory supports parsing List or String[] automatically. There is no need to do this yourself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
* {@link TestPortableRunner} is a pipeline runner that wraps a {@link PortableRunner} when running | ||
* tests against the {@link TestPipeline}. | ||
* | ||
* <p>This runner requires a JobServerDriver with following methods. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the FlinkJobServerDriver implement a new interface TestPortableRunner.JobServerDriver
which exposes these methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will Take it up in subsequent diff as there is some class dependency issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the issue is that TestPortableRunner shouldn't be part of the reference runner subproject but in a place where every runner could refer to it. SGTM as a change in a subsequent PR.
try { | ||
jobServerDriver = | ||
jobServerDriverClass | ||
.getMethod("fromParams", String[].class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
I don't have comments on this change yet, but wanted to just mention this prototype of an ArtifactStagingService that:
It doesn't help the main pain point here (re-staging ~200MB of artifacts for each test case) because it doesn't work across artifact-server instances, but that's the next extension I'm planning to look into. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Luke.
Applied the review comments.
* the following projects are evaluated before we evaluate this project. This is because | ||
* we are attempting to reference the "sourceSets.test.output" directly. | ||
*/ | ||
evaluationDependsOn(":beam-runners-flink_2.11") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
dependencies { | ||
compile project(path: ":beam-runners-flink_2.11", configuration: "shadow") | ||
validatesRunner project(path: ":beam-runners-flink_2.11", configuration: "shadowTest") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
@@ -45,3 +59,55 @@ runShadow { | |||
// Enable remote debugging. | |||
jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] | |||
} | |||
|
|||
class PortableValidatesRunnerConfig { | |||
String name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
boolean streaming | ||
} | ||
|
||
def createPortableValidatesRunnerTask(Map m) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG.
I am not very sure with the semantics so please take a look again.
} | ||
} | ||
|
||
createPortableValidatesRunnerTask(name: "validatesPortableRunner", jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", jobServerConfig: "[]", streaming: false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for Flink.
But I want to move it to createPortableValidatesRunnerTask
reference runner project so that it can be reused. I am not sure how to do it though.
@Required | ||
@Description( | ||
"Fully qualified class name of TestJobServiceDriver capable of managing the JobService.") | ||
String getJobServerDriver(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
void setJobServerDriver(String jobServerDriver); | ||
|
||
@Description("String containing comma separated arguments for the JobServer.") | ||
String getJobServerConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
* <ul> | ||
* <li>public static Object fromParams(String... params) | ||
* <li>public String start() // Start JobServer and returns the JobServer host and port. | ||
* <li>public void start() // Stop the JobServer and free all resources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
try { | ||
jobServerDriver = | ||
jobServerDriverClass | ||
.getMethod("fromParams", String[].class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
* {@link TestPortableRunner} is a pipeline runner that wraps a {@link PortableRunner} when running | ||
* tests against the {@link TestPipeline}. | ||
* | ||
* <p>This runner requires a JobServerDriver with following methods. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will Take it up in subsequent diff as there is some class dependency issue.
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, made some minor changes directly. Will wait for tests to be green to confirm my edits were ok and then will merge.
Thanks Luke. |
Basic idea is to start a job server using a JobServerDriver which can be initialized with provided parameters.
TestPortableRunner will start a new instance of JobServer for every test and then run the test on it.
I have kept the maxParallelForks to 1 for now as i want to first make sure that all the tests pass with no concurrency.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)