## All necesaary Imports

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import date

In [2]:
spark = SparkSession.builder \
        .appName('ETL_Pyspark') \
        .master("local[3]") \
        .getOrCreate()

## Reading using Pyspark

In [3]:
dframe = spark.read.csv('source_file/raw/orders.csv',header=True)

In [4]:
dframe.printSchema()

root
 |-- Order Id: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Product Id: string (nullable = true)
 |-- cost price: string (nullable = true)
 |-- List Price: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount Percent: string (nullable = true)



## Replaced Ship mode 2 values with null

In [5]:
dframe = dframe.withColumn('Ship Mode',when((dframe['Ship Mode']=='unknown'),None).when((dframe['Ship Mode']=='Not Available'),None).otherwise(dframe['Ship Mode']))

In [6]:
dframe.select(col('Ship Mode')).distinct().show()

+--------------+
|     Ship Mode|
+--------------+
|   First Class|
|      Same Day|
|           N/A|
|  Second Class|
|Standard Class|
|          NULL|
+--------------+



## Conversion of Columns name to lowercase

In [7]:
temp_list=[]
for var1 in dframe.columns:
    var1 = var1.lower()
    var1 = var1.replace(' ','_')
    temp_list.append(var1)

for var2 in range(len(dframe.columns)):
    dframe = dframe.withColumnRenamed(dframe.columns[var2],temp_list[var2])

In [8]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|cost_price|list_price|quantity|discount_percent|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|
|       2|2023-08-15|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|      Chairs|FUR-CH-10000454|       600|       730|       3|               3|


## Adding Extra columns for analysis

### Discount given

In [9]:
dframe = dframe.withColumn('discount_given',round((dframe['list_price']*dframe['discount_percent']*0.01),2))

In [10]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|cost_price|list_price|quantity|discount_percent|discount_given|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|           5.2|
|       2|2023-08-15|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|      Chairs|FUR-C

### Sale price

In [11]:
dframe = dframe.withColumn('sale_price',dframe['list_price']-dframe['discount_given'])

In [12]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|cost_price|list_price|quantity|discount_percent|discount_given|sale_price|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|           5.2|     254.8|
|       2|2023-08-15|  Second Class| Consumer|United States|      Henderson|  Kentucky|      424

### Profit

In [13]:
dframe = dframe.withColumn('profit',round((dframe['sale_price']-dframe['cost_price']),2))

In [14]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|cost_price|list_price|quantity|discount_percent|discount_given|sale_price|profit|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|           5.2|     254.8|  14.8|
|       2|2023-08-15|  Second Class| Consumer|United States|      He

### Adding File date

In [15]:
dframe = dframe.withColumn('file_date',lit(date.today()))

In [16]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+------+----------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|cost_price|list_price|quantity|discount_percent|discount_given|sale_price|profit| file_date|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+--------------+----------+------+----------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|           5.2|     254.8|  14.8|2024-06-06|
|       2|2023-08-15|  S

### converted orderdate to date type

In [17]:
dframe = dframe.withColumn('order_date', to_date(dframe['order_date'],'yyyy-MM-dd'))

### Dropping columns which is not required

In [18]:
dframe = dframe.drop('cost_price','list_price','discount_percent')

In [19]:
dframe.show(5)

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+--------+--------------+----------+------+----------+
|order_id|order_date|     ship_mode|  segment|      country|           city|     state|postal_code|region|       category|sub_category|     product_id|quantity|discount_given|sale_price|profit| file_date|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+--------+--------------+----------+------+----------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       2|           5.2|     254.8|  14.8|2024-06-06|
|       2|2023-08-15|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|      Chairs|FUR-CH-10000454|       3|          21.9|     

## Connecting to SQL Server

In [23]:
dbinfo=spark.read.csv('C:/Users/Apromit/dbdetails.csv',header=True)

In [30]:
servername= dbinfo.collect()[0][0]
dbname = dbinfo.collect()[0][1]

In [34]:
import sqlalchemy as sl

db_engine = sl.create_engine(f'mssql://{servername}/{dbname}?driver=ODBC+DRIVER+17+FOR+SQL+SERVER')
# 
conection=db_engine.connect()

### Converted Pyspark Dataframe to Pandas dataframe because of connectivity issues

In [35]:
pd_frame = dframe.toPandas()

In [36]:
pd_frame.to_sql('asd', con=conection, index=False, if_exists='append')

31