# PySpark and Hadoop/HDFS Demo

## Package Installation and Imports

In [None]:
# Install packages to venv
%pip install pandas
%pip install hdfs
%pip install pyspark pyspark[sql]

In [1]:
# Import dependencies
import os
import sys
import pandas as pd
from hdfs import InsecureClient
from pyspark.sql import SparkSession, Row

## HDFS Basic Operations

In [2]:
# Set inputs and outputs for pipes
HDFS_SERVER = 'x.x.x.x'  # IP address of the machine where the HDFS and Spark containers are run
HDFS_IPC_PORT = '9000'
HDFS_HTTP_PORT = '9870'
HDFS_USER = 'root'
HDFS_IPC_URL = f'hdfs://{HDFS_SERVER}:{HDFS_IPC_PORT}'  # pyspark connector port
HDFS_HTTP_URL = f'http://{HDFS_SERVER}:{HDFS_HTTP_PORT}'  # accesses the http port directly (bypasses reverse proxy, doesn't work if reverse proxy is used)
HDFS_HTTP_URL2 = 'http://hadoop-subdomain.domain.com'  # accesses the web server through the reverse proxy
hdfs_user_dir = f'/user/{HDFS_USER}'
hdfs_test_file = 'df_small.csv'
local_test_dir = f'/home/{os.getenv("USER")}/data-environment/test'

In [3]:
# Open/Create HDFS client
hdfs_client = InsecureClient(HDFS_HTTP_URL, user=HDFS_USER)

In [31]:
# Create an HDFS directory
hdfs_path = os.path.join(hdfs_user_dir, 'demo')
hdfs_client.makedirs(hdfs_path)

In [None]:
# List HDFS files and directories
contents = hdfs_client.list(hdfs_user_dir)
print(contents)

In [None]:
# Check if HDFS file or directory exists
def is_hdfs_path(path, hdfs_client):
    head, tail = os.path.split(path)
    # check that head path exists
    try:
        contents = hdfs_client.list(head)
    except:
        return False
    # check if tail file or dir in head path
    if tail in contents:
        return True
    else:
        return False

test_path = os.path.join(hdfs_user_dir, hdfs_test_file)
print(is_hdfs_path(test_path, hdfs_client))    

**NOTE**  
In order to download and upload to a remote HDFS, the hadoop datanode needs to be added to your local hosts  
To do this run the following on the command line:  
```echo "x.x.x.x hadoop-datanode" >> /etc/hosts```  
where x.x.x.x is your server's IP address

In [None]:
# Download file from HDFS (NOT WORKING: local machine and namenode both need to resolve the datanode hostname which may not be possible using docker)
hdfs_path = os.path.join(hdfs_user_dir, hdfs_test_file)
local_path = os.path.join(local_test_dir, hdfs_test_file)
hdfs_client.download(hdfs_path, local_path, n_threads=1, overwrite=True)

In [None]:
# Upload file to HDFS (NOT WORKING: local machine and namenode both need to resolve the datanode hostname which may not be possible using docker)
local_path = os.path.join(local_test_dir, hdfs_test_file)
hdfs_path = os.path.join(hdfs_user_dir, hdfs_test_file)
hdfs_client.upload(hdfs_path, local_path)

In [None]:
# Delete an HDFS directory
hdfs_path = os.path.join(hdfs_user_dir, 'test')
hdfs_client.delete(hdfs_path, recursive=True)

## PySpark Basic Operations

In [None]:
# Set configuration params and input/output files and directories
JAVA_HOME = '/usr/lib/jvm/java-21-openjdk'  # this final directory may change after system updates
SPARK_SERVER = 'x.x.x.x'  # IP address of machine running spark container (set to "local[n]" to run on local spark instance with n cores)
SPARK_MASTER_PORT = '7077'
SPARK_CONNECT_PORT = '15002'
SPARK_MASTER_URL = f'spark://{SPARK_SERVER}:{SPARK_MASTER_PORT}'
SPARK_CONNECT_URL = f'sc://{SPARK_SERVER}:{SPARK_CONNECT_PORT}'
hdfs_user_dir = f'/user/{HDFS_USER}'
hdfs_test_dir = os.path.join(hdfs_user_dir, 'demo')
local_test_dir = f'/home/{os.getenv("USER")}/data-environment/test'
test_filename = 'df_small'
test_csv_file = f'{test_filename}.csv'
test_parquet_file = f'{test_filename}.parquet'

# set JAVA_HOME env variable
os.system(f'export JAVA_HOME={JAVA_HOME}')

In [None]:
# Connect to a spark-connect session (from machines where local is accessing a remote spark instance)
spark = SparkSession.builder \
  .appName("connect-test-pipe") \
  .remote(SPARK_CONNECT_URL) \
  .getOrCreate()

In [None]:
# Connect to a direct spark session (from machines where local is connected and running a spark instance)
spark = SparkSession.builder \
  .appName("master-test-pipe") \
  .master(SPARK_MASTER_URL) \
  .getOrCreate()

In [11]:
# End the spark session
spark.stop()

In [12]:
# create spark dataframe with explicit schema
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')


In [17]:
# write the dataframe to csv file
remote_path = os.path.join(hdfs_test_dir, test_csv_file)
#df.write.mode("overwrite").csv(remote_path), header=True)  # set to overwrite if already exists
df.write.csv(remote_path, header=True)

In [19]:
# read the dataframe from the csv file (all values appear to read in as string type)
remote_path = os.path.join(hdfs_test_dir, test_csv_file)
df_csv = spark.read.csv(remote_path, header=True)

In [20]:
# write the dataframe to parquet
remote_path = os.path.join(hdfs_test_dir, test_parquet_file)
df_csv.write.mode("overwrite").parquet(remote_path)

In [None]:
# Write the csv to local file system
local_path = os.path.join(local_test_dir, test_csv_file)
df_pandas = df.toPandas()
df_pandas.to_csv(local_path)