From e7254e2c28f2d433a0d2ccc16cf79db611e108f6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 9 Jul 2020 14:19:22 -0700 Subject: [PATCH] Add validatesRunner suite for local job service --- runners/portability/java/build.gradle | 160 ++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle index b595325fde59a..b9de3f3b73a31 100644 --- a/runners/portability/java/build.gradle +++ b/runners/portability/java/build.gradle @@ -1,3 +1,5 @@ +import groovy.json.JsonOutput + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,11 +20,16 @@ plugins { id 'org.apache.beam.module' } applyJavaNature(automaticModuleName: 'org.apache.beam.runners.portability', enableChecker:false) +applyPythonNature() description = "Apache Beam :: Runners :: Portability :: Java" ext.summary = """A Java implementation of the Beam Model which utilizes the portability framework to execute user-definied functions.""" +configurations { + validatesRunner +} + dependencies { compile library.java.vendored_guava_26_0_jre compile library.java.hamcrest_library @@ -31,9 +38,162 @@ dependencies { compile project(path: ":sdks:java:harness", configuration: "shadow") compile library.java.vendored_grpc_1_26_0 compile library.java.slf4j_api + testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime") testCompile library.java.hamcrest_core testCompile library.java.junit testCompile library.java.mockito_core testCompile library.java.slf4j_jdk14 + + validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") + validatesRunner project(path: ":runners:core-java", configuration: "testRuntime") + validatesRunner project(path: project.path, configuration: "testRuntime") +} + + +project.evaluationDependsOn(":sdks:java:core") +project.evaluationDependsOn(":runners:core-java") + +ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid" +ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port" +ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout" + +void execInVirtualenv(String... args) { + String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") + exec { + workingDir pythonSdkDir + commandLine "sh", "-c", shellCommand + } +} + +// Does not background the process, but allows the process to daemonize itself +void execBackgroundInVirtualenv(String... args) { + String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ") + println "execBackgroundInVirtualEnv: ${shellCommand}" + ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand]) + Process proc = pb.start(); + + // redirectIO does not work for connecting to groovy/gradle stdout + BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); + String line + while ((line = reader.readLine()) != null) { + println line + } + proc.waitFor(); +} + +task startLocalJobService { + dependsOn setupVirtualenv + + doLast { + execBackgroundInVirtualenv "python", + "-m", "apache_beam.runners.portability.local_job_service_main", + "--background", + "--stdout_file=${localJobServiceStdoutFile}", + "--pid_file=${localJobServicePidFile}", + "--port_file=${localJobServicePortFile}" +// +// File pidFile = new File(localJobServicePidFile) +// int totalSleep = 0 +// while (!pidFile.exists()) { +// sleep(500) +// totalSleep += 500 +// if (totalSleep > 5000) { +// throw new RuntimeException("Local job service pid file never showed up"); +// } +// } + } } + +task stopLocalJobService { + doLast { + execInVirtualenv "python", + "-m", "apache_beam.runners.portability.local_job_service_main", + "--stop", + "--pid_file=${localJobServicePidFile}" + } +} + +startLocalJobService.finalizedBy stopLocalJobService + +/** + * Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main + * with subprocess SDK harness environments. + */ +task ulrValidatesRunnerTests(type: Test) { + dependsOn ":sdks:java:container:docker" + + if (!project.hasProperty("localJobServicePortFile")) { + dependsOn startLocalJobService + } + + group = "Verification" + description "PortableRunner Java subprocess ValidatesRunner suite" + classpath = configurations.validatesRunner + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=TestUniversalRunner", + "--experiments=beam_fn_api", + "--localJobServicePortFile=${localJobServicePortFile}" + ]) + testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) + useJUnit { + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' + excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' + excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' + excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages' + excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo' + } + filter { + // There is not currently a category for excluding these _only_ in committed mode + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics' + excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics' + + // This test seems erroneously labeled ValidatesRunner + excludeTestsMatching 'org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy' + + // Teardown not called in exceptions + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful' + + // Only known window fns supported, not general window merging + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows' + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection' + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing' + excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing' + + // Flatten with empty PCollections hangs + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty' + excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo' + + // Empty side inputs hang + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputFixedToFixedWithDefault' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyIterableSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyListSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInputWithNonDeterministicKeyCoder' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInput' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInputWithNonDeterministicKeyCoder' + + // Misc failures + excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers' + excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombine' + excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombineWithContext' + excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest.testWindowedIsEqualTo' + excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo' + excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming' + excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode' + } +} + +stopLocalJobService.mustRunAfter ulrValidatesRunnerTests +