In [None]:
import os

In [None]:
import pandas as pd

from Extract.extract_s3 import extract_s3
from dotenv import load_dotenv

In [None]:
load_dotenv()
BUCKET = os.getenv("BUCKET_NAME")

## Extracting Data from S3

In [None]:
# customers
customers_df = extract_s3(bucket=BUCKET, key="Data/customers.parquet")
if customers_df is None:
    raise ValueError("No data found")

In [None]:
# order items
order_items_df = extract_s3(BUCKET, key="Data/order_items.parquet")
if order_items_df is None:
    raise ValueError("No data found")

In [None]:
# order payments
order_payments_df = extract_s3(BUCKET, key="Data/order_payments.parquet")

In [None]:
# order reviews
order_reviews_df = extract_s3(BUCKET, key="Data/order_reviews.parquet")
if order_reviews_df is None:
    raise ValueError("No data found")

In [None]:
# orders
orders_df = extract_s3(BUCKET, key="Data/orders.parquet")
if orders_df is None:
    raise ValueError("No data found")

In [None]:
# product categories
product_category_df = extract_s3(BUCKET, key="Data/product_categories.parquet")
if product_category_df is None:
    raise ValueError("No data found")

In [None]:
# products
products_df = extract_s3(BUCKET, key="Data/products.parquet")
if products_df is None:
    raise ValueError("No data found")

In [None]:
# sellers
sellers_df = extract_s3(BUCKET, key="Data/sellers.parquet")
if sellers_df is None:
    raise ValueError("No data found")

## Cleaning Data

### Customers

In [None]:
customers_df.head()

In [None]:
customers_df.shape

In [None]:
customers_df.info()

In [None]:
customers_df.duplicated().sum()

In [None]:
from uuid import UUID

In [None]:
# converting object to uuid
customers_df["customer_id"] = customers_df["customer_id"].apply(UUID)
customers_df["customer_unique_id"] = customers_df["customer_unique_id"].apply(UUID)

In [None]:
customers_df.isnull().sum()

In [None]:
# id to name mapping in customers
id_to_name = {
    cust_id: f"Customer_{idx + 1}"
    for idx, cust_id in enumerate(customers_df["customer_unique_id"].unique())
}
customers_df["customer_name"] = customers_df["customer_unique_id"].map(id_to_name)

### Order Items

In [None]:
order_items_df.head()

In [None]:
order_items_df.shape

In [None]:
order_items_df.info()

In [None]:
order_items_df.sample(10)

In [None]:
order_items_df.drop(["order_items_id"], axis=1, inplace=True)

Dropping `order_items_id` because it doesn't provide any information about dataset. It simply acts as the index value.

In [None]:
order_items_df.rename({"quantity": "order_item_id"}, axis=1, inplace=True)

**Note:** Previously `order_item_id` was misjudged as `quantity`, it was used in total price calculation. So we have to fix the total price by recalculating it.
`order_item_id` shows the id of item in particular order.

In [None]:
order_items_df["total_price"] = (
    order_items_df["freight_value"] + order_items_df["price"]
)

`total_price` was calculated with quantity which was `order_item_id`, so we have to recalculate the total price, where `price` shows the price of the product and `freight_value` shows the shipping cost of the particular order.

In [None]:
import numpy as np

In [None]:
order_items_df["total_price"] = (
    order_items_df["total_price"].apply(np.round).astype(int)
)

In [None]:
order_items_df.duplicated().sum()

In [None]:
order_items_df["order_id"] = order_items_df["order_id"].apply(UUID)
order_items_df["product_id"] = order_items_df["product_id"].apply(UUID)
order_items_df["seller_id"] = order_items_df["seller_id"].apply(UUID)

### Order Payments

In [None]:
order_payments_df.shape

In [None]:
order_payments_df.dtypes

`order_payments` is in correct datatypes.

In [None]:
order_payments_df.isnull().sum()

In [None]:
order_payments_df.duplicated().sum()

In [None]:
order_payments_df["order_id"] = order_payments_df["order_id"].apply(UUID)

### Order Reviews

In [None]:
order_reviews_df.shape

In [None]:
order_reviews_df.dtypes

In [None]:
order_reviews_df.isnull().sum()

**NaN** values in `review_comment_title` and `review_comment_message` contains more than half of the data. So dropping them is needed.

In [None]:
order_reviews_df.head()

In [None]:
order_reviews_df.drop(
    ["review_comment_title", "review_comment_message"], axis=1, inplace=True
)

In [None]:
order_reviews_df["review_answer_timestamp"] = pd.to_datetime(
    order_reviews_df["review_answer_timestamp"]
)

In [None]:
order_reviews_df.dtypes

In [None]:
order_reviews_df.duplicated().sum()

In [None]:
order_reviews_df["order_id"] = order_reviews_df["order_id"].apply(UUID)
order_reviews_df["review_id"] = order_reviews_df["review_id"].apply(UUID)

In [None]:
order_reviews_df.groupby("order_id")["review_id"].count().sort_values(ascending=False)

In [None]:
order_reviews_df[
    order_reviews_df["order_id"] == UUID("03c939fd-7fd3-b38f-8485-a0f95798f1f6")
]

single `order_id` contains 3 reviews, which is misleading. a single order can only be brought by single customer, so we have to remove the duplicates and keep the latest review.

In [None]:
order_reviews_df = order_reviews_df.sort_values("review_creation_date").drop_duplicates(
    "order_id", keep="last"
)

#### Orders

In [None]:
orders_df.shape

In [None]:
orders_df.dtypes

In [None]:
orders_df.isnull().sum()

`order_approved_at`, `order_delievered_carriar_date` and `order_delivered_customer_date` contains NaN values but those are linked with `order_status`, if order is cancelled those columns will not be populated.

In [None]:
orders_df.head()

In [None]:
orders_df.duplicated().sum()

In [None]:
orders_df["order_id"] = orders_df["order_id"].apply(UUID)
orders_df["customer_id"] = orders_df["customer_id"].apply(UUID)

### Product Categories

In [None]:
product_category_df.shape

In [None]:
product_category_df.dtypes

In [None]:
product_category_df.isnull().sum()

In [None]:
product_category_df.head()

### Products

In [None]:
products_df.shape

In [None]:
products_df.dtypes

In [None]:
products_df.isnull().sum()

In [None]:
products_df.head()

In [None]:
products_df.rename(
    {
        "product_description_lenght": "product_description_length",
        "product_name_lenght": "product_name_length",
    },
    axis=1,
    inplace=True,
)

In [None]:
products_df.duplicated().sum()

In [None]:
products_df.fillna({"product_category_name": "unknown"}, inplace=True)

`product_category_name` contains nan values, but they show important information about products, so we replace it with "unknown"

In [None]:
products_df["product_id"] = products_df["product_id"].apply(UUID)

In [None]:
# id to product name mapping
id_to_product_name = {
    prod_id: f"Product_{idx + 1}"
    for idx, prod_id in enumerate(products_df["product_id"].unique())
}
products_df["product_name"] = products_df["product_id"].map(id_to_product_name)



### Sellers

In [None]:
sellers_df.shape

In [None]:
sellers_df.dtypes

In [None]:
sellers_df.isnull().sum()

In [None]:
sellers_df.head()

In [None]:
sellers_df.duplicated().sum()

In [None]:
sellers_df["seller_id"] = sellers_df["seller_id"].apply(UUID)

In [None]:
# id to seller name
id_to_seller_name = {
    sel_id: f"Seller_{idx + 1}"
    for idx, sel_id in enumerate(sellers_df["seller_id"].unique())
}
sellers_df["seller_name"] = sellers_df["seller_id"].map(id_to_seller_name)

## Dimension Modeling

In [None]:
customer_map = customers_df.set_index("customer_id")["customer_unique_id"]

In [None]:
orders_df["customer_id"] = orders_df["customer_id"].map(customer_map)

In [None]:
customers_df.drop("customer_id", axis=1, inplace=True)

In [None]:
customers_df.drop_duplicates(inplace=True)

In [None]:
customers_df.rename({"customer_unique_id": "customer_id"}, axis=1, inplace=True)

### Fact Tables

In [None]:
fact_orders = order_items_df.merge(orders_df, on="order_id", how="left").merge(
    order_reviews_df, on="order_id", how="left"
)

In [None]:
fact_orders.drop(
    ["review_id", "review_answer_timestamp", "review_creation_date"],
    axis=1,
    inplace=True,
)

In [None]:
fact_orders.rename({"review_score": "order_rating"}, axis=1, inplace=True)

In [None]:
order_payments_df.groupby("order_id")["payment_type"].count().sort_values(
    ascending=False
)

`order_payments_df` contains the multiple payments of single order, so merging it with `fact_orders` will lead to duplicates so keeping it as separate fact table.

In [None]:
fact_payments = order_payments_df

### Dim Tables

In [None]:
customers_df.duplicated().sum()

In [None]:
dim_customers = customers_df

In [None]:
sellers_df.duplicated().sum()

In [None]:
dim_sellers = sellers_df

In [None]:
product_category_df.head()

In [None]:
product_category_map = product_category_df.set_index("product_category_name")[
    "product_category_name_english"
].to_dict()

In [None]:
product_category_map["unknown"] = "unknown"
products_df["product_category_name"] = products_df["product_category_name"].map(
    product_category_map
)

In [None]:
products_df["product_category_name"].value_counts()

In [None]:
dim_products = products_df

In [None]:
dim_dates = pd.DataFrame(
    pd.date_range(start="2016-01-01", end="2018-12-31"), columns=["date"]
)

In [None]:
dim_dates["quarter"] = dim_dates.date.dt.quarter
dim_dates["month"] = dim_dates.date.dt.month
dim_dates["year"] = dim_dates.date.dt.year
dim_dates["week_by_year"] = dim_dates.date.dt.strftime("%W").astype(int)
dim_dates["day"] = dim_dates.date.dt.day
dim_dates["weekday"] = dim_dates.date.dt.weekday
dim_dates["weekday_name"] = dim_dates.date.dt.day_name()

In [None]:
dim_dates.head()

## Load Data to Snowflake

In [None]:
from Load.snowflake_loader import load_df_to_snowflake, create_tables_in_snowflake

### Create Tables

In [None]:
# fact orders
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS FACT_ORDERS(
        "order_id" VARCHAR(50),
        "order_item_id" INT,
        "product_id" VARCHAR(50),
        "seller_id" VARCHAR(50),
        "shipping_limit_date" TIMESTAMP_TZ(0),
        "price" FLOAT,
        "freight_value" FLOAT,
        "total_price" INT,
        "customer_id" VARCHAR(50),
        "order_status" VARCHAR(50),
        "order_purchase_timestamp" TIMESTAMP_TZ(0),
        "order_approved_at" TIMESTAMP_TZ(0),
        "order_delivered_carrier_date" TIMESTAMP_TZ(0),
        "order_delivered_customer_date" TIMESTAMP_TZ(0),
        "order_estimated_delivery_date" TIMESTAMP_TZ(0),
        "order_rating" FLOAT
    )
    """
):
    raise Exception("failed to create table")

In [None]:
# fact payments
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS FACT_PAYMENTS(
        "order_id" VARCHAR(50),
        "payment_sequential" INT,
        "payment_type" VARCHAR(50),
        "payment_installments" INT,
        "payment_value" FLOAT
    );
    """
):
    raise Exception("failed to create table")

In [None]:
# dim customer
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS DIM_CUSTOMERS(
        "customer_id" VARCHAR(50),
        "customer_zip_code_prefix" INT,
        "customer_city" VARCHAR(50),
        "customer_state" VARCHAR(50),
        "customer_name" VARCHAR(20)
    );
    """
):
    raise Exception("failed to create table")

In [None]:
# dim sellers
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS DIM_SELLERS(
        "seller_id" VARCHAR(50),
        "seller_zip_code_prefix" INT,
        "seller_city" VARCHAR(50),
        "seller_state" VARCHAR(50),
        "seller_name" VARCHAR(20)
    );
    """
):
    raise Exception("failed to create table")

In [None]:
# dim products
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS DIM_PRODUCTS(
        "product_id" VARCHAR(50),
        "product_category_name" VARCHAR(50),
        "product_name_length" INT,
        "product_description_length" INT,
        "product_photos_qty" INT,
        "product_weight_g" INT,
        "product_length_cm" INT,
        "product_height_cm" INT,
        "product_width_cm" INT,
        "product_name" VARCHAR(20)
    );
    """
):
    raise Exception("failed to create table")

In [None]:
# dim dates
if not create_tables_in_snowflake(
    """
    CREATE TABLE IF NOT EXISTS DIM_DATES(
        "date" TIMESTAMP_TZ(0),
        "quarter" INT,
        "month" INT,
        "year" INT,
        "week_by_year" INT,
        "day" INT,
        "weekday" INT,
        "weekday_name" VARCHAR(50)
    );
    """
):
    raise Exception("failed to create table")

### Load data

In [None]:
load_df_to_snowflake("FACT_ORDERS", fact_orders)

In [None]:
load_df_to_snowflake("FACT_PAYMENTS", fact_payments)

In [None]:
load_df_to_snowflake("DIM_CUSTOMERS", dim_customers)

In [None]:
load_df_to_snowflake("DIM_SELLERS", dim_sellers)

In [None]:
load_df_to_snowflake("DIM_PRODUCTS", dim_products)

In [None]:
load_df_to_snowflake("DIM_DATES", dim_dates)