## Importing Libraries

In [1]:
#Installing dependencies
%pip install pyarrow azure-storage-blob python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import os
import io
from azure.storage.blob import BlobServiceClient, BlobClient
from dotenv import load_dotenv

In [3]:
#Extracting data
ziko_df = pd.read_csv("ziko_logistics_data.csv")

In [4]:
ziko_df

Unnamed: 0,Transaction_ID,Date,Customer_ID,Product_ID,Quantity,Unit_Price,Total_Cost,Discount_Rate,Sales_Channel,Order_Priority,...,Return_Reason,Payment_Type,Taxable,Region,Country,Customer_Name,Customer_Phone,Customer_Email,Customer_Address,Product_List_Title
0,200,2020-01-01 20:32:25.945945945,1086,536,3,120.436821,8265.374549,0.20,Online,High,...,Wrong Item,Wire Transfer,False,West,Canada,Customer 200,+1-652-572-9306,customer.200.78@example.com,"275 Second St, Phoenix, USA",Product 53
1,321,2020-01-02 06:55:08.108108108,1078,523,6,475.724994,4047.850479,,Reseller,Critical,...,Damaged,PayPal,True,South,Mexico,Customer 321,+1-311-186-5760,customer.321.13@sample.com,"478 Third St, New York, USA",Product 33
2,989,2020-01-06 08:12:58.378378378,1077,535,3,146.400556,,0.05,Direct,Critical,...,Damaged,PayPal,True,West,Canada,Customer 989,+1-922-606-9032,customer.989.99@example.com,"843 Second St, Phoenix, USA",Product 6
3,682,2020-01-07 22:03:14.594594594,1027,546,6,19.373194,8194.281993,,Reseller,Medium,...,Wrong Item,Cash,True,South,Canada,Customer 682,+1-237-853-5808,customer.682.66@demo.com,"153 Main St, Phoenix, USA",Product 68
4,484,2020-01-07 07:08:06.486486486,1052,556,8,193.221313,8331.329249,0.20,Direct,Low,...,Late,Cash,False,South,Mexico,Customer 484,+1-986-360-9109,customer.484.3@sample.com,"264 Second St, New York, USA",Product 89
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1000,250,2020-01-05 02:22:42.162162162,1095,533,5,490.760429,1138.724711,,Reseller,Medium,...,Damaged,Credit Card,True,South,Canada,Customer 250,+1-875-504-2878,customer.250.26@sample.com,"664 Main St, Chicago, USA",Product 16
1001,412,2020-01-07 11:27:34.054054054,1008,542,9,236.240910,1786.865600,0.10,Online,Medium,...,Not Specified,Cash,True,South,USA,Customer 412,+1-645-470-5077,customer.412.80@sample.com,"699 Third St, Houston, USA",Product 38
1002,816,2020-01-06 02:48:38.918918918,1060,511,6,,6443.682500,0.20,Partner,Medium,...,Not Specified,Cash,False,West,Mexico,Customer 816,+1-162-396-1771,customer.816.55@sample.com,"453 Fourth St, Los Angeles, USA",Product 48
1003,702,2020-01-09 08:38:55.135135135,1054,504,6,,4814.744796,,Online,Medium,...,Damaged,Cash,True,West,Canada,Customer 702,+1-130-713-1098,customer.702.30@sample.com,"438 Fourth St, Houston, USA",Product 38


In [5]:
ziko_df.columns

Index(['Transaction_ID', 'Date', 'Customer_ID', 'Product_ID', 'Quantity',
       'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel',
       'Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status',
       'Customer_Satisfaction', 'Item_Returned', 'Return_Reason',
       'Payment_Type', 'Taxable', 'Region', 'Country', 'Customer_Name',
       'Customer_Phone', 'Customer_Email', 'Customer_Address',
       'Product_List_Title'],
      dtype='object')

## Cleaning and Transformation of data

In [6]:
ziko_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1005 entries, 0 to 1004
Data columns (total 25 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Transaction_ID         1005 non-null   int64  
 1   Date                   1005 non-null   object 
 2   Customer_ID            1005 non-null   int64  
 3   Product_ID             1005 non-null   int64  
 4   Quantity               1005 non-null   int64  
 5   Unit_Price             904 non-null    float64
 6   Total_Cost             905 non-null    float64
 7   Discount_Rate          714 non-null    float64
 8   Sales_Channel          1005 non-null   object 
 9   Order_Priority         1005 non-null   object 
 10  Warehouse_Code         1005 non-null   object 
 11  Ship_Mode              1005 non-null   object 
 12  Delivery_Status        1005 non-null   object 
 13  Customer_Satisfaction  1005 non-null   object 
 14  Item_Returned          1005 non-null   bool   
 15  Retu

In [7]:
#Filling the missing parameters of the data
ziko_df.fillna({
    'Unit_Price':ziko_df['Unit_Price'].mean(),
    'Total_Cost':ziko_df['Total_Cost'].mean(),
    'Discount_Rate':0.0,
    'Return_Reason':'Unknown'

}, inplace= True)

In [8]:
ziko_df['Date'] =pd.to_datetime(ziko_df['Date'])

In [9]:
ziko_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1005 entries, 0 to 1004
Data columns (total 25 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   Transaction_ID         1005 non-null   int64         
 1   Date                   1005 non-null   datetime64[ns]
 2   Customer_ID            1005 non-null   int64         
 3   Product_ID             1005 non-null   int64         
 4   Quantity               1005 non-null   int64         
 5   Unit_Price             1005 non-null   float64       
 6   Total_Cost             1005 non-null   float64       
 7   Discount_Rate          1005 non-null   float64       
 8   Sales_Channel          1005 non-null   object        
 9   Order_Priority         1005 non-null   object        
 10  Warehouse_Code         1005 non-null   object        
 11  Ship_Mode              1005 non-null   object        
 12  Delivery_Status        1005 non-null   object        
 13  Cus

In [10]:
ziko_df.head()

Unnamed: 0,Transaction_ID,Date,Customer_ID,Product_ID,Quantity,Unit_Price,Total_Cost,Discount_Rate,Sales_Channel,Order_Priority,...,Return_Reason,Payment_Type,Taxable,Region,Country,Customer_Name,Customer_Phone,Customer_Email,Customer_Address,Product_List_Title
0,200,2020-01-01 20:32:25.945945945,1086,536,3,120.436821,8265.374549,0.2,Online,High,...,Wrong Item,Wire Transfer,False,West,Canada,Customer 200,+1-652-572-9306,customer.200.78@example.com,"275 Second St, Phoenix, USA",Product 53
1,321,2020-01-02 06:55:08.108108108,1078,523,6,475.724994,4047.850479,0.0,Reseller,Critical,...,Damaged,PayPal,True,South,Mexico,Customer 321,+1-311-186-5760,customer.321.13@sample.com,"478 Third St, New York, USA",Product 33
2,989,2020-01-06 08:12:58.378378378,1077,535,3,146.400556,5096.553818,0.05,Direct,Critical,...,Damaged,PayPal,True,West,Canada,Customer 989,+1-922-606-9032,customer.989.99@example.com,"843 Second St, Phoenix, USA",Product 6
3,682,2020-01-07 22:03:14.594594594,1027,546,6,19.373194,8194.281993,0.0,Reseller,Medium,...,Wrong Item,Cash,True,South,Canada,Customer 682,+1-237-853-5808,customer.682.66@demo.com,"153 Main St, Phoenix, USA",Product 68
4,484,2020-01-07 07:08:06.486486486,1052,556,8,193.221313,8331.329249,0.2,Direct,Low,...,Late,Cash,False,South,Mexico,Customer 484,+1-986-360-9109,customer.484.3@sample.com,"264 Second St, New York, USA",Product 89


In [11]:
ziko_df.columns

Index(['Transaction_ID', 'Date', 'Customer_ID', 'Product_ID', 'Quantity',
       'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel',
       'Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status',
       'Customer_Satisfaction', 'Item_Returned', 'Return_Reason',
       'Payment_Type', 'Taxable', 'Region', 'Country', 'Customer_Name',
       'Customer_Phone', 'Customer_Email', 'Customer_Address',
       'Product_List_Title'],
      dtype='object')

In [12]:
#Creating the Schemas/ Customer
customer = ziko_df[['Customer_ID','Customer_Name','Customer_Phone', 'Customer_Email', 'Customer_Address']].copy().drop_duplicates().reset_index(drop=True)
customer.head()

Unnamed: 0,Customer_ID,Customer_Name,Customer_Phone,Customer_Email,Customer_Address
0,1086,Customer 200,+1-652-572-9306,customer.200.78@example.com,"275 Second St, Phoenix, USA"
1,1078,Customer 321,+1-311-186-5760,customer.321.13@sample.com,"478 Third St, New York, USA"
2,1077,Customer 989,+1-922-606-9032,customer.989.99@example.com,"843 Second St, Phoenix, USA"
3,1027,Customer 682,+1-237-853-5808,customer.682.66@demo.com,"153 Main St, Phoenix, USA"
4,1052,Customer 484,+1-986-360-9109,customer.484.3@sample.com,"264 Second St, New York, USA"


In [13]:
#Product schema
product = ziko_df[['Product_ID','Product_List_Title','Quantity','Unit_Price', 'Discount_Rate',]].copy().drop_duplicates().reset_index(drop=True)

In [14]:
product.head()

Unnamed: 0,Product_ID,Product_List_Title,Quantity,Unit_Price,Discount_Rate
0,536,Product 53,3,120.436821,0.2
1,523,Product 33,6,475.724994,0.0
2,535,Product 6,3,146.400556,0.05
3,546,Product 68,6,19.373194,0.0
4,556,Product 89,8,193.221313,0.2


In [15]:
#Transaction Schema
transaction =ziko_df.merge(customer, on=['Customer_ID','Customer_Name','Customer_Phone', 'Customer_Email', 'Customer_Address'], how='left') \
                    .merge(product, on=['Product_ID','Product_List_Title','Quantity','Unit_Price', 'Discount_Rate'], how ='left')\
                    [['Transaction_ID', 'Date','Total_Cost','Sales_Channel','Order_Priority',
                            'Warehouse_Code', 'Ship_Mode', 'Delivery_Status','Customer_Satisfaction', 'Item_Returned',
                            'Return_Reason','Payment_Type', 'Taxable', 'Region', 'Country']]

In [16]:
transaction.head()

Unnamed: 0,Transaction_ID,Date,Total_Cost,Sales_Channel,Order_Priority,Warehouse_Code,Ship_Mode,Delivery_Status,Customer_Satisfaction,Item_Returned,Return_Reason,Payment_Type,Taxable,Region,Country
0,200,2020-01-01 20:32:25.945945945,8265.374549,Online,High,WH-3,2-Day,Cancelled,Neutral,False,Wrong Item,Wire Transfer,False,West,Canada
1,321,2020-01-02 06:55:08.108108108,4047.850479,Reseller,Critical,WH-1,Overnight,Backorder,Satisfied,True,Damaged,PayPal,True,South,Mexico
2,989,2020-01-06 08:12:58.378378378,5096.553818,Direct,Critical,WH-1,Overnight,Pending,Unsatisfied,True,Damaged,PayPal,True,West,Canada
3,682,2020-01-07 22:03:14.594594594,8194.281993,Reseller,Medium,WH-1,Express,Pending,Unsatisfied,False,Wrong Item,Cash,True,South,Canada
4,484,2020-01-07 07:08:06.486486486,8331.329249,Direct,Low,WH-2,2-Day,Delivered,Satisfied,True,Late,Cash,False,South,Mexico


In [17]:
#Temporal storage of the data
customer.to_csv(r'dataset\customer.csv', index= False)
product.to_csv(r'dataset\product.csv', index= False)
transaction.to_csv(r'dataset\transaction.csv', index= False)

In [18]:
#Loading Data
#Azure blob connection
load_dotenv()

connect_str = os.getenv("CONNECT_STR")
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

container_name = os.getenv('CONTAINER_NAME')
container_client = blob_service_client.get_container_client(container_name)


In [35]:
#Create a function that loads data in Azure storage as parquet file
def upload_df_to_blob_as_parquet(df, container_client, blob_name):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(buffer, blob_type ="BlockBlob", overwrite = True)
    print(f'{blob_name} uploaded successfully')

In [38]:
upload_df_to_blob_as_parquet(customer, container_client, 'rawdata/customer.parquet')
upload_df_to_blob_as_parquet(product, container_client, 'rawdata/product.parquet')
upload_df_to_blob_as_parquet(transaction, container_client, 'rawdata/transaction.parquet')

rawdata/customer.parquet uploaded successfully
rawdata/product.parquet uploaded successfully
rawdata/transaction.parquet uploaded successfully
