# BASIC ETL WITH SPARK

## Download Raw Data

In [1]:
!wget https://github.com/erkansirin78/datasets/raw/master/dirty_store_transactions.csv

--2023-02-23 10:56:52--  https://github.com/erkansirin78/datasets/raw/master/dirty_store_transactions.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/erkansirin78/datasets/master/dirty_store_transactions.csv [following]
--2023-02-23 10:56:53--  https://raw.githubusercontent.com/erkansirin78/datasets/master/dirty_store_transactions.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2609524 (2.5M) [text/plain]
Saving to: ‘dirty_store_transactions.csv’


2023-02-23 10:56:55 (2.16 MB/s) - ‘dirty_store_transactions.csv’ saved [2609524/2609524]



## Data Cleaning

In [1]:
from pyspark.sql import SparkSession, functions as F

In [2]:
spark = SparkSession.builder \
.master("local[2]") \
.appName("SparkExperiment") \
.getOrCreate()

2023-03-01 11:38:27,793 WARN util.Utils: Your hostname, emk-Aspire-A514-52 resolves to a loopback address: 127.0.1.1; using 192.168.1.24 instead (on interface wlp2s0)
2023-03-01 11:38:27,794 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2023-03-01 11:38:28,439 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = spark.read \
.format("csv") \
.option("header", True) \
.option("sep", ",") \
.option("inferSchema", True) \
.load("file:///home/emk/dirty_store_transactions.csv")

In [4]:
df.show()

+--------+--------------+----------------+----------+---+------+--------+------+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|    CP|DISCOUNT|    SP|      Date|
+--------+--------------+----------------+----------+---+------+--------+------+----------+
|  YR7220|     New York(|     Electronics|  12254943|$31|$20.77|   $1.86|$29.14|2019-11-26|
|  YR7220|     New York+|       Furniture| 72619323C|$15| $9.75|    $1.5| $13.5|2019-11-26|
|  YR7220|     New York |     Electronics| 34161682B|$88|$62.48|    $4.4| $83.6|2019-11-26|
|  YR7220|     New York!|         Kitchen|  79411621|$91|$58.24|   $3.64|$87.36|2019-11-26|
|  YR7220|      New York|         Fashion| 39520263T|$85|   $51|   $2.55|$82.45|2019-11-26|
|  YR7220|      New York|         Kitchen|  93809204|$37|$24.05|   $0.74|$36.26|2019-11-26|
|  YR7220|      New York|       Cosmetics| 86610412D|$80| $48.8|    $6.4| $73.6|2019-11-26|
|  YR7220|     New York$|         Kitchen| 52503356^|$71| $42.6|   $5.68|$65.32|

In [5]:
df.printSchema()

root
 |-- STORE_ID: string (nullable = true)
 |-- STORE_LOCATION: string (nullable = true)
 |-- PRODUCT_CATEGORY: string (nullable = true)
 |-- PRODUCT_ID: string (nullable = true)
 |-- MRP: string (nullable = true)
 |-- CP: string (nullable = true)
 |-- DISCOUNT: string (nullable = true)
 |-- SP: string (nullable = true)
 |-- Date: string (nullable = true)



In [6]:
df2 = df
str_cols = [col[0] for col in df.dtypes if col[1] == 'string']
print(str_cols)


['STORE_ID', 'STORE_LOCATION', 'PRODUCT_CATEGORY', 'PRODUCT_ID', 'MRP', 'CP', 'DISCOUNT', 'SP', 'Date']


In [8]:
df2 = df
for col in str_cols:
    df2 = df2.withColumn(col,F.regexp_replace(col, '[$+()!"^]', ""))

df2.show()

+--------+--------------+----------------+----------+---+-----+--------+-----+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|   CP|DISCOUNT|   SP|      Date|
+--------+--------------+----------------+----------+---+-----+--------+-----+----------+
|  YR7220|      New York|     Electronics|  12254943| 31|20.77|    1.86|29.14|2019-11-26|
|  YR7220|      New York|       Furniture| 72619323C| 15| 9.75|     1.5| 13.5|2019-11-26|
|  YR7220|     New York |     Electronics| 34161682B| 88|62.48|     4.4| 83.6|2019-11-26|
|  YR7220|      New York|         Kitchen|  79411621| 91|58.24|    3.64|87.36|2019-11-26|
|  YR7220|      New York|         Fashion| 39520263T| 85|   51|    2.55|82.45|2019-11-26|
|  YR7220|      New York|         Kitchen|  93809204| 37|24.05|    0.74|36.26|2019-11-26|
|  YR7220|      New York|       Cosmetics| 86610412D| 80| 48.8|     6.4| 73.6|2019-11-26|
|  YR7220|      New York|         Kitchen|  52503356| 71| 42.6|    5.68|65.32|2019-11-26|
|  YR7220|

In [9]:
df3 = df2
for col in str_cols:
    df3 = df3.withColumn(col, F.trim(col))

df3.show()

+--------+--------------+----------------+----------+---+-----+--------+-----+----------+
|STORE_ID|STORE_LOCATION|PRODUCT_CATEGORY|PRODUCT_ID|MRP|   CP|DISCOUNT|   SP|      Date|
+--------+--------------+----------------+----------+---+-----+--------+-----+----------+
|  YR7220|      New York|     Electronics|  12254943| 31|20.77|    1.86|29.14|2019-11-26|
|  YR7220|      New York|       Furniture| 72619323C| 15| 9.75|     1.5| 13.5|2019-11-26|
|  YR7220|      New York|     Electronics| 34161682B| 88|62.48|     4.4| 83.6|2019-11-26|
|  YR7220|      New York|         Kitchen|  79411621| 91|58.24|    3.64|87.36|2019-11-26|
|  YR7220|      New York|         Fashion| 39520263T| 85|   51|    2.55|82.45|2019-11-26|
|  YR7220|      New York|         Kitchen|  93809204| 37|24.05|    0.74|36.26|2019-11-26|
|  YR7220|      New York|       Cosmetics| 86610412D| 80| 48.8|     6.4| 73.6|2019-11-26|
|  YR7220|      New York|         Kitchen|  52503356| 71| 42.6|    5.68|65.32|2019-11-26|
|  YR7220|

##  Write clean data to hdfs `/user/train/spark_transaction` in parquet format.

In [None]:
## start-all.sh to start predefined hive/hadoop/sqoop etc on my machine

In [10]:
import findspark

In [11]:
findspark.init("/opt/spark")

In [12]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import  *

In [13]:
spark = SparkSession.builder \
.appName("DataframeWriter ORC") \
.master("yarn") \
.enableHiveSupport() \
.getOrCreate()

In [14]:
import time

In [15]:
start_time = time.time()

df3.write \
.format("parquet") \
.mode("overwrite") \
.save("hdfs://localhost:9000/user/train/output_data/spark_odev_transaction")


print("----------- %s secs ----------" %(time.time() - start_time))

[Stage 6:>                                                          (0 + 1) / 1]

----------- 3.054807424545288 secs ----------


                                                                                

In [16]:
!hdfs dfs -ls -h /user/train/output_data/spark_odev_transaction

#UI http://localhost:9870/

Found 2 items
-rw-r--r--   1 emk supergroup          0 2023-03-01 11:40 /user/train/output_data/spark_odev_transaction/_SUCCESS
-rw-r--r--   1 emk supergroup    244.7 K 2023-03-01 11:40 /user/train/output_data/spark_odev_transaction/part-00000-59021861-df8f-4fce-9875-91c4dea85deb-c000.snappy.parquet
