Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10435] Add ValidatesRunner task for local_job_service and Java SDK harness #11792

Merged
merged 6 commits into from Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1888,7 +1888,9 @@ class BeamModulePlugin implements Plugin<Project> {
// For some reason base doesn't define a test task so we define it below and make
// check depend on it. This makes the Python project similar to the task layout like
// Java projects, see https://docs.gradle.org/4.2.1/userguide/img/javaPluginTasks.png
project.task('test') {}
if (project.tasks.findByName('test') == null) {
project.task('test') {}
}
project.check.dependsOn project.test

project.evaluationDependsOn(":runners:google-cloud-dataflow-java:worker")
Expand Down
Expand Up @@ -39,7 +39,7 @@
public class ArtifactRetrievalService
extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements FnService {

public static final int DEFAULT_BUFFER_SIZE = 4 << 20; // 4 MB
public static final int DEFAULT_BUFFER_SIZE = 2 << 20; // 2 MB
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb @lukecwik first thing I hit putting this together was exceeding message size limit


public static final String FILE_ARTIFACT_URN = "beam:artifact:type:file:v1";
public static final String URL_ARTIFACT_URN = "beam:artifact:type:url:v1";
Expand Down
162 changes: 162 additions & 0 deletions runners/portability/java/build.gradle
@@ -1,3 +1,5 @@
import groovy.json.JsonOutput

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -18,11 +20,16 @@

plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.runners.portability', enableChecker:false)
applyPythonNature()

description = "Apache Beam :: Runners :: Portability :: Java"
ext.summary = """A Java implementation of the Beam Model which utilizes the portability
framework to execute user-definied functions."""

configurations {
validatesRunner
}

dependencies {
compile library.java.vendored_guava_26_0_jre
compile library.java.hamcrest_library
Expand All @@ -31,9 +38,164 @@ dependencies {
compile project(path: ":sdks:java:harness", configuration: "shadow")
compile library.java.vendored_grpc_1_26_0
compile library.java.slf4j_api

testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.slf4j_jdk14

validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(path: project.path, configuration: "testRuntime")
}


project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":runners:core-java")

ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"

void execInVirtualenv(String... args) {
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
exec {
workingDir pythonSdkDir
commandLine "sh", "-c", shellCommand
}
}

// Does not background the process, but allows the process to daemonize itself
void execBackgroundInVirtualenv(String... args) {
kennknowles marked this conversation as resolved.
Show resolved Hide resolved
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
println "execBackgroundInVirtualEnv: ${shellCommand}"
ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
Process proc = pb.start();

// redirectIO does not work for connecting to groovy/gradle stdout
BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
kennknowles marked this conversation as resolved.
Show resolved Hide resolved
String line
while ((line = reader.readLine()) != null) {
println line
}
proc.waitFor();
}

task startLocalJobService {
dependsOn setupVirtualenv

doLast {
execBackgroundInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--background",
"--stdout_file=${localJobServiceStdoutFile}",
"--pid_file=${localJobServicePidFile}",
"--port_file=${localJobServicePortFile}"
kennknowles marked this conversation as resolved.
Show resolved Hide resolved
}
}

task stopLocalJobService {
doLast {
execInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--stop",
"--pid_file=${localJobServicePidFile}"
}
}

startLocalJobService.finalizedBy stopLocalJobService

/**
* Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
* with subprocess SDK harness environments.
*/
task ulrValidatesRunnerTests(type: Test) {
dependsOn ":sdks:java:container:docker"

if (!project.hasProperty("localJobServicePortFile")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very convenient.

dependsOn startLocalJobService
}

group = "Verification"
description "PortableRunner Java subprocess ValidatesRunner suite"
classpath = configurations.validatesRunner
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestUniversalRunner",
"--experiments=beam_fn_api",
"--localJobServicePortFile=${localJobServicePortFile}"
])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider passing --defaultEnvironmentType=LOOPBACK. You can then remove the docker dependency as well. (Maybe we could run one test with docker, but all of them seems overkill and expensive.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather specifically want the docker dependency, to have a test of the true Java SDK harness container without the complexity of a production runner. But that can be postcommit and if LOOPBACK is faster and easier to debug that's good for precommit. I'd like to leave as-is to avoid churning this PR, but will follow up and create a LOOPBACK version prior to creating any Jenkins job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I don't think it makes sense to run every test to verify the container/harness setup works (this is reminiscent of other threads) but definitely agree these choices can be postponed while we get this PR in.

testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
}
filter {
// There is not currently a category for excluding these _only_ in committed mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create JIRAs for these (at whatever granularity seems appropriate) and add them here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also added a new Jira component runner-universal since I did not find one, in case there's a need to search for these.

// https://issues.apache.org/jira/browse/BEAM-10445
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics'
// https://issues.apache.org/jira/browse/BEAM-10446
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics'

// This test seems erroneously labeled ValidatesRunner
excludeTestsMatching 'org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy'

// Teardown not called in exceptions
// https://issues.apache.org/jira/browse/BEAM-10447
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'

// Only known window fns supported, not general window merging
// https://issues.apache.org/jira/browse/BEAM-10448
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing'

// Flatten with empty PCollections hangs
// https://issues.apache.org/jira/browse/BEAM-10450
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty'
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo'

// Empty side inputs hang
// https://issues.apache.org/jira/browse/BEAM-10449
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputFixedToFixedWithDefault'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyIterableSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyListSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInputWithNonDeterministicKeyCoder'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInputWithNonDeterministicKeyCoder'

// Misc failures
// https://issues.apache.org/jira/browse/BEAM-10451
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers'
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombine'
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombineWithContext'

// https://issues.apache.org/jira/browse/BEAM-10454
excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo'

// https://issues.apache.org/jira/browse/BEAM-10453
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming'

// https://issues.apache.org/jira/browse/BEAM-10452
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
}
}

stopLocalJobService.mustRunAfter ulrValidatesRunnerTests

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.portability.testing;

import static org.hamcrest.MatcherAssert.assertThat;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.beam.runners.portability.PortableRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. */
public class TestUniversalRunner extends PipelineRunner<PipelineResult> {

private static final Logger LOG = LoggerFactory.getLogger(TestUniversalRunner.class);

private final PipelineOptions options;

private TestUniversalRunner(PipelineOptions options) {
this.options = options;
}

/**
* Constructs a runner from the provided options.
*
* @param options Properties which configure the runner.
* @return The newly created runner.
*/
public static TestUniversalRunner fromOptions(PipelineOptions options) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to add this, because TestPortableRunner couples "check that the job succeeds" logic with a bunch of other things having to do with launching an existing Java runner as a portable runner, not relevant to actual portable runner services.

return new TestUniversalRunner(options);
}

@Override
public PipelineResult run(Pipeline pipeline) {
Options testOptions = options.as(Options.class);
if (testOptions.getLocalJobServicePortFile() != null) {
String localServicePortFilePath = testOptions.getLocalJobServicePortFile();
try {
testOptions.setJobEndpoint(
"localhost:"
+ new String(
Files.readAllBytes(Paths.get(localServicePortFilePath)), Charsets.UTF_8)
.trim());
} catch (IOException e) {
throw new RuntimeException(
String.format("Error reading local job service port file %s", localServicePortFilePath),
e);
}
}

PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
portableOptions.setRunner(PortableRunner.class);
PortableRunner runner = PortableRunner.fromOptions(portableOptions);
PipelineResult result = runner.run(pipeline);
assertThat(
"Pipeline did not succeed.",
result.waitUntilFinish(),
Matchers.is(PipelineResult.State.DONE));
return result;
}

public interface Options extends TestPipelineOptions, PortablePipelineOptions {
/**
* A file containing the job service port, since Gradle needs to know this filename statically
* to provide it in Beam testing options.
*
* <p>This option conflicts with {@link #getJobEndpoint()}. This option will override the job
* endpoint to be localhost at the port specified in the file.
*/
@Description("File containing local job service port.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, it would make sense to let this be optional (e.g. one could instead provide jobEndpoint directly). A point could be made that the testing infrastructure should be the one reading the file and setting jobEndpoint, rather than passing the file path as an option (but I don't know how much messier that'd make things).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this sort of thing is what took 90% of the time for this PR actually. Scraping around Gradle's docs and the internet for ways to insert that little bit of logic, because realizing it was sort of against the grain. Pipeline options are passed as a Java system property, and those are set up in the Gradle graph construction phase. More generally, there's not a Gradle graph execution-time slot for free-form code that also re-uses the Test task type. Perhaps they expect you to use inheritance and make a new Task type. Which I would rather not do ;_;

It would be fine to have two pipeline options, so that simple use could be simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the plain jobEndpoint option, just make (re)setting it conditional on LocalJobServicePortFile being set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

String getLocalJobServicePortFile();

void setLocalJobServicePortFile(String endpoint);
}

/** Register {@link Options}. */
@AutoService(PipelineOptionsRegistrar.class)
public static class OptionsRegistrar implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.of(Options.class);
}
}

/** Registrar for the portable runner. */
@AutoService(PipelineRunnerRegistrar.class)
public static class RunnerRegistrar implements PipelineRunnerRegistrar {

@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.of(TestUniversalRunner.class);
}
}
}
Expand Up @@ -4126,7 +4126,7 @@ public void onTimer(OutputReceiver<String> r, PipelineOptions options) {
}

@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
public void duplicateTimerSetting() {
TestStream<KV<String, String>> stream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
Expand Down
Expand Up @@ -65,6 +65,11 @@ def run(argv):
action='store_true',
help='Start the server up as a background process.'
' Will fail if pid_file already exists, unless --stop is also specified.')
parser.add_argument(
'--stderr_file',
help='Where to write stderr (if not specified, merged with stdout).')
parser.add_argument(
'--stdout_file', help='Where to write stdout for background job service.')
parser.add_argument(
'--stop',
action='store_true',
Expand Down Expand Up @@ -99,11 +104,23 @@ def run(argv):
options.port_file = os.path.splitext(options.pid_file)[0] + '.port'
argv.append('--port_file')
argv.append(options.port_file)

if not options.stdout_file:
raise RuntimeError('--stdout_file must be specified with --background')
stdout_dest = open(options.stdout_file, mode='w')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mentioned this before, but is it an issue that these file descriptors might get closed on completion of this process?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think my rebasing clobbered that thread. It is not an issue. Parent file descriptors are not closed. You can find some links I think on the PR front page it will still have the prior conversation.

(I won't rebase from here on out, until review is done)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. My comment was from something like a mont ago. Thanks for the references.


if options.stderr_file:
stderr_dest = open(options.stderr_file, mode='w')
else:
stderr_dest = subprocess.STDOUT

subprocess.Popen([
sys.executable,
'-m',
'apache_beam.runners.portability.local_job_service_main'
] + argv)
] + argv,
stderr=stderr_dest,
stdout=stdout_dest)
print('Waiting for server to start up...')
while not os.path.exists(options.port_file):
time.sleep(.1)
Expand Down