In [1]:
# Import libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from random import random, randint

In [2]:
# Declare global variables 
str_filename_generator = "data_generator.xlsx"

# A simple dictionary to help with Serial Number generation later, to put a different prefix based on product range. 
dict_sn_prefix = {
    "10G_": "L", 
    "25G_": "M", 
    "100G_": "H", 
    "400G_": "U"
}

# A simple dictionary for an average duration (in seconds) when proceeding between retests and between tests. 
dict_avg_interval = {
    "1st Retest": 600,  
    "2nd Retest": 3600, 
    "Between Tests": 10800
}

In [3]:
# Import data from generator Excel file. 
df_gen_prodqty = pd.read_excel(str_filename_generator, sheet_name = "Prod_FreshQty")
df_gen_testlist = pd.read_excel(str_filename_generator, sheet_name = "TestList")
df_gen_machlist = pd.read_excel(str_filename_generator, sheet_name = "MachineList")
df_gen_calendar = pd.read_excel(str_filename_generator, sheet_name = "dbo.Calendar")

print(df_gen_prodqty.shape)
print(df_gen_testlist.shape)
print(df_gen_machlist.shape)
print(df_gen_calendar.shape)

(12, 2)
(82, 5)
(42, 3)
(13, 3)


In [4]:
# Create a df for weekly fresh quantity for each products. 
df_prod_weekly_qty = df_gen_prodqty.merge(right = df_gen_calendar, how = "cross")
# Generate a random number for the fresh quantity, by using normal distribution with the given mean & std dev = 0.1*mean
df_prod_weekly_qty["FreshQty"] = np.random.normal(df_prod_weekly_qty["MeanQty"], df_prod_weekly_qty["MeanQty"]*0.1)
df_prod_weekly_qty["FreshQty"] = df_prod_weekly_qty["FreshQty"].astype("int")

display(df_prod_weekly_qty.head(5))

Unnamed: 0,ProductName,MeanQty,Week,StartDate,EndDate,FreshQty
0,100G_A1,1000,W01,2024-01-01 07:00:00,2024-01-08 06:59:59,1021
1,100G_A1,1000,W02,2024-01-08 07:00:00,2024-01-15 06:59:59,990
2,100G_A1,1000,W03,2024-01-15 07:00:00,2024-01-22 06:59:59,755
3,100G_A1,1000,W04,2024-01-22 07:00:00,2024-01-29 06:59:59,1028
4,100G_A1,1000,W05,2024-01-29 07:00:00,2024-02-05 06:59:59,786


In [5]:
# Generate a full module list based on the FreshQty
df_module_list = df_prod_weekly_qty.loc[np.repeat(df_prod_weekly_qty.index.values, df_prod_weekly_qty["FreshQty"])]
df_module_list = df_module_list.reset_index(drop=True)

print(df_module_list.shape)

(79435, 6)


In [6]:
# Generate a random timestamp for each row. 
for i in range(0, len(df_module_list)):
    # Get the unix timestamp for both start & end date. 
    temp_startdate_unix = datetime.timestamp(df_module_list.at[i, "StartDate"]) * 1000
    temp_enddate_unix = datetime.timestamp(df_module_list.at[i, "EndDate"]) * 1000

    # Generate a random unix timestamp between the start & end date, and convert it back into DateTime and add into the df. 
    temp_random_unix = randint(temp_startdate_unix, temp_enddate_unix)
    temp_random_datetime = datetime.fromtimestamp(temp_random_unix/1000)
    df_module_list.at[i, "TestTimeStamp"] = temp_random_datetime

In [7]:
# Drop columns that are no longer required. At this point only the ProductName and TestTimeStamp is required. 
df_module_list = df_module_list[["ProductName", "TestTimeStamp"]]

# Sort by timestamp and reset index to shuffle the records, in order to generate running serial number based on timestamp. 
df_module_list = df_module_list.sort_values(by = ["TestTimeStamp"], ascending = [True]).reset_index(drop = True)

In [8]:
df_module_list

Unnamed: 0,ProductName,TestTimeStamp
0,10G_A1,2024-01-01 07:01:04.939
1,10G_A2,2024-01-01 07:02:17.268
2,100G_B2,2024-01-01 07:02:26.458
3,10G_A1,2024-01-01 07:03:00.429
4,10G_A1,2024-01-01 07:03:39.104
...,...,...
79430,10G_A2,2024-04-01 06:51:01.982
79431,10G_B1,2024-04-01 06:51:43.968
79432,400G_A2,2024-04-01 06:52:39.651
79433,25G_B1,2024-04-01 06:53:19.864


In [9]:
# Loop through each product range to generate SNs with different prefixes and rolling numbers. 
list_module_with_sn = []  # Initialize an empty list to store resulting dfs from the loop. 
for key, val in dict_sn_prefix.items():
    df_temp = df_module_list[df_module_list["ProductName"].str.startswith(key)].reset_index(drop = True)

    # Generate a random int to be used for the starting SN. 
    i_sn_int = randint(1,1000)

    # Loop through each row to add the SN into the column. 
    for i in range(0, len(df_temp)):
        i_sn_str = val + str(i_sn_int).zfill(6)  # Final SN will be the prefix plus 6 rolling digits. 
        df_temp.at[i, "SerialNum"] = i_sn_str
        i_sn_int += 1

    list_module_with_sn.append(df_temp)

df_module_sn = pd.concat(list_module_with_sn).reset_index(drop = True)

In [10]:
df_module_sn

Unnamed: 0,ProductName,TestTimeStamp,SerialNum
0,10G_A1,2024-01-01 07:01:04.939,L000192
1,10G_A2,2024-01-01 07:02:17.268,L000193
2,10G_A1,2024-01-01 07:03:00.429,L000194
3,10G_A1,2024-01-01 07:03:39.104,L000195
4,10G_A1,2024-01-01 07:05:07.944,L000196
...,...,...,...
79430,400G_A1,2024-04-01 05:47:28.924,U004826
79431,400G_A2,2024-04-01 06:24:52.287,U004827
79432,400G_A2,2024-04-01 06:25:36.961,U004828
79433,400G_A2,2024-04-01 06:34:03.607,U004829


In [11]:
i = 0 
list_dfs = []  # To store all dfs generated through the loop. 

# Loop through each SerialNum to generate a list of test records for that module. 
for i in range(0, len(df_module_sn)):
    # Create an empty temp df for this SN. 
    df_temp = pd.DataFrame()
    
    # Get the existing information from the SN df first. 
    temp_sn = df_module_sn.at[i, "SerialNum"]
    temp_prodname = df_module_sn.at[i, "ProductName"]
    temp_ts = df_module_sn.at[i, "TestTimeStamp"]  # Note that this will be replaced later throughout the test / retest processes. 
    
    # Get the test list for the product. 
    df_temp_testlist = df_gen_testlist[df_gen_testlist["ProductName"] == temp_prodname].sort_values(by = ["Sequence"]).reset_index(drop = True)
    
    # Loop through each tests to generate test / retest data for that test. 
    for j in range(0, len(df_temp_testlist)):
        # Get existing information regarding the test. 
        temp_testname = df_temp_testlist.at[j, "TestName"]
        temp_passrate = df_temp_testlist.at[j, "PassingRate"]
        temp_mean_test_duration = df_temp_testlist.at[j, "Mean_TestDuration_s"]
        
        # Get the list of MachineID and their usage probabilities for this test. 
        df_temp_machlist = df_gen_machlist[df_gen_machlist["TestName"] == temp_testname]
        list_temp_machine_id = list(df_temp_machlist["MachineID"])
        list_temp_usage_prob = list(df_temp_machlist["UsageProbability"])
    
        # If the test failed, perform a maximum of 2 retest (means total 3 tests). 
        count_retest = 0
        temp_testresult = "FAIL"
        while ((temp_testresult != "PASS") and (count_retest < 3)):
            # Generate a TestDuration_s, TestResult (pass or fail) and MachineID based on random numbers. 
            temp_test_duration = np.random.normal(temp_mean_test_duration, temp_mean_test_duration*0.1)
            temp_testresult = ("PASS" if random() < temp_passrate else "FAIL")
            temp_machine_id = np.random.choice(list_temp_machine_id, p = list_temp_usage_prob)
            
            # If the TestResult is FAIL, randomly reduce the TestDuration_s by 10%-90%. 
            if temp_testresult != "PASS":
                temp_test_duration = temp_test_duration * randint(1000, 9000)/10000
                count_retest += 1
                temp_passrate = temp_passrate - 0.12  # Some adjustment for modules that failed first test to be more likely to fail retests
            
            # Append this record to the df_temp for this SN. 
            temp_rownum = len(df_temp)
            df_temp.at[temp_rownum, "SerialNum"] = temp_sn
            df_temp.at[temp_rownum, "ProductName"] = temp_prodname
            df_temp.at[temp_rownum, "TestName"] = temp_testname
            df_temp.at[temp_rownum, "MachineID"] = temp_machine_id
            df_temp.at[temp_rownum, "TestTimeStamp"] = temp_ts
            df_temp.at[temp_rownum, "TestResult"] = temp_testresult
            df_temp.at[temp_rownum, "TestDuration_s"] = temp_test_duration
    
            # Generate a random duration that lapsed after this test, depends on whether it is proceeding to retest or next test
            if temp_testresult == "PASS":
                temp_dict_key = "Between Tests"
            elif count_retest == 1:
                temp_dict_key = "1st Retest"
            else: # Max 2 retests
                temp_dict_key = "2nd Retest"
            temp_random_interval = np.random.normal(dict_avg_interval[temp_dict_key], dict_avg_interval[temp_dict_key]*0.1)
            # Update the timestamp for this SN. 
            temp_ts = temp_ts + timedelta(seconds = (temp_test_duration + temp_random_interval))

        # If an SN failed 3 times in the same test, directly stop the process without moving to the next test. 
        if ((temp_testresult != "PASS") and (count_retest >= 3)):
            break
    
    # Adding the df for this SN into the list of dfs. 
    list_dfs.append(df_temp)

# Combine the data of all dfs into one. 
df_master = pd.concat(list_dfs)
print(df_master.shape)

(630649, 7)


In [13]:
df_master.to_csv("generated_raw_data.csv", index = False)