## Example which uses on-demand Spark to perform computation and read/write to remote secure HDFS


## First get the HDFS Delegation token

SPARK communicates with HDFS securely using an HDFS Delegation token. This token is requesting from the HDFS service via identifying the client using the Kerberos ticket (Run `klist` to see the ticket details from your cache).

We need to have this ticket accessible by both the driver and the executors. Hence we place it in the DATASET for the project. Ideally we need to have a DATASET per user and only accessible to the user to be contain this ticket for security reasons. 

**In more ideally this should be supported inside SPARK via a side-car container in both the Worskspace and Spark Nodes (Via Domsed or natively) in transient location such as `/tmp/` folder**.

Set the value for this token file in the project environment variable `HADOOP_TOKEN_FILE_LOCATION`


In [None]:
import os
#Generate an HDFS Delegation token
#%env HADOOP_TOKEN_FILE_LOCATION=/mnt/data/$ON-DEMAND-SPARK/hdfsdt.token
#Run the following commands
!/mnt/code/scripts/my_hdfs.sh fetchdt --renewer null /mnt/data/$DOMINO_PROJECT_NAME/hdfsdt.token
#%env HADOOP_TOKEN_FILE_LOCATION=/mnt/data/$DOMINO_PROJECT_NAME/hdfsdt.token

%env HADOOP_TOKEN_FILE_LOCATION=/mnt/data/ON-DEMAND-SPARK/hdfsdt.token

hdfs_dt_token_path=os.environ['HADOOP_TOKEN_FILE_LOCATION']



Next configure the environment variables `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON`. This is not necessary if both the workspace and executors use the same path (`/usr/bin/python3`). But if your workspace uses `/opt/conda/bin/python3` your job will fail in the executors. You can set both to  `/opt/conda/bin/python3`/ and see how it fails.

Also fetch the HDFS_ENDPOINT from the Project Environment variable `HDFS_ENDPOINT`. In my example the HDFS Namenode location is running on `hdfs://10.0.123.114:8020`

In [None]:
%env PYSPARK_PYTHON /usr/bin/python3
%env PYSPARK_DRIVER_PYTHON /usr/bin/python3
hdfs_endpoint=os.environ['HDFS_ENDPOINT']

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, DoubleType, IntegerType
import random

Now create the Spark Session. Note the value of the config `spark.executorEnv.HADOOP_TOKEN_FILE_LOCATION` .

In [None]:
#On DEMAND SPARK WITH DT


sparkSession = SparkSession.builder.appName("Calculate Pi using OnDemand Spark") \
.config("spark.dynamicAllocation.enabled", "false") \
.config("fs.default.name", hdfs_endpoint) \
.config("spark.driver.extraClassPath", "/opt/hadoop/etc/hadoop:/usr/lib/hadoop-lzo/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/hdfs/*") \
.config("spark.executorEnv.HADOOP_TOKEN_FILE_LOCATION",hdfs_dt_token_path) \
.getOrCreate()
sc=sparkSession.sparkContext
#.config("spark.executor.extraClassPath", "/opt/hadoop/etc/hadoop:/usr/lib/hadoop-lzo/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/hdfs/*") \

In [None]:
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1


In [None]:


columns = StructType([ StructField("name", StringType(), True),
                      StructField("value", DoubleType(), True)
                    ])

count = sc.parallelize(range(0, 1000),1) \
             .filter(inside).count()
data = [("Pi",4.0 * count/1000)]

df = sparkSession.createDataFrame(data=data, schema=columns)

df.show()

In [None]:
Now we write to secure (Kerberized) HDFS. This uses the hdfs token file we generated in step 1.

In [None]:
#Let us write to a dataset
ds_path = '/user/dominospark/my-pi'
!/mnt/code/scripts/my_hdfs.sh dfs -rmr '/user/dominospark/my-pi*'
df.write.csv(ds_path)
#Read it back
sparkSession.read.csv(ds_path).show()
!/mnt/code/scripts/my_hdfs.sh dfs -ls /user/dominospark/my-pi

And now we just do more the same. Generate a dataset in HDFS and then retrive it and filter the records based on a simple criteria.

In [None]:
!/mnt/code/scripts/my_hdfs.sh dfs -rmr '/user/dominospark/small-data-100/'
hdfs_src_path = '/user/dominospark/largedata/'
hdfs_dest_path =  '/user/dominospark/small-data-100/'
local_dest_path = 'file:///mnt/data/ON-DEMAND-SPARK/small-data-100'
!rm -rf /mnt/data/ON-DEMAND-SPARK/small-data-100
filter_criteria = 100
sparkSession = SparkSession.builder.appName("Generate Data") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("fs.default.name", hdfs_endpoint) \
    .getOrCreate()

    
columns = StructType([ StructField("id", IntegerType(), True), \
                       StructField("v1", IntegerType(), True),\
                       StructField("v2", IntegerType(), True),\
                       StructField("v3", IntegerType(), True) ])

df_load = sparkSession.read.csv(hdfs_src_path,columns)
df_load_filtered = df_load.where(df_load.id < filter_criteria)
df_load_filtered.write.csv(hdfs_dest_path)
df_load_filtered.write.csv(local_dest_path)


In [None]:
df_load_read = sparkSession.read.csv(hdfs_dest_path,columns)
df_load_read.show()

In [None]:
df_load_read = sparkSession.read.csv(local_dest_path,columns)
df_load_read.show()

In [None]:
sparkSession.stop()

In [None]:
!/mnt/code/scripts/my_hdfs.sh dfs -ls