# INITIAL SETUP

In [25]:
!pip freeze > requirements.txt

In [1]:
#We need to install faker using pip
!pip install faker

Collecting faker
  Downloading Faker-19.8.0-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: faker
Successfully installed faker-19.8.0


In [2]:
import pandas as pd
import numpy as np
from datetime import *
import pandas_gbq
from sqlalchemy import *
from faker import Faker

from google.oauth2 import service_account
from google.cloud import bigquery

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

Run this command in your gcloud shell to generate ADC credentials.


```
gcloud auth application-default login
```



In [4]:
#Add the ADC key to the environment
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/fractal1a-8cd62469cbdc.json'

In [6]:
#Let's test if our ADC key is working fine by running a gbq command
df = pd.DataFrame(
    {
        "my_string": ["a", "b", "c"],
        "my_int64": [1, 2, 3],
        "my_float64": [4.0, 5.0, 6.0],
        "my_bool1": [True, False, True],
        "my_bool2": [False, True, False],
        "my_dates": pd.date_range("now", periods=3),
    }
)

pandas_gbq.to_gbq(df, 'fractal1a.starschema.test', project_id = "fractal1a", if_exists='append')

100%|██████████| 1/1 [00:00<00:00, 1446.81it/s]


# EXTRACTION

In [7]:
#Set the frequently used credentials
USER = "postgres"
PASSWORD = "123"
HOST = "34.93.118.74"
PORT = "5432"

In [8]:
# Create the sqlalchemy engine
engine = create_engine(f"postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{'oltp'}")
conn = engine.connect()

#Run the pandas read_sql function to retreive the SQL data.
customer_master = pd.read_sql(text("select * from customer_master"), conn)
product_master = pd.read_sql(text("select * from product_master"), conn)
order_details = pd.read_sql(text("select * from order_details"), conn)
order_items = pd.read_sql(text("select * from order_items"), conn)

#Run these for the new generated data.
#order_details = pd.read_sql(text("select * from order_details where order_status_update_timestamp >= '20231010 00:00:00.000"), conn)
#order_items = pd.read_sql(text("select * from order_items where order_status_update_timestamp >= '20231010 00:00:00.000"), conn)

#Check if the data is getting imported correctly.
product_master.head()

Unnamed: 0,productid,productcode,productname,sku,rate,isactive
0,1,c66,Amul Aata,5KG,573,True
1,2,Y76,Balaji Oil,5KG,538,True
2,3,n59,Reliance Shampoo,3KG,338,False
3,4,N21,Reliance Cream,5KG,981,False
4,5,j89,MDH Salt,5KG,151,False


In [9]:
#Check the datatypes of these new dataframes
customer_master.dtypes

customerid                   int64
name                        object
address                     object
city                        object
state                       object
pincode                      int64
update_timestamp    datetime64[ns]
dtype: object

#TRANSFORMATION

In [10]:
conn = psycopg2.connect(
    dbname = 'oltp',
    user = USER,
    password = PASSWORD,
    host = HOST
)

In [11]:
#Build the dim_order

#dim_order = order_details[["orderid", "order_status_update_timestamp", "order_status"]]

dim_order = pd.read_sql_query('''
SELECT orderid, order_status_update_timestamp, order_status FROM order_details;
''', conn)
dim_order.head()

  dim_order = pd.read_sql_query('''


Unnamed: 0,orderid,order_status_update_timestamp,order_status
0,1,2023-06-28 22:21:44,Received
1,2,2023-05-25 04:58:14,Received
2,3,2023-05-29 10:39:56,Received
3,4,2023-07-25 18:09:55,Received
4,5,2023-01-14 01:20:33,Received


In [12]:
#Build the fact_daily_orders

fact_daily_orders = pd.read_sql_query('''
SELECT
    od.customerid,
    od.orderid,
    MIN(CASE WHEN od.order_status = 'Received' THEN od.order_status_update_timestamp END) AS order_received_timestamp,
    MAX(CASE WHEN od.order_status = 'Delivered' THEN od.order_status_update_timestamp END) AS order_delivery_timestamp,
    cm.pincode,
    SUM(oi.quantity * pm.rate) AS order_amount,
    SUM(oi.quantity) AS item_count,
    EXTRACT(EPOCH FROM MAX(CASE WHEN od.order_status = 'Delivered' THEN od.order_status_update_timestamp END) - MIN(CASE WHEN od.order_status = 'Received' THEN od.order_status_update_timestamp END))::INTEGER AS order_delivery_time_seconds
FROM
    order_details od
JOIN
    customer_master cm ON od.customerid = cm.customerid
JOIN
    order_items oi ON od.orderid = oi.orderid
JOIN
    product_master pm ON oi.productid = pm.productid
GROUP BY
    od.customerid,
    od.orderid,
    cm.pincode
ORDER BY od.orderid;
''', conn)

fact_daily_orders.head()

  fact_daily_orders = pd.read_sql_query('''


Unnamed: 0,customerid,orderid,order_received_timestamp,order_delivery_timestamp,pincode,order_amount,item_count,order_delivery_time_seconds
0,992,2,2023-05-25 04:58:14,2023-05-25 20:14:58,25306,19425,33,55004
1,518,3,2023-05-29 10:39:56,2023-05-30 14:12:21,666381,3810,15,99145
2,923,4,2023-07-25 18:09:55,2023-07-27 11:29:04,266856,11184,12,148749
3,588,6,2023-02-05 04:53:24,2023-02-07 00:11:00,16313,9984,18,155856
4,695,7,2023-07-21 04:05:40,2023-07-22 06:44:02,653869,5442,6,95902


In [13]:
#Let's confirm the data types.
fact_daily_orders.dtypes

customerid                              int64
orderid                                 int64
order_received_timestamp       datetime64[ns]
order_delivery_timestamp       datetime64[ns]
pincode                                 int64
order_amount                            int64
item_count                              int64
order_delivery_time_seconds             int64
dtype: object

In [14]:
#Build the dim_customer

dim_customer = pd.read_sql_query('''SELECT customerid, name, customerid AS address_id, update_timestamp AS start_date, NULL AS end_date
FROM customer_master;''', conn)

dim_customer.head()

  dim_customer = pd.read_sql_query('''SELECT customerid, name, customerid AS address_id, update_timestamp AS start_date, NULL AS end_date


Unnamed: 0,customerid,name,address_id,start_date,end_date
0,1,Tara Walia,1,2023-02-06 19:53:12,
1,2,Hazel Sastry,2,2023-06-21 22:12:04,
2,3,Biju Swaminathan,3,2023-07-30 21:01:59,
3,4,Divij Baria,4,2023-08-20 13:47:46,
4,5,Samar Mani,5,2023-09-29 14:09:39,


In [15]:
#Build the dim_address

dim_address = pd.read_sql_query('''SELECT customerid AS address_id,address,city,state,pincode from customer_master;''', conn)

dim_address.head()

  dim_address = pd.read_sql_query('''SELECT customerid AS address_id,address,city,state,pincode from customer_master;''', conn)


Unnamed: 0,address_id,address,city,state,pincode
0,1,H.No. 487\nSrinivasan Marg,Mysore,Karnataka,938242
1,2,"H.No. 11, Keer Path",Bhubaneswar,Odisha,659387
2,3,"H.No. 609, Thaker Chowk",Belgaum,Karnataka,139332
3,4,71/48\nSastry Road,Bhagalpur,Bihar,989471
4,5,"H.No. 23, Babu Street",Rourkela,Odisha,112201


In [16]:
#Build the dim_product

dim_product = pd.read_sql_query('''SELECT productid, productcode, productname, SKU, rate, isactive, now() AS start_date, NULL AS end_date from product_master;''', conn)

dim_product.head()

  dim_product = pd.read_sql_query('''SELECT productid, productcode, productname, SKU, rate, isactive, now() AS start_date, NULL AS end_date from product_master;''', conn)


Unnamed: 0,productid,productcode,productname,sku,rate,isactive,start_date,end_date
0,1,c66,Amul Aata,5KG,573,True,2023-10-10 15:04:02.871417+00:00,
1,2,Y76,Balaji Oil,5KG,538,True,2023-10-10 15:04:02.871417+00:00,
2,3,n59,Reliance Shampoo,3KG,338,False,2023-10-10 15:04:02.871417+00:00,
3,4,N21,Reliance Cream,5KG,981,False,2023-10-10 15:04:02.871417+00:00,
4,5,j89,MDH Salt,5KG,151,False,2023-10-10 15:04:02.871417+00:00,


In [17]:
#Build the f_order_details

f_order_details = pd.read_sql_query('''
SELECT oi.orderid, od.order_status_update_timestamp AS order_delivery_timestamp, oi.productid , oi.quantity from order_items oi JOIN order_details od
ON od.orderid = oi.orderid AND od.order_status = 'Delivered';
''', conn)

f_order_details.head()

  f_order_details = pd.read_sql_query('''


Unnamed: 0,orderid,order_delivery_timestamp,productid,quantity
0,7608,2023-09-05 22:17:13,91,1
1,17513,2023-09-05 10:15:31,56,5
2,7214,2023-05-01 02:00:54,98,5
3,15080,2023-05-13 03:00:10,54,4
4,3870,2023-02-06 14:48:51,40,5


# LOAD

In [18]:
#Make a bigquery connection to the project
client = bigquery.Client(project='fractal1a')

In [19]:
#Upload the dim_order table
client.load_table_from_dataframe(dim_order, 'fractal1a.starschema.dim_order')

LoadJob<project=fractal1a, location=asia-south1, id=350a4833-ee7b-4970-93d1-0faf22d29ce1>

In [20]:
#Upload the fact_daily_orders table
client.load_table_from_dataframe(fact_daily_orders, 'fractal1a.starschema.fact_daily_orders')

LoadJob<project=fractal1a, location=asia-south1, id=5c01bb86-e01c-411f-9dcd-fff09aec0b4e>

In [21]:
#Upload the dim_customer table
client.load_table_from_dataframe(dim_customer, 'fractal1a.starschema.dim_customer')

LoadJob<project=fractal1a, location=asia-south1, id=32958c4b-f798-46b3-a5e2-2ed97f0839e5>

In [22]:
#Upload the dim_address table
client.load_table_from_dataframe(dim_address, 'fractal1a.starschema.dim_address')

LoadJob<project=fractal1a, location=asia-south1, id=0d74a184-52cf-4a1e-9271-ba22e91e4923>

In [23]:
#Upload the dim_product table
client.load_table_from_dataframe(dim_product, 'fractal1a.starschema.dim_product')

LoadJob<project=fractal1a, location=asia-south1, id=b87dcde5-08e1-4e90-90a5-902346e88adb>

In [24]:
#Upload the f_order_details table
client.load_table_from_dataframe(f_order_details, 'fractal1a.starschema.f_order_details')

LoadJob<project=fractal1a, location=asia-south1, id=2a00325a-86eb-4080-8b9d-398ff2b0b969>