In [76]:
import pandas as pd
import numpy as np
import csv
import os

## Simulation Process

To simulate incremental data ingestion into a logical data warehouse, the dataset will be split and processed in two phases:
- A full initial load
- Daily incremental loads that represent future data updates and inserts

**Phase 1: Splitting Old and New Data**

The cutoff date is December 31, 2019. Any record with a `ValidFrom` on or before 2019-12-31 is considered historical and will be included in the initial full load. Records with a `ValidFrom` after 2019-12-31 represent new data that will be incrementally ingested.

Although all source data exists prior to the current date, for simulation purposes, dates from January 1, 2020, onward will be treated as future incremental loads.

Steps:
- Identify dimension tables that contain records spanning both before and after the cutoff date
- Split these tables into: initial "old" data and '"new" data
- For customers and suppliers tables, merge their new data, if any, with the corresponding simulated update tables (described below)
- Partition all "new" data by `ValidFrom`, producing one partition (folder/file) per date. Each partition represents the data arriving on that day, to be ingested sequentially

**Phase 2: Simulating data updates for existing records**

Customers:
- Randomly select 200 existing customers (from the "old" dataset).
- Update the following fields:
    - `CreditLimit`
    - `StandardDiscountPercentage`
    - `ValidFrom`
- The new `ValidFrom` date is randomly assigned between 2020-01-01 and 2021-04-17 (the end of the available data)
- These updated records are then partitioned by their new  `ValidFrom` date for daily ingestion

Suppliers:
- Randomly select a set of existing suppliers (sample size and set can vary for each field generation).
- Update the following fields:
    - `PaymentDays`
    - `BankAccountName`
    - `PhoneNumber`
    - `ValidFrom`
- The `ValidFrom` date is again randomly selected within 2020-01-01 to 2021-04-17.
- Updated supplier records are also partitioned daily by `ValidFrom` for ingestion.


In [None]:
# load in tables with validfrom values potentially after 12/31/2019, the last day of old data in simulation:
csv.field_size_limit(10_000_000)
all_customers = pd.read_csv(
    "Downloaded_Initial/Sales_Customers/Sales_Customers.csv",
    delimiter="|",
    quoting=3,  # 3 = csv.QUOTE_NONE = keep all "" marks as literal
    engine="python",
    escapechar='\\'  # optional, just in case
)

all_cities = pd.read_csv(
    "Downloaded_Initial/Application_Cities/Application_Cities.csv",
    delimiter="|",
    quoting=3,
    engine="python",
    escapechar='\\'
)

all_countries = pd.read_csv(
    "Downloaded_Initial/Application_Countries/Application_Countries.csv",
    delimiter="|",
    quoting=3,
    engine="python",
    escapechar='\\'
)

all_stateprov = pd.read_csv(
    "Downloaded_Initial/Application_StateProvinces/Application_StateProvinces.csv",
    delimiter="|",
    quoting=3,
    engine="python",
    escapechar='\\'
)

# load in suppliers
all_suppliers = pd.read_csv(
    "Downloaded_Initial/Purchasing_Suppliers/Purchasing_Suppliers.csv",
    delimiter="|",
    quoting=3,
    engine="python",
    escapechar='\\'
)

# load in stock items
stock_items = pd.read_csv(
    'Downloaded_Initial/Warehouse_StockItems/Warehouse_StockItems.csv',
    delimiter="|",
    quoting=3, 
    engine="python"
)


In [100]:
# find tables that need new, old splits
df_tables = {'all_customers': all_customers, 'all_cities': all_cities,
             'all_countries': all_countries, 'all_stateprov': all_stateprov,
             'all_suppliers': all_suppliers}
for name, table, in df_tables.items():
    print(f"{name}:{len(table[table['ValidFrom'] > '2019-12-31'])}")

all_customers:40
all_cities:17
all_countries:5
all_stateprov:8
all_suppliers:0


In [104]:
# split new and old customers
new_customers = all_customers[all_customers['ValidFrom'] > '2019-12-31'].sort_values('CustomerID').reset_index(drop=True)
old_customers = all_customers[all_customers['ValidFrom'] <= '2019-12-31'].sort_values('CustomerID').reset_index(drop=True)

# split new and old cities
new_cities = all_cities[all_cities['ValidFrom'] > '2019-12-31'].sort_values('CityID').reset_index(drop=True)
old_cities = all_cities[all_cities['ValidFrom'] <= '2019-12-31'].sort_values('CityID').reset_index(drop=True)

# split new and old countries
new_countries = all_countries[all_countries['ValidFrom'] > '2019-12-31'].sort_values('CountryID').reset_index(drop=True)
new_countries.drop(columns=['Border'], inplace=True) # dropping unneeded long geo border polygon column
old_countries = all_countries[all_countries['ValidFrom'] <= '2019-12-31'].sort_values('CountryID').reset_index(drop=True)
old_countries.drop(columns=['Border'], inplace=True)

# split new and old state provinces
new_stateprov = all_stateprov[all_stateprov['ValidFrom'] > '2019-12-31'].sort_values('StateProvinceID').reset_index(drop=True)
new_stateprov.drop(columns=['Border'], inplace=True)
old_stateprov = all_stateprov[all_stateprov['ValidFrom'] <= '2019-12-31'].sort_values('StateProvinceID').reset_index(drop=True)
old_stateprov.drop(columns=['Border'], inplace=True)

# minor cleaning of stock items table
stock_items['StockItemName'] = stock_items['StockItemName'].str.replace('"', '').str.replace('\\', '')
stock_items['SearchDetails'] = stock_items['SearchDetails'].str.replace('"', '').str.replace('\\', '')

In [None]:
# generate random set of updates to occur during new period for existing customers:
np.random.seed(42)

sample_size = 200
update_customers = old_customers.sample(n=200).copy()

# function to pick a new value for each row of provided column; new values are picked at random from the column's existing set of values
def random_choice_from_column(s, n=sample_size):
    unique_values = s.unique()
    return np.random.choice(unique_values, size=n)

# simulate updates to credit limit
update_customers['CreditLimit'] = random_choice_from_column(old_customers['CreditLimit'])

# simulate updates standard discount %
update_customers['StandardDiscountPercentage'] = random_choice_from_column(old_customers['StandardDiscountPercentage'])

# simulate updates to valid from; will serve as the date of the update
sim_start = pd.Timestamp('2020-01-01')
sim_end = pd.Timestamp('2021-04-17')
update_customers['ValidFrom'] = pd.to_datetime(np.random.randint(sim_start.value // 10**9, sim_end.value / 10 ** 9, sample_size),
                                               unit='s')
update_customers = update_customers.reset_index(drop=True)

In [81]:
# generate random updates to occur during new period for existing suppliers:
np.random.seed(42)
bank_updates = all_suppliers.copy()

# simulate updates to bank account name; about half of suppliers will have a change during the simulation
bank_mask = np.random.choice([True, False], size=len(bank_updates))

update_bank_names = ['Alpha', 'Beta', 'Delta', 'Gamma', 'Bronze', 'Silver', 'Gold', 'Platinum']
bank_updates = bank_updates[bank_mask]
bank_updates['BankAccountName'] = bank_updates['BankAccountName'] + ' ' + np.random.choice(update_bank_names,
                                                                                           size=len(bank_updates))
bank_updates['BankAccountName'] = bank_updates['BankAccountName'].str.replace('"', '')
bank_updates['BankAccountName'] = '"' + bank_updates['BankAccountName'] + '"'

# function to generate random phone numbers
def fake_phone_numbers():
    return f'({np.random.randint(200, 999)}) {np.random.randint(100, 999)}-{np.random.randint(1000, 9993)}'

# simulate updates to phone number on another random set of suppliers
np.random.seed(41)
phone_updates = all_suppliers.copy()
phone_mask = np.random.choice([True, False], size=len(phone_updates))
phone_updates = phone_updates[phone_mask]

phone_updates['PhoneNumber'] = [fake_phone_numbers() for i in range(phone_mask.sum())]

# simulate updates to payment days on another random set of suppliers
np.random.seed(40)
pday_updates = all_suppliers.copy()
pday_mask = np.random.choice([True, False], size=len(pday_updates))
pday_updates = pday_updates[pday_mask]

pday_updates['PaymentDays'] = random_choice_from_column(all_suppliers['PaymentDays'], pday_mask.sum())

# combine update tables + simulate updates to valid from
update_suppliers = pd.concat([bank_updates, phone_updates, pday_updates])
update_suppliers['ValidFrom'] = pd.to_datetime(np.random.randint(sim_start.value // 10**9, sim_end.value / 10 ** 9, len(update_suppliers)),
                                               unit='s')

In [82]:
# combine new and update tables for customers, no new data for suppliers
new_update_customers = pd.concat([update_customers, new_customers])        

In [98]:
# partition new data into different folders and files by validfrom and dimension they represent
change_dates = set()
for df in [update_suppliers, new_update_customers, new_stateprov, new_cities, new_countries]:
   change_dates |= set(pd.to_datetime(df['ValidFrom']).dt.date)

change_dates = sorted(change_dates)

change_tables = {'Purchasing_Suppliers': update_suppliers, 'Application_Cities': new_cities,
                 'Application_Countries': new_countries, 'Application_StateProvinces': new_stateprov,
                 'Sales_Customers': new_update_customers}


for date in change_dates:
   date_path = f"Changed_Data/{date.strftime('%Y-%m-%d')}"
   os.makedirs(date_path, exist_ok=True)

   for name, table in change_tables.items():
      daily_load = table[pd.to_datetime(table['ValidFrom']).dt.date == date]

      if not daily_load.empty:
         category_path = os.path.join(date_path, name)
         os.makedirs(category_path, exist_ok=True)

         file_path = os.path.join(category_path, f"{name}_{date.strftime('%Y-%m-%d')}.csv")
         daily_load.to_csv(file_path,
                      index=False,
                      sep='|',
                      quoting=csv.QUOTE_NONE,
                      escapechar=None)


# place old data into relevant directory for initial loads
old_tables = {'Application_Cities': old_cities,'Application_Countries': old_countries,
              'Application_StateProvinces': old_stateprov, 'Sales_Customers': old_customers,
              'Warehouse_StockItems': stock_items}

for name, table in old_tables.items():
   path = f'Simulation_Initial_Dims/{name}'
   os.makedirs(path, exist_ok=True)

   file_path = os.path.join(path, f"{name}.csv")
   table.to_csv(file_path,
                index=False,
                sep='|',
                quoting=csv.QUOTE_NONE,
                escapechar=None)
   