# DBT training database

Assumptions:
  - No extraction will be performed in this scenario.
  - For simplicity, only 5 tables will be created - Customer,address, Product, category, Purchases
  - Fake data will be created using faker and loaded into the raw data
  - Product will be a static dimension while customer will be a SCD2
  - No git best practices nor modeling was applied, this is a simple exercise for DBT purposes

## Creating catalog and schemas

In [0]:
%sql
drop table if exists dbt_training.raw.customer;
drop table if exists dbt_training.raw.products;
drop table if exists dbt_training.raw.product;

In [0]:
%sql
create catalog if not exists 'dbt_training';
create schema if not exists dbt_training.raw;
create schema if not exists dbt_training.bronze;
create schema if not exists dbt_training.silver;
create schema if not exists dbt_training.gold;
drop schema if exists dbt_training.default;

## Installing and importing libraries

In [0]:
# %pip install Faker

In [0]:
from faker import Faker
import random
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp, rand

In [0]:
# Creating faker instance
fake = Faker("en_US")
seed = 14

## Table creation

### Creating customer table
- This will be a SCD2 dimension for the purpose of the training

In [0]:
#seed
fake.seed_instance(seed)
random.seed(seed)

# Creating a customer table
customer_data = []
customer_female_data = []
customer_male_data = []
num_customers = 5_000

for customers in range (1,num_customers + 1):
    customer_female_data.append(
        Row(
            customer_id = customers,
            first_name = fake.first_name_female(),
            last_name = fake.last_name_female(),
            civil_status = fake.random_element(elements=("Married", "Single", "Divorced", "Widowed", "Prefer not to say","stable UNION")),
            children = fake.random_int(min=0, max=3),
            gender = fake.random_element(elements=("Female","Prefer not to say", "F","f")),
            email = fake.email(),
            phone = fake.phone_number(),
            date_of_birth = fake.date_of_birth(minimum_age=0, maximum_age=110),
            account_status = random.choice(["Active","Inactive", "Activated", "Suspensed", "Suspended"]),
            account_created_at = fake.date_time_between(start_date='-3y', end_date='now'),
            last_purchase_at = fake.date_time_between(start_date='-3y', end_date='now'),
            last_login_at = fake.date_time_between(start_date='-3y', end_date='now'
            )
        )
    )
    customer_male_data.append(
        Row(
            customer_id = customers,
            first_name = fake.first_name_male(),
            last_name = fake.last_name_male(),
            civil_status = fake.random_element(elements=("Married", "Single", "Divorced", "Widowed", "Prefer not to say","stable UNION")),
            children = fake.random_int(min=0, max=3),
            gender = fake.random_element(elements=("Male", "Prefer not to say", "M", "m")),
            email = fake.email(),
            phone = fake.phone_number(),
            date_of_birth = fake.date_of_birth(minimum_age=0, maximum_age=110),
            account_status = random.choice(["Active","Inactive", "Activated", "Suspensed", "Suspended"]),
            account_created_at = fake.date_time_between(start_date='-3y', end_date='now'),
            last_purchase_at = fake.date_time_between(start_date='-3y', end_date='now'),
            last_login_at = fake.date_time_between(start_date='-3y', end_date='now'
            )
        )
    )

customer_data = customer_female_data + customer_male_data



In [0]:
# Transforming it into a spark dataframe to add ingestion date
customer_df = spark.createDataFrame(customer_data)
display(customer_df)

In [0]:
# Ingestion date add
customer_df = customer_df.withColumn("Load_date", current_timestamp())
display(customer_df)

In [0]:
# Write the table into thew raw layer
customer_df.write.mode("append").saveAsTable('dbt_training.raw.customer')

### Address table

In [0]:
#seed
fake.seed_instance(seed)
random.seed(seed)

# Creating a customer table
address_data = []
num_addresses= 5_000

for address in range(1, num_addresses + 1):
    address_data.append(
        Row(
            address_id = fake.uuid4(),
            customer_id = address,
            address = fake.address(),
            city = fake.city(),
            state = fake.state(),
            zip_code = fake.zipcode(),
            country = fake.country(),
            created_at = fake.date_time_between(start_date='-3y', end_date='now')
        )
    )
    




In [0]:
# Transforming it into a spark dataframe to add ingestion date
address_df = spark.createDataFrame(address_data)
display(address_df)

In [0]:
address_df =  address_df.withColumn("Load_Date", current_timestamp())
display(address_df)

In [0]:
address_df.write.mode("append").saveAsTable('dbt_training.raw.address')

### Creating category table
- This will be SCD1 for the purpose of the training

In [0]:
# Creating a category structure as base for the category table
CATEGORY_STRUCTURE = {
    "electronics": [
        "smartphone", "tablet", "laptop", "camera",
        "headphones", "smartwatch", "gaming console", "smart home device"
    ],
    "clothing": [
        "shirts", "pants", "dresses", "shoes",
        "jackets", "sweaters", "blouses"
    ],
    "jewelry": [
        "necklaces", "earrings", "rings",
        "bracelets", "watches", "jewelry sets"
    ],
    "books": [
        "fiction", "non-fiction", "biographies",
        "science fiction", "romance", "mystery", "horror"
    ],
    "home": [
        "kitchenware", "bedding", "lighting",
        "furniture", "decor"
    ]
}

In [0]:
#seed
fake.seed_instance(seed)
random.seed(seed)

# Generate category list
category_list = []
category_id = 1
category_map = {}  

for category, subcategories in CATEGORY_STRUCTURE.items():
    for subcategory in subcategories:
        category_list.append(
            Row(
                category_id=category_id,
                category_name=category,
                subcategory_name=subcategory,
                created_at=fake.date_time_between(start_date='-5y', end_date='-1y')
            )
        )

        # save it for later use in the products table
        category_map[(category, subcategory)] = category_id
        category_id += 1

In [0]:
category_df  = spark.createDataFrame(category_list)

In [0]:
category_df = category_df.withColumn("Load_Date", current_timestamp())
display(category_df)

In [0]:
category_df.write.mode("append").saveAsTable('dbt_training.raw.category')

### Product Table

In [0]:
#seed
fake.seed_instance(seed)
random.seed(seed)

product_list = []
product_items = 500

for product in range(1, product_items + 1):

    # sorteia categoria e subcategoria de forma v√°lida
    category = random.choice(list(CATEGORY_STRUCTURE.keys()))
    subcategory = random.choice(CATEGORY_STRUCTURE[category])

    category_id_fk = category_map[(category, subcategory)]

    product_list.append(
        Row(
            product_id=f"SKU-{product:06d}",
            product_name=f"{fake.word().capitalize()} {fake.word().capitalize()}",
            category_id=category_id_fk,
            brand=fake.company(),
            cost=fake.pydecimal(left_digits=3, right_digits=2, positive=True),
            weight=fake.pydecimal(left_digits=2, right_digits=2, positive=True),
            created_at=fake.date_time_between(start_date='-3y', end_date='-1y'),
            modified_at=fake.date_time_between(start_date='-1y', end_date='now')
        )
    )

In [0]:
products_df = spark.createDataFrame(product_list)

In [0]:
products_df = products_df.withColumn("Load_Date", current_timestamp())
display(products_df)

In [0]:
products_df.write.mode("append").saveAsTable('dbt_training.raw.product')

### Creating purchase table

In [0]:
fake.seed_instance(seed)
random.seed(seed)
num_transactions = 2_000  

# Function to get customer_id and product_id
def get_random_customer_and_product():
    
    random_customer = customer_df.orderBy(rand()).limit(1).collect()[0]
    customer_id = random_customer['customer_id']
    random_product = products_df.orderBy(rand()).limit(1).collect()[0]
    product_id = random_product['product_id']
    
    return customer_id, product_id

# Transaction list
purchases = []

# Generating transactions
for _ in range(num_transactions):
    customer_id, product_id = get_random_customer_and_product()

    purchases.append(
        Row(
            purchase_id = fake.uuid4(),
            purchase_date = fake.date_between(start_date="-3y", end_date="today"),
            customer_id = customer_id, 
            product_id = product_id,
            price = round(random.uniform(5.0, 500.0), 2),
            quantity = random.randint(1, 5)
        )
    )

In [0]:
purchase_df = spark.createDataFrame(purchases)
purchase_df = purchase_df.withColumn("Load_date", current_timestamp())
display(purchase_df)

In [0]:
purchase_df.write.mode("append").saveAsTable('dbt_training.raw.purchase')