In [None]:
import os
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark001")
sparkConf.set("spark.kubernetes.container.image", "angelmaroco/spark:3.2.0-hadoop-3.2-aws-sdk-1.12.132-python-3.8")
sparkConf.set("spark.kubernetes.namespace", "default")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "1800m")
sparkConf.set("spark.submit.deployMode",'client')
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.instances", "1")
sparkConf.set("spark.driver.blockManager.port", "7777")
sparkConf.set("spark.driver.port", "2222")
sparkConf.set("spark.driver.host", socket.gethostbyname(socket.gethostname()))
sparkConf.set("spark.driver.bindAddress", "0.0.0.0")
sparkConf.set("spark.kubernetes.node.selector.workload", "workload-low-cpu")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.local.dirs.tmpfs", "true")
sparkConf.set("‍spark.dynamicAllocation.enabled", "true")
sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
sparkConf.set("spark.dynamicAllocation.shuffleTracking.timeout", "120")
sparkConf.set("spark.dynamicAllocation.minExecutors", "1")
sparkConf.set("spark.dynamicAllocation.maxExecutors", "2")
sparkConf.set("spark.kubernetes.allocation.batch.size", "15")
sparkConf.set("spark.dynamicAllocation.executorAllocationRatio", "1")
sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1")

In [None]:
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

from random import random
from operator import add
partitions = 30
n = 1000 * partitions
def f(_):
 x = random() * 2 - 1
 y = random() * 2 - 1
 
 return 1 if x ** 2 + y ** 2 <= 1 else 0
count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

In [None]:
sc.stop()