Skip to content

Latest commit

 

History

History
195 lines (141 loc) · 7.78 KB

README.md

File metadata and controls

195 lines (141 loc) · 7.78 KB

Example multi-language pipelines

This project provides examples of Apache Beam multi-language pipelines:

Using Java transforms from Python

  • python/addprefix - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
  • python/javacount - A Python pipeline that counts words using the Java Count.perElement() transform.
  • python/javadatagenerator - A Python pipeline that produces a set of strings generated from Java. This example demonstrates the JavaExternalTransform API.

Instructions for running the pipelines

1) Start the expansion service

  1. Download the latest 'beam-examples-multi-language' JAR. Starting with Apache Beam 2.36.0, you can find it in the Maven Central Repository.
  2. Run the following command, replacing <version> and <port> with valid values: java -jar beam-examples-multi-language-<version>.jar <port> --javaClassLookupAllowlistFile='*'

2) Set up a Python virtual environment for Beam

  1. See the Python quickstart for more information.

3) Execute the Python pipeline

  1. In a new shell, run a pipeline in the python directory using a Beam runner that supports multi-language pipelines.

    The Python files contain details about the actual commands to run.

Using Python transforms from Java

Sklearn Mnist Classification

Performs image classification on handwritten digits from the MNIST database.

Please see here for context and information regarding the corresponding Python pipeline.

Please note that the Java pipeline is available in the Beam Java examples module.

Setup

  • Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in GCS. An example input is available here.

  • Create a model file that contains the pickled file of a scikit-learn model trained on MNIST data and store it in GCS. An example model file is available here. This model was generated by by running the program given here on the example input dataset.

  • Perform Beam runner specific setup according to instructions here.

Following instructions are for running the pipeline with the Dataflow runner. For other portable runners, please modify the instructions according to the guidelines here

Instructions for running the Java pipeline on released Beam (Beam 2.43.0 and later).

  • Checkout the Beam examples Maven archetype for the relevant Beam version.
export BEAM_VERSION=<Beam version>

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DarchetypeVersion=$BEAM_VERSION \
    -DgroupId=org.example \
    -DartifactId=multi-language-beam \
    -Dversion="0.1" \
    -Dpackage=org.apache.beam.examples \
    -DinteractiveMode=false
  • Run the pipeline.
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \
    -Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
                 --region=$GCP_REGION \
                 --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
                 --output=gs://$GCP_BUCKET/multi-language-beam/output" \
    -Pdataflow-runner
  • Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*

Instructions for running the Java pipeline at HEAD (Beam 2.41.0 and 2.42.0).

  • Activate a new virtual environment following these instructions.

    1. Install Apache Beam package with gcp support and the sklearn package.
pip install apache-beam[gcp]
pip install sklearn
  • Startup the expansion service
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
  • Make sure that Docker is installed and available on your system.

  • In a different shell, build and push Python and Java Docker containers.

export DOCKER_ROOT=<Docker root>

./gradlew :sdks:python:container:py38:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest

docker push $DOCKER_ROOT/beam_python3.8_sdk:latest

./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME

docker push $DOCKER_ROOT/beam_java11_sdk:latest
  • Run the pipeline using the following Gradle command (this guide assumes Dataflow runner). Note that we override both the Java and Python SDK harness containers here.
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
export EXPANSION_SERVICE_PORT=<PORT>

# This removes any existing output.
gsutil rm gs://$GCP_BUCKET/multi-language-beam/output*

./gradlew :examples:multi-language:sklearnMinstClassification --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output \
--sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \
--sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.8_sdk:latest \
--expansionService=localhost:$EXPANSION_SERVICE_PORT \
--region=${GCP_REGION}"
  • Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*

Python Dataframe Wordcount

This example is covered in the Java multi-language pipelines quickstart. The pipeline source code is available at PythonDataframeWordCount.java.