From 9ab200ca616b131c31d8a7955d3a380b2e1feed7 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Tue, 20 Apr 2021 17:12:11 -0700 Subject: [PATCH] [BEAM-12174] Samza Portable Runner Support (#14554) --- runners/samza/build.gradle | 2 +- .../runners/samza/SamzaExecutionContext.java | 117 +------------ .../beam/runners/samza/SamzaJobInvoker.java | 82 +++++++++ .../runners/samza/SamzaJobServerDriver.java | 162 ++++++------------ .../runners/samza/SamzaPipelineRunner.java | 2 +- .../beam/runners/samza/SamzaRunner.java | 5 +- .../beam/runners/samza/runtime/DoFnOp.java | 9 +- .../SamzaExecutableStageContextFactory.java | 61 +++++++ .../samza/translation/ImpulseTranslator.java | 6 +- .../ParDoBoundMultiTranslator.java | 2 + .../PortableTranslationContext.java | 9 +- .../util/SamzaPipelineTranslatorUtils.java | 2 +- 12 files changed, 226 insertions(+), 233 deletions(-) create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java create mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaExecutableStageContextFactory.java diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle index c4b7cf7492e59..7d33eade4e4df 100644 --- a/runners/samza/build.gradle +++ b/runners/samza/build.gradle @@ -52,6 +52,7 @@ dependencies { compile library.java.jackson_annotations compile library.java.slf4j_api compile library.java.joda_time + compile library.java.args4j compile library.java.commons_io runtimeOnly "org.rocksdb:rocksdbjni:6.15.2" runtimeOnly "org.scala-lang:scala-library:2.11.8" @@ -59,7 +60,6 @@ dependencies { compile "org.apache.samza:samza-core_2.11:$samza_version" runtimeOnly "org.apache.samza:samza-kafka_2.11:$samza_version" runtimeOnly "org.apache.samza:samza-kv_2.11:$samza_version" - compile project(":sdks:java:expansion-service") compile "org.apache.samza:samza-kv-rocksdb_2.11:$samza_version" compile "org.apache.samza:samza-kv-inmemory_2.11:$samza_version" compile "org.apache.samza:samza-yarn_2.11:$samza_version" diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java index 3d49ae191c207..b15efaa1c78c8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java @@ -17,53 +17,22 @@ */ package org.apache.beam.runners.samza; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - -import java.time.Duration; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.beam.runners.fnexecution.GrpcFnServer; -import org.apache.beam.runners.fnexecution.ServerFactory; -import org.apache.beam.runners.fnexecution.control.ControlClientPool; -import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; -import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; -import org.apache.beam.runners.fnexecution.control.JobBundleFactory; -import org.apache.beam.runners.fnexecution.control.MapControlClientPool; -import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory; -import org.apache.beam.runners.fnexecution.data.GrpcDataService; -import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; -import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; -import org.apache.beam.runners.fnexecution.state.GrpcStateService; import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer; -import org.apache.beam.sdk.fn.IdGenerator; -import org.apache.beam.sdk.fn.IdGenerators; -import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; import org.apache.samza.context.ApplicationContainerContext; import org.apache.samza.context.ApplicationContainerContextFactory; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.ExternalContext; import org.apache.samza.context.JobContext; import org.apache.samza.metrics.MetricsRegistryMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Runtime context for the Samza runner. */ @SuppressWarnings({ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class SamzaExecutionContext implements ApplicationContainerContext { - private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutionContext.class); - private static final String SAMZA_WORKER_ID = "samza_py_worker_id"; private final SamzaPipelineOptions options; private SamzaMetricsContainer metricsContainer; - private JobBundleFactory jobBundleFactory; - private GrpcFnServer fnControlServer; - private GrpcFnServer fnDataServer; - private GrpcFnServer fnStateServer; - private ControlClientPool controlClientPool; - private ExecutorService dataExecutor; - private IdGenerator idGenerator = IdGenerators.incrementingLongs(); public SamzaExecutionContext(SamzaPipelineOptions options) { this.options = options; @@ -81,93 +50,11 @@ void setMetricsContainer(SamzaMetricsContainer metricsContainer) { this.metricsContainer = metricsContainer; } - public JobBundleFactory getJobBundleFactory() { - return this.jobBundleFactory; - } - - void setJobBundleFactory(JobBundleFactory jobBundleFactory) { - this.jobBundleFactory = jobBundleFactory; - } - @Override - public void start() { - checkState(getJobBundleFactory() == null, "jobBundleFactory has been created!"); - - if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { - try { - controlClientPool = MapControlClientPool.create(); - dataExecutor = Executors.newCachedThreadPool(); - - fnControlServer = - GrpcFnServer.allocatePortAndCreateFor( - FnApiControlClientPoolService.offeringClientsToPool( - controlClientPool.getSink(), () -> SAMZA_WORKER_ID), - ServerFactory.createWithPortSupplier( - () -> SamzaRunnerOverrideConfigs.getFnControlPort(options))); - LOG.info("Started control server on port {}", fnControlServer.getServer().getPort()); - - fnDataServer = - GrpcFnServer.allocatePortAndCreateFor( - GrpcDataService.create( - options, dataExecutor, OutboundObserverFactory.serverDirect()), - ServerFactory.createDefault()); - LOG.info("Started data server on port {}", fnDataServer.getServer().getPort()); - - fnStateServer = - GrpcFnServer.allocatePortAndCreateFor( - GrpcStateService.create(), ServerFactory.createDefault()); - LOG.info("Started state server on port {}", fnStateServer.getServer().getPort()); - - final long waitTimeoutMs = - SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); - LOG.info("Control client wait timeout config: " + waitTimeoutMs); - - final InstructionRequestHandler instructionHandler = - controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); - final EnvironmentFactory environmentFactory = - (environment, workerId) -> - RemoteEnvironment.forHandler(environment, instructionHandler); - // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping - jobBundleFactory = - SingleEnvironmentInstanceJobBundleFactory.create( - environmentFactory, fnDataServer, fnStateServer, idGenerator); - LOG.info("Started job bundle factory"); - } catch (Exception e) { - throw new RuntimeException( - "Running samza in Beam portable mode but failed to create job bundle factory", e); - } - - setJobBundleFactory(jobBundleFactory); - } - } + public void start() {} @Override - public void stop() { - closeAutoClosable(fnControlServer, "controlServer"); - fnControlServer = null; - closeAutoClosable(fnDataServer, "dataServer"); - fnDataServer = null; - closeAutoClosable(fnStateServer, "stateServer"); - fnStateServer = null; - if (dataExecutor != null) { - dataExecutor.shutdown(); - dataExecutor = null; - } - controlClientPool = null; - closeAutoClosable(jobBundleFactory, "jobBundle"); - jobBundleFactory = null; - } - - private static void closeAutoClosable(AutoCloseable closeable, String name) { - try (AutoCloseable closer = closeable) { - LOG.info("Closed {}", name); - } catch (Exception e) { - LOG.error( - "Failed to close {}. Ignore since this is shutdown process...", - closeable.getClass().getSimpleName(), - e); - } - } + public void stop() {} /** The factory to return this {@link SamzaExecutionContext}. */ public class Factory implements ApplicationContainerContextFactory { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java new file mode 100644 index 0000000000000..dc96bdd4d3563 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.jobsubmission.JobInvocation; +import org.apache.beam.runners.jobsubmission.JobInvoker; +import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator; +import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +public class SamzaJobInvoker extends JobInvoker { + + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvoker.class); + private final SamzaJobServerDriver.SamzaServerConfiguration configuration; + + public static SamzaJobInvoker create( + SamzaJobServerDriver.SamzaServerConfiguration configuration) { + return new SamzaJobInvoker(configuration); + } + + private SamzaJobInvoker(SamzaJobServerDriver.SamzaServerConfiguration configuration) { + super("samza-runner-job-invoker-%d"); + this.configuration = configuration; + } + + @Override + protected JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) { + LOG.trace("Parsing pipeline options"); + final SamzaPortablePipelineOptions samzaOptions = + PipelineOptionsTranslation.fromProto(options).as(SamzaPortablePipelineOptions.class); + + final PortablePipelineRunner pipelineRunner; + if (Strings.isNullOrEmpty( + samzaOptions.as(PortablePipelineOptions.class).getOutputExecutablePath())) { + pipelineRunner = new SamzaPipelineRunner(samzaOptions); + } else { + /* + * To support --output_executable_path where bundles the input pipeline along with all + * artifacts, etc. required to run the pipeline into a jar that can be executed later. + */ + pipelineRunner = new PortablePipelineJarCreator(SamzaPipelineRunner.class); + } + + final String invocationId = + String.format("%s_%s", samzaOptions.getJobName(), UUID.randomUUID().toString()); + final JobInfo jobInfo = + JobInfo.create(invocationId, samzaOptions.getJobName(), retrievalToken, options); + return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java index 222d829c32efb..cc5d343021e62 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java @@ -17,134 +17,78 @@ */ package org.apache.beam.runners.samza; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; -import org.apache.beam.runners.fnexecution.GrpcFnServer; import org.apache.beam.runners.fnexecution.ServerFactory; -import org.apache.beam.runners.fnexecution.provisioning.JobInfo; -import org.apache.beam.runners.jobsubmission.InMemoryJobService; -import org.apache.beam.runners.jobsubmission.JobInvocation; -import org.apache.beam.runners.jobsubmission.JobInvoker; -import org.apache.beam.sdk.expansion.service.ExpansionServer; -import org.apache.beam.sdk.expansion.service.ExpansionService; +import org.apache.beam.runners.jobsubmission.JobServerDriver; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Driver program that starts a job server. */ -// TODO(BEAM-8510): extend JobServerDriver -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public class SamzaJobServerDriver { +/** Driver program that starts a job server for the Samza runner. */ +public class SamzaJobServerDriver extends JobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); - private final SamzaPortablePipelineOptions pipelineOptions; + /** Samza runner-specific Configuration for the jobServer. */ + public static class SamzaServerConfiguration extends ServerConfiguration {} - protected SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) { - this.pipelineOptions = pipelineOptions; + public static void main(String[] args) { + // TODO: Expose the fileSystem related options. + PortablePipelineOptions options = + PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class); + // Register standard file systems. + FileSystems.setDefaultPipelineOptions(options); + fromParams(args).run(); } - public static void main(String[] args) throws Exception { - SamzaPortablePipelineOptions pipelineOptions = - PipelineOptionsFactory.fromArgs(args).as(SamzaPortablePipelineOptions.class); - fromOptions(pipelineOptions).run(); + private static SamzaJobServerDriver fromParams(String[] args) { + return fromConfig(parseArgs(args)); } - public static SamzaJobServerDriver fromOptions(SamzaPortablePipelineOptions pipelineOptions) { - Map overrideConfig = - pipelineOptions.getConfigOverride() != null - ? pipelineOptions.getConfigOverride() - : new HashMap<>(); - overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, String.valueOf(true)); - overrideConfig.put( - SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, - String.valueOf(pipelineOptions.getControlPort())); - overrideConfig.put(SamzaRunnerOverrideConfigs.FS_TOKEN_PATH, pipelineOptions.getFsTokenPath()); - - pipelineOptions.setConfigOverride(overrideConfig); - return new SamzaJobServerDriver(pipelineOptions); + private static void printUsage(CmdLineParser parser) { + System.err.printf("Usage: java %s arguments...%n", SamzaJobServerDriver.class.getSimpleName()); + parser.printUsage(System.err); + System.err.println(); } - private InMemoryJobService createJobService() throws IOException { - JobInvoker jobInvoker = - new JobInvoker("samza-job-invoker") { - @Override - protected JobInvocation invokeWithExecutor( - RunnerApi.Pipeline pipeline, - Struct options, - @Nullable String retrievalToken, - ListeningExecutorService executorService) - throws IOException { - SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions); - String invocationId = - String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString()); - JobInfo jobInfo = - JobInfo.create( - invocationId, - pipelineOptions.getJobName(), - retrievalToken, - PipelineOptionsTranslation.toProto(pipelineOptions)); - return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); - } - }; - return InMemoryJobService.create( - null, - session -> session, - stagingSessionToken -> {}, - jobInvoker, - InMemoryJobService.DEFAULT_MAX_INVOCATION_HISTORY); + private static SamzaJobServerDriver fromConfig(SamzaServerConfiguration configuration) { + return create( + configuration, + createJobServerFactory(configuration), + createArtifactServerFactory(configuration)); } - private ExpansionServer createExpansionService(String host, int expansionPort) - throws IOException { - if (host == null) { - host = InetAddress.getLoopbackAddress().getHostName(); + public static SamzaServerConfiguration parseArgs(String[] args) { + SamzaServerConfiguration configuration = new SamzaServerConfiguration(); + CmdLineParser parser = new CmdLineParser(configuration); + try { + parser.parseArgument(args); + } catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments.", e); + printUsage(parser); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); } - ExpansionServer expansionServer = - ExpansionServer.create(new ExpansionService(), host, expansionPort); - LOG.info( - "Java ExpansionService started on {}:{}", - expansionServer.getHost(), - expansionServer.getPort()); - return expansionServer; + return configuration; } - public void run() throws Exception { - // Create services - final InMemoryJobService service = createJobService(); - final GrpcFnServer jobServiceGrpcFnServer = - GrpcFnServer.allocatePortAndCreateFor( - service, ServerFactory.createWithPortSupplier(pipelineOptions::getJobPort)); - final String jobServerUrl = jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl(); - LOG.info("JobServer started on {}", jobServerUrl); - final URI uri = new URI(jobServerUrl); - final ExpansionServer expansionServer = - createExpansionService(uri.getHost(), pipelineOptions.getExpansionPort()); + private static SamzaJobServerDriver create( + SamzaServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory) { + return new SamzaJobServerDriver(configuration, jobServerFactory, artifactServerFactory); + } - try { - jobServiceGrpcFnServer.getServer().awaitTermination(); - } finally { - LOG.info("JobServer closing"); - jobServiceGrpcFnServer.close(); - if (expansionServer != null) { - try { - expansionServer.close(); - LOG.info( - "Expansion stopped on {}:{}", expansionServer.getHost(), expansionServer.getPort()); - } catch (Exception e) { - LOG.error("Error while closing the Expansion Service.", e); - } - } - } + private SamzaJobServerDriver( + SamzaServerConfiguration configuration, + ServerFactory jobServerFactory, + ServerFactory artifactServerFactory) { + super( + configuration, + jobServerFactory, + artifactServerFactory, + () -> SamzaJobInvoker.create(configuration)); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java index 375b0555441ee..308ca79a2eb01 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java @@ -57,7 +57,7 @@ public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) { options.setRunner(SamzaRunner.class); try { final SamzaRunner runner = SamzaRunner.fromOptions(options); - return runner.runPortablePipeline(fusedPipeline); + return runner.runPortablePipeline(fusedPipeline, jobInfo); } catch (Exception e) { throw new RuntimeException("Failed to invoke samza job", e); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 573a2a1cd2275..b7fc1f5ef094b 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -25,6 +25,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.samza.translation.ConfigBuilder; import org.apache.beam.runners.samza.translation.PViewToIdMapper; @@ -78,7 +79,7 @@ private SamzaRunner(SamzaPipelineOptions options) { listenerReg.hasNext() ? Iterators.getOnlyElement(listenerReg).getLifeCycleListener() : null; } - public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) { + public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { final String dotGraph = PipelineDotRenderer.toDotString(pipeline); LOG.info("Portable pipeline to run:\n{}", dotGraph); @@ -101,7 +102,7 @@ public PortablePipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) { .withApplicationContainerContextFactory(executionContext.new Factory()) .withMetricsReporterFactories(reporterFactories); SamzaPortablePipelineTranslator.translate( - pipeline, new PortableTranslationContext(appDescriptor, options)); + pipeline, new PortableTranslationContext(appDescriptor, options, jobInfo)); }; ApplicationRunner runner = runSamzaApp(app, config); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index ac8212111e7e4..dd839761502b9 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -41,7 +41,9 @@ import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.samza.SamzaExecutionContext; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.util.FutureUtils; @@ -97,6 +99,7 @@ public class DoFnOp implements Op { // portable api related private final boolean isPortable; private final RunnerApi.ExecutableStagePayload stagePayload; + private final JobInfo jobInfo; private final HashMap> idToTupleTagMap; private transient SamzaTimerInternalsFactory timerInternalsFactory; @@ -140,6 +143,7 @@ public DoFnOp( PCollection.IsBounded isBounded, boolean isPortable, RunnerApi.ExecutableStagePayload stagePayload, + JobInfo jobInfo, Map> idToTupleTagMap, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { @@ -159,6 +163,7 @@ public DoFnOp( this.isBounded = isBounded; this.isPortable = isPortable; this.stagePayload = stagePayload; + this.jobInfo = jobInfo; this.idToTupleTagMap = new HashMap<>(idToTupleTagMap); this.bundleCheckTimerId = "_samza_bundle_check_" + transformId; this.bundleStateId = "_samza_bundle_" + transformId; @@ -217,7 +222,9 @@ public void open( .stateInternalsForKey(null) .state(StateNamespaces.global(), StateTags.bag(bundleStateId, windowedValueCoder)); final ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload); - stageBundleFactory = samzaExecutionContext.getJobBundleFactory().forStage(executableStage); + final ExecutableStageContext stageContext = + SamzaExecutableStageContextFactory.getInstance().get(jobInfo); + stageBundleFactory = stageContext.getStageBundleFactory(executableStage); this.fnRunner = SamzaDoFnRunners.createPortable( samzaPipelineOptions, diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaExecutableStageContextFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaExecutableStageContextFactory.java new file mode 100644 index 0000000000000..f034d031f2c29 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaExecutableStageContextFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * Singleton class that contains one {@link ExecutableStageContext.Factory} per job. Assumes it is + * safe to release the backing environment asynchronously. + */ +public class SamzaExecutableStageContextFactory implements ExecutableStageContext.Factory { + + private static final SamzaExecutableStageContextFactory instance = + new SamzaExecutableStageContextFactory(); + // This map should only ever have a single element, as each job will have its own + // classloader and therefore its own instance of SamzaExecutableStageContextFactory. This + // code supports multiple JobInfos in order to provide a sensible implementation of + // Factory.get(JobInfo), which in theory could be called with different JobInfos. + private static final ConcurrentMap jobFactories = + new ConcurrentHashMap<>(); + + private SamzaExecutableStageContextFactory() {} + + public static SamzaExecutableStageContextFactory getInstance() { + return instance; + } + + @Override + public ExecutableStageContext get(JobInfo jobInfo) { + ExecutableStageContext.Factory jobFactory = + jobFactories.computeIfAbsent( + jobInfo.jobId(), + k -> + ReferenceCountingExecutableStageContextFactory.create( + DefaultExecutableStageContext::create, + // Always release environment asynchronously. + (caller) -> false)); + + return jobFactory.get(jobInfo); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java index ba909225703ff..25b39abc6d688 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java @@ -20,6 +20,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; import org.apache.beam.runners.samza.runtime.OpMessage; +import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -65,13 +66,14 @@ public void translatePortable( PortableTranslationContext ctx) { final String outputId = ctx.getOutputId(transform); + final String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId); final GenericSystemDescriptor systemDescriptor = - new GenericSystemDescriptor(outputId, SamzaImpulseSystemFactory.class.getName()); + new GenericSystemDescriptor(escapedOutputId, SamzaImpulseSystemFactory.class.getName()); // The KvCoder is needed here for Samza not to crop the key. final Serde>> kvSerde = KVSerde.of(new NoOpSerde(), new NoOpSerde<>()); final GenericInputDescriptor>> inputDescriptor = - systemDescriptor.getInputDescriptor(outputId, kvSerde); + systemDescriptor.getInputDescriptor(escapedOutputId, kvSerde); ctx.registerInputMessageStream(outputId, inputDescriptor); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index 15f6f2fb59029..01be930855e6e 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -162,6 +162,7 @@ private static void doTranslate( input.isBounded(), false, null, + null, Collections.emptyMap(), doFnSchemaInformation, sideInputMapping); @@ -283,6 +284,7 @@ private static void doTranslatePortable( isBounded, true, stagePayload, + ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, sideInputMapping); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java index cf0a874a71041..d0cccd5afead5 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.wire.WireCoders; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.runtime.OpMessage; @@ -59,6 +60,7 @@ public class PortableTranslationContext { private final Map> messsageStreams = new HashMap<>(); private final StreamApplicationDescriptor appDescriptor; + private final JobInfo jobInfo; private final SamzaPipelineOptions options; private final Set registeredInputStreams = new HashSet<>(); private final Map registeredTables = new HashMap<>(); @@ -67,7 +69,8 @@ public class PortableTranslationContext { private PipelineNode.PTransformNode currentTransform; public PortableTranslationContext( - StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options) { + StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options, JobInfo jobInfo) { + this.jobInfo = jobInfo; this.appDescriptor = appDescriptor; this.options = options; } @@ -101,6 +104,10 @@ public String getOutputId(PipelineNode.PTransformNode transform) { return Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()); } + public JobInfo getJobInfo() { + return jobInfo; + } + public void registerMessageStream(String id, MessageStream> stream) { if (messsageStreams.containsKey(id)) { throw new IllegalArgumentException("Stream already registered for id: " + id); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java index 08117fc084e42..d139f0894aaea 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java @@ -86,7 +86,7 @@ public static WindowingStrategy getPortableWindowStrategy( * non-alphabetic characters. */ public static String escape(String name) { - return name.replaceAll("[\\.(/]", "-").replaceAll("[^A-Za-z0-9-_]", ""); + return name.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1").replaceAll("[^A-Za-z0-9_-]", "_"); } public static PCollection.IsBounded isBounded(RunnerApi.PCollection pCollection) {