# Using Spark on Kubernetes

This is a testing notebook and also "cheat sheet" to make sure everything is running and connecting
for my kubernetes spark setup

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

# load spark session templates
from spark_utils import get_k8s_spark

## Objectstore Tests 

we are using Minio as our object store so firstly lets test it independent of spark
if we return buckets then all is good

In [None]:
from minio import Minio
from minio.error import S3Error

In [None]:
minio_client = Minio(
        "minio.minio-tenant.svc.cluster.local",
        access_key='AKIAIOSFODNN7EXAMPLE',
        secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
        secure=False
    )

In [None]:
buckets = minio_client.list_buckets()

for bucket in buckets:
    print(bucket.name, bucket.creation_date)

## Configs

These configs are set to work with the stack at: https://github.com/Data-drone/data_eng_kube.git

Note compared to Spark 2.x, Spark 3.x doesn't properly maven load spark.jars.packages:
https://issues.apache.org/jira/browse/SPARK-35084

We need to have at least the hadoop-aws jar already on drivers and executors to make things work more smoothly

In [None]:
BASIC_SUBMIT_ARGS = "--packages org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"

os.environ["PYSPARK_SUBMIT_ARGS"] = BASIC_SUBMIT_ARGS

In [None]:
access_key = 'AKIAIOSFODNN7EXAMPLE' # os.environ['MINIO_ACCESS_KEY']
secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' # os.environ['MINIO_SECRET_KEY']

spark = (get_k8s_spark()
            .config("spark.kubernetes.container.image", 
                    "k3d-test-registry:5000/datadrone/k8s-spark-worker:3.1.2-hadoop3.2-rapids-k8s")
            .config("spark.kubernetes.container.image.pullPolicy", "Always")
            .config("spark.hadoop.fs.s3a.access.key", access_key)
            .config("spark.hadoop.fs.s3a.secret.key", secret_key)
            .config("spark.hadoop.fs.s3a.endpoint", "minio.minio-tenant.svc.cluster.local")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            .config("spark.hadoop.fs.s3a.path.style.access", True)
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.packages", "org.apache.hadoop:hadoop-aws:3.2.0")
            .appName("Spark K8s")
            .enableHiveSupport()
            .getOrCreate()
        )

In [None]:
# check loaded jars
print(spark.sparkContext._jsc.sc().listJars())

In [None]:
# test spark without reading data
# Create a distributed data set to test to the session
t = spark.sparkContext.parallelize(range(10))

# Calculate the approximate sum of values in the dataset
r = t.sumApprox(3)
print('Approximate sum: %s' % r)

# Generate some test data and run through Spark

In [None]:
import pandas as pd
import numpy as np

In [None]:
df = pd.DataFrame(np.random.randn(100000,20))

In [None]:
df.head()

In [None]:
sparkDF=spark.createDataFrame(df) 

In [None]:
sparkDF.printSchema()

# Load Data and write it to my object store

In [None]:
# Firstly create a new bucket

In [None]:
try:
    minio_client.make_bucket('testing-bucket')
except S3Error as err:
    print(err)

In [None]:
# need boto to pull from AWS
# !pip install boto3

In [None]:
import boto3
from botocore import UNSIGNED
from botocore.client import Config

In [None]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

In [None]:
# Configs
output_bucket = 'testing-bucket'
testing_file = 'green_tripdata_2015-07.csv'
load_path = 'trip data/' + testing_file
write_path = 'raw_data/' + testing_file

In [None]:
with open('green_tripdata_2015-07.csv', 'wb') as f:
        s3.download_fileobj('nyc-tlc', load_path, f)

In [None]:
minio_client.fput_object(output_bucket, write_path, testing_file)

## Reading the loaded Data with Spark

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)

Test read from minio

In [None]:
raw_data = spark.read.option("header", True).csv(os.path.join('s3a://data/raw_data/green_tripdata_2014-09.csv'))

In [None]:
raw_data.printSchema()

In [None]:
raw_data.take(10)

In [None]:
raw_data.count()

# Close out Session

In [None]:
# Shutdown Our Context
spark.stop()

# Testing Different Class Paths and loading downloaded libs

For larger libs we want to have them downloaded already and it would be good to be able to load libs from s3a paths so that we don't have to load workers and driver through local files.

It seems like we need to add things in the `jars` section for pyspark to work properly. We also need it on local as the jars get loaded together. So it will get stuck if the hadoop-aws jars aren't loaded when it tries to load a s3 pathed one.
Extra jars via `extraClassPath` don't seem to work either. Perhaps because it won't search through the classpaths on the initial spark initialisation? 

### Init Spark

In [2]:
BASIC_SUBMIT_ARGS = ("--jars local:///opt/spark-jars/hadoop-aws-3.2.0.jar,"
                     "local:///opt/spark-jars/delta-core_2.12-1.0.0.jar,"
                     "local:///opt/spark-jars/aws-java-sdk-bundle-1.11.375.jar,"
                     "local:///opt/spark-jars/postgresql-42.2.24.jar,"
                     "local:///opt/sparkRapidsPlugin/cudf-21.08.2-cuda11.jar,"
                     "local:///opt/sparkRapidsPlugin/rapids-4-spark_2.12-21.08.0.jar"
                     " pyspark-shell")

os.environ["PYSPARK_SUBMIT_ARGS"] = BASIC_SUBMIT_ARGS

In [3]:
access_key = 'AKIAIOSFODNN7EXAMPLE' # os.environ['MINIO_ACCESS_KEY']
secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' # os.environ['MINIO_SECRET_KEY']

# This cannot be triggered in the python code as the JVM will be activated when it hits the python builder starts
# .config("spark.packages", "org.apache.hadoop:hadoop-aws:3.2.0")

# debug
# .config("spark.kubernetes.executor.deleteOnTermination", "false")

spark = (get_k8s_spark()
            .config("spark.kubernetes.container.image", 
                    "k3d-test-registry:5000/datadrone/k8s-spark-worker:3.1.2-hadoop3.2-rapids-k8s")
            .config("spark.kubernetes.container.image.pullPolicy", "Always")
            .config("spark.hadoop.fs.s3a.access.key", access_key)
            .config("spark.hadoop.fs.s3a.secret.key", secret_key)
            .config("spark.hadoop.fs.s3a.endpoint", "minio.minio-tenant.svc.cluster.local")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            .config("spark.hadoop.fs.s3a.path.style.access", True)
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.executor.resource.gpu.amount", "1")
            .config("spark.task.resource.gpu.amount", "1")
            .config("spark.executor.resource.gpu.discoveryScript", "/opt/sparkRapidsPlugin/getGpusResources.sh")
            .config("spark.executor.resource.gpu.vendor", "nvidia.com")
            .config("spark.plugins", "com.nvidia.spark.SQLPlugin")
            .config("spark.rapids.sql.concurrentGpuTasks", "2")
            .config("spark.kubernetes.executor.deleteOnTermination", "false")
            .config("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:postgresql://metastore-ha.metastore.svc.cluster.local:5432/metastore")
            .config("spark.hadoop.javax.jdo.option.ConnectionDriverName", "org.postgresql.Driver")
            .config("spark.hadoop.javax.jdo.option.ConnectionUserName", "metastore")
            .config("spark.hadoop.javax.jdo.option.ConnectionPassword", "?A;U|6:FS1i(>RetZR?l1Htn")
            .config("spark.hadoop.metastore.catalog.default", "hive")
            .config("spark.sql.warehouse.dir", "s3a://data/warehouse")
            .appName("Spark K8s")
            .enableHiveSupport()
            .getOrCreate()
        )

#.config("spark.hadoop.metastore.catalog.default", "hive")
#.config("spark.sql.warehouse.dir", "s3a://data/warehouse")
#.config("spark.hive.metastore.uris", "thrift://metastore-service.metastore.svc.cluster.local:9083")
            

21/10/07 05:44:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/07 05:45:13 WARN SQLExecPlugin: RAPIDS Accelerator 21.08.0 using cudf 21.08.2. To disable GPU support set `spark.rapids.sql.enabled` to false
21/10/07 05:45:13 WARN Plugin: Installing rapids UDF compiler extensions to Spark. The compiler is disabled by default. To enable it, set `spark.rapids.sql.udfCompiler.enabled` to true


### Run Tests

In [None]:
raw_data = spark.read.option("header", True).csv(os.path.join('s3a://data/raw_data/green_tripdata_2014-09.csv'))

In [None]:
raw_data.printSchema()

In [None]:
clean_warehouse = "s3a://data/warehouse/raw/green_taxi_pre2015"

delta_data = spark.read.option("header", True).format("delta").load(clean_warehouse)

In [None]:
delta_data.printSchema()

# Rebuilding the metastore tables

Reloading the delta tables copied over from "a ml platform nyc taxi tables "

First check connectivity

In [None]:
import socket
socket.gethostbyname('metastore-service.metastore.svc.cluster.local')

In [None]:
!pip install thrift

In [None]:
# Testing Metastore
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

In [None]:
transport = TSocket.TSocket('metastore-service.metastore.svc.cluster.local', 9083)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
transport.open()

## Raw

raw tables schema

In [13]:
spark.sql("CREATE DATABASE processed")

21/10/07 05:51:38 WARN ObjectStore: Failed to get database processed, returning NoSuchObjectException


DataFrame[]

In [17]:
spark.catalog.listDatabases()

21/10/07 06:36:00 WARN HeartbeatReceiver: Removing executor 134 with no recent heartbeats: 173898 ms exceeds timeout 120000 ms
21/10/07 06:36:00 WARN HeartbeatReceiver: Removing executor 133 with no recent heartbeats: 124709 ms exceeds timeout 120000 ms
21/10/07 06:36:00 WARN HeartbeatReceiver: Removing executor 135 with no recent heartbeats: 120019 ms exceeds timeout 120000 ms
21/10/07 06:36:00 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
21/10/07 06:36:00 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
                                                                                

[Database(name='clean', description='', locationUri='s3a://data/warehouse/clean.db'),
 Database(name='default', description='Default Hive database', locationUri='s3a://data/warehouse'),
 Database(name='processed', description='', locationUri='s3a://data/warehouse/processed.db'),
 Database(name='raw', description='', locationUri='s3a://data/warehouse/raw.db')]

21/10/07 06:37:00 WARN HeartbeatReceiver: Removing executor 137 with no recent heartbeats: 173632 ms exceeds timeout 120000 ms
21/10/07 06:37:00 WARN HeartbeatReceiver: Removing executor 136 with no recent heartbeats: 173649 ms exceeds timeout 120000 ms
21/10/07 06:37:00 WARN HeartbeatReceiver: Removing executor 138 with no recent heartbeats: 160691 ms exceeds timeout 120000 ms

In [19]:
spark.sql("SHOW TABLES IN raw").collect()

[]

21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 143 with no recent heartbeats: 157759 ms exceeds timeout 120000 ms
21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 140 with no recent heartbeats: 124295 ms exceeds timeout 120000 ms
21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 142 with no recent heartbeats: 165792 ms exceeds timeout 120000 ms
21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 139 with no recent heartbeats: 179841 ms exceeds timeout 120000 ms
21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 144 with no recent heartbeats: 123611 ms exceeds timeout 120000 ms
21/10/07 06:38:00 WARN HeartbeatReceiver: Removing executor 141 with no recent heartbeats: 173796 ms exceeds timeout 120000 ms
21/10/07 06:39:00 WARN HeartbeatReceiver: Removing executor 146 with no recent heartbeats: 173763 ms exceeds timeout 120000 ms
21/10/07 06:39:00 WARN HeartbeatReceiver: Removing executor 145 with no recent heartbeats: 122019 ms exceeds ti

In [6]:
raw_green_pre_2015_warehouse = "s3a://data/warehouse/raw/green_taxi_pre2015"

delta_data = spark.read.option("header", True).format("delta").load(raw_green_pre_2015_warehouse)

21/10/07 05:45:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [16]:
delta_data.write.format("delta").mode("overwrite").saveAsTable('raw.green_taxi_pre2015')

21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_37 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_18 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_4 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_12 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_23 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_8 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_2 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_30 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_40 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_15_47 !
21/10/07 05:56:30 WARN BlockManagerMasterEndpoint: No more repl

KeyboardInterrupt: 

21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 151682 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 5 with no recent heartbeats: 133692 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 7 with no recent heartbeats: 122692 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 152241 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 4 with no recent heartbeats: 140726 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 6 with no recent heartbeats: 128703 ms exceeds timeout 120000 ms
21/10/07 05:59:00 WARN HeartbeatReceiver: Removing executor 3 with no recent heartbeats: 147883 ms exceeds timeout 120000 ms
21/10/07 06:00:00 WARN HeartbeatReceiver: Removing executor 8 with no recent heartbeats: 176745 ms exceeds timeout 120000 ms


In [None]:
# Shutdown Our Context
spark.stop()

In [None]:
# Validate sql db
#metastore-ha.metastore:5432

In [None]:
import socket
socket.gethostbyname('metastore-service.metastore.svc.cluster.local')