# Acquiring HDFS using Pyspark

In [1]:
# check env and python version
import sys
from platform import python_version
print(f"Current environment: {sys.executable}")
print(f"Current Python Version: {python_version()}")

Current environment: /home/master/anaconda3/envs/mlops/bin/python
Current Python Version: 3.10.6


In [2]:
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

2022-11-02 18:06:57,989 WARN util.Utils: Your hostname, hazirah resolves to a loopback address: 127.0.1.1; using 172.22.6.247 instead (on interface eth0)
2022-11-02 18:06:57,991 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-11-02 18:07:00,044 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# read HDFS file
def load_data(path):
    df_load = spark.read.option("header", "true") \
                .csv(path)
    df_load.show(truncate=True)
    return df_load

In [4]:
path1 = 'hdfs://localhost:9000/user/fraudTrain.csv'
df1 = load_data(path1)

+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|      first|     last|gender|              street|                city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|  0|  2019-01-01 00:00:18|   2703186189

2022-11-02 18:07:09,457 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/fraudTrain.csv


In [5]:
path2 = 'hdfs://localhost:9000/user/fraudTest.csv'
df2 = load_data(path2)

+---+---------------------+-------------------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|      category|   amt|    first|    last|gender|              street|         city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|         merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+--------------+------+---------+--------+------+--------------------+-------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-------------------+--------+
|  0|  2020-06-21 12:14:25|   2291163933867244|fraud_Kirlin and 

2022-11-02 18:07:10,029 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/fraudTest.csv


In [6]:
# join tables
df_all = df1.union(df2)
df_all.show(truncate=True)

+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|      first|     last|gender|              street|                city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|  0|  2019-01-01 00:00:18|   2703186189

2022-11-02 18:07:10,552 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/fraudTrain.csv


In [7]:
# save hdfs file to local
def save_file(df, path):
    df.write.option("header","true") \
        .option("encoding", "UTF-8") \
        .csv(path)

In [8]:
save_path = "file:///home/master/projects/mlflow/HDFSfile/fraud"
df1_save = save_file(df_all, save_path)

2022-11-02 18:07:16,076 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/fraudTrain.csv
2022-11-02 18:07:19,240 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_pop, job, dob, trans_num, unix_time, merch_lat, merch_long, is_fraud
 Schema: _c0, trans_date_trans_time, cc_num, merchant, category, amt, first, last, gender, street, city, state, zip, lat, long, city_p