In [57]:
!pip install pyspark sqlalchemy pandas psycopg2 pyarrow azure-storage-blob python.dotenv




In [56]:
import pandas as pd
import psycopg2
import os
import io
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from dotenv import load_dotenv
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_date, year, month, trim, regexp_replace, monotonically_increasing_id


In [4]:
spark =  SparkSession.builder\
                    .appName("ZikoLogistics")\
                    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")\
                    .getOrCreate()

In [5]:
df = spark.read.csv(r'Raw_Dataset\ziko_logistics_data.csv', header =True, inferSchema = True)

In [6]:
df.show(5)

+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+------+-------+-------------+---------------+--------------------+--------------------+------------------+
|Transaction_ID|                Date|Customer_ID|Product_ID|Quantity|        Unit_Price|       Total_Cost|Discount_Rate|Sales_Channel|Order_Priority|Warehouse_Code|Ship_Mode|Delivery_Status|Customer_Satisfaction|Item_Returned|Return_Reason| Payment_Type|Taxable|Region|Country|Customer_Name| Customer_Phone|      Customer_Email|    Customer_Address|Product_List_Title|
+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+

In [7]:
df.columns

['Transaction_ID',
 'Date',
 'Customer_ID',
 'Product_ID',
 'Quantity',
 'Unit_Price',
 'Total_Cost',
 'Discount_Rate',
 'Sales_Channel',
 'Order_Priority',
 'Warehouse_Code',
 'Ship_Mode',
 'Delivery_Status',
 'Customer_Satisfaction',
 'Item_Returned',
 'Return_Reason',
 'Payment_Type',
 'Taxable',
 'Region',
 'Country',
 'Customer_Name',
 'Customer_Phone',
 'Customer_Email',
 'Customer_Address',
 'Product_List_Title']

In [8]:
df.count()


1005

In [9]:
len(df.columns)

25

In [10]:
df.printSchema()

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Product_ID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Discount_Rate: double (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Warehouse_Code: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Delivery_Status: string (nullable = true)
 |-- Customer_Satisfaction: string (nullable = true)
 |-- Item_Returned: boolean (nullable = true)
 |-- Return_Reason: string (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Taxable: boolean (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Phone: string (nullable = true)
 |-- Customer_Email: string (nu

In [11]:
for column in df.columns:
    print(f"{column}:{df.filter(df[column].isNull()).count()}, Null Values")

Transaction_ID:0, Null Values
Date:0, Null Values
Customer_ID:0, Null Values
Product_ID:0, Null Values
Quantity:0, Null Values
Unit_Price:101, Null Values
Total_Cost:100, Null Values
Discount_Rate:291, Null Values
Sales_Channel:0, Null Values
Order_Priority:0, Null Values
Warehouse_Code:0, Null Values
Ship_Mode:0, Null Values
Delivery_Status:0, Null Values
Customer_Satisfaction:0, Null Values
Item_Returned:0, Null Values
Return_Reason:101, Null Values
Payment_Type:0, Null Values
Taxable:0, Null Values
Region:0, Null Values
Country:0, Null Values
Customer_Name:0, Null Values
Customer_Phone:0, Null Values
Customer_Email:0, Null Values
Customer_Address:0, Null Values
Product_List_Title:0, Null Values


In [21]:
df = df.fillna({'Unit_Price':df.agg({'Unit_Price':'mean'}).first()[0],
               'Total_Cost':df.agg({'Total_Cost':'mean'}).first()[0],
               'Return_Reason':'No Return',
               'Discount_Rate':0.0})

In [22]:
for column in df.columns:
    print(f"{column}:{df.filter(df[column].isNull()).count()}, Null Values")

Transaction_ID:0, Null Values
Date:0, Null Values
Customer_ID:0, Null Values
Product_ID:0, Null Values
Quantity:0, Null Values
Unit_Price:0, Null Values
Total_Cost:0, Null Values
Discount_Rate:0, Null Values
Sales_Channel:0, Null Values
Order_Priority:0, Null Values
Warehouse_Code:0, Null Values
Ship_Mode:0, Null Values
Delivery_Status:0, Null Values
Customer_Satisfaction:0, Null Values
Item_Returned:0, Null Values
Return_Reason:0, Null Values
Payment_Type:0, Null Values
Taxable:0, Null Values
Region:0, Null Values
Country:0, Null Values
Customer_Name:0, Null Values
Customer_Phone:0, Null Values
Customer_Email:0, Null Values
Customer_Address:0, Null Values
Product_List_Title:0, Null Values


In [23]:
df.show(5)

+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+------+-------+-------------+---------------+--------------------+--------------------+------------------+
|Transaction_ID|                Date|Customer_ID|Product_ID|Quantity|        Unit_Price|       Total_Cost|Discount_Rate|Sales_Channel|Order_Priority|Warehouse_Code|Ship_Mode|Delivery_Status|Customer_Satisfaction|Item_Returned|Return_Reason| Payment_Type|Taxable|Region|Country|Customer_Name| Customer_Phone|      Customer_Email|    Customer_Address|Product_List_Title|
+--------------+--------------------+-----------+----------+--------+------------------+-----------------+-------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+

In [29]:
#customer_dim
customer_dim =df.select('Customer_ID', 'Customer_Name', 'Customer_Phone','Customer_Email', 'Customer_Address').distinct()

In [30]:
customer_dim.show(5)

+-----------+-------------+---------------+--------------------+--------------------+
|Customer_ID|Customer_Name| Customer_Phone|      Customer_Email|    Customer_Address|
+-----------+-------------+---------------+--------------------+--------------------+
|       1056| Customer 159|+1-548-230-8512|customer.159.18@d...|50 Third St, New ...|
|       1073| Customer 735|+1-405-971-6710|customer.735.66@s...|457 Second St, Ch...|
|       1032| Customer 575|+1-503-301-6495|customer.575.76@s...|474 Main St, Hous...|
|       1001|  Customer 20|+1-968-669-5071|customer.20.44@ex...|536 Fourth St, Ho...|
|       1029| Customer 383|+1-603-726-7015|customer.383.37@s...|177 Park Ave, Pho...|
+-----------+-------------+---------------+--------------------+--------------------+
only showing top 5 rows


In [None]:
customer_dim.count()


1000

In [32]:
len(customer_dim.columns)

5

In [40]:
product_dim = df.select('Product_ID', 'Product_List_Title', 'Quantity', 'Unit_Price', 'Discount_Rate').distinct()

In [41]:
product_dim.count()

1000

In [42]:
len(product_dim.columns)

5

In [47]:
df_trans = df.join(customer_dim, on=['Customer_ID', 'Customer_Name', 'Customer_Phone','Customer_Email', 'Customer_Address'], how ='left')\
                    .join(product_dim, on=['Product_ID', 'Product_List_Title', 'Quantity', 'Unit_Price', 'Discount_Rate'], how='left')
                    

In [49]:
transaction_fact = df_trans.select('Transaction_ID','Customer_ID', 'Product_ID', 'Date', 'Total_Cost', 'Sales_Channel', 'Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status', 'Customer_Satisfaction', 'Item_Returned', 'Return_Reason', 'Payment_Type','Taxable', 'Region', 'Country')

In [50]:
transaction_fact.show(5)

+--------------+-----------+----------+--------------------+-----------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+------+-------+
|Transaction_ID|Customer_ID|Product_ID|                Date|       Total_Cost|Sales_Channel|Order_Priority|Warehouse_Code|Ship_Mode|Delivery_Status|Customer_Satisfaction|Item_Returned|Return_Reason| Payment_Type|Taxable|Region|Country|
+--------------+-----------+----------+--------------------+-----------------+-------------+--------------+--------------+---------+---------------+---------------------+-------------+-------------+-------------+-------+------+-------+
|           200|       1086|       536|2020-01-01 20:32:...| 8265.37454940804|       Online|          High|          WH-3|    2-Day|      Cancelled|              Neutral|        false|   Wrong Item|Wire Transfer|  false|  West| Canada|
|           321|       1078|       523|2020-01-02 06:55:

In [53]:
customer_df = customer_dim.toPandas()
product_df = product_dim.toPandas()
transaction_fact_df = transaction_fact.toPandas()

In [54]:
customer_df.to_csv(r'trans_dataset\customer_dim.csv', index = False)
product_df.to_csv(r'trans_dataset\product_dim.csv', index =False)
transaction_fact_df.to_csv(r'trans_dataset\transaction_fact.csv', index = False)

In [None]:
#Connection to blob storage
load_dotenv()
connection_str = os.getenv('CONNECTION_STR')
blob_service_client = BlobServiceClient.from_connection_string(connection_str)
container_name = os.getenv('CONTAINER_NAME')
container_client = blob_service_client.get_container_client(container_name)


In [None]:
def upload_to_blob_as_parquet(df, container_client, blob_name):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index = False)
    buffer.seek(0)
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(buffer, overwrite = True)
    print("{blob_name} upload sucessful")

In [None]:
upload_to_blob_as_parquet(customer_df, container_client, 'trans_data/customer.parquet')
upload_to_blob_as_parquet(product_df, container_client, 'trans_data/product.parquet')
upload_to_blob_as_parquet(transaction_fact_df, container_client, 'trans_data/transaction_fact.parquet') 