# Spark workloads Executed using GKE Custom Compute Classes

## Objective
Execute a Spark workload on a GKE cluster that uses custom compute classes. Jobs are submitted via spark-submit layer through Vertex Custom jobs. Users would define `ComputeClass.yaml` with a list of resource preferences. GKE would attempt to fulfill resources according to this list (e.g. L4 > T4 > CPU), and when a preferred resource is unavailable, a fallback strategy would shift to the next suitable resource. Custom Compute Class is set as the default for the namespace that runs the spark workload.


### Work Flow
- GKE cluster (Autopilot mode) is created and a Custom Compute class is set as the default for a namespace 
- Vertex Custom Job pulls and submits containerized workloads from Artifact Registry using WorkerPoolSpecs
- Spark workload is run in the Kubernetes cluster specified in configuration


## Google Cloud services and resources:

- `Vertex AI`
- `Artifact Registry`
- `Cloud Storage`
- `Kubernetes Engine`
- `Compute Engine`

In [None]:
# Check the versions of the packages installed

! kubectl version --client
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [None]:
# Project parameters
PROJECT_ID = "sandbox-401718" # @param {type:"string"}
REGION="us-central1" # @param {type:"string"}

# Cluster parameters
NETWORK="beusebio-network" # @param {type:"string"}
cluster_name = "ccc-test-region-autopilot" # @param {type:"string"}
cluster_zone = "us-central1" # @param {type:"string"}

# storage bucket to store intermediate artifacts such as YAML job files
BUCKET_URI = "gs://sandbox-401718-us-notebooks/gke-yaml"  # @param {type:"string"}

### Create a Cluster

Custom Compute Classes work with GKE Autopilot Clusters as well as GKE Standard Clusters with Autoprovisioning.

In [None]:
! gcloud container clusters create-auto {cluster_name} \
    --network={NETWORK} \
    --location=us-central1 \
    --release-channel=regular

In [None]:
# Set and connect to the Kubernetes Master Server IP address
K8S = "https://34.173.27.183" # @param {type:"string"}

! gcloud container clusters get-credentials {cluster_name} --location {cluster_zone} --project {PROJECT_ID}

In [None]:
! gcloud container clusters describe {cluster_name} --location {cluster_zone}

### Define a Custom Compule Class

Custom compute classes control the properties of the nodes that Google Kubernetes Engine (GKE) provisions when autoscaling your cluster

In [None]:
%%writefile ./src/computeclass.yaml

apiVersion: cloud.google.com/v1
kind: ComputeClass
metadata:
  name: l4-t4-cpu
spec:
  priorities:
  - gpu:
      count: 1
      type: nvidia-l4
  - gpu:
      count: 1
      type: nvidia-tesla-t4
  - machineFamily: n1
    minCores: 16
  activeMigration:
    optimizeRulePriority: true
  nodePoolAutoCreation:
    enabled: true

In [None]:
# Apply compute class
! kubectl apply -f ./src/computeclass.yaml

In [None]:
! kubectl describe computeclass l4-t4-cpu

### Test Example Workload

In [None]:
%%writefile ./src/workload.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-workload
spec:
  replicas: 2
  selector:
    matchLabels:
      app: custom-workload
  template:
    metadata:
      labels:
        app: custom-workload
    spec:
      nodeSelector:
        cloud.google.com/compute-class: l4-t4-cpu
      containers:
      - name: test
        image: gcr.io/google_containers/pause
        resources:
          requests:
            cpu: 1.5
            memory: "4Gi"

In [None]:
# Apply compute class
! kubectl apply -f ./src/workload.yaml

In [None]:
# # Gives detailed information about the  Deployment
! kubectl describe deployment custom-workload 

In [None]:
# Check that all Pods are running
! kubectl get pods -l=app=custom-workload

In [None]:
# View nodes
! kubectl get nodes

## Spark on GPU-enabled Kubernetes

Build image to run and submit Apache Spark applications on Kubernetes. Steps include downloading files from Nvidia and Spark into a local `src/` folder. In this example, no operators are required.

### Configure RBAC Role
Create namespace, configure user control for managing access to Kubernetes cluster resources, and verify permissions to run Spark workloads on Kubernetes

In [None]:
%%writefile ./src/spark-role.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: spark-demo
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark
  namespace: spark-demo
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-role
  namespace: spark-demo
subjects:
  - kind: ServiceAccount
    name: spark
    namespace: spark-demo
roleRef:
  kind: ClusterRole
  name: edit
  apiGroup: rbac.authorization.k8s.io
---

In [None]:
# Create namespace, apply RBAC cofig, Custom Compute Class as default, and verify permissions to run Spark workloads on Kubernetes
! kubectl create namespace spark-demo
! kubectl label namespaces spark-demo \
    cloud.google.com/default-compute-class=l4-t4-cpu
! ! kubectl --namespace=spark-demo apply -f ./src/spark-role.yaml
! kubectl auth can-i create pod --namespace spark --as=system:serviceaccount:spark-demo:spark
! kubectl auth can-i delete services --namespace spark --as=system:serviceaccount:spark-demo:spark

### Spark Workload

A Spark test job is run on the GKE cluster through a Vertex AI Custom Jobs. The Vertex Custom Job with a worker pool specification points to a pre-built Docker image containing spark-rapids, and allows users to submit spark jobs without using Kubernetes Operators for Spark. The Custom Compute Class is set as default for the spark-demo namespace.

Set up required parameters.

Container Image (created in *00_build_spark_images.ipynb*):
- `VERSION`: version or tag of the Docker image. Default set as `latest`
- `REPO_NAME`: The name of the Artifact Registry repository that will store the compiled pipeline file
- `JOB_IMAGE_ID`: The name of the image that will be used to run spark jobs on Kubernetes. The full image name: `<REGION>-docker.pkg.dev/<PROJECT_ID>/<REPO_NAME>/<JOB_IMAGE_ID>:<VERSION>`
- `BASE_IMAGE_ID`: The name of the image that will be used to submit jobs using Vertex AI. The full image name: `<REGION>-docker.pkg.dev/<PROJECT_ID>/<REPO_NAME>/<BASE_IMAGE_ID>:<VERSION>`
<br>

Custom Job:
- `SERVICE_ACCOUNT`: The service account to use to run custom jobs and pipeline

The final local `/src` folder will include the following: Dockerfile.cuda, spark (folder), getGpusResources.sh, rapids-4-spark_2.12-23.02.0.jar

In [None]:
# Image Parameters
VERSION="latest"
REPO_NAME="gke-mlops-pilot-docker" # @param {type:"string"}
JOB_IMAGE_ID="spark-gke" # @param {type:"string"}
BASE_IMAGE_ID = "component-base" # @param {type:"string"}

# Vertex Custom Job parameters
SERVICE_ACCOUNT="757654702990-compute@developer.gserviceaccount.com" # @param {type:"string"}
PIPELINE_ROOT="gs://sanbox-bucket-kfp-intro-demo" # @param {type:"string"}

In [None]:
# Import libraries

import os
from google.cloud import aiplatform

In [None]:
# Sprk Pi test

CMD = [
    r"""gcloud container clusters get-credentials {cluster_name_} --zone {cluster_zone_} --project {project} &&./bin/spark-submit \
        --master k8s://{k8s} \
        --deploy-mode cluster \
        --name spark-pi \
        --class org.apache.spark.examples.SparkPi \
        --conf spark.kubernetes.driver.request.cores=400m \
        --conf spark.kubernetes.executor.request.cores=100m \
        --conf spark.kubernetes.container.image={image} \
        --conf spark.kubernetes.namespace=spark-demo \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
        local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar""".format(
        cluster_name_=cluster_name,
        cluster_zone_=cluster_zone,
        project=PROJECT_ID,
        k8s=K8S,
        image=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{JOB_IMAGE_ID}:{VERSION}",
    )
]

In [None]:
WORKER_POOL_SPEC_ = [
    {
        "replica_count": 1,
        "machine_spec": {"machine_type": "n1-standard-4", "accelerator_count": 0},
        "container_spec": {
            "image_uri": f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{BASE_IMAGE_ID}:{VERSION}",
            "command": ["sh", "-c"],
            "args": CMD
        },
    }
]

In [None]:
custom_job = aiplatform.CustomJob(
    display_name="k8s-custom-job",
    worker_pool_specs=WORKER_POOL_SPEC_,
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=PIPELINE_ROOT
)

custom_job.run(sync=False, service_account=SERVICE_ACCOUNT)

### Check Kubernetes Task Completion and Output

View the jobs that have been submitted to the Kubernetes cluster. Once the job and/or pipeline is complete, check to see the output of the spark-pi job.

Example: `Job 0 finished: reduce at SparkPi.scala:38, took 1.490524 s
Pi is roughly 3.1339956699783498`

In [None]:
! kubectl get pods --namespace=spark-demo

In [None]:
 # Check the logs for any Pod
    
pod = "spark-pi-d68f3f94ddd6e3e4-driver"    # @param {type:"string"}
! kubectl logs {pod} --namespace=spark-demo

In [None]:
! kubectl describe pod spark-pi-d68f3f94ddd6e3e4-driver --namespace=spark-demo

In [None]:
! kubectl get nodes -l cloud.google.com/compute-class=l4-t4-cpu

In [None]:
# Delete Cluster
! gcloud container clusters delete {cluster_name} --zone {cluster_zone} --quiet

## Additional References
* [About Custom Compute Classes](https://cloud.google.com/kubernetes-engine/docs/concepts/about-custom-compute-classes)
* [Running Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html)
* [Getting Started with RAPIDS and Kubernetes](https://docs.nvidia.com/ai-enterprise/deployment-guide-spark-rapids-accelerator/0.1.0/kubernetes.html)