## Spark Jobs with Dataproc Serverless With Customer Container

[Dataproc Serverless](https://cloud.google.com/dataproc-serverless/docs/overview) runs Spark jobs as batch workloads in a managed infrastructure that [autoscales](https://cloud.google.com/dataproc-serverless/docs/concepts/autoscaling) resources as needed.  Simply put, all you need is a job!  

This allows you to run PySpark, Spark SQL, Spark R, Spark Jave/Scala.  You can set most Spark Properties, including these [resource allocation properties](https://cloud.google.com/dataproc-serverless/docs/concepts/properties) to determine compute, memory, and disk resources for initial active executors and autoscaling maximums.  

You can also use [custom containers](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers) without the need to include Spark which will be mounted to the container at runtime.  This notebook include creating a custom container to add Python libraries and uses it in a new job.

**Overview**

The example below shows the process of setting up a GCP environment and submitting a Dataproc Serverless PySpark job.  First, it creates a custom container with Python libraries that are then used in the PySpark jobs as Spark UDFs.  This job uses the provided BigQuery connector to read from BigQuery, process the data with Spark, then write the result to a BigQuery table.


**Resources**

An [excellent blog](https://medium.com/google-cloud/processing-databricks-delta-lake-data-in-google-cloud-dataproc-serverless-for-spark-1cc1405a3ee4) with walkthroughs for processing databricks Delta Lake data in Google Cloud Dataproc Serverless for Spark.

## Environment Setup

### Create Parameters

In [1]:
# Defined Parameters
PROJECT_ID = 'statmike-demo3'
NOTEBOOK = 'dataproc'
REGION = 'us-central1'

# Derived Parameters
GCS_BUCKET = PROJECT_ID
BQ_DATASET = NOTEBOOK
GCS_FOLDER = f'demos/{NOTEBOOK}'

### Import Libraries

In [2]:
from google.cloud import bigquery
from google.cloud import storage

import json

### Setup Clients

In [3]:
bq = bigquery.Client(project = PROJECT_ID)
gcs = storage.Client()

### Local Directory

In [4]:
DIR = NOTEBOOK
!rm -rf {DIR}
!mkdir -p {DIR}

### GCS Bucket

In [5]:
buckets = !gsutil list -p {PROJECT_ID}
if f"gs://{GCS_BUCKET}/" not in buckets:
    ! gsutil mb -l us -c standard gs://{GCS_BUCKET}
else: print(f"Bucket gs://{GCS_BUCKET} already exists")

Bucket gs://statmike-project-1 already exists


### BigQuery Dataset

In [6]:
ds = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET}")
ds.location = 'US'
ds = bq.create_dataset(dataset = ds, exists_ok = True)

### Enable Container Registry
Dataproc Serverless currently can use container from [Google Container Registry](https://cloud.google.com/container-registry) only.
- [Reference](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers)

In [7]:
services = !gcloud services list --format="json" --available --filter=name:containerregistry.googleapis.com
services = json.loads("".join(services))

if (services[0]['config']['name'] == 'containerregistry.googleapis.com') & (services[0]['state'] == 'ENABLED'):
    print(f"Container Registry is Enabled for This Project: {PROJECT_ID}")
else:
    print(f"Enabeling Container Registry for this Project: {PROJECT_ID}")
    !gcloud services enable containerregistry.googleapis.com

Container Registry is Enabled for This Project: statmike-project-1


### Setup Dataproc Serverless
Using Google APIs from Spark code will require the subnet to have Private Google Access enabled.
- Network Configuration: https://cloud.google.com/dataproc-serverless/docs/concepts/network
    - Configure Private Google Access: https://cloud.google.com/vpc/docs/configure-private-google-access#config-pga

In [8]:
status = !gcloud compute networks subnets describe default --region={REGION} --format="get(privateIpGoogleAccess)"
if status[0] == 'False':
  !gcloud compute networks subnets update default --region={REGION} --enable-private-ip-google-access
  status = !gcloud compute networks subnets describe default --region={REGION} --format="get(privateIpGoogleAccess)"
print(f"Private Google Access is Enable = {status[0]}")

Private Google Access is Enable = True


### Setup Docker Config

In [20]:
!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet


{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud",
    "us-central1-docker.pkg.dev": "gcloud"
  }
}
Adding credentials for: us-central1-docker.pkg.dev
gcloud credential helpers already registered correctly.


## Build Custom Container

- Documentation [Example](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers#example_custom_container_image_build)
- Add and Install [Miniconda3](https://docs.conda.io/en/latest/miniconda.html)
    - Need latest for Linux + Python 3.9 + Miniconda3 Linux 64-bit
- Add Python Packages
    - Use pip to install [fuzzywuzzy](https://pypi.org/project/fuzzywuzzy/)
    - Use conda to install [python-levenshtein](https://anaconda.org/conda-forge/python-levenshtein)
- Add BigQuery Connector: https://github.com/GoogleCloudDataproc/spark-bigquery-connector

### Add BigQuery Connector
This will add the BigQuery connector directly to the custom container so it will not need to be include with the job as `--jars=...`.

In [10]:
BQ_CONNECTOR = 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar'

In [11]:
!gsutil cp {BQ_CONNECTOR} {DIR}/.

Copying gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar...
/ [1 files][ 35.0 MiB/ 35.0 MiB]                                                
Operation completed over 1 objects/35.0 MiB.                                     


### Add Miniconda3
This will install Miniconda and be used to install the package list from the default container as well as additional packages.

In [15]:
MINI_CONDA = 'https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh'

In [18]:
!wget "{MINI_CONDA}" -P ./{DIR}/

--2022-05-28 11:09:06--  https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh
Resolving repo.anaconda.com (repo.anaconda.com)... 104.16.131.3, 104.16.130.3, 2606:4700::6810:8203, ...
Connecting to repo.anaconda.com (repo.anaconda.com)|104.16.131.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 66709754 (64M) [application/x-sh]
Saving to: ‘./dataproc/Miniconda3-py39_4.10.3-Linux-x86_64.sh’


2022-05-28 11:09:07 (103 MB/s) - ‘./dataproc/Miniconda3-py39_4.10.3-Linux-x86_64.sh’ saved [66709754/66709754]



### Create Dockerfile
Use the template from the example in the documentation and add the BigQuery Connector (.jar), Python Package `python-levenshtein` using Miniconda, and Python package `fuzzywuzzy` using pip.

In [35]:
%%writefile {DIR}/Dockerfile
FROM debian:11-slim
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && apt install -y procps tini

ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.24.2.jar "${SPARK_EXTRA_JARS_DIR}"

ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda=4.10 \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-bigtable \
      google-cloud-container \
      google-cloud-datacatalog \
      google-cloud-dataproc \
      google-cloud-datastore \
      google-cloud-language \
      google-cloud-logging \
      google-cloud-monitoring \
      google-cloud-pubsub \
      google-cloud-redis \
      google-cloud-secret-manager \
      google-cloud-spanner \
      google-cloud-speech \
      google-cloud-storage \
      google-cloud-texttospeech \
      google-cloud-translate \
      google-cloud-vision \
      koalas \
      matplotlib \
      nltk \
      numba \
      numpy \
      openblas \
      orc \
      pandas \
      pyarrow \
      pysal \
      pytables \
      python \
      regex \
      requests \
      rtree \
      scikit-image \
      scikit-learn \
      scipy \
      seaborn \
      sqlalchemy \
      sympy \
      virtualenv \ 
      python-levenshtein

RUN ${CONDA_HOME}/bin/pip install fuzzywuzzy

RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

Overwriting dataproc/Dockerfile


### Build The Docker Image (local to notebook)

In [44]:
# This will be the name for the dockerimage locally and in GCR
IMAGE_URI = f"gcr.io/{PROJECT_ID}/{NOTEBOOK}:latest"

In [37]:
!docker build {DIR}/. -t $IMAGE_URI

Sending build context to Docker daemon  103.4MB
Step 1/17 : FROM debian:11-slim
 ---> c9cb6c086ef7
Step 2/17 : ENV DEBIAN_FRONTEND=noninteractive
 ---> Using cache
 ---> a31d510cc1fa
Step 3/17 : RUN apt update && apt install -y procps tini
 ---> Using cache
 ---> ef0225e0f9a3
Step 4/17 : ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
 ---> Using cache
 ---> 7096bc924ae9
Step 5/17 : ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
 ---> Using cache
 ---> de560fdb104c
Step 6/17 : RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
 ---> Using cache
 ---> 6ac10117b564
Step 7/17 : COPY spark-bigquery-with-dependencies_2.12-0.24.2.jar "${SPARK_EXTRA_JARS_DIR}"
 ---> Using cache
 ---> 6b7175d8117f
Step 8/17 : ENV CONDA_HOME=/opt/miniconda3
 ---> Using cache
 ---> 814f4d8e9bc6
Step 9/17 : ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
 ---> Using cache
 ---> 74d8bab51dd9
Step 10/17 : ENV PATH=${CONDA_HOME}/bin:${PATH}
 ---> Using cache
 ---> 4c245f44ce44
Step 11/17 : COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
 --

### Push The Docker Container to GCR

In [38]:
!docker push $IMAGE_URI

The push refers to repository [gcr.io/statmike-project-1/dataproc]

[1B5fb513d1: Preparing 
[1Bf74e54de: Preparing 
[1B71d704a4: Preparing 
[1B7ac996d0: Preparing 
[1B6d48392e: Preparing 
[1B4799020a: Preparing 
[1Bf2dac683: Preparing 
[1B55c63d18: Preparing 
[1B0f20e388: Preparing 
[7B7ac996d0: Pushed   5.074GB/4.913GB[9A[2K[5A[2K[4A[2K[3A[2K[2A[2K[1A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[9A[2K[10A[2K[8A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A[2K[7A

In [39]:
IMAGE_URI

'gcr.io/statmike-project-1/dataproc:latest'

## Dataproc Serverless Spark Batch Job
- Dataproc Serverless: https://cloud.google.com/dataproc-serverless/docs/overview
- BigQuery Connector: https://github.com/GoogleCloudDataproc/spark-bigquery-connector
- gcloud dataproc batches submit pyspark [documentation](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark)

### Define PySpark Job

In [40]:
%%writefile {DIR}/myjob.py
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
import sys
from fuzzywuzzy import fuzz, process

print("Number of Arguments: {0} arguments.".format(len(sys.argv)))
print("Arguments List: {0}".format(str(sys.argv)))

# create a session
spark = SparkSession.builder.appName('spark-bigquery').getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
spark.conf.set('temporaryGcsBucket', sys.argv[1])

# Load data from BigQuery.
words = spark.read.format('bigquery').option('table', 'bigquery-public-data:samples.shakespeare').load()
# Create a View
words.createOrReplaceTempView('words')

# Perform word count.
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC')
word_count.show(n=5)
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format('bigquery').option('table', sys.argv[2]).mode('overwrite').save()

Overwriting dataproc/myjob.py


### Run PySpark Job

In [41]:
bq.query(query = f"SELECT COUNT(*) as record_count FROM bigquery-public-data.samples.shakespeare").to_dataframe()['record_count'].iloc[0]

164656

In [42]:
!gcloud dataproc batches submit pyspark {DIR}/myjob.py \
--project={PROJECT_ID} \
--region={REGION} \
--deps-bucket={GCS_BUCKET} \
--container-image={IMAGE_URI} \
-- {GCS_BUCKET}/{GCS_FOLDER} \
    {PROJECT_ID}:{BQ_DATASET}.myjob_output

Batch [4b7a26f1a1124cb9879d2e7bc9a56957] submitted.
Pulling image gcr.io/statmike-project-1/dataproc:latest
About to run 'docker pull gcr.io/statmike-project-1/dataproc:latest' with retries...
latest: Pulling from statmike-project-1/dataproc
42c077c10790: Pulling fs layer
5f2212035f41: Pulling fs layer
262f86e44458: Pulling fs layer
52d6e05ecdd7: Pulling fs layer
abf79a304d3a: Pulling fs layer
b26f200a32c2: Pulling fs layer
73361f799343: Pulling fs layer
1dd2d75fa1f0: Pulling fs layer
c3ddafca7424: Pulling fs layer
c884d432d4f9: Pulling fs layer
52d6e05ecdd7: Waiting
abf79a304d3a: Waiting
b26f200a32c2: Waiting
73361f799343: Waiting
1dd2d75fa1f0: Waiting
c3ddafca7424: Waiting
c884d432d4f9: Waiting
262f86e44458: Verifying Checksum
262f86e44458: Download complete
5f2212035f41: Verifying Checksum
5f2212035f41: Download complete
42c077c10790: Verifying Checksum
42c077c10790: Download complete
52d6e05ecdd7: Verifying Checksum
52d6e05ecdd7: Download complete
abf79a304d3a: Verifying Checksum
a

In [43]:
bq.query(query = f"SELECT * FROM {PROJECT_ID}.{NOTEBOOK}.myjob_output ORDER BY word_count DESC LIMIT 5").to_dataframe()

Unnamed: 0,word,word_count
0,the,25568
1,I,21028
2,and,19649
3,to,17361
4,of,16438
