### installing necessary libraries

In [None]:
!pip install pyarrow
!pip install azure-storage-blob
!pip install python-dotenv

### IMPORT NECESSARY LIBRARIES

In [18]:
### IMPORT NECESSARY LIBRARIES
import datetime
import pandas as pd
import os
import io
from azure.storage.blob import BlobServiceClient, BlobClient
from dotenv import load_dotenv

### EXTRACTION LAYER

In [3]:
### EXTRACTION LAYER
ziko_df = pd.read_csv('ziko_logistics_data.csv')

In [4]:
ziko_df.columns

Index(['Transaction_ID', 'Date', 'Customer_ID', 'Product_ID', 'Quantity',
       'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel',
       'Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status',
       'Customer_Satisfaction', 'Item_Returned', 'Return_Reason',
       'Payment_Type', 'Taxable', 'Region', 'Country', 'Customer_Name',
       'Customer_Phone', 'Customer_Email', 'Customer_Address',
       'Product_List_Title'],
      dtype='object')

### Data cleaning and transformation

In [5]:
# check for missing or null values
ziko_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1005 entries, 0 to 1004
Data columns (total 25 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Transaction_ID         1005 non-null   int64  
 1   Date                   1005 non-null   object 
 2   Customer_ID            1005 non-null   int64  
 3   Product_ID             1005 non-null   int64  
 4   Quantity               1005 non-null   int64  
 5   Unit_Price             904 non-null    float64
 6   Total_Cost             905 non-null    float64
 7   Discount_Rate          714 non-null    float64
 8   Sales_Channel          1005 non-null   object 
 9   Order_Priority         1005 non-null   object 
 10  Warehouse_Code         1005 non-null   object 
 11  Ship_Mode              1005 non-null   object 
 12  Delivery_Status        1005 non-null   object 
 13  Customer_Satisfaction  1005 non-null   object 
 14  Item_Returned          1005 non-null   bool   
 15  Retu

In [6]:
### Data cleaning and transformation
ziko_df.fillna({
    'Unit_Price': ziko_df['Unit_Price'].mean(), # unit price dtype is float so by default fill it with mean
    'Total_Cost': ziko_df['Total_Cost'].mean(),
    'Discount_Rate': 0.0,
    'Return_Reason': 'Unknown'
    }, inplace=True)


ziko_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1005 entries, 0 to 1004
Data columns (total 25 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Transaction_ID         1005 non-null   int64  
 1   Date                   1005 non-null   object 
 2   Customer_ID            1005 non-null   int64  
 3   Product_ID             1005 non-null   int64  
 4   Quantity               1005 non-null   int64  
 5   Unit_Price             1005 non-null   float64
 6   Total_Cost             1005 non-null   float64
 7   Discount_Rate          1005 non-null   float64
 8   Sales_Channel          1005 non-null   object 
 9   Order_Priority         1005 non-null   object 
 10  Warehouse_Code         1005 non-null   object 
 11  Ship_Mode              1005 non-null   object 
 12  Delivery_Status        1005 non-null   object 
 13  Customer_Satisfaction  1005 non-null   object 
 14  Item_Returned          1005 non-null   bool   
 15  Retu

In [7]:
# correct and convert Date datatype
ziko_df['Date'] = pd.to_datetime(ziko_df['Date'])

ziko_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1005 entries, 0 to 1004
Data columns (total 25 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   Transaction_ID         1005 non-null   int64         
 1   Date                   1005 non-null   datetime64[ns]
 2   Customer_ID            1005 non-null   int64         
 3   Product_ID             1005 non-null   int64         
 4   Quantity               1005 non-null   int64         
 5   Unit_Price             1005 non-null   float64       
 6   Total_Cost             1005 non-null   float64       
 7   Discount_Rate          1005 non-null   float64       
 8   Sales_Channel          1005 non-null   object        
 9   Order_Priority         1005 non-null   object        
 10  Warehouse_Code         1005 non-null   object        
 11  Ship_Mode              1005 non-null   object        
 12  Delivery_Status        1005 non-null   object        
 13  Cus

In [8]:
ziko_df.columns

Index(['Transaction_ID', 'Date', 'Customer_ID', 'Product_ID', 'Quantity',
       'Unit_Price', 'Total_Cost', 'Discount_Rate', 'Sales_Channel',
       'Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status',
       'Customer_Satisfaction', 'Item_Returned', 'Return_Reason',
       'Payment_Type', 'Taxable', 'Region', 'Country', 'Customer_Name',
       'Customer_Phone', 'Customer_Email', 'Customer_Address',
       'Product_List_Title'],
      dtype='object')

### CREATE TABLE

In [9]:
### CREATE TABLE
Customer_table = ziko_df[['Customer_ID','Customer_Name','Customer_Phone', 'Customer_Email', 'Customer_Address']].copy().drop_duplicates()\
.reset_index(drop=True)

Customer_table.head()

Unnamed: 0,Customer_ID,Customer_Name,Customer_Phone,Customer_Email,Customer_Address
0,1086,Customer 200,+1-652-572-9306,customer.200.78@example.com,"275 Second St, Phoenix, USA"
1,1078,Customer 321,+1-311-186-5760,customer.321.13@sample.com,"478 Third St, New York, USA"
2,1077,Customer 989,+1-922-606-9032,customer.989.99@example.com,"843 Second St, Phoenix, USA"
3,1027,Customer 682,+1-237-853-5808,customer.682.66@demo.com,"153 Main St, Phoenix, USA"
4,1052,Customer 484,+1-986-360-9109,customer.484.3@sample.com,"264 Second St, New York, USA"


In [10]:
### CREATE TABLE
Product_table = ziko_df[['Product_ID','Product_List_Title','Unit_Price','Quantity']].copy().drop_duplicates()\
.reset_index(drop=True)

Product_table.head()

Unnamed: 0,Product_ID,Product_List_Title,Unit_Price,Quantity
0,536,Product 53,120.436821,3
1,523,Product 33,475.724994,6
2,535,Product 6,146.400556,3
3,546,Product 68,19.373194,6
4,556,Product 89,193.221313,8


In [11]:
### CREATE TABLE
Transaction_Fact_Table = ziko_df.merge(Customer_table, on= ['Customer_ID','Customer_Name','Customer_Phone', 'Customer_Email', 'Customer_Address'], how='left')\
                                .merge(Product_table, on= ['Product_ID','Product_List_Title','Unit_Price','Quantity'], how='left')\
                                [['Transaction_ID','Product_ID','Customer_ID','Total_Cost', 'Discount_Rate', 'Sales_Channel','Order_Priority', 'Warehouse_Code', 'Ship_Mode', 'Delivery_Status',\
                                'Customer_Satisfaction', 'Item_Returned', 'Return_Reason','Payment_Type', 'Taxable', 'Region', 'Country',]]

Transaction_Fact_Table.head()
                    

Unnamed: 0,Transaction_ID,Product_ID,Customer_ID,Total_Cost,Discount_Rate,Sales_Channel,Order_Priority,Warehouse_Code,Ship_Mode,Delivery_Status,Customer_Satisfaction,Item_Returned,Return_Reason,Payment_Type,Taxable,Region,Country
0,200,536,1086,8265.374549,0.2,Online,High,WH-3,2-Day,Cancelled,Neutral,False,Wrong Item,Wire Transfer,False,West,Canada
1,321,523,1078,4047.850479,0.0,Reseller,Critical,WH-1,Overnight,Backorder,Satisfied,True,Damaged,PayPal,True,South,Mexico
2,989,535,1077,5096.553818,0.05,Direct,Critical,WH-1,Overnight,Pending,Unsatisfied,True,Damaged,PayPal,True,West,Canada
3,682,546,1027,8194.281993,0.0,Reseller,Medium,WH-1,Express,Pending,Unsatisfied,False,Wrong Item,Cash,True,South,Canada
4,484,556,1052,8331.329249,0.2,Direct,Low,WH-2,2-Day,Delivered,Satisfied,True,Late,Cash,False,South,Mexico


### SAVE AS CSV

In [12]:
### LOAD TEMPORALLY AS CSV
Customer_table.to_csv(r'Dataset\Customer_table.csv', index=False)
Product_table.to_csv(r'Dataset\Product_table.csv', index=False)
Transaction_Fact_Table.to_csv(r'Dataset\Transaction_Fact_Table.csv', index=False)

print('file have been loaded temporarily into local machine')

file have been loaded temporarily into local machine


### DATA LOADING

In [13]:
### DATA LOADING
# set up azure blob connection
load_dotenv()
connect_str = os.getenv('CONNECT_STR')
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

container_name = os.getenv('CONTAINER_NAME')
container_client = blob_service_client.get_container_client(container_name)

### Data Access protection standard

In [14]:
## To ignore your dontenv data access from from being pushed to github 
# create a .gitignore file to disable that

### Create a function that will load the data into azure blob storage as parquet file

In [15]:
### Create a function that will load the data into azure blob storage as parquet file
def upload_df_to_blob_as_parquet(df, container_client, blob_name):
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    buffer.seek(0)
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(buffer, blob_type="BlockBlob", overwrite=True)
    print(f'{blob_name} uploaded to blob storage successfully')

In [16]:
## data upload
upload_df_to_blob_as_parquet(Customer_table, container_client, 'rawdata/Customer_table.parquet')
upload_df_to_blob_as_parquet(Product_table, container_client, 'rawdata/Product_table.parquet')
upload_df_to_blob_as_parquet(Transaction_Fact_Table, container_client, 'rawdata/Transaction_Fact_Table.parquet')

rawdata/Customer_table.parquet uploaded to blob storage successfully
rawdata/Product_table.parquet uploaded to blob storage successfully
rawdata/Transaction_Fact_Table.parquet uploaded to blob storage successfully


### Authomate the entire process into a pipeline 

In [17]:
## design an ETL pipeline using task schedule to confirm if the pipeline will run on schedule
