Skip to content

Commit

Permalink
Add validatesRunner suite for local job service
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Jul 9, 2020
1 parent 2774088 commit e7254e2
Showing 1 changed file with 160 additions and 0 deletions.
160 changes: 160 additions & 0 deletions runners/portability/java/build.gradle
@@ -1,3 +1,5 @@
import groovy.json.JsonOutput

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -18,11 +20,16 @@

plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.runners.portability', enableChecker:false)
applyPythonNature()

description = "Apache Beam :: Runners :: Portability :: Java"
ext.summary = """A Java implementation of the Beam Model which utilizes the portability
framework to execute user-definied functions."""

configurations {
validatesRunner
}

dependencies {
compile library.java.vendored_guava_26_0_jre
compile library.java.hamcrest_library
Expand All @@ -31,9 +38,162 @@ dependencies {
compile project(path: ":sdks:java:harness", configuration: "shadow")
compile library.java.vendored_grpc_1_26_0
compile library.java.slf4j_api

testCompile project(path: ":runners:core-construction-java", configuration: "testRuntime")
testCompile library.java.hamcrest_core
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.slf4j_jdk14

validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
validatesRunner project(path: project.path, configuration: "testRuntime")
}


project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":runners:core-java")

ext.localJobServicePidFile = "${project.buildDir}/local_job_service_pid"
ext.localJobServicePortFile = project.hasProperty("localJobServicePortFile") ? project.property("localJobServicePortFile") : "${project.buildDir}/local_job_service_port"
ext.localJobServiceStdoutFile = "${project.buildDir}/local_job_service_stdout"

void execInVirtualenv(String... args) {
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
exec {
workingDir pythonSdkDir
commandLine "sh", "-c", shellCommand
}
}

// Does not background the process, but allows the process to daemonize itself
void execBackgroundInVirtualenv(String... args) {
String shellCommand = ". ${project.ext.envdir}/bin/activate && " + args.collect { arg -> "'" + arg.replaceAll("'", "\\'") + "'" }.join(" ")
println "execBackgroundInVirtualEnv: ${shellCommand}"
ProcessBuilder pb = new ProcessBuilder().redirectErrorStream(true).directory(new File(pythonSdkDir)).command(["sh", "-c", shellCommand])
Process proc = pb.start();

// redirectIO does not work for connecting to groovy/gradle stdout
BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line
while ((line = reader.readLine()) != null) {
println line
}
proc.waitFor();
}

task startLocalJobService {
dependsOn setupVirtualenv

doLast {
execBackgroundInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--background",
"--stdout_file=${localJobServiceStdoutFile}",
"--pid_file=${localJobServicePidFile}",
"--port_file=${localJobServicePortFile}"
//
// File pidFile = new File(localJobServicePidFile)
// int totalSleep = 0
// while (!pidFile.exists()) {
// sleep(500)
// totalSleep += 500
// if (totalSleep > 5000) {
// throw new RuntimeException("Local job service pid file never showed up");
// }
// }
}
}

task stopLocalJobService {
doLast {
execInVirtualenv "python",
"-m", "apache_beam.runners.portability.local_job_service_main",
"--stop",
"--pid_file=${localJobServicePidFile}"
}
}

startLocalJobService.finalizedBy stopLocalJobService

/**
* Runs Java ValidatesRunner tests against the Universal Local Runner (ULR) aka local_job_service_main
* with subprocess SDK harness environments.
*/
task ulrValidatesRunnerTests(type: Test) {
dependsOn ":sdks:java:container:docker"

if (!project.hasProperty("localJobServicePortFile")) {
dependsOn startLocalJobService
}

group = "Verification"
description "PortableRunner Java subprocess ValidatesRunner suite"
classpath = configurations.validatesRunner
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestUniversalRunner",
"--experiments=beam_fn_api",
"--localJobServicePortFile=${localJobServicePortFile}"
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
}
filter {
// There is not currently a category for excluding these _only_ in committed mode
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics'
excludeTestsMatching 'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics'

// This test seems erroneously labeled ValidatesRunner
excludeTestsMatching 'org.apache.beam.sdk.schemas.AvroSchemaTest.testAvroPipelineGroupBy'

// Teardown not called in exceptions
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful'

// Only known window fns supported, not general window merging
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing'
excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing'

// Flatten with empty PCollections hangs
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty'
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo'

// Empty side inputs hang
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputFixedToFixedWithDefault'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyIterableSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyListSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInputWithNonDeterministicKeyCoder'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInputWithNonDeterministicKeyCoder'

// Misc failures
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers'
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombine'
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest$WindowingTests.testSessionsCombineWithContext'
excludeTestsMatching 'org.apache.beam.sdk.testing.CombineTest.testWindowedIsEqualTo'
excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedIsEqualTo'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming'
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
}
}

stopLocalJobService.mustRunAfter ulrValidatesRunnerTests

0 comments on commit e7254e2

Please sign in to comment.