## WRITING DATA TO HIVE AND PostgreSQL WITH APACHE SPARK (ORC AND PARQUET FORMATS)

In [None]:
import findspark

findspark.init("/opt/manual/spark")

In [1]:
!pip install findspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

sc = SparkContext('local')
spark = SparkSession(sc)
sc




In [2]:
spark = (
    SparkSession.builder
    .appName("Dirty Data Clean")
    .master("yarn")
    .enableHiveSupport()
    .getOrCreate())

In [4]:
df = spark.read \
    .option("header",True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv("D:/Big Data Processing with Apache Spark/Case Study I/dirty_store_transactions.csv")

In [5]:
df.limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York(,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York+,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York!,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


### TASK1: DATA CLEANING AND STRUCTURAL ADJUSTMENTS

In [6]:
df1 = df.withColumn("STORE_LOCATION", F.regexp_replace(F.col("STORE_LOCATION"),"[^A-Za-z0-9]", "")) \
        .withColumn("PRODUCT_ID", F.regexp_replace(F.col("PRODUCT_ID"), "[^A-Za-z0-9]", ""))

df1.limit(3).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,NewYork,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26


In [7]:
df2 = df1.withColumn("MRP", F.regexp_replace(F.col("MRP"), "\$", "").cast(FloatType())) \
         .withColumn("CP", F.regexp_replace(F.col("CP"), "\$", "").cast(FloatType())) \
         .withColumn("SP", F.regexp_replace(F.col("SP"), "\$", "").cast(FloatType())) \
         .withColumn("DISCOUNT", F.regexp_replace(F.col("DISCOUNT"), "\$", "").cast(FloatType())) \
         .withColumn("Date_Casted", F.col("Date").cast(DateType())).drop("Date")

df2.printSchema()

root
 |-- STORE_ID: string (nullable = true)
 |-- STORE_LOCATION: string (nullable = true)
 |-- PRODUCT_CATEGORY: string (nullable = true)
 |-- PRODUCT_ID: string (nullable = true)
 |-- MRP: float (nullable = true)
 |-- CP: float (nullable = true)
 |-- DISCOUNT: float (nullable = true)
 |-- SP: float (nullable = true)
 |-- Date_Casted: date (nullable = true)



In [8]:
df2.limit(3).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26


### TASK2 :  Save DataFrame to Hive Table called as test1.clean_transactions in orc format

In [10]:
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|bookstore|
|  default|
|    test1|
+---------+



In [11]:
spark.sql("show tables;").show()

+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
| default|    advertising|      false|
| default|order_items_tbl|      false|
| default|     orders_tbl|      false|
+--------+---------------+-----------+



In [12]:
spark.sql("create database if not exists test1")

DataFrame[]

In [13]:
spark.sql("use test1;")

DataFrame[]

In [14]:
# Save DataFrame to Hive Table
import time

start_time = time.time()

df2.write \
.format("orc") \
.mode("overwrite") \
.saveAsTable("test1.clean_transactions")

print("%s secs = "% (time.time() - start_time))

8.628795385360718 secs = 


In [15]:
spark.sql(" select * from test1.clean_transactions").limit(10).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,91.0,58.240002,3.64,87.360001,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,85.0,51.0,2.55,82.449997,2019-11-26
5,YR7220,NewYork,Kitchen,93809204,37.0,24.049999,0.74,36.259998,2019-11-26
6,YR7220,NewYork,Cosmetics,86610412D,80.0,48.799999,6.4,73.599998,2019-11-26
7,YR7220,NewYork,Kitchen,52503356,71.0,42.599998,5.68,65.32,2019-11-26
8,YR7220,NewYork,Kitchen,77516479,92.0,56.119999,3.68,88.32,2019-11-26
9,YR7220,NewYork,Cosmetics,47334289,16.0,10.72,0.96,15.04,2019-11-26


### TASK3 :  Write data into a PostgreSQL database table: traindb.public.clean_transactions

In [16]:
jdbcUrl = "jdbc:postgresql://localhost/traindb?user=train&password=****"

In [17]:
df2.write \
.jdbc(url=jdbcUrl, table="clean_transactions", mode="overwrite", properties={"driver":"org.postgresql.Driver"})

In [18]:
spark.read.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("query", "SELECT * FROM clean_transactions") \
    .option("driver", "org.postgresql.Driver") \
    .load().limit(5).toPandas()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date_Casted
0,YR7220,NewYork,Electronics,12254943,31.0,20.77,1.86,29.139999,2019-11-26
1,YR7220,NewYork,Furniture,72619323C,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,NewYork,Electronics,34161682B,88.0,62.48,4.4,83.599998,2019-11-26
3,YR7220,NewYork,Kitchen,79411621,91.0,58.240002,3.64,87.360001,2019-11-26
4,YR7220,NewYork,Fashion,39520263T,85.0,51.0,2.55,82.449997,2019-11-26


### TASK4 :Write data into a directory : hdfs/user/train/spark_odev_transaction
### in parquet format with snappy compression.


! hdfs dfs -mkdir /user/train/spark_odev_transaction

In [19]:
start_time = time.time()

df2.write.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.save("hdfs://localhost:9000/user/train/spark_odev_transaction")

print("%s secs = "% (time.time() - start_time))

4.015817642211914 secs = 


In [20]:
! hdfs dfs -ls /user/train/spark_odev_transaction

Found 2 items
-rw-r--r--   1 train supergroup          0 2022-09-09 22:36 /user/train/spark_odev_transaction/_SUCCESS
-rw-r--r--   1 train supergroup     244350 2022-09-09 22:36 /user/train/spark_odev_transaction/part-00000-c2c2a7de-fe06-438d-ba8f-11fe382de42a-c000.snappy.parquet
