In [None]:
# Kerberos init
!chown jovyan /home/jovyan/testuser.keytab
!chmod 400 /home/jovyan/testuser.keytab

!klist -kte /home/jovyan/testuser.keytab
!kinit -kt /home/jovyan/testuser.keytab testuser/scm@EXAMPLE.COM

In [None]:
# Install Kubernetes client
!pip install kubernetes 

In [None]:
# Spark conf setup
import os
os.environ["HADOOP_CONF_DIR"] = '/usr/local/spark/conf'

In [None]:
from kubernetes.client.rest import ApiException
import json

def get_spark_driver_ip_address():
    import socket
    hostname = socket.gethostname()
    local_ip = socket.gethostbyname(hostname)
    return local_ip

def get_spark_driver_url():
    return "spark://jupyter:7077"

def create_service_endpoint(k8s_api_client, 
                   service_endpoint_name: str,
                   pod_ip_address: str,
                   labels: dict,
                   port: str=7070,
                   namespace: str='default'
                  ):
    
    core_v1 = k8s_api_client

    api_response = None
    try:
        api_response = core_v1.read_namespaced_endpoints(name=service_endpoint_name,
                                                        namespace=namespace)
    except ApiException as e:
        if e.status != 404:
            print("Unknown error: %s" % e)
            exit(1)

    if not api_response:
        service_endpoint_manifest = {
                'apiVersion': 'v1',
                'kind': 'Endpoints',
                'metadata': {
                    'labels': labels,
                    'name': service_endpoint_name,
                    'namespace': namespace
                },
                'subsets': [{
                    'addresses': [{
                        'ip': pod_ip_address
                    }],
                    'ports': [{
                        'protocol': 'TCP',
                        'port': port
                    }]
                }]
            }

        #print(f'SERVICE MANIFEST:\n{service_endpoint_manifest}')

        api_response = core_v1.create_namespaced_endpoints(body=service_endpoint_manifest, namespace=namespace)
        #print(api_response)
    return api_response

def create_service(k8s_api_client, service_name: str,
                   labels: dict,
                   selector: dict,
                   port: str=7070,
                   target_port: str=7070,
                   namespace: str='default'
                  ):
    
    core_v1 = k8s_api_client

    api_response = None
    try:
        api_response = core_v1.read_namespaced_service(name=service_name,
                                                        namespace=namespace)
    except ApiException as e:
        if e.status != 404:
            print("Unknown error: %s" % e)
            exit(1)

    if not api_response:
        service_manifest = {
                'apiVersion': 'v1',
                'kind': 'Service',
                'metadata': {
                    'labels': labels,
                    'name': service_name,
                    'namespace': namespace
                },
                'spec': {
                    'clusterIP': 'None',
                    'selector': selector,
                    'ports': [{
                        'protocol': 'TCP',
                        'port': port,
                        'targetPort': target_port
                    }]
                }
            }
        api_response = core_v1.create_namespaced_service(body=service_manifest, namespace=namespace)
    return api_response

def get_kubernetes_core_api():
    from kubernetes import config
    from kubernetes.client.api import core_v1_api

    config.load_kube_config(config_file='/home/jovyan/.kube/config', context='spark')
    core_v1 = core_v1_api.CoreV1Api()
    return core_v1

def create_spark_executor_template(cmd: list, 
                   labels: dict,
                   volumes: [dict],
                   volumeMounts: [dict],
                   env: [dict],
                   resources: dict,
                   restartPolicy: str='Always'):
    '''
    This method returns a Spark executor pod manifest
    '''
    # Create pod manifest
    pod_manifest = {
            'apiVersion': 'v1',
            'kind': 'Pod',
            'metadata': {
                'labels': labels,
                #'name': pod_name
            },
            'spec': {
                'containers': [{
#                    'name': pod_name,
#                    'args': cmd,
                    'env': env,
                    'resources': resources,
                }],
#                'tolerations': pod_tolerations,
                'restartPolicy': restartPolicy,
                'terminationGracePeriodSeconds': 0
            }
        }
    with open('spark_executor_template.yaml', 'w') as outfile:
        json.dump(pod_manifest, outfile)

class SparkEnvironment:
    _clustersize = 0
    _config = None
    _proxybase = None
    _spark = None

    @staticmethod
    def _startMaster():
        spark_driver_host = 'jupyter'
        spark_driver_port = 7077
        spark_namespace = 'default'
        spark_driver_ip_address = get_spark_driver_ip_address()
        
        v1 = get_kubernetes_core_api()
        
        create_service_endpoint(v1, service_endpoint_name=spark_driver_host,
                    pod_ip_address=spark_driver_ip_address,
                    labels={'name': spark_driver_host},
                    port=spark_driver_port,
                    namespace=spark_namespace
                    ) 
                                
        create_service(v1, service_name=spark_driver_host,
                    labels={'name': spark_driver_host},
                    selector={},
                    port=spark_driver_port,
                    target_port=spark_driver_port,
                    namespace=spark_namespace
                    )

    def _conf( conf = None, debug = False ):
        from pyspark import SparkConf
        
        if conf is None:
            conf = SparkConf(False)
        
        if SparkEnvironment._clustersize >= 1:         
            # Create Spark config for our Kubernetes based cluster manager
            conf.setMaster("k8s://https://kubernetes.default:443")
            conf.setAppName("spark")
            conf.set("spark.kubernetes.namespace", "default")
            conf.set("spark.executor.instances", SparkEnvironment._clustersize)
            conf.set("spark.kubernetes.executor.request.cores", "3")
            conf.set("spark.executor.memory", "2g")
        
            conf.set("spark.kubernetes.pyspark.pythonVersion", "3")
            conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
            conf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
            conf.set("spark.kubernetes.context", "spark")
            conf.set("spark.kubernetes.trust.certificates", "true")            
            conf.set("spark.kubernetes.container.image", "buslovaev/pyspark-ozone:v3.2.1")
            conf.set("spark.kubernetes.kerberos.krb5.path", "/opt/spark/conf/krb5.conf")
            
            conf.set("spark.kerberos.keytab", "/home/jovyan/testuser.keytab")
            conf.set("spark.kerberos.principal", "testuser/scm@EXAMPLE.COM")
            conf.set("spark.driver.memory", "2g")
            
            conf.set("spark.driver.host", 'jupyter.default')
            conf.set("spark.driver.port", 7077)
            if debug:
                conf.set("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true -Dlog4jspark.root.logger=DEBUG,console -Djava.security.krb5.conf=/etc/krb5.conf")
                conf.set("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true -Dlog4jspark.root.logger=DEBUG,console")
                conf.set("spark.executor.extraJavaOptions", f"-Dlog4j.configuration=file:/opt/spark/conf/log4j.properties -Djava.security.krb5.conf=/opt/spark/conf/krb5.conf")
                
                create_spark_executor_template(cmd=[],                   
                   labels={"type": "spark-worker"},
                   volumes={},
                   volumeMounts={},
                   env=[{'name': 'HADOOP_CONF_DIR', 'value': '/opt/spark/conf'}, {'name': 'HADOOP_USER_NAME', 'value': 'testuser'}, {'name': 'HADOOP_JAAS_DEBUG', 'value': 'true'}, {'name': 'HADOOP_OPTS', 'value': '-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug'} ],
                   resources={},                            
                   restartPolicy='Always')
                
                conf.set("spark.kubernetes.executor.podTemplateFile", "spark_executor_template.yaml")
        
        SparkEnvironment._config = conf
        return conf

    @staticmethod
    def _master(clusterSize=0, debug=False):
        SparkEnvironment._clustersize = clusterSize
        if SparkEnvironment._clustersize >= 1:
            SparkEnvironment._startMaster()
            return get_spark_driver_url()
        elif SparkEnvironment._clustersize == 0:
            return "local[*]"

    @staticmethod
    def runSparkSession(clusterSize=0 , app="my app", conf=None, debug=False):
        from pyspark.sql import SparkSession
        from pyspark.context import SparkContext
        from pyspark.conf import SparkConf

        master = SparkEnvironment._master(clusterSize, debug)
        config = SparkEnvironment._conf(conf, debug)
        SparkEnvironment._spark = SparkSession \
            .builder \
            .appName(app) \
            .master(master) \
            .config(conf=config)\
            .getOrCreate()
        # DEBUG FOR SPARK MASTER
        if debug:
            SparkEnvironment._spark.sparkContext.setLogLevel('DEBUG')    
        return SparkEnvironment._spark
    
    @staticmethod
    def stopSparkSession():
        try:
            SparkEnvironment._spark.stop()
        except:
            pass

In [None]:
#8 Run Spark cluster in a client mode
SparkEnvironment.stopSparkSession()
spark = SparkEnvironment.runSparkSession(1, debug=True)

In [None]:
# Write sample data to Ozone bare metal secured
ozone_data_path='/volume1/bucket1/test2'
spark.createDataFrame(
        [{"age": 100, "name": "Hyukjin Kwon"}]
).write.parquet(ozone_data_path, mode="overwrite")

df7 = spark.read.parquet(ozone_data_path)
df7.show()
df7 = spark.read.parquet(ozone_data_path).to_pandas_on_spark()

In [None]:
# Stop Spark cluster
SparkEnvironment.stopSparkSession()