In [0]:
%pip install dbldatagen

In [0]:
from pyspark.sql.types import FloatType, IntegerType, StringType,TimestampType, DecimalType
from pyspark.sql.functions import col, date_format
import dbldatagen as dg
import pyspark.sql.functions as F

import re

import random


In [0]:
shuffle_partitions_requested = 12 * 4

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)

In [0]:
row_count = 1000

unique_stores = 20
unique_employees = 100
unique_customers = 700

partitions_requested = 4

df1_data_generator = (
    dg.DataGenerator(spark, name="test_data_set1", rows=row_count,
                     partitions=partitions_requested, randomSeedMethod="hash_fieldname", verbose=True)
    .withIdOutput()
    .withColumn("StoreId", IntegerType(), minValue=0, maxValue=unique_stores-1, random=True)
)

df1 = df1_data_generator.build()

display(df1)

In [0]:
df2_data_generator = (
    dg.DataGenerator(spark, name="test_data_set2", rows=row_count,
                     partitions=partitions_requested, randomSeedMethod="hash_fieldname", verbose=True)
    .withIdOutput()
    .withColumn("StoreId", IntegerType(), minValue=0, maxValue=unique_stores-1, random=True)
    .withColumn("Timestamp", TimestampType(), begin="2013-01-01 01:00:00", end="2023-10-31 23:59:00", interval="1 hour", random=True)
    .withColumn("Amount", DecimalType(5, 2), expr="((rand() + 0.1) * 350)")
)

df2 = df2_data_generator.build()

display(df2)

In [0]:
def get_employees(unique_employees: int, unique_stores: int) -> tuple([list,list]):
    """
    Generate random allocation of employees to stores.

    Parameters:
    - unique_employees (int): The total number of unique employees available for allocation.
    - unique_stores (int): The number of unique stores where employees need to be allocated.

    Returns:
    - employees_per_store (list): A list containing the number of employees assigned to each store.
    - employees_per_store_cumulative (list): A list of lists representing the cumulative employee
      allocation for each store.

    The function uses random sampling to generate distinct bounds on the employee Ids for each store, ensuring Ids are assigned individually. It then creates a cumulative list of employee Ids for each store based
    on the generated bounds.

    Example usage:
    employees_per_store, employees_per_store_cumulative = get_employees(10, 3)
    print(f"Employees per store: {employees_per_store}")
    print(f"Cumulative employee allocation: {employees_per_store_cumulative}")
    """

    # Generate random Id boundaries for employees in each store. Start at 2nd element and finish at 2nd last element, with a step of 2 to prevent any stores with a single employee.
    employees_per_store_id_bounds = sorted(random.sample(
        range(2, unique_employees-1, 2), unique_stores))

    # print(f"id bounds {employees_per_store_id_bounds} \n")

    initialise = 0
    employees_per_store_cumulative = []

    # Use the Ids to produce nested list of ids per each store
    for store in employees_per_store_id_bounds:
        if store == max(employees_per_store_id_bounds):
            store = unique_employees
        employees_per_store_cumulative.append(
            list(range(initialise, store)))
        initialise = store
    print(f"employees:{employees_per_store_cumulative} \n")

    # Return the number of employees per store
    employees_per_store = [len(i) for i in employees_per_store_cumulative]
    print(f"employees per store {employees_per_store}")
    return employees_per_store, employees_per_store_cumulative

employees_per_store, employees_per_store_cumulative = get_employees(unique_employees,unique_stores)

In [0]:
# Define the UDF
def employees(store_id,employees_per_store_cumulative):
    employee_id = random.choice(employees_per_store_cumulative[store_id])
    return employee_id

employees_udf = F.udf(employees, IntegerType())

In [0]:
df2 = df2.withColumn("EmployeeId", employees_udf(df2["StoreId"], F.lit(employees_per_store_cumulative)))
display(df2)

In [0]:
print([(i, j) for i, j in enumerate(employees_per_store)])

In [0]:
# Total number of employees
num_employees = df2.select(F.countDistinct('EmployeeId')).take(1)[0][0]
print(f"Number employees recording a sale: {num_employees} vs total number of employees: {unique_employees}")

if num_employees <= unique_employees:
    print("Number of employees insert fits logical constraints")

In [0]:
def get_customers(unique_customers: int, unique_stores: int) -> tuple([list,list]):
    """
    Generate random allocation of customers to stores.

    Parameters:
    - unique_customers (int): The total number of unique customers available for allocation.
    - unique_stores (int): The number of unique stores where customers need to be allocated.

    Returns:
    - customers_per_store (list): A list containing the number of customers assigned to each store.
    - customers_per_store_cumulative (list): A list of lists representing the cumulative employee
      allocation for each store.

    The function uses random sampling to generate distinct bounds on the employee Ids for each store, ensuring Ids are assigned individually. It then creates a cumulative list of employee Ids for each store based
    on the generated bounds.

    Example usage:
    customers_per_store, customers_per_store_cumulative = get_customers(10, 3)
    print(f"customers per store: {customers_per_store}")
    print(f"Cumulative employee allocation: {customers_per_store_cumulative}")
    """

    # Generate random Id boundaries for customers in each store. Start at 2nd element and finish at 2nd last element, with a step of 2 to prevent any stores with a single employee.
    customers_per_store_id_bounds = sorted(random.sample(
        range(2, unique_customers-1, 2), unique_stores))

    # print(f"id bounds {customers_per_store_id_bounds} \n")

    initialise = 0
    customers_per_store_cumulative = []

    # Use the Ids to produce nested list of ids per each store
    for store in customers_per_store_id_bounds:
        if store == max(customers_per_store_id_bounds):
            store = unique_customers
        customers_per_store_cumulative.append(
            list(range(initialise, store)))
        initialise = store
    print(f"customers:{customers_per_store_cumulative} \n")

    # Return the number of customers per store
    customers_per_store = [len(i) for i in customers_per_store_cumulative]
    print(f"customers per store {customers_per_store}")
    return customers_per_store, customers_per_store_cumulative

customers_per_store, customers_per_store_cumulative = get_customers(unique_customers,unique_stores)

In [0]:
def customers(store_id: int, customers_per_store_cumulative: list()) -> int:
    customer_id = random.choice(customers_per_store_cumulative[store_id])
    return customer_id

customers_udf = F.udf(customers, IntegerType())

df2 = df2.withColumn("CustomerId", customers_udf(df2["StoreId"],F.lit(customers_per_store_cumulative)))
display(df2)

In [0]:
print([(i, j) for i, j in enumerate(customers_per_store)])

In [0]:
df3_data_generator = (
    dg.DataGenerator(spark, name="test_data_set3", rows=row_count,
                     partitions=partitions_requested, randomSeedMethod="hash_fieldname", verbose=True)
    .withIdOutput()
    .withColumn("CustomerId", IntegerType(), minValue=0, maxValue=unique_customers-1, random=True)
    .withColumn("Timestamp", TimestampType(), begin="2013-01-01 01:00:00", end="2023-10-31 23:59:00", interval="1 hour", random=True)
    .withColumn("Amount", DecimalType(5, 2), expr="((rand() + 0.1) * 350)")
)

df3 = df3_data_generator.build()

display(df3)

In [0]:
def stores(customer_id: int, customers_per_store_cumulative: list(), stores: list()) -> int:
    for customer_ids, store_id in zip(customers_per_store_cumulative, stores):
        if customer_id in customer_ids:
            return store_id
        
stores_udf = F.udf(stores, IntegerType())

stores = list(range(0,unique_stores))

df3 = df3.withColumn("StoreId", stores_udf(df3["CustomerId"],F.lit(customers_per_store_cumulative),F.lit(stores)))
df3 = df3.withColumn("EmployeeId", employees_udf(df3["StoreId"], F.lit(employees_per_store_cumulative)))

display(df3)

In [0]:
weight = 90
remainder_weight = (100 - weight) / (unique_stores - 1)
customers_list = list(range(unique_stores))

def customers_weighted(store_id: int, weight: int, remainder_weight: int, customers_list: list(), customers_per_store_cumulative: list()) -> int:
    weights = [weight if i == store_id else (1 * remainder_weight) for i in customers_list]
    store_id_randomised = random.choices(customers_list, weights=weights, k=1)[0]
    # customer_id = customers_per_store_cumulative[0]
    customer_id = random.choice(customers_per_store_cumulative[store_id_randomised])
    return customer_id

customers_randomised_udf = F.udf(customers_weighted, IntegerType())

df2 = df2.withColumn("CustomerId", customers_randomised_udf(df2["StoreId"],F.lit(weight),F.lit(remainder_weight),F.lit(customers_list),F.lit(customers_per_store_cumulative)))
display(df2)

In [0]:
weight = 90
remainder_weight = (100 - weight) / (unique_stores - 1)
stores = list(range(0,unique_stores))

def stores_weighted(customer_id: int, weight: int, remainder_weight: int, customers_per_store_cumulative: list(), stores: list()) -> int:
    for customer_ids, store_id in zip(customers_per_store_cumulative, stores):
        if customer_id in customer_ids:
            weights = [weight if i == store_id else (1 * remainder_weight) for i in customers_list]
            store_id_randomised = random.choices(customers_list, weights=weights, k=1)[0]
            return store_id_randomised
        
stores_randomised_udf = F.udf(stores_weighted, IntegerType())

stores = list(range(0,unique_stores))

df3 = df3.withColumn("StoreId", stores_udf(df3["CustomerId"],F.lit(customers_per_store_cumulative),F.lit(stores)))
df3 = df3.withColumn("EmployeeId", employees_udf(df3["StoreId"], F.lit(employees_per_store_cumulative)))

display(df3)