In [1]:
import pandas as pd
import numpy as np
import statistics as stat

#--- Functions ---#
#-- Cleaning up source data --#
def clean_sd(path,ec):
    cd = []
    with open(path,'r') as f:
        for line in f:
            columns = line.strip().split('\t')
            if len(columns)<ec:
                columns += [''] * (ec - len(columns))
            elif len(columns)<ec:
                columns = columns[:2] + [' '.join(columns[2:])]
            
            cd.append(columns)

    return cd

def complete_sd(path,cn):
    """
    Completes source data by adding arrival events to corresponding workstations
    """
    sd = []
    WS = ["W1","W2","W3"]
    with open(path,'r') as f:
        for line in f:
            columns = line.strip().split('\t')
            #Translate creation event into arrival at WS 1
            if columns[2] == "Created":
                columns[2] = WS[0]
                columns.append("A")
                sd.append(columns)
            
            elif columns[3] == "D":
                sd.append(columns)
                W_departing = WS.index(columns[2])

                if W_departing < 2:
                    new_entry = [None]*4
                    new_entry[:2] = columns[:2]
                    new_entry[2:] = [WS[W_departing+1],"A"]
                    sd.append(new_entry)
    
    sd_c = pd.DataFrame(sd,columns=cn)
    sd_c["time"] = pd.to_numeric(sd_c["time"])
    return sd_c

def split_sd(sd_c,ws):
    """
    Devide completed source data per workstation
    """
    sd_s = sd_c.loc[sd_c.workstation == ws].copy()
    sd_s = sd_s.drop("workstation",axis=1)
    return sd_s

def get_arrival(sd_c,hb):
    """
    Determines the mean, standard deviation and coefficient of covariance for lots and batches (if applicable)
    Returns the batch structure if applicable
    """
    # select arrival data and determine the difference between arrivals
    Lot_dT = sd_c[sd_c['event'] == 'A']['time'].diff()
    # Determine mean, standard deviation and coefficient of covariance
    Lot_mean = Lot_dT.mean()
    Lot_std = Lot_dT.std()
    Lot_CoV = Lot_std/Lot_mean
    
    # handle as batch
    if hb:
        # build batch data structure
        Batch = build_batch_view(sd_c)
        # select arrival data and determine the difference between arrivals
        Batch_dT = Batch[Batch['event'] == 'A']['time'].diff()
        Batch_mean = Batch_dT.mean()
        Batch_std = Batch_dT.std()
        Batch_CoV = Batch_std/Batch_mean
    else:
        Batch = None
        Batch_mean = None
        Batch_std = None
        Batch_CoV = None

    result = pd.DataFrame({
        "Workstation": ["Batch", "Lot"],
        "Mean": [Batch_mean, Lot_mean],
        "Std": [Batch_std, Lot_std],
        "CoV": [Batch_CoV, Lot_CoV]})

    # print(result)
    return result, Batch

def get_distribution(EPT_r):
    """
    Determines the EPT mean, standard deviation and coefficient of variance
    Additionally the amount of realizations is determined for reporting purpouses
    """
    # Group data by WIP levels and determine the count, mean and standard deviation for each WIP level
    EPT_distribution = EPT_r.groupby('sw')['EPT'].agg(['count','mean','std'])
    # Determine the covariance: standard deviation / mean
    EPT_distribution['CoV'] = EPT_distribution['std'] / EPT_distribution['mean']

    # Group data by Number of lots in the buffer and determine the count, mean and standard deviation for each WIP level
    OT_distribution = EPT_r.groupby('aw')['k'].agg(['count','mean','std'])
    # Determine the covariance: standard deviation / mean
    OT_distribution['CoV'] = OT_distribution['std'] / OT_distribution['mean']
    
    #print(EPT_distribution)
    #print(OT_distribution)
    return EPT_distribution, OT_distribution

#-- EPT calculations --#
def get_ept_data(df):
    def detOvert(xs, i):
        ys = []
        while xs:
            j, aw = xs[0]       # head(xs)
            xs = xs[1:]         # tail(xs)
            if j < i:
                ys.append((j, aw))
            elif j == i:
                return ys + xs, len(ys), aw 
        raise ValueError(f"Lot {i} not found in system")

    initial_wip = 0
    non_tracked_lots = list()  # lots not tracked by the system

    # count initial WIP 
    for lot in df.lot.unique():
        if not df.loc[df.lot == lot, "event"].str.contains("A").any():
            initial_wip += 1
            non_tracked_lots.append(lot)

    # prefil xs with initial WIP
    xs = [(lot, initial_wip) for lot in non_tracked_lots] #state of system,
    s, sw = None, None          #  EPT start time, WIP at start
    records = []                        # output rows

    for τ, i, ev, *_ in df.itertuples(index=False, name=None):

        if ev.upper() == "A":           
            if not xs:                  # empty system -> new EPT
                s, sw = τ, 1
            xs.append((i, len(xs)))     # len(xs) is WIP before this arrival

        elif ev.upper() == "D":         

            xs, k, aw = detOvert(xs, i)

            if i not in non_tracked_lots:
                # determine EPT
                ept = τ - s
                # save record
                records.append(dict(lot=i, EPT=ept, sw=sw, k=k, aw=aw))

            # start a new EPT if system still contains lots
            if xs:
                s, sw = τ, len(xs)

        else:
            raise ValueError(f"Unknown event type '{ev}'")
    results = pd.DataFrame(records)
    return results

def build_batch_view(df: pd.DataFrame):
    # Simultaneous departures are a batch
    dep = df[df.event.str.upper() == "D"].copy()
    dep["group"] = dep.groupby("time").ngroup()

    batches = dep.groupby("group").filter(lambda g: len(g) > 0).copy()

    # create a batch id for each group
    group2virtual = {g: f"B{n}" for n, g in enumerate(sorted(batches.group.unique()))}
    batches["virtual"] = batches.group.map(group2virtual)

    lot2batch  = dict(zip(batches.lot, batches.virtual))

    # arrival = latest arrival of any member in the batch
    arrivals = (
        df[df.event.str.upper() == "A"]
          .loc[df.lot.isin(lot2batch)]                 # only batched lots
          .assign(virtual=lambda d: d.lot.map(lot2batch))
          .groupby("virtual", as_index=False)
          .agg(time=("time", "max"),
               lot_ids=("lot", lambda x: list(x)))     # collect ids
          .assign(event="A", lot=lambda d: d.virtual)  # rename columns
          .drop(columns="virtual")
    )

    # departure = common departure time 
    departures = (
        batches.groupby("virtual", as_index=False)
               .agg(time=("time", "first"),
                    lot_ids=("lot", lambda x: list(x)))
               .assign(event="D", lot=lambda d: d.virtual)
               .drop(columns="virtual")
    )

    drop_lots = set(lot2batch)                       # originals replaced by batch rows

    df_batch = (
        pd.concat([df, arrivals, departures], ignore_index=True)
          .loc[lambda d: ~d["lot"].isin(drop_lots)]  
          .sort_values("time")
          .reset_index(drop=True)
    )

    return df_batch


#--- Take Home solver ---#
#-- variables --#
# source data
sd_path = 'group07.txt'
expected_nr_columns = 4
column_names = ['time', 'lot', "workstation", 'event']

#-- Load assignment data --#
sd_completed = complete_sd(sd_path,column_names)
sd_W1 = split_sd(sd_completed,"W1")
sd_W2 = split_sd(sd_completed,"W2")
sd_W3 = split_sd(sd_completed,"W3")

Arival_W1, _ = get_arrival(sd_W1,False)
Arival_W2, _ = get_arrival(sd_W2,False)
Arival_W3, _ = get_arrival(sd_W3,False)
Arival_W2, Batch_W2 = get_arrival(sd_W2,True)

EPT_W1r = get_ept_data(sd_W1)
EPT_W2r = get_ept_data(Batch_W2)
EPT_W3r = get_ept_data(sd_W3)

[EPTd_W1, OTd_W1] = get_distribution(EPT_W1r)
[EPTd_W2, OTd_W2] = get_distribution(EPT_W2r)
[EPTd_W3, OTd_W3] = get_distribution(EPT_W3r)

sd_completed = complete_sd(sd_path,column_names)
sd_W1 = split_sd(sd_completed,"W1")
Arival_W1, _ = get_arrival(sd_W1,False)
EPT_W1r = get_ept_data(sd_W1)
[EPTd_W1, OTd_W1] = get_distribution(EPT_W1r)



In [2]:
print(EPT_W1r)
print(EPT_W2r)
print(EPT_W3r)

           lot  EPT  sw  k  aw
0     lot00994   76   1  0   0
1     lot00995   30   2  0   1
2     lot00997   80   1  1   1
3     lot00996  179   1  0   2
4     lot00998   46   3  0   1
...        ...  ...  .. ..  ..
9991  lot10985  125   2  1   3
9992  lot10983   18   4  0   1
9993  lot10987   13   3  0   2
9994  lot10989   29   2  1   4
9995  lot10988   11   1  0   3

[9996 rows x 5 columns]
      lot  EPT  sw  k  aw
0      B1  631   1  0   0
1      B2  548   1  0   0
2      B3  510   1  0   0
3      B4  596   1  0   0
4      B5  527   1  0   0
..    ...  ...  .. ..  ..
993  B994  604   1  0   0
994  B995  605   1  0   1
995  B996  549   1  0   1
996  B997  547   1  0   0
997  B998  444   1  0   0

[998 rows x 5 columns]
           lot  EPT  sw  k  aw
0     lot00978   27   1  0   0
1     lot00983   31   9  0   1
2     lot00984   41   8  0   2
3     lot00982   28   7  0   3
4     lot00985   23   6  0   4
...        ...  ...  .. ..  ..
9985  lot10968   29   5  0   5
9986  lot10969   26

In [25]:
import math

def compute_ept_data(EPT_W1r, EPT_W2r, EPT_W3r, Batch_W2):
    return {
        'W1_wip3plus': EPT_W1r[EPT_W1r['aw'] >= 3]['EPT'].mean(),
        'W2_all': EPT_W2r['EPT'].mean(),
        'W3_all': EPT_W3r['EPT'].mean(),
        'c_e2_W1': EPT_W1r[EPT_W1r['aw'] >= 3]['EPT'].var() / (EPT_W1r[EPT_W1r['aw'] >= 3]['EPT'].mean() ** 2),
        'c_e2_W2': EPT_W2r['EPT'].var() / (EPT_W2r['EPT'].mean() ** 2),
        'c_e2_W3': EPT_W3r['EPT'].var() / (EPT_W3r['EPT'].mean() ** 2),
        'W2_batch_size': len(Batch_W2['lot_ids'].explode()) / len(Batch_W2)
    }

def build_state_space_model(ept_data, sampling_time=360):
    """State-space model with delays and constraints. Follows the way they did it in Exercises 2 and 3."""
    delays = {ws: math.ceil(ept_data[f'{ws}_all' if ws != 'W1' else 'W1_wip3plus'] / sampling_time) 
              for ws in ['W1', 'W2', 'W3']}
#------------
    # Fractional effects for delays. ChatGPT added this and I am unsure if it is correct.
    fractions = {ws: min((ept_data[f'{ws}_all' if ws != 'W1' else 'W1_wip3plus'] % sampling_time) / sampling_time, 1.0) 
                 if delays[ws] > 0 else 1.0 for ws in ['W1', 'W2', 'W3']}
#------------
    d1, d2 = delays['W1'], delays['W2']
    state_dim = 3 + d1 + d2
    A = np.eye(state_dim)
    B = np.zeros((state_dim, 3))
    C = np.zeros((1, state_dim))

#------------
# Not sure if correct in general

    # WIP dynamics
    B[0, 0], B[0, 1] = 1, -1  # x1: u1 - u2
    B[1, 2] = -1              # x2: -u3
    A[1, 3 + d1 - 1] = fractions['W1']  # x2: u2(k-d1)
    A[2, 3 + d1 + d2 - 1] = fractions['W2']  # x3: u3(k-d2)

    # Delay states
    for i in range(d1 - 1):
        A[3 + i + 1, 3 + i] = 1
    for i in range(d2 - 1):
        A[3 + d1 + i + 1, 3 + d1 + i] = 1
    B[3, 1] = 1
    B[3 + d1, 2] = 1
#------------
#------------
    # Output: y = u3(k-d3)
    if delays['W3'] > 0:
        C[0, 3 + d1 + delays['W3'] - 1] = fractions['W3']
#------------

    # Constraints (Kingman's formula) -> Check slides for formula
    c_a2 = {'W1': 1.007654, 'W2': 0.352636, 'W3': 3.145749} # Taken from Arival_W1 in Take_home_1
    constraints = {}
    for ws, ept_key in [('W1', 'W1_wip3plus'), ('W2', 'W2_all'), ('W3', 'W3_all')]:
        ept = ept_data[ept_key]
        u_max = sampling_time / ept
        c = (c_a2[ws] + ept_data[f'c_e2_{ws}']) / 2
        t_e = ept / 60
        scale = ept_data['W2_batch_size'] if ws == 'W2' else 1.0 # Only prints for workstation 2
        constraints[ws] = {'u_max': u_max * scale, 'c': c * scale, 't_e': t_e}
    
    return {
        'A': A, 'B': B, 'C': C,
        'states': ['x1', 'x2', 'x3'] + [f'u2(k-{i+1})' for i in range(d1)] + [f'u3(k-{i+1})' for i in range(d2)],
        'controls': ['u1', 'u2', 'u3'],
        'output': ['y'],
        'delays': delays,
        'fractions': fractions,
        'constraints': constraints
    }

def print_model(model, ept_data):
    """Print model summary for sanity check."""
    print("EPTs (min) and c_e^2:")
    for k, v in ept_data.items():
        print(f"  {k}: {v:.2f}{' lots/batch' if k == 'W2_batch_size' else ''}")
    
    print("Delays and Fractions:")
    for ws in model['delays']:
        print(f"  {ws}: {model['delays'][ws]} periods, fraction = {model['fractions'][ws]:.4f}")
    
    print("State-Space:")
    print("States:", model['states'])
    print("Controls:", model['controls'])
    print("Output:", model['output'])
    print("A:\n", np.round(model['A'], 4))
    print("B:\n", np.round(model['B'], 4))
    print("C:\n", np.round(model['C'], 4))
    
    print("Constraints:")
    for ws, c in model['constraints'].items():
        print(f"  {ws}: 0 <= u_{ws} <= min({c['u_max']:.4f}, {c['c']:.4f} * u_{ws} * {c['t_e']:.4f} / ({c['c']:.4f} + x_{ws} * {c['t_e']:.4f}))")
        if ws != 'W1':
            print(f"        u_{ws} <= x_{'W1' if ws == 'W2' else 'W2'}")

# Execute and print everything
ept_data = compute_ept_data(EPT_W1r, EPT_W2r, EPT_W3r, Batch_W2)
model = build_state_space_model(ept_data)
print_model(model, ept_data)

EPTs (min) and c_e^2:
  W1_wip3plus: 47.56
  W2_all: 554.64
  W3_all: 36.86
  c_e2_W1: 1.00
  c_e2_W2: 0.01
  c_e2_W3: 0.99
  W2_batch_size: 9.91 lots/batch
Delays and Fractions:
  W1: 1 periods, fraction = 0.1321
  W2: 2 periods, fraction = 0.5407
  W3: 1 periods, fraction = 0.1024
State-Space:
States: ['x1', 'x2', 'x3', 'u2(k-1)', 'u3(k-1)', 'u3(k-2)']
Controls: ['u1', 'u2', 'u3']
Output: ['y']
A:
 [[1.     0.     0.     0.     0.     0.    ]
 [0.     1.     0.     0.1321 0.     0.    ]
 [0.     0.     1.     0.     0.     0.5407]
 [0.     0.     0.     1.     0.     0.    ]
 [0.     0.     0.     0.     1.     0.    ]
 [0.     0.     0.     0.     1.     1.    ]]
B:
 [[ 1. -1.  0.]
 [ 0.  0. -1.]
 [ 0.  0.  0.]
 [ 0.  1.  0.]
 [ 0.  0.  1.]
 [ 0.  0.  0.]]
C:
 [[0.     0.     0.     0.     0.1024 0.    ]]
Constraints:
  W1: 0 <= u_W1 <= min(7.5700, 1.0028 * u_W1 * 0.7926 / (1.0028 + x_W1 * 0.7926))
  W2: 0 <= u_W2 <= min(6.4347, 1.8005 * u_W2 * 9.2439 / (1.8005 + x_W2 * 9.2439))
   

In [26]:
import matplotlib.pyplot as plt

def plot_constraints_and_tangents(ept_data, model):
    """Plot WIP-dependent constraints and their tangent approximations at specified utilizations for question 2."""
    constraints = model['constraints']
    utilizations = [0, 0.35, 0.7, 0.8, 0.9, 1.0] # These are provided in the assignment.
    
    for ws in ['W1', 'W2', 'W3']:
        c = constraints[ws]['c']
        t_e = constraints[ws]['t_e']
        u_max = constraints[ws]['u_max']
        
        # WIP range for plotting
        x = np.linspace(0, 20, 1000)  # Guess this is a reasonable WIP range
        u = np.minimum(x / (c * t_e + x * t_e), u_max)
        
        # Plot constraint curve
        plt.figure(figsize=(8, 6))
        plt.plot(x, u, 'b-', label=f'{ws} Constraint')
        
        # Compute tangents
        for rho in utilizations:
            u_i = rho / t_e
            if u_i >= u_max:
                u_i = u_max
                rho = u_i * t_e  # Adjust rho if capped
            x_i = (c * u_i * t_e) / (1 - u_i * t_e) if u_i * t_e < 1 else 1 # Goes to inf if not smaller than 1
            
            # Derivative du/dx
            du_dx = c * t_e / (c * t_e + x_i * t_e) ** 2
            
            # Tangent line
            u_tangent = u_i + du_dx * (x - x_i)
            u_tangent = np.minimum(u_tangent, u_max)  # Cap at u_max. Otherwise the bounds of the plot are messy. Limiting plot range didn't work for some reason, so temp fix
            
            # Plot tangent
            plt.plot(x, u_tangent, '--', label=f'ρ={rho}')

        plt.title(f'WIP-Dependent Constraint and Tangents for {ws}')
        plt.xlabel(f'WIP (x_{ws})')
        plt.ylabel(f'Input Rate (u_{ws})')
        plt.grid(True)
        plt.legend()
        plt.savefig(f'constraint_{ws}.png')
        plt.close()

plot_constraints_and_tangents(ept_data, model)