In [None]:
!pip install pyspark img2dataset

In [None]:
from pyspark.sql import SparkSession

In [None]:
# TODO: Update with your namespace
NAMESPACE = "tenant-CHANGE-ME"

In [None]:
SERVICE_NAME = "spark-jupyter"
SERVICE_ACCOUNT_NAME = "spark-sa"

In [None]:
HOSTNAME = !hostname
HOSTNAME = HOSTNAME[0]
HOSTNAME

In [None]:
import os

K8S_API = os.environ["KUBERNETES_SERVICE_HOST"]
K8S_API

In [None]:
import wandb

wandb.login()
WANDB_API_KEY = os.environ["WANDB_API_KEY"]

In [None]:
%%!
echo 'apiVersion: v1
kind: Pod
metadata:
  name: cpu-job
spec:
  terminationGracePeriodSeconds: 10
  containers:
    - name: cpu-job
      volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: spark-pvc
          # Match the mountPath of jupyter lab
          mountPath: /mnt/pvc
          readOnly: false

  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: topology.kubernetes.io/region
                operator: In
                values:
                  - "LGA1"
              - key: node.coreweave.cloud/cpu
                operator: In
                values:
                  - amd-epyc-rome
                  - amd-epyc-milan
                  - intel-xeon-v3
                  - intel-xeon-v4
  volumes:
    - name: dshm
      emptyDir:
        medium: Memory
    - name: spark-pvc
      persistentVolumeClaim:
        claimName: spark-pvc
        readOnly: false
  restartPolicy: Always' >> cpu-pod-template.yaml

In [None]:
spark = (
    SparkSession.builder
    .appName("interactive-test")
    .config("spark.master", f"k8s://{K8S_API}")
    .config("spark.submit.deployMode", "client")
    .config("spark.driver.port", "2222")
    .config("spark.driver.blockManager.port", "7777")
    .config("spark.driver.host", f"{SERVICE_NAME}.{NAMESPACE}.svc.tenant.chi.local")
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "0.0.0.0")

    # Driver config (Resources should match the deployment)
    .config("spark.driver.cores", "16")
    .config("spark.kubernetes.driver.limit.cores", "16")
    .config("spark.driver.memory", "64G")
    .config("spark.kubernetes.driver.pod.name", HOSTNAME)

    # Executor config
    .config("spark.executor.cores", "16")
    .config("spark.kubernetes.executor.limit.cores", "16")
    .config("spark.executor.memory", "64G")
    .config("spark.kubernetes.executor.podNamePrefix", "spark-interactive")

    # Dynamic scaling config
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "0")
    .config("spark.dynamicAllocation.maxExecutors", "5")
    # .config("spark.executor.instances", 5)

    # The image has spark v3.4.0 and img2dataset already installed
    .config("spark.kubernetes.driver.container.image", "navarrepratt/spark-download-imgdataset:1.0.0")
    .config("spark.kubernetes.executor.container.image", "navarrepratt/spark-download-imgdataset:1.0.0")

    # Use the pod template that was defined in a local file in the previous cell
    .config("spark.kubernetes.driver.podTemplateFile", "./cpu-pod-template.yaml")
    .config("spark.kubernetes.executor.podTemplateFile", "./cpu-pod-template.yaml")

    .config("spark.kubernetes.namespace", NAMESPACE)
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", SERVICE_ACCOUNT_NAME)
    .config("spark.kubernetes.authenticate.serviceAccountName", SERVICE_ACCOUNT_NAME)

    .config("spark.kubernetes.driverEnv.WANDB_API_KEY", WANDB_API_KEY)
    .config("spark.executorEnv.WANDB_API_KEY", WANDB_API_KEY)

    .getOrCreate()
)

spark

In [None]:
# Example workload to calculate Pi
# Meant to spin up all 5 of the dynamic executors defined above.

from random import random
from operator import add

partitions = 100
n = 10000000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

In [None]:
!wget https://storage.googleapis.com/conceptual_12m/cc12m.tsv

In [None]:
!sed -i '1s/^/url\tcaption\n/' cc12m.tsv

In [None]:
# Downlowd the CC12M dataset to the PVC.
# Should take ~1 hour.

from img2dataset import download

url_list = "/mnt/pvc/cc12m.tsv"
output = "/mnt/pvc/cc12m-jupyter"
thread_count = 2048

download(
    processes_count=1,  # Process count will be num executors * num cores per executor when using pyspark
    thread_count=thread_count,
    url_list=url_list,
    image_size=256,
    output_folder=output,
    output_format="webdataset",
    input_format="tsv",
    url_col="url",
    caption_col="caption",
    enable_wandb=True,
    subjob_size=10000,
    distributor="pyspark",
    timeout=10
)