diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle index 3de798a0aca2c..2b22c2c9c1829 100644 --- a/runners/core-construction-java/build.gradle +++ b/runners/core-construction-java/build.gradle @@ -53,9 +53,3 @@ dependencies { testCompile project(path: ":sdks:java:core", configuration: "testRuntime") testRuntimeOnly library.java.slf4j_jdk14 } - -task runExpansionService (type: JavaExec) { - main = "org.apache.beam.runners.core.construction.expansion.ExpansionService" - classpath = sourceSets.main.runtimeClasspath - args = [project.findProperty("constructionService.port") ?: "8097"] -} diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index 9e7913f54ed14..644f9941de4d7 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -27,6 +27,7 @@ dependencies { compile project(path: ":model:pipeline", configuration: "shadow") compile project(path: ":model:fn-execution", configuration: "shadow") compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") compile project(":sdks:java:fn-execution") compile project(":runners:core-construction-java") compile project(path: ":vendor:sdks-java-extensions-protobuf", configuration: "shadow") diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java index d61162f7dc5ab..f93bd96bc111e 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java @@ -20,11 +20,11 @@ import java.io.IOException; import java.nio.file.Paths; import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.runners.core.construction.expansion.ExpansionServer; -import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.runners.fnexecution.GrpcFnServer; import org.apache.beam.runners.fnexecution.ServerFactory; import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.sdk.expansion.service.ExpansionServer; +import org.apache.beam.sdk.expansion.service.ExpansionService; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.kohsuke.args4j.Option; import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler; diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle new file mode 100644 index 0000000000000..def9e80914177 --- /dev/null +++ b/sdks/java/expansion-service/build.gradle @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.expansion.service') + +description = "Apache Beam :: SDKs :: Java :: Expansion Service" +ext.summary = """Contains code that can be used to run an expansion service.""" + + +// Exclude tests that need a runner +test { + systemProperty "beamUseDummyRunner", "true" + useJUnit { + excludeCategories "org.apache.beam.sdk.testing.NeedsRunner" + } +} + +dependencies { + compile project(path: ":model:pipeline", configuration: "shadow") + compile project(path: ":model:fn-execution", configuration: "shadow") + compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(path: ":runners:core-construction-java") + compile library.java.vendored_grpc_1_26_0 + compile library.java.vendored_guava_26_0_jre + compile library.java.slf4j_api + runtimeOnly library.java.slf4j_jdk14 + testCompile library.java.junit +} + +task runExpansionService (type: JavaExec) { + main = "org.apache.beam.sdk.expansion.service.ExpansionService" + classpath = sourceSets.main.runtimeClasspath + args = [project.findProperty("constructionService.port") ?: "8097"] +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServer.java similarity index 97% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java rename to sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServer.java index 5859d69e78d8a..895a011a1a706 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionServer.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction.expansion; +package org.apache.beam.sdk.expansion.service; import java.io.IOException; import java.net.InetSocketAddress; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java similarity index 99% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java rename to sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 84ee4f293fdc3..4c45bca52942a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction.expansion; +package org.apache.beam.sdk.expansion.service; import com.google.auto.service.AutoService; import java.io.IOException; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/package-info.java similarity index 93% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java rename to sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/package-info.java index 60060928b9853..a32edae893b66 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/package-info.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/package-info.java @@ -17,4 +17,4 @@ */ /** Classes used to expand cross-language transforms. */ -package org.apache.beam.runners.core.construction.expansion; +package org.apache.beam.sdk.expansion.service; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServerTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java similarity index 96% rename from runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServerTest.java rename to sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java index 29370d9a38f1e..51ed82682e7db 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServerTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction.expansion; +package org.apache.beam.sdk.expansion.service; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java similarity index 99% rename from runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java rename to sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java index b78f1da001bfc..3caff2c836971 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction.expansion; +package org.apache.beam.sdk.expansion.service; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java similarity index 98% rename from runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java rename to sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java index 2cde8fcd6cfda..b15f74437bd57 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExternalTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction; +package org.apache.beam.sdk.expansion.service; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -26,7 +26,7 @@ import java.net.ServerSocket; import java.nio.charset.StandardCharsets; import java.util.Map; -import org.apache.beam.runners.core.construction.expansion.ExpansionService; +import org.apache.beam.runners.core.construction.External; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index ebf453def8b39..66db0bea448c0 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -29,6 +29,7 @@ ext.summary = "IO library to read and write Google Cloud Platform systems from B dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") compile project(":sdks:java:extensions:google-cloud-platform-core") compile project(":sdks:java:extensions:protobuf") compile library.java.avro diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index abecf899aa07b..54908c7ee9d76 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -28,10 +28,10 @@ import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; -import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.expansion.service.ExpansionService; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 9581f34798846..b234f4a637513 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -32,6 +32,7 @@ ext.summary = "Library to read Kafka topics." dependencies { compile library.java.vendored_guava_26_0_jre compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") // Get back to "provided" since 2.14 provided library.java.kafka_clients compile library.java.slf4j_api @@ -43,7 +44,7 @@ dependencies { // It depends on "spotbugs-annotations:3.1.9" which clashes with current // "spotbugs-annotations:3.1.12" used in Beam. Not required. exclude group: "org.apache.zookeeper", module: "zookeeper" - // "kafka-clients" has to be provided since user can use its own version. + // "kafka-clients" has to be provided since user can use its own version. exclude group: "org.apache.kafka", module: "kafka-clients" } provided library.java.jackson_dataformat_csv diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 632f2e4bf1fe2..d157c16798a6c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -30,11 +30,11 @@ import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; -import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.expansion.service.ExpansionService; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.WithKeys; diff --git a/sdks/java/testing/expansion-service/build.gradle b/sdks/java/testing/expansion-service/build.gradle index 6efe7754aaee4..635280f234890 100644 --- a/sdks/java/testing/expansion-service/build.gradle +++ b/sdks/java/testing/expansion-service/build.gradle @@ -28,11 +28,12 @@ dependencies { compile project(path: ":runners:core-construction-java") compile project(path: ":sdks:java:io:parquet") compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") testRuntime library.java.hadoop_client } task runTestExpansionService (type: JavaExec) { - main = "org.apache.beam.sdk.testing.expansion.TestExpansionService" + main = "org.apache.beam.sdk.expansion.service.ExpansionService" classpath = sourceSets.test.runtimeClasspath args = [project.findProperty("constructionService.port") ?: "8097"] } @@ -44,9 +45,10 @@ task buildTestExpansionServiceJar(type: ShadowJar) { mergeServiceFiles() manifest { attributes( - 'Main-Class': 'org.apache.beam.sdk.testing.expansion.TestExpansionService' + 'Main-Class': 'org.apache.beam.sdk.expansion.service.ExpansionService' ) } + exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA' from { project.configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }} from sourceSets.main.output from sourceSets.test.output diff --git a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java index c930b451061e0..2f9b26d0c132a 100644 --- a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java +++ b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java @@ -25,10 +25,10 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.expansion.service.ExpansionService; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.parquet.ParquetIO; import org.apache.beam.sdk.transforms.Count; @@ -55,8 +55,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server; -import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ServerBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; @@ -344,12 +342,4 @@ public PCollection expand(PBegin input) { } } } - - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - System.out.println("Starting expansion service at localhost:" + port); - Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build(); - server.start(); - server.awaitTermination(); - } } diff --git a/settings.gradle b/settings.gradle index e8da75e6b214f..580d3159ce47d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -65,6 +65,7 @@ include ":sdks:java:bom" include ":sdks:java:build-tools" include ":sdks:java:container" include ":sdks:java:core" +include ":sdks:java:expansion-service" include ":sdks:java:extensions:euphoria" include ":sdks:java:extensions:kryo" include ":sdks:java:extensions:google-cloud-platform-core"