In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import findspark
findspark.init('D:\Hadoop_Ecosystem\spark-3.5.0-bin-hadoop3')


master = "spark://localhost:7077"

conf = (
    SparkConf()
    .setAppName("Transform")
    .setMaster(master)
    .set("spark.hadoop.hive.metastore.uris", "thrift://localhost:9880")
    .set("spark.sql.warehouse.dir", "hdfs://localhost:9000/hive/warehouse")
    .set("spark.memory.offHeap.enabled", "true")
    .set("spark.memory.offHeap.size", "10g")
    .set("spark.executor.memory", "4g")
    .set("hive.exec.dynamic.partition", "true")
    .set("hive.exec.dynamic.partition.mode", "nonstrict")
    .set("spark.sql.session.timeZone", "UTC+7")
    .set("spark.network.timeout", "50000")
    .set("spark.executor.heartbeatInterval", "5000")
    .set("spark.worker.timeout", "5000")
)

ss = (
    SparkSession.builder.config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)
ss

In [2]:
file_name = 'beer_reviews_transformed.csv'
# then copy this file to /mnt/namenode in HadoopDocker cluster, this is mapped to /data in the container
# then copy this file to hdfs
df_beer = ss.read.csv('beer_reviews.csv',inferSchema=True,header=True)
df_beer = df_beer.select([
    'beer_beerid', 
    'review_taste', 
    'review_appearance', 
    'review_palate', 
    'review_aroma', 
    'review_overall', 
    'review_profilename'
])
df_beer = df_beer.withColumnRenamed('beer_beerid', 'beer_id')
df_beer = df_beer.withColumnRenamed('review_profilename', 'user_id')
df_beer = df_beer.dropna()

#convert to pandas dataframe
df_beer = df_beer.toPandas()
df_beer['user_id'] = df_beer['user_id'].astype('category').cat.codes #each username is converted to a unique number
df_beer.to_csv(file_name, index=False)

In [None]:
from hdfs import InsecureClient

namenode_url = 'http://localhost:9870'
local_folder = './' + file_name
print(local_folder)
hdfs_client = InsecureClient(namenode_url, user='root')
test_folder = '/test'
# Create a new directory
if not hdfs_client.content(test_folder, strict=False):
    hdfs_client.makedirs(test_folder)
    print('Created directory {}'.format(test_folder))
    print("Before upload")
    print(hdfs_client.list(test_folder, status=False))
# Upload file to hdfs
hdfs_client.upload(test_folder, local_folder, overwrite=True)
print("After upload")
print(hdfs_client.list(test_folder, status=False))
delete_result = hdfs_client.delete(test_folder, recursive=True)
print(delete_result)