In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

%pip install dbldatagen --quiet 
%pip install jmespath --quiet

In [None]:
import dbldatagen as dg
import dbldatagen.distributions as dist
import random
from pyspark.sql.types import FloatType, StructType, StructField,  StringType, IntegerType
import pandas as pd
from pyspark.sql.functions import col, last_day, dayofweek, year, month, date_format


country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG', 'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']

colors = ['White','Black','Grey','Silver','Orange','Yellow','Blue','Brown','Gold','Silver Grey','Pink','Red','Green','Transparent','Purple','Azure']
categories = ['Audio','TV and Video','Computers','Cell phones','Music, Movies and Audio Books','Home Appliances','Cameras and camcorders','Games and Toys']
categories_weights = [1000,900,500,400,100,10,300,90]
subCategories = ['MP4&MP3','Home Theater System','Projectors & Screens','Computers Accessories','Home & Office Phones','Movie DVD','Washers & Dryers','Microwaves','Water Heaters','Coffee Machines','Air Conditioners','Digital SLR Cameras','Touch Screen Phones','Cell phones Accessories','Camcorders','VCD & DVD','Car Video','Boxed Games','Bluetooth Headphones','Digital Cameras','Smart phones & PDAs','Televisions','Laptops','Desktops','Monitors','Printers, Scanners & Fax','Refrigerators','Lamps','Fans','Cameras & Camcorders Accessories','Recording Pen','Download Games']

start_date = '2020-01-01 00:00:00'
end_date = '2024-12-31 00:00:00'

customer_rows = 1000
store_rows = 100
product_rows = 1000
sales_rows = 10000

num_fact_tables = 3
num_random_tables = 10
num_columns_random_tables = 15
random_table_prefix = "zRandomTable"

listOfTables = []

# Customer table

dataSpec = (
    dg.DataGenerator(spark, name="customerDataset", rows=customer_rows)
    .withColumn("customerId", IntegerType(),expr="id + 1")
    .withColumn("customer", template=r"\w \w")
    .withColumn("email", template=r"\w.\w@\w.com")    
    .withColumn("country", StringType(), values=country_codes, random=True, distribution=dist.Gamma(1.0, 2.0))
    .withColumn("birthday", "date", data_range=dg.DateRange("1942-01-01 00:00:00", "2010-10-06 11:55:00", "days=3"), random=True)
    .withColumn("gender", StringType(), values=["male", "female"], random=True)
    )

df = dataSpec.build()

listOfTables.append({'name': 'customer', 'data': df})

# store table

dataSpec = (
    dg.DataGenerator(spark, name="storeDataset", rows=store_rows)
    .withColumn("storeId", IntegerType(),expr="id + 1")
    .withColumn("store", template=r"\w \w \w")
    .withColumn("openDate", "date", data_range=dg.DateRange("1942-01-01 00:00:00", "2010-10-06 11:55:00", "days=3"), random=True)        
    .withColumn("status", StringType(), values=["open", "closed"], weights=[100,10], random=True)
    )

df = dataSpec.build()

listOfTables.append({'name': 'store', 'data': df})

# product table

dataSpec = (
    dg.DataGenerator(spark, name="productDataset", rows=product_rows)    
    .withColumn("productId", IntegerType(),expr="id + 1")
    .withColumn("product", template=r"\w \w \w")    
    .withColumn("color", StringType(), values=colors, random=True, distribution=dist.Gamma(1.0, 2.0))
    .withColumn("category", StringType(), values=categories, weights=categories_weights, random=True)
    .withColumn("unitPrice", 'decimal(10,2)', minValue=1, maxValue=100, random=True)
    .withColumn("weight", 'decimal(10,2)', minValue=1, maxValue=50, step=0.1, random=True)    
    )

df = dataSpec.build()

listOfTables.append({'name': 'product', 'data': df})

# sales table

for i in range(1, num_fact_tables+1):

    dataSpec = (
        dg.DataGenerator(spark, name="salesDataset", rows=sales_rows, randomSeed  = i)
        .withColumn("salesId", IntegerType(),expr="id + 1")
        .withColumn("customerId", IntegerType(),  minValue=1, maxValue=customer_rows, random=True)
        .withColumn("productId", IntegerType(),  minValue=1, maxValue=product_rows, random=True)
        .withColumn("storeId", IntegerType(),  minValue=1, maxValue=store_rows, random=True)    
        .withColumn("orderDate", "date", data_range=dg.DateRange(start_date, end_date, "days=10"), random=True)
        .withColumn("shippingDate","date", expr="date_add(orderDate, cast(floor(rand() * 20 + 1) as int))", baseColumn=["orderDate"])
        .withColumn("quantity", IntegerType(),  minValue=0, maxValue=500, random=True, distribution=dist.Gamma(1.0, 2.0))    
        .withColumn("price", 'decimal(10,2)', minValue=1, maxValue=10, random=True, distribution=dist.Gamma(1.0, 2.0))
        .withColumn("amount","decimal(10,2)", expr="quantity*price", baseColumn=["quantity", "price"])
        )    

    df = dataSpec.build()
        
    if (i == 1):
        listOfTables.append({'name': f"sales", 'data': df})
    else:
        listOfTables.append({'name': f"sales_{i}", 'data': df})

# date dimension

date_df = pd.date_range(start=start_date, end=end_date).to_frame(index=False, name='Date')
date_df['Date'] = date_df['Date'].astype(str)

dfCalendarData = spark.createDataFrame(date_df)

dfCalendarData = dfCalendarData.withColumn('date', col('Date').cast('date'))
dfCalendarData = dfCalendarData.withColumn('dateID', date_format(col('Date'),"yyyyMMdd").cast('integer'))
dfCalendarData = dfCalendarData.withColumn('monthly', date_format(col('Date'),"yyyy-MM-01").cast('date'))
dfCalendarData = dfCalendarData.withColumn('month', date_format(col('Date'),"MMM"))
dfCalendarData = dfCalendarData.withColumn('monthYear', date_format(col('Date'),"MMM yyyy"))
dfCalendarData = dfCalendarData.withColumn('monthOfYear', month(col('Date')))
dfCalendarData = dfCalendarData.withColumn('year', year(col('Date')))
dfCalendarData = dfCalendarData.withColumn('dayOfWeekNum', dayofweek(col('Date')))
dfCalendarData = dfCalendarData.withColumn('dayOfWeek', date_format(col('Date'),"EE"))

listOfTables.append({'name': 'calendar', 'data': dfCalendarData})

# random tables

for i in range(1, num_random_tables+1):

    randomTableRowCount = random.randint(100, 1000)     

    dataSpec = (
        dg.DataGenerator(spark, name="test_data_set1", rows=randomTableRowCount, randomSeed = i)
        .withIdOutput()
        .withColumn(
            "r",
            FloatType(),
            expr="floor(rand() * 350) * (86400 + 3600)",
            numColumns=num_columns_random_tables,
        )
        .withColumn("code1", IntegerType(), minValue=100, maxValue=200)
        .withColumn("code2", "integer", minValue=0, maxValue=10, random=True)
        .withColumn("code3", StringType(), values=["online", "offline", "unknown"])
        .withColumn("code4", template=r"\w")    
        .withColumn(
            "code5", StringType(), values=["a", "b", "c", "d", "e"], random=True, percentNulls=0.05
        )
        .withColumn(
            "code6", "string", values=["a", "b", "c"], random=True, weights=[9, 1, 1]
        )
    )

    df = dataSpec.build()

    listOfTables.append({'name': f"{random_table_prefix}_{i}", 'data': df})

# Save to Lakehouse

for table in listOfTables:

    tableName = table['name']

    print(f"Saving table '{tableName}'")

    df = table['data']

    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(tableName)   

In [None]:

for table in listOfTables:
    tableName = table['name']

    spark.sql(f"SELECT count(*) as {tableName} FROM {tableName}").show()