In [None]:
!pip install faker

In [None]:
from faker import Faker
import pandas as pd
import random
import hashlib
import uuid
import datetime
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import rand
from pyspark.sql.types import DecimalType
from decimal import Decimal

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

fake = Faker()

product_seed = 100
customer_seed = 200
location_seed = 300

num_product = 10000
num_customers = 100000
num_locations = 100

year = 2022

In [None]:
def generate_products_dimensions(num_rows, seed):
    dimensions = []
    _random = random.Random()
    _random.seed(seed)

    for i in range(num_rows):
        integer_bytes = str(i + seed).encode('utf-8')
        dimension = {
            'dim_id': i + 1,
            'dim_guid': str(uuid.UUID(int=_random.getrandbits(128), version=4)),
            'dim_sha256': hashlib.sha256(integer_bytes).hexdigest(),
            'dim_type': 'product',
            'name': fake.word().capitalize(),
            'category': fake.word().capitalize(),
            'color': fake.color_name(),
            'size': random.choice(['S', 'M', 'L', 'XL'])
        }
        dimensions.append(dimension)
    return dimensions

product_dimensions_data = generate_products_dimensions(num_product, product_seed)

spark_df = spark.createDataFrame(product_dimensions_data)

id_df = spark_df.select(['dim_id', 'dim_type', 'name', 'category', 'color', 'size'])
id_df.write.format("delta").mode("overwrite").save("Tables/dim_product_id")

guid_df = spark_df.select(['dim_guid', 'dim_type', 'name', 'category', 'color', 'size'])
guid_df.write.format("delta").mode("overwrite").save("Tables/dim_product_guid")

sha256_df = spark_df.select(['dim_sha256', 'dim_type', 'name', 'category', 'color', 'size'])
sha256_df.write.format("delta").mode("overwrite").save("Tables/dim_product_sha256")

In [None]:
def generate_customer_dimension(num_rows, seed):
    _random = random.Random()
    _random.seed(seed)

    customers = []
    for i in range(num_rows):
        integer_bytes = str(i + seed).encode('utf-8')
        customer = {
            'dim_id': i + 1,
            'dim_guid': str(uuid.UUID(int=_random.getrandbits(128), version=4)),
            'dim_sha256': hashlib.sha256(integer_bytes).hexdigest(),
            'dim_type': 'customer',
            'name': fake.name(),
            'age': random.randint(18, 80),
            'gender': random.choice(['Male', 'Female']),
            'city': fake.city()
        }
        customers.append(customer)
    return customers

customer_dimension_data = generate_customer_dimension(num_customers, customer_seed)

spark_df = spark.createDataFrame(customer_dimension_data)

id_df = spark_df.select(['dim_id', 'dim_type', 'name', 'age', 'gender', 'city'])
id_df.write.format("delta").mode("overwrite").save("Tables/dim_customer_id")

guid_df = spark_df.select(['dim_guid', 'dim_type', 'name', 'age', 'gender', 'city'])
guid_df.write.format("delta").mode("overwrite").save("Tables/dim_customer_guid")

sha256_df = spark_df.select(['dim_sha256', 'dim_type', 'name', 'age', 'gender', 'city'])
sha256_df.write.format("delta").mode("overwrite").save("Tables/dim_customer_sha256")

In [None]:

def generate_location_dimension(num_rows, seed):
    locations = pd.DataFrame(columns=['dim_id', 'dim_guid', 'dim_sha256', 'dim_type', 'country', 'state', 'city'])
    _random = random.Random()
    _random.seed(seed)

    for i in range(num_rows):
        integer_bytes = str(i + seed).encode('utf-8')
        locations = locations._append({
            'dim_id': i + 1,
            'dim_guid': str(uuid.UUID(int=_random.getrandbits(128), version=4)),
            'dim_sha256': hashlib.sha256(integer_bytes).hexdigest(),
            'dim_type': 'location',
            'country': 'US',
            'state': fake.state(),
            'city': fake.city()
        }, ignore_index=True)
    return locations

location_dimension_data = generate_location_dimension(num_locations, location_seed)

spark_df = spark.createDataFrame(location_dimension_data)

id_df = spark_df.select(['dim_id', 'dim_type', 'country', 'state', 'city'])
id_df.write.format("delta").mode("overwrite").save("Tables/dim_location_id")

guid_df = spark_df.select(['dim_guid', 'dim_type', 'country', 'state', 'city'])
guid_df.write.format("delta").mode("overwrite").save("Tables/dim_location_guid")

sha256_df = spark_df.select(['dim_sha256', 'dim_type', 'country', 'state', 'city'])
sha256_df.write.format("delta").mode("overwrite").save("Tables/dim_location_sha256")

In [None]:
def get_month_start_end_dates(year):
    month_dates = []
    current_date = datetime.datetime(year, 1, 1)  # Start from January 1st of the specified year

    while current_date.year == year:
        # Get the start date of the current month
        start_date = current_date.replace(day=1)
        
        # Calculate the end of the current month by adding one month and going back one day
        end_date = (current_date + relativedelta(months=1, day=1)) - datetime.timedelta(days=1)
        
        # Append the start and end dates to the list
        month_dates.append((start_date, end_date))
        
        # Move to the next month
        current_date += relativedelta(months=1)
    
    return month_dates

def generate_sales_facts(start_date: str, end_date: str, num_products, num_customers, num_locations, product_seed, customer_seed, location_seed):
    facts = []
    for current_date in pd.date_range(start=start_date, end=end_date, freq='D'):

        customer_random = random.Random()
        customer_random.seed(customer_seed)

        product_random = random.Random()
        product_random.seed(product_seed)

        location_random = random.Random()
        location_random.seed(location_seed)

        product_i = 0
        location_i = 0
        for i in range(num_customers):
            customer_integer_bytes = str(i + customer_seed).encode('utf-8')
            product_integer_bytes = str(product_i + product_seed).encode('utf-8')
            location_integer_bytes = str(location_i + location_seed).encode('utf-8')

            if i % num_products == 0:
                product_random.seed(product_seed)
                product_i = 0

            if i % num_locations == 0:
                location_random.seed(location_seed)
                location_i = 0

            fact = {
                'customer_dim_id': i + 1,
                'customer_dim_guid': str(uuid.UUID(int=customer_random.getrandbits(128), version=4)),
                'customer_dim_sha256': hashlib.sha256(customer_integer_bytes).hexdigest(),
                'product_dim_id': product_i + 1,
                'product_dim_guid': str(uuid.UUID(int=product_random.getrandbits(128), version=4)),
                'product_dim_sha256': hashlib.sha256(product_integer_bytes).hexdigest(),
                'location_dim_id': location_i + 1,
                'location_dim_guid': str(uuid.UUID(int=location_random.getrandbits(128), version=4)),
                'location_dim_sha256': hashlib.sha256(location_integer_bytes).hexdigest(),
                'date': int(current_date.strftime('%Y%m%d')),
 #               'quantity': random.randint(1, 100),
 #               'price': round(random.uniform(10, 100), 2)
            }

            facts.append(fact)

            product_i += 1
            location_i += 1
    return facts

month_dates = get_month_start_end_dates(year)

for start_date, end_date in month_dates:
    print(f"Start Date: {start_date.strftime('%Y-%m-%d')}, End Date: {end_date.strftime('%Y-%m-%d')}")
    start_date_str = start_date.strftime('%Y-%m-%d')
    end_date_str = end_date.strftime('%Y-%m-%d')

    sales_facts_data = generate_sales_facts(start_date_str, end_date_str, num_product, num_customers, num_locations, product_seed, customer_seed, location_seed)
    spark_df = spark.createDataFrame(sales_facts_data)
    spark_df = spark_df.withColumn("quantity", (rand() * 100 + 1).cast("int")).withColumn("price", (rand() * 90 + 10).cast(DecimalType(10, 2)))

    id_df = spark_df.select(['product_dim_id', 'customer_dim_id', 'location_dim_id', 'date', 'quantity', 'price'])
    id_df.write.format("delta").mode("append").save("Tables/fact_sales_id")

    guid_df = spark_df.select(['product_dim_guid', 'customer_dim_guid', 'location_dim_guid', 'date', 'quantity', 'price'])
    guid_df.write.format("delta").mode("append").save("Tables/fact_sales_guid")

    sha256_df = spark_df.select(['product_dim_sha256', 'customer_dim_sha256', 'location_dim_sha256', 'date', 'quantity', 'price'])
    sha256_df.write.format("delta").mode("append").save("Tables/fact_sales_sha256")


In [None]:
%%sql
OPTIMIZE here_lake_house.fact_sales_id;
OPTIMIZE here_lake_house.fact_sales_guid;
OPTIMIZE here_lake_house.fact_sales_sha256;

In [None]:
%%sql
VACUUM here_lake_house.fact_sales_id RETAIN 0 HOURS;
VACUUM here_lake_house.fact_sales_guid RETAIN 0 HOURS;
VACUUM here_lake_house.fact_sales_sha256 RETAIN 0 HOURS;

In [27]:
%%sql
SELECT SUM(quantity), SUM(price) FROM fact_sales_id
INNER JOIN dim_customer_id ON customer_dim_id = dim_customer_id.dim_id
INNER JOIN dim_product_id ON product_dim_id = dim_product_id.dim_id
INNER JOIN dim_location_id ON location_dim_id = dim_location_id.dim_id


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 29, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [28]:
%%sql
SELECT SUM(quantity), SUM(price) FROM fact_sales_id
INNER JOIN dim_customer_id ON customer_dim_id = dim_customer_id.dim_id
INNER JOIN dim_product_id ON product_dim_id = dim_product_id.dim_id
INNER JOIN dim_location_id ON location_dim_id = dim_location_id.dim_id


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 30, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [29]:
%%sql
SELECT SUM(quantity), SUM(price) FROM fact_sales_id
INNER JOIN dim_customer_id ON customer_dim_id = dim_customer_id.dim_id
INNER JOIN dim_product_id ON product_dim_id = dim_product_id.dim_id
INNER JOIN dim_location_id ON location_dim_id = dim_location_id.dim_id


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 31, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [30]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_guid
INNER JOIN dim_customer_guid ON customer_dim_guid = dim_customer_guid.dim_guid
INNER JOIN dim_product_guid ON product_dim_guid = dim_product_guid.dim_guid
INNER JOIN dim_location_guid ON location_dim_guid = dim_location_guid.dim_guid


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 32, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [31]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_guid
INNER JOIN dim_customer_guid ON customer_dim_guid = dim_customer_guid.dim_guid
INNER JOIN dim_product_guid ON product_dim_guid = dim_product_guid.dim_guid
INNER JOIN dim_location_guid ON location_dim_guid = dim_location_guid.dim_guid


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 33, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [32]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_guid
INNER JOIN dim_customer_guid ON customer_dim_guid = dim_customer_guid.dim_guid
INNER JOIN dim_product_guid ON product_dim_guid = dim_product_guid.dim_guid
INNER JOIN dim_location_guid ON location_dim_guid = dim_location_guid.dim_guid


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 34, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [33]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_sha256
INNER JOIN dim_customer_sha256 ON customer_dim_sha256 = dim_customer_sha256.dim_sha256
INNER JOIN dim_product_sha256 ON product_dim_sha256 = dim_product_sha256.dim_sha256
INNER JOIN dim_location_sha256 ON location_dim_sha256 = dim_location_sha256.dim_sha256


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 35, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [34]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_sha256
INNER JOIN dim_customer_sha256 ON customer_dim_sha256 = dim_customer_sha256.dim_sha256
INNER JOIN dim_product_sha256 ON product_dim_sha256 = dim_product_sha256.dim_sha256
INNER JOIN dim_location_sha256 ON location_dim_sha256 = dim_location_sha256.dim_sha256


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 36, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>

In [35]:
%%sql

SELECT SUM(quantity), SUM(price) FROM fact_sales_sha256
INNER JOIN dim_customer_sha256 ON customer_dim_sha256 = dim_customer_sha256.dim_sha256
INNER JOIN dim_product_sha256 ON product_dim_sha256 = dim_product_sha256.dim_sha256
INNER JOIN dim_location_sha256 ON location_dim_sha256 = dim_location_sha256.dim_sha256


StatementMeta(, edb76bf5-0551-4a94-a26c-914320f6f136, 37, Finished, Available)

<Spark SQL result set with 1 rows and 2 fields>