# Armada Spark Example

This notebook demonstrates how to run Spark jobs on Armada using PySpark in client mode.

In [None]:
import os
import glob
import subprocess
import random
from pyspark.sql import SparkSession
from pyspark import SparkConf

## Setup

Clean up any existing Spark context and configure the environment.

In [None]:
try:
    from pyspark import SparkContext
    if SparkContext._active_spark_context:
        SparkContext._active_spark_context.stop()
except:
    pass

## Configuration

Set up connection parameters and locate the Armada Spark JAR file.

In [None]:
# Configuration
driver_host = os.environ.get('SPARK_DRIVER_HOST', '10.0.0.80')
driver_port = os.environ.get('SPARK_DRIVER_PORT', '7078')
block_manager_port = os.environ.get('SPARK_BLOCK_MANAGER_PORT', '10061')
armada_master = os.environ.get('ARMADA_MASTER', 'local://armada://host.docker.internal:30002')
armada_queue = os.environ.get('ARMADA_QUEUE', 'test')
image_name = os.environ.get('IMAGE_NAME', 'spark:armada')

# Find JAR
jar_paths = glob.glob('/opt/spark/jars/armada-cluster-manager_2.13-*-all.jar')
if not jar_paths:
    raise FileNotFoundError("Armada Spark JAR not found!")
armada_jar = jar_paths[0]

# Generate app ID, required for client mode
app_id = f"armada-spark-{subprocess.check_output(['openssl', 'rand', '-hex', '3']).decode().strip()}"

## Spark Configuration

Configure Spark to use Armada as the cluster manager in client mode.

In [None]:
# Spark Configuration
conf = SparkConf()
conf.set("spark.master", armada_master)
conf.set("spark.submit.deployMode", "client")
conf.set("spark.app.id", app_id)
conf.set("spark.app.name", "jupyter-spark-pi")
conf.set("spark.driver.bindAddress", "0.0.0.0")
conf.set("spark.driver.host", driver_host)
conf.set("spark.driver.port", driver_port)
conf.set("spark.driver.blockManager.port", block_manager_port)
conf.set("spark.home", "/opt/spark")
conf.set("spark.armada.container.image", image_name)
conf.set("spark.armada.scheduling.nodeUniformity", "armada-spark")
conf.set("spark.armada.queue", armada_queue)
conf.set("spark.kubernetes.file.upload.path", "/tmp")
conf.set("spark.kubernetes.executor.disableConfigMap", "true")
conf.set("spark.local.dir", "/tmp")
conf.set("spark.jars", armada_jar)

# Network timeouts
conf.set("spark.network.timeout", "800s")
conf.set("spark.executor.heartbeatInterval", "60s")

# Static mode
conf.set("spark.executor.instances", "2")
conf.set("spark.armada.executor.limit.memory", "1Gi")
conf.set("spark.armada.executor.request.memory", "1Gi")

In [None]:
# Create SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print(f"SparkSession created")

## Examples

Run Spark computations on the Armada cluster.

In [None]:
# Spark Pi calculation
print(f"Running Spark Pi calculation...")

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

n = 10000
count = spark.sparkContext.parallelize(range(0, n)).filter(inside).count()
pi = 4.0 * count / n
print(f"  Pi is approximately: {pi}")

In [None]:
# Word count example
print("Running word count test...")

from pyspark.sql import Row

# Create a simple dataset
data = [
    "Hello Spark",
    "Hello Armada",
    "Spark on Armada",
    "Distributed computing",
    "Spark and Armada"
]

# Create RDD and perform word count
words_rdd = spark.sparkContext.parallelize(data)
word_counts = words_rdd.flatMap(lambda line: line.split(" ")) \
                      .map(lambda word: (word.lower(), 1)) \
                      .reduceByKey(lambda a, b: a + b) \
                      .collect()

print("Word counts:")
for word, count in sorted(word_counts):
    print(f"  {word}: {count}")

## Cleanup

Stop the Spark context to release resources. This will stop the executors in Armada.

In [None]:
# Stop Spark context
print("Stopping Spark context...")
spark.stop()
print("Spark context stopped successfully")