In [283]:
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.metrics import root_mean_squared_error, r2_score
from sklearn.model_selection import validation_curve
import plotly.io as pio
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.model_selection import KFold

pio.templates.default = "plotly_white"
pcolors = px.colors.qualitative.T10
pcolors25 = px.colors.qualitative.Alphabet

In [284]:
def generate_bwu(owu):
    owu = owu.drop(["timesteps"], axis=1)
    # Input: multiindex OWU
    # Output: singleindex BWU
    for run_ix, run in owu.groupby("run"):
        if run_ix == owu.index.get_level_values('run')[0]:
            bwuindex = run.unstack(level=1)
        else:
            bwuindex = pd.concat([bwuindex, run.unstack(level=1)])
    bwu_columns = [str(bwuindex.columns.get_level_values(0)[i])+str(":")+str(bwuindex.columns.get_level_values(1)[i]) 
                   for i in range(len(bwuindex.columns.get_level_values(0)))]
    bwu = pd.DataFrame(bwuindex.to_numpy(), columns=bwu_columns)
    
    return bwu


def generate_y(bwu, return_aggr=False):
    # Input: singleindex BWU
    # Output: singleindex BWU having only target
    titer_column = [c for c in bwu.columns if c.startswith("X:Titer")]
    targets = pd.DataFrame(columns=["Y:Titer", "Y:Aggr"], index=bwu.index)

    # iterate through experiments
    for j in list(bwu.index):
        x_titer = bwu.loc[j, titer_column]
        x_prod = [0]
        x_aggr = [0]
        k_aggr = 10**-7
        for i in range(len(x_titer)):
            if i == 0:
                continue
            xt_titer = x_titer.iloc[i]
            dt_titer = x_titer.iloc[i] - x_titer.iloc[i - 1]
            x_prod.append(xt_titer)
            x_aggr.append(k_aggr * (xt_titer**2))

            dt_aggr = x_aggr[i] - x_aggr[i - 1]
            dt_prod = dt_titer - 2 * dt_aggr
            dt_aggr = k_aggr * (x_prod[i - 1] + dt_prod) ** 2

            x_aggr[i] = x_aggr[i - 1] + dt_aggr
            x_prod[i] = x_prod[i - 1] + dt_prod
        y_prod = x_prod[-1]
        y_aggr = x_aggr[-1]

        targets.loc[j, "Y:Titer"] = y_prod
        targets.loc[j, "Y:Aggr"] = y_aggr
    if return_aggr:
        target = targets["Y:Aggr"]
    else:
        target = targets["Y:Titer"]

    return pd.DataFrame(target)

In [285]:
def vip(X, model):
    # Score matrix T (latent variables), corresponding to T in the formula
    t = model.x_scores_

    # Weight matrix W, corresponding to W in the formula
    w = model.x_weights_

    # Loadings matrix Q, corresponding to c in the formula (sometimes Q is used for loadings in PLS models)
    q = model.y_loadings_

    # Number of samples (m) and number of variables (p), corresponding to the shape of X
    m, p = X.shape

    # Number of latent variables (h), corresponding to the shape of T
    _, h = t.shape

    # Initialize VIP scores array
    vips = np.zeros((p,))

    # Calculate SS(c_i t_i), the s in the formula, representing the sum of squares for the i-th latent variable
    # Here, t.T @ t is T^t * T, q.T @ q is c^t * c
    s = np.diag(t.T @ t @ q.T @ q).reshape(h, -1)

    # Calculate the total sum of SS(c_i t_i)
    total_s = np.sum(s)

    # Calculate the VIP score for each variable
    for i in range(p):
        # Calculate (w_ij / ||w_i||)^2 for each latent variable j
        # w[:,j] is the j-th column of weights, representing the weights for the j-th latent variable
        weight = np.array([(w[i, j] / np.linalg.norm(w[:, j])) ** 2 for j in range(h)])

        # Calculate the VIP score using the formula:
        # VIP_j = (k * ∑(SS(c_i t_i) * (w_ij / ||w_i||)^2) / ∑(SS(c_i t_i)))^(1/2)
        # Where k = p (number of variables)
        vips[i] = np.sqrt(p * np.sum(s.T @ weight) / total_s)

    return vips

In [286]:
def r2(y, y_pred):
    return round(r2_score(y, y_pred), 3)


def absolute_rmse(y, y_pred):
    return round(root_mean_squared_error(y, y_pred), 3)


def relative_rmse(y, y_pred):
    return round(root_mean_squared_error(y, y_pred) / np.std(np.array(y)), 3)

# Dataset

In [287]:
def read_owu_v4(file, root_path = 'dataset/datahow_2022/extrapolation/'):
    data = pd.read_csv(f'{root_path}/{file}.csv')
    owu_df = data.copy()
    num_runs = len(pd.read_csv(f'{root_path}/{file}_doe.csv'))
    owu_df.index = pd.MultiIndex.from_product(
        [list(range(num_runs)), list(range(15))], names=["run", "time"]
    )
    return owu_df

def read_doe(file, root_path= 'dataset/datahow_2022/extrapolation/'):
    data = pd.read_csv(f'{root_path}/{file}.csv', usecols=["feed_start", "feed_end", "Glc_feed_rate", "Glc_0", "VCD_0"])
    doe_df = data.copy()
    return doe_df

In [288]:
owu = read_owu_v4('owu')
doe = read_doe('owu_doe')
bwu = generate_bwu(owu)
tar = generate_y(bwu, return_aggr=False)

owu_test = read_owu_v4('owu_test')
doe_test = read_doe('owu_test_doe')
bwu_test = generate_bwu(owu_test)
tar_test = generate_y(bwu_test, return_aggr=False)

# Data-Driven Models for CQAs

- Aim of predicting the final titer

<details>
<summary>
<font size="3" color="black">
<b>PLS Introduction ⏏︎Click to open</b>
</font>
</summary>

<img src="assets/pls_explain.png" alt="Variables Type" width="1000">



PLS 模型允许我们使用更多的变量/列而不会导致模型过度拟合。

在只有一个响应变量 $ y $ 和 $ k $ 个预测变量的情况下，具有 $ h $ 个潜变量的 PLS 回归模型表达如下：

$$ X = T W^t + E$$

$$ y = U c^t + f $$

### Model Explain

- **X**：原始预测变量矩阵。
- **T**：得分矩阵（潜变量矩阵）。
- **W**：权重矩阵。
- **E**：误差矩阵。
- **y**：响应变量。
- **U**：响应变量的得分矩阵。
- **c**：回归系数向量。
- **f**：响应变量的误差向量。

PLS 模型通过找到一组新的潜变量（得分矩阵 $ T $ 和 $ U $）来解释原始变量和响应变量之间的关系，从而减少数据的维度并避免多重共线性的问题。

### VIP Scores

VIP 分数是用于衡量变量在模型中重要性的指标。对于第 $ j $ 个变量，VIP 分数计算公式如下：

$$ VIP_j = \left( k \sum_{i=1}^h \left(SS(c_i t_i) \left(\frac{w_{ij}}{||w_i||}\right)^2\right) / \sum_{k=1}^h (c_i t_i) \right)^{1/2} $$

- **VIP_j**：第 $ j $ 个变量的 VIP 分数。
- **k**：总的预测变量数。
- **h**：潜变量的数量。
- **SS(c_i t_i)**：第 $ i $ 个潜变量的平方和。
- **w_{ij}**：第 $ j $ 个变量在第 $ i $ 个潜变量中的权重。
- **||w_i||**：第 $ i $ 个潜变量权重的范数。

### VIP Usuage

- VIP 分数的平方平均值等于 1，因此“一大于一规则”通常用作变量选择的标准。即，VIP 分数大于 1 的变量被认为对模型重要，可以优先保留。

### Summary
- PLS 模型通过引入潜变量减少维度，并避免多重共线性的问题，使得我们可以使用更多的变量而不会导致模型过度拟合。
- VIP 分数则帮助我们评估每个变量在模型中的重要性，提供了一个有效的变量选择标准。

</details>

## BB-PLS1
- Black Box - Partial Least Square Model (PLS1)
- Training: $[Z, X(t = 0)] \rightarrow PLS1 \rightarrow Y_{Final}$

* Input matrix: "doe". This corresponds to the values of the manipulated process parameters for each experiment.
* Output target: "tar". This corresponds to the final value of titer at the end of each experiment. (or aggregates)
* Select the number of latent variables for the model (the maximum number of latent variables is 5, equal to the number of variables in the input matrix).

In [289]:
def fit_pls_model(
    X, y, latent_variables=5, polynomial_degree=1, interactions_only=False
):
    # Define pipeline
    include_bias = False
    if polynomial_degree == 0:
        print("Constant model for pls is not allowed!")
        latent_variables = 1
        include_bias = True
    if polynomial_degree == 1:
        latent_variables = min(latent_variables, 5)
    poly_features = PolynomialFeatures(
        degree=polynomial_degree,
        interaction_only=interactions_only,
        include_bias=include_bias,
    )

    # Normlization data
    standard_scaler = StandardScaler(with_mean=True, with_std=True)

    pls_model = PLSRegression(n_components=latent_variables, scale=True)
    pipe = Pipeline(
        [("features", poly_features), ("scaler", standard_scaler), ("model", pls_model)]
    )

    # Fit PLS model
    pipe.fit(X, y)
    return pipe


def plot_pls_model_coef(X, pls_pipeline):
    X_columns = pls_pipeline["features"].get_feature_names_out()
    X_preproc = pd.DataFrame(
        pls_pipeline["scaler"].transform(pls_pipeline["features"].transform(X)),
        columns=X_columns,
    )

    pls_vip = vip(X=X_preproc, model=pls_pipeline["model"])

    # Plot Model Inference
    fig = px.bar(
        x=list(X_columns),
        y=pls_vip.reshape(-1),
        title="VIP scores of PLS model",
        labels={"x": "Variables", "y": "Estimated VIP", "color": "p-value"},
    )
    fig.add_hline(y=1)
    fig.add_hline(y=0.8, line=dict(color="gray"))
    fig.update_layout(width=1600)
    fig.show()
    

def plot_pls_model_eval(
    y,
    y_pred,
    y_test,
    y_test_pred,
):
    # Metrics for training set
    train_r2 = r2(y, y_pred)
    train_abs_rmse = absolute_rmse(y, y_pred)
    train_rel_rmse = relative_rmse(y, y_pred)

    # Metrics for testing set
    test_r2 = r2(y_test, y_test_pred)
    test_abs_rmse = absolute_rmse(y_test, y_test_pred)
    test_rel_rmse = relative_rmse(y_test, y_test_pred)

    # Plot observed vs predicted
    fig = make_subplots(
        rows=1,
        cols=2,
        subplot_titles=(
            f"Train Set <br> R^2 = {train_r2} <br> Abs RMSE = {train_abs_rmse} <br> Rel RMSE = {train_rel_rmse}",
            f"Test Set <br> R^2 = {test_r2} <br> Abs RMSE = {test_abs_rmse} <br> Rel RMSE = {test_rel_rmse}",
        ),
    )
    # Train set plot
    fig.add_trace(
        go.Scatter(x=y.values.reshape(-1), y=y_pred.reshape(-1), mode="markers"),
        row=1,
        col=1,
    )
    fig.add_shape(
        type="line",
        x0=min(y_pred)[0],
        y0=min(y_pred)[0],
        x1=max(y_pred)[0],
        y1=max(y_pred)[0],
        layer="below",
        line=dict(dash="dash"),
    )
    # Test set plot
    fig.add_trace(
        go.Scatter(
            x=y_test.values.reshape(-1), y=y_test_pred.reshape(-1), mode="markers"
        ),
        row=1,
        col=2,
    )
    fig.add_shape(
        type="line",
        x0=min(y_test_pred)[0],
        y0=min(y_test_pred)[0],
        x1=max(y_test_pred)[0],
        y1=max(y_test_pred)[0],
        layer="below",
        line=dict(dash="dash"),
        row=1,
        col=2,
    )

    fig.update_layout(width=1600)
    fig.update_xaxes(title="Observed values", row=1, col=1)
    fig.update_xaxes(title="Observed values", row=1, col=2)
    fig.update_yaxes(title="Predicted values", row=1, col=1)
    fig.update_yaxes(title="Predicted values", row=1, col=2)
    fig.show()


def plot_pls_scores(pls_pipe, pc_x_axis=1, pc_y_axis=2):
    X_columns = pls_pipe["features"].get_feature_names_out()
    model = pls_pipe["model"]
    fig = make_subplots(
        rows=2,
        cols=2,
        specs=[[{"colspan": 2}, None], [{}, {}]],
        subplot_titles=(
            "Scores Plot ",
            "Loadings of Principal Component - " + str(pc_x_axis),
            "Loadings of Principal Component - " + str(pc_y_axis),
        ),
    )
    fig.add_trace(
        go.Scatter(
            x=model.x_scores_[:, pc_x_axis],
            y=model.x_scores_[:, pc_y_axis],
            mode="markers",
            name="Scores",
        ),
        row=1,
        col=1,
    )
    fig.add_bar(
        x=X_columns,
        y=model.x_loadings_[:, pc_x_axis - 1],
        name="Loadings PC - " + str(pc_x_axis),
        row=2,
        col=[1, 2],
    )
    fig.add_bar(
        x=X_columns,
        y=model.x_loadings_[:, pc_y_axis - 1],
        name="Loadings PC - " + str(pc_y_axis),
        row=2,
        col=2,
    )
    fig.update_layout(height=1000)
    fig.show()


def pls_cross_validation(X, y, pls_pipeline):
    X_columns = pls_pipeline["features"].get_feature_names_out()
    X_preproc = pd.DataFrame(
        pls_pipeline["scaler"].transform(pls_pipeline["features"].transform(X)),
        columns=X_columns,
    )
    range_LV = range(1, X_preproc.shape[1] + 1)
    train_eval, valid_eval = validation_curve(
        pls_pipeline,
        X,
        y,
        param_name="model__n_components",
        param_range=list(range_LV),
        scoring="neg_root_mean_squared_error",
    )
    return train_eval, valid_eval, range_LV


def plot_pls_cross_validation(train_eval, valid_eval, range_LV=None):
    train_score = -np.mean(train_eval, axis=1)
    valid_score = -np.mean(valid_eval, axis=1)
    train_std = np.std(train_eval, axis=1)
    valid_std = np.std(valid_eval, axis=1)
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=list(range_LV),
            y=train_score,
            error_y=dict(type="data", array=train_std, visible=True),
            name="Training",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=list(range_LV),
            y=valid_score,
            error_y=dict(type="data", array=valid_std, visible=True),
            name="Validation",
        )
    )
    fig.update_layout(
        title="Hyperparameter Optimization in PLS",
        xaxis_title="Number of Latent Variables",
        yaxis_title="Abs RMSE",
        legend_title="Evaluation type",
    )
    fig.show();

### Setting

In [290]:
""" Number of latent Variables """
LATENT_VARIABLES = 5
""" Ploynomial degree of features """
POLYNOMIAL_DEGREE_PLS = 1
""" Add only interaction between features"""
INTERACTIONS_ONLY = True

### Data

In [291]:
dataset = doe.copy() # inputs
dataset["Target"] = tar # outputs
dataset.head()

Unnamed: 0,feed_start,feed_end,Glc_feed_rate,Glc_0,VCD_0,Target
0,1.510204,10.72449,16.632653,34.489796,0.678571,598.214512
1,2.0,10.908163,19.081633,32.857143,0.515306,677.286486
2,2.489796,10.357143,19.693878,40.204082,0.617347,962.435039
3,1.142857,8.091837,15.408163,66.326531,0.790816,376.471284
4,2.326531,8.520408,19.897959,54.081633,0.872449,749.036075


### Train

In [292]:
# Training
pipe = fit_pls_model(
    X=doe,
    y=tar,
    latent_variables=LATENT_VARIABLES,
    polynomial_degree=POLYNOMIAL_DEGREE_PLS,
    interactions_only=INTERACTIONS_ONLY,
)

# Model features and variable importance
plot_pls_model_coef(X=doe, pls_pipeline=pipe)

### Test

In [293]:
# Make predictions for training
X = doe
y = tar
y_pred = pipe.predict(X)

# Make predictions for testing
X_test = doe_test
y_test = tar_test
y_test_pred = pipe.predict(X_test)

# Plot
plot_pls_model_eval(
    y, y_pred, y_test, y_test_pred,
)

### PCA

In [294]:
plot_pls_scores(pipe, pc_x_axis=1, pc_y_axis=2)

### K-Fold
- Use a typical cross-validation to define the optimal number of latent variables.
- This is repeated for different numbers of latent variables. 
	- Nfold PLS models are trained using (Nfolds-1) folds. 
	- Each model is then evaluated on the remaining unused fold. 
- The number of latent variables returning the least value of the RMSE is chosen as optimal.

In [295]:
train_eval, valid_eval, range_LV = pls_cross_validation(X, y, pls_pipeline=pipe)

In [296]:
plot_pls_cross_validation(train_eval, valid_eval, range_LV=range_LV)

## BWU-PLS1
- Batch Wise Unfolded - Partial Least Square Model (PLS1), also called Historical-PLS
- BWU can be used to compute final properties of the experiment, like CQAs-Titer, which are typically the effect of the cumulated effect of the experiment profile.
- Clearly, titer information are removed from the BWU matrix.

* Training: $[Z, X(t < t_{final})] \rightarrow PLS1 \rightarrow y(t=t_{final})$
* Testing: $[Z, X(t < t_{final})] \rightarrow PLS1 \rightarrow y(t=t_{final})$

> Why not to use MLR:
> Unlinke in the initial conditions model, there are several problems in using an linear regression / response surface model for modelling historical data, which makes it dificult to fit linear model.
> - $X$ (the BWU data), rows (samples) < columns (variables). 
> - if $X^T X$ has full rank due only to noise, the inverse is unstable and small changes in noise realization can produce dramatically different results.

In [297]:
def preprocess(Z, X, process_history=14,
    input_variables=[
        "feed_start",
        "feed_end",
        "feed_rate",
        "glc_0",
        "vcd_0",
        "X:VCD",
        "X:Glc",
    ]):
    # Remove Variables
    remove_columns = []
    remove_columns.extend([c for c in X.columns if c.startswith("X:Titer")])
    if "X:Lac" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:Lac")])
    if "W:Feed" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("W:Feed")])
    if "X:Glc" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:Glc")])
    if "X:VCD" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:VCD")])

    # Remove History
    for d in range(process_history, 15):
        remove_columns.extend([c for c in X.columns if c.endswith(":0")])
        remove_columns.extend([c for c in X.columns if c.endswith(":" + str(d))])

    # Remove Invariant
    remove_columns.extend(list(X.columns[~(X != X.iloc[0]).any().values]))

    # Add and remove columns
    X_preproc = pd.concat([Z, X.drop(set(remove_columns), axis=1)], axis=1)
    return X_preproc

In [298]:
def fit_hist_pls_model(
    X,
    y,
    latent_variables=5,
):
    # Define Pipeline
    pscaler = StandardScaler(with_mean=True, with_std=True)
    pls_bwu = PLSRegression(n_components=latent_variables)
    pipe = Pipeline([("scaler", pscaler), ("model", pls_bwu)])

    # Train PLS model
    pipe.fit(X, y)
    return pipe


def plot_hist_pls_model_coef(X_preproc, pls_pipeline):
    X_columns = X_preproc.columns
    pls_bwu_vip = vip(X_preproc, model=pls_pipeline["model"])
    fig = px.bar(
        x=list(X_columns),
        y=pls_bwu_vip.reshape(-1),
        title="VIP scores of Historical PLS model",
        labels={"x": "Variables", "y": "Estimated VIP", "color": "p-value"},
    )
    fig.add_hline(y=1)
    fig.add_hline(y=0.8, line=dict(color="gray"))
    fig.update_layout(width=1600)
    fig.show()


def plot_hist_pls_model_eval(
    y,
    y_pred,
    y_test,
    y_test_pred,
):

    # Calculate error metrics
    train_r2 = r2(y, y_pred)
    train_abs_rmse = absolute_rmse(y, y_pred)
    train_rel_rmse = relative_rmse(y, y_pred)
    test_r2 = r2(y_test, y_test_pred)
    test_abs_rmse = absolute_rmse(y_test, y_test_pred)
    test_rel_rmse = relative_rmse(y_test, y_test_pred)

    # Plot observed vs predicted
    fig = make_subplots(
        rows=1,
        cols=2,
        subplot_titles=(
            f"Train Set <br> R^2 = {train_r2} <br> Abs RMSE = {train_abs_rmse} <br> Rel RMSE = {train_rel_rmse}",
            f"Test Set <br> R^2 = {test_r2} <br> Abs RMSE = {test_abs_rmse} <br> Rel RMSE = {test_rel_rmse}",
        ),
    )
    # Train set plot
    fig.add_trace(
        go.Scatter(x=y.values.reshape(-1), y=y_pred.reshape(-1), mode="markers"),
        row=1,
        col=1,
    )
    fig.add_shape(
        type="line",
        x0=min(y_pred)[0],
        y0=min(y_pred)[0],
        x1=max(y_pred)[0],
        y1=max(y_pred)[0],
        layer="below",
        line=dict(dash="dash"),
    )
    # Test set plot
    fig.add_trace(
        go.Scatter(
            x=y_test.values.reshape(-1), y=y_test_pred.reshape(-1), mode="markers"
        ),
        row=1,
        col=2,
    )
    fig.add_shape(
        type="line",
        x0=min(y_test_pred)[0],
        y0=min(y_test_pred)[0],
        x1=max(y_test_pred)[0],
        y1=max(y_test_pred)[0],
        layer="below",
        line=dict(dash="dash"),
        row=1,
        col=2,
    )
    fig.update_layout(title_text="Observed vs Predicted", showlegend=False)
    fig.update_layout(width=1600)
    fig.show()


def plot_hist_pls_scores(pls_pipe, pc_x_axis=1, pc_y_axis=2, X_columns=None):
    model = pls_pipe["model"]
    fig = make_subplots(
        rows=2,
        cols=2,
        specs=[[{"colspan": 2}, None], [{}, {}]],
        subplot_titles=(
            "Scores Plot ",
            "Loadings of Principal Component - " + str(pc_x_axis),
            "Loadings of Principal Component - " + str(pc_y_axis),
        ),
    )
    fig.add_trace(
        go.Scatter(
            x=model.x_scores_[:, pc_x_axis],
            y=model.x_scores_[:, pc_y_axis],
            mode="markers",
            name="Scores",
        ),
        row=1,
        col=1,
    )
    fig.add_bar(
        x=X_columns,
        y=model.x_loadings_[:, pc_x_axis - 1],
        name="Loadings PC - " + str(pc_x_axis),
        row=2,
        col=[1, 2],
    )
    fig.add_bar(
        x=X_columns,
        y=model.x_loadings_[:, pc_y_axis - 1],
        name="Loadings PC - " + str(pc_y_axis),
        row=2,
        col=2,
    )
    fig.update_layout(height=1000)
    fig.show()


def hist_pls_cross_validation(X, y, pls_pipeline):
    range_LV = range(1, X.shape[1] + 1)
    train_eval, valid_eval = validation_curve(
        pls_pipeline,
        X,
        y,
        param_name="model__n_components",
        param_range=list(range_LV),
        scoring="neg_root_mean_squared_error",
    )
    return train_eval, valid_eval, range_LV


def plot_hist_pls_cross_validation(train_eval, valid_eval, range_LV=None):
    train_score = -np.mean(train_eval, axis=1)
    valid_score = -np.mean(valid_eval, axis=1)
    train_std = np.std(train_eval, axis=1)
    valid_std = np.std(valid_eval, axis=1)
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=list(range_LV),
            y=train_score,
            error_y=dict(type="data", array=train_std, visible=True),
            name="Training",
        )
    )
    fig.add_trace(
        go.Scatter(
            x=list(range_LV),
            y=valid_score,
            error_y=dict(type="data", array=valid_std, visible=True),
            name="Validation",
        )
    )
    fig.update_layout(
        title="Hyperparameter Optimization in Historical PLS",
        xaxis_title="Number of Latent Variables",
        yaxis_title="Abs RMSE",
        legend_title="Evaluation type",
    )
    fig.show();

### Setting

In [299]:
""" Number of latent variables """
LATENT_VARIABLES_HIST = 15

""" Number of days of process history """
PROCESS_HISTORY = 14

""" Input variables """
INPUT_VARIABLES = ['feed_start','feed_end','feed_rate','glc_0','vcd_0','X:VCD', 'X:Glc']

### Data

In [300]:
ZX_preproc = preprocess(Z=doe, X=bwu, process_history=PROCESS_HISTORY, 
					   input_variables=INPUT_VARIABLES)

In [301]:
ZX_preproc.columns

Index(['feed_start', 'feed_end', 'Glc_feed_rate', 'Glc_0', 'VCD_0', 'X:VCD:1',
       'X:VCD:2', 'X:VCD:3', 'X:VCD:4', 'X:VCD:5', 'X:VCD:6', 'X:VCD:7',
       'X:VCD:8', 'X:VCD:9', 'X:VCD:10', 'X:VCD:11', 'X:VCD:12', 'X:VCD:13',
       'X:Glc:1', 'X:Glc:2', 'X:Glc:3', 'X:Glc:4', 'X:Glc:5', 'X:Glc:6',
       'X:Glc:7', 'X:Glc:8', 'X:Glc:9', 'X:Glc:10', 'X:Glc:11', 'X:Glc:12',
       'X:Glc:13'],
      dtype='object')

### Train

* Remove titer (and also lactate)
* Remove exceeding days
* Eliminate invariant columns
* Remove linearly dependent columns
* Add process parameters at the beginning
* Create a PLS model from the initial design to the final titer


In [302]:
pipe = fit_hist_pls_model(
    X=ZX_preproc,
    y=tar,
    latent_variables=LATENT_VARIABLES_HIST,
)

plot_hist_pls_model_coef(ZX_preproc, pls_pipeline=pipe)

### Test

In [303]:
# Make predictions for training
y = tar
y_pred = pipe.predict(ZX_preproc)

# Make predictions for testing
ZX_test_preproc = preprocess(Z=doe_test, X=bwu_test, process_history=PROCESS_HISTORY, 
					   input_variables=INPUT_VARIABLES)
y_test = tar_test
y_test_pred = pipe.predict(ZX_test_preproc)


# Plot
plot_hist_pls_model_eval(
    y, y_pred, y_test, y_test_pred,
)

### PCA

In [304]:
plot_hist_pls_scores(pipe, pc_x_axis=1, pc_y_axis=2)

### K-Fold

- Use a typical cross-validation to define the optimal number of latent variables.
- This is repeated for different numbers of latent variables. 
	- Nfold PLS models are trained using (Nfolds-1) folds. For each model, the sum of squared residuals (SSR) is calculated and summed up.
	- Each model is then evaluated on the remaining unused fold. 
- The number of latent variables returning the least value of the RMSE is chosen as optimal.
- A second criterion is selected, namely the adjusted R-squared (adj. R^2), which is weighting the effect of the number of latent variables
	- if two values of the number of latent variables are returning a similar value of the SSR, then the one using less variables is chosen to be more likely to produce robust predictions:



In [305]:
train_eval, valid_eval, range_LV = hist_pls_cross_validation(X=ZX_preproc, y=tar, pls_pipeline=pipe)


y residual is constant at iteration 28


y residual is constant at iteration 28


y residual is constant at iteration 28


y residual is constant at iteration 27


y residual is constant at iteration 27


y residual is constant at iteration 27


y residual is constant at iteration 27


y residual is constant at iteration 30



In [306]:
plot_hist_pls_cross_validation(train_eval, valid_eval, range_LV)

# Data-Driven Models for Simulation

## BB-PLS1
- Black Box - Partial Least Square Model (PLS1)
- One model per timepoint per process variable is developed, denoted as $PLS1_{i, t}$
- Training: $[Z, X(t = 0)] \rightarrow PLS1_{i, t} \rightarrow X_i(t = t_{model})$

In [307]:
def transform_owu(owu, t_steps=15, batch_first=False):
    X_columns = [col for col in owu.columns if "X:" in col]
    X_owu = owu[X_columns].copy()
    X_owu = X_owu.sort_index(level=["run", "time"])

    C = len(X_columns)
    B = X_owu.index.get_level_values("run").nunique()
    T = t_steps

    if batch_first:
        X = np.zeros((B, T, C))
    else:
        X = np.zeros((T, C, B))

    for i, (run, group) in enumerate(X_owu.groupby(level="run")):
        if len(group) != T:
            raise ValueError(f"Run {run} does not have {T} time steps.")

        if batch_first:
            X[i, :, :] = group.values
        else:
            X[:, :, i] = group.values

    return X, X_columns


def fit_multi_step_pls_model(
    Z, X, latent_variables=2, polynomial_degree=1, interactions_only=False
):
    # Function to train using multiple BB-PLS1 models
    T, C, B = X.shape
    models = {}
    for t in range(T):
        models[t] = {}
        for i in range(C):
            model = fit_pls_model(
                X=Z,
                y=X[t, i, :],
                latent_variables=latent_variables,
                polynomial_degree=polynomial_degree,
                interactions_only=interactions_only,
            )
            models[t][i] = model
    return models


def predict_multi_step_pls_model(Z, multi_step_models, t_steps=15):
    # Prediction in train set
    B, _ = Z.shape
    T = len(multi_step_models)
    C = len(multi_step_models[0])

    if t_steps != T:
        raise ValueError(f"Models does not have {t_steps} time steps.")

    X_pred = np.zeros((T, C, B))
    for t in range(T):
        for i in range(C):
            X_pred[t, i, :] = multi_step_models[t][i].predict(Z)

    return X_pred


def plot_multi_step_pls_model_eval(
    X,
    X_pred,
    X_test,
    X_test_pred,
    X_columns=None,
):
    for i, col in enumerate(X_columns):
        y = X[:, i, :]
        y_pred = X_pred[:, i, :]
        y_test = X_test[:, i, :]
        y_test_pred = X_test_pred[:, i, :]

        # Metrics for training set
        train_r2 = r2(y, y_pred)
        train_abs_rmse = absolute_rmse(y, y_pred)
        train_rel_rmse = relative_rmse(y, y_pred)

        # Metrics for testing set
        test_r2 = r2(y_test, y_test_pred)
        test_abs_rmse = absolute_rmse(y_test, y_test_pred)
        test_rel_rmse = relative_rmse(y_test, y_test_pred)

        # Plot observed vs predicted
        fig = make_subplots(
            rows=1,
            cols=2,
            subplot_titles=(
                f"Train Set - {col} <br> R^2 = {train_r2} <br> Abs RMSE = {train_abs_rmse} <br> Rel RMSE = {train_rel_rmse}",
                f"Test Set - {col} <br> R^2 = {test_r2} <br> Abs RMSE = {test_abs_rmse} <br> Rel RMSE = {test_rel_rmse}",
            ),
        )

        # Train set plot
        _, _, NUM_TRAIN = X.shape
        for i in range(NUM_TRAIN):
            fig.add_trace(
                go.Scatter(
                    x=y[:, i].reshape(-1),
                    y=y_pred[:, i].reshape(-1),
                    mode="markers",
                    name=f"Run id in Train {i}",
                    legendgroup=f"train_{i}",
                ),
                row=1,
                col=1,
            )
        fig.add_shape(
            type="line",
            x0=y_pred.min(),
            y0=y_pred.min(),
            x1=y_pred.max(),
            y1=y_pred.max(),
            layer="above",
            line=dict(dash="dash"),
        )

        # Test set plot
        _, _, NUM_TEST = X_test.shape
        for j in range(NUM_TEST):
            fig.add_trace(
                go.Scatter(
                    x=y_test[:, j].reshape(-1),
                    y=y_test_pred[:, j].reshape(-1),
                    mode="markers",
                    name=f"Run id in Test {j}",
                    legendgroup=f"test_{j}",
                ),
                row=1,
                col=2,
            )
        fig.add_shape(
            type="line",
            x0=y_test_pred.min(),
            y0=y_test_pred.min(),
            x1=y_test_pred.max(),
            y1=y_test_pred.max(),
            layer="above",
            line=dict(dash="dash"),
            row=1,
            col=2,
        )

        fig.update_layout(width=1600)
        fig.update_xaxes(title="Observed values", row=1, col=1)
        fig.update_xaxes(title="Observed values", row=1, col=2)
        fig.update_yaxes(title="Predicted values", row=1, col=1)
        fig.update_yaxes(title="Predicted values", row=1, col=2)
        fig.show()


def plot_relative_rmse_by_variables(
    X,
    X_pred,
    X_test,
    X_test_pred,
    X_columns=None,
):
    relative_rmse_train = []
    relative_rmse_test = []
    for i, col in enumerate(X_columns):
        y = X[:, i, :]
        y_pred = X_pred[:, i, :]
        y_test = X_test[:, i, :]
        y_test_pred = X_test_pred[:, i, :]

        # Metrics for training set
        train_rel_rmse = relative_rmse(y, y_pred)
        relative_rmse_train.append(train_rel_rmse)

        # Metrics for testing set
        test_rel_rmse = relative_rmse(y_test, y_test_pred)
        relative_rmse_test.append(test_rel_rmse)

    fig_rmse = go.Figure()
    fig_rmse.add_trace(
        go.Bar(
            x=X_columns,
            y=relative_rmse_train,
            name="Train Set",
            marker_color=pcolors[0],
            text=[f"{v:.2f}" for v in relative_rmse_train],
            textposition="outside",
        )
    )

    fig_rmse.add_trace(
        go.Bar(
            x=X_columns,
            y=relative_rmse_test,
            name="Test Set",
            marker_color=pcolors[1],
            text=[f"{v:.2f}" for v in relative_rmse_test],
            textposition="outside",
        )
    )

    fig_rmse.update_layout(
        barmode="group",
        title="Relative RMSE for Each Variables",
        xaxis_title="Feature",
        yaxis_title="Relative RMSE",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="center", x=0.5),
    )

    fig_rmse.show()

### Setting

In [308]:
""" Number of latent Variables """
LATENT_VARIABLES = 2
""" Ploynomial degree of features """
POLYNOMIAL_DEGREE_PLS = 1
""" Add only interaction between features"""
INTERACTIONS_ONLY = True

""" Number of days of process history """
PROCESS_HISTORY = 15

### Data

In [310]:
X, X_columns = transform_owu(owu, t_steps=15, batch_first=False)
X_test, X_columns = transform_owu(owu_test, t_steps=15, batch_first=False)

### Train

In [311]:
models = fit_multi_step_pls_model(
    Z=doe,
    X=X,
    latent_variables=LATENT_VARIABLES,
    polynomial_degree=POLYNOMIAL_DEGREE_PLS,
    interactions_only=INTERACTIONS_ONLY,
)


y residual is constant at iteration 0


y residual is constant at iteration 0



### Test

In [312]:
X_pred = predict_multi_step_pls_model(Z=doe, multi_step_models=models, t_steps=15)
X_test_pred = predict_multi_step_pls_model(Z=doe_test, multi_step_models=models, t_steps=15)

plot_multi_step_pls_model_eval(
    X,
    X_pred,
    X_test,
    X_test_pred,
    X_columns = X_columns,
)

plot_relative_rmse_by_variables(
    X,
    X_pred,
    X_test,
    X_test_pred,
    X_columns = X_columns,
)

In [313]:
def plot_predicted_profile(X, X_pred, X_columns, select_runs=[0], height=1000):
    max_cols_per_row = 5
    num_columns = len(X_columns)
    num_rows = (num_columns + max_cols_per_row) // max_cols_per_row

    fig = make_subplots(
        rows=num_rows, cols=min(num_columns, max_cols_per_row), 
        subplot_titles=X_columns
    )

    color_palette = px.colors.qualitative.Plotly

    for idx, j in enumerate(select_runs):
        color = color_palette[idx % len(color_palette)]
        for i, c in enumerate(X_columns):
            row = i // max_cols_per_row + 1
            col = i % max_cols_per_row + 1
            show_legend = (i == 0)
            fig.add_trace(
                go.Scatter(
                    x=list(range(15)),
                    y=X[:, i, j],
                    name=f"Run {j} Observed",
                    marker=dict(color=color),
                    showlegend=show_legend,
                    legendgroup=f"group_{j}"
                ),
                row=row,
                col=col,
            )
            fig.add_trace(
                go.Scatter(
                    x=list(range(15)),
                    y=X_pred[:, i, j],
                    name=f"Run {j} Predicted",
                    line=dict(dash="dash"),
                    marker=dict(color=color),
                    showlegend=show_legend,
                    legendgroup=f"group_{j}"
                ),
                row=row,
                col=col,
            )

    fig.update_layout(
        showlegend=True,
        title_text="Process variable evolution for selected runs",
        height=height,
    )
    fig.show()


In [314]:
plot_predicted_profile(X_test, X_test_pred, X_columns, select_runs=[0,10], height=500)

### K-Fold

In [315]:
def multi_step_pls_cross_validation(Z, X, polynomial_degree=1, interactions_only=False, cv_folds=5):
    T, C, B = X.shape
    Z = Z.values

    kf = KFold(n_splits=cv_folds)
    train_eval = []
    valid_eval = []

    range_LV = range(1, Z.shape[1] + 1)

    for latent_variables in range_LV:
        fold_train_errors = []
        fold_val_errors = []
        for train_index, val_index in kf.split(Z):
            Z_train, Z_val = Z[train_index, :], Z[val_index, :]
            X_train, X_val = X[:, :, train_index], X[:, :, val_index]

            models = fit_multi_step_pls_model(
                Z=Z_train,
                X=X_train,
                latent_variables=latent_variables,
                polynomial_degree=polynomial_degree,
                interactions_only=interactions_only,
            )

            X_train_pred = predict_multi_step_pls_model(Z=Z_train, multi_step_models=models, t_steps=T)
            X_val_pred = predict_multi_step_pls_model(Z=Z_val, multi_step_models=models, t_steps=T)

            train_error = -absolute_rmse(X_train.flatten(), X_train_pred.flatten())
            val_error = -absolute_rmse(X_val.flatten(), X_val_pred.flatten())

            fold_train_errors.append(train_error)
            fold_val_errors.append(val_error)

        train_eval.append(fold_train_errors)
        valid_eval.append(fold_val_errors)

    return np.array(train_eval), np.array(valid_eval), range_LV

In [316]:
train_eval, valid_eval, range_LV = multi_step_pls_cross_validation(
    Z=doe,
    X=X,
    polynomial_degree=POLYNOMIAL_DEGREE_PLS,
    interactions_only=INTERACTIONS_ONLY,
    cv_folds=5
)


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0


y residual is constant at iteration 0



In [317]:
plot_pls_cross_validation(train_eval, valid_eval, range_LV=range_LV)

## BWU-PLS1
- Batch Wise Unfolded - Partial Least Square Model (PLS1), also called Historical-PLS
- One model per timepoint per process variable is developed, denoted as $PLS1_{i, t}$
	- But, using the process condition and the historical information available until a given time
- Training: $[Z, X(t < t_{model})] \rightarrow PLS1_{i, t} \rightarrow X_i(t=t_{model})$
- Testing: $[Z, X(t = 0), X^{predicted}(t < t_{model})] \rightarrow PLS1_{i, t} \rightarrow X_i(t=t_{model})$

### Setting

In [324]:
""" Number of latent variables """
LATENT_VARIABLES_HIST = 2

""" Input variables """
INPUT_VARIABLES = ['feed_start','feed_end','feed_rate','glc_0','vcd_0','X:VCD', 'X:Glc']

In [325]:
def preprocess(Z, X, process_history=1,
    input_variables=[
        "feed_start",
        "feed_end",
        "feed_rate",
        "glc_0",
        "vcd_0",
        "X:VCD",
        "X:Glc",
    ]):
    # Remove Variables
    remove_columns = []
    remove_columns.extend([c for c in X.columns if c.startswith("X:Titer")])
    if "X:Lac" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:Lac")])
    if "W:Feed" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("W:Feed")])
    if "X:Glc" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:Glc")])
    if "X:VCD" in input_variables:
        pass
    else:
        remove_columns.extend([c for c in X.columns if c.startswith("X:VCD")])

    # Remove History
    for d in range(process_history+1, 15):
        remove_columns.extend([c for c in X.columns if c.endswith(":0")])
        remove_columns.extend([c for c in X.columns if c.endswith(":" + str(d))])

    # Remove Invariant
    remove_columns.extend(list(X.columns[~(X != X.iloc[0]).any().values]))

    # Add and remove columns
    X_preproc = pd.concat([Z, X.drop(set(remove_columns), axis=1)], axis=1)
    return X_preproc

In [327]:
X, X_columns = transform_owu(owu, t_steps=15, batch_first=False)
X_test, X_columns = transform_owu(owu_test, t_steps=15, batch_first=False)

In [331]:
def fit_multi_step_hist_pls_model(
    doe,
    owu,
    latent_variables=2,
    input_variables=[],
):
    # Function to train using multiple BWU-PLS1 models
    bwu = generate_bwu(owu)
    X, _ = transform_owu(owu, t_steps=15, batch_first=False)
    T, C, B = X.shape
    
    models = {}
    for t in range(T):
        ZX_preproc = preprocess(
            Z=doe, X=bwu, process_history=t, input_variables=input_variables
        )
        models[t] = {}
        for i in range(C):
            model = fit_hist_pls_model(
                X=ZX_preproc,
                y=X[t, i, :],
                latent_variables=latent_variables,
            )
            models[t][i] = model
    return models

In [332]:
models = fit_multi_step_hist_pls_model(
    doe,
    owu,
    latent_variables=LATENT_VARIABLES,
    input_variables=INPUT_VARIABLES,
)


y residual is constant at iteration 0


y residual is constant at iteration 0



In [344]:
def matrix_to_bwu(predicted_data, X_columns):
    # predicted_data shape: (T, C, B)
    T, C, B = predicted_data.shape
    
    # Convert 3D array to DataFrame with appropriate multi-index
    data = []
    columns = []
    for t in range(T):
        for i, col in enumerate(X_columns):
            col_name = f'{col}:{t}'
            columns.append(col_name)
            data.append(predicted_data[t, i, :])
    
    data = np.array(data).T  # Shape: (B, T*C)
    bwu = pd.DataFrame(data, columns=columns)
    
    return bwu


In [345]:
def predict_multi_step_hist_pls_model(
    doe, X_init, multi_step_models, X_columns=None, t_steps=15, input_variables=None
):
    # Initialize the predicted matrix
    B, _ = doe.shape
    T = len(multi_step_models)
    C = len(multi_step_models[0])

    if t_steps != T:
        raise ValueError(f"Models does not have {t_steps} time steps.")

    X_pred = np.zeros((T, C, B))

    # Get initial state X(t=0)
    X_initial = X_init  # assuming the shape of owu is (T, C, B)
    X_pred[0, :, :] = X_initial

    for t in range(1, T):
        # Generate BWU matrix using the predicted data until time t
        partial_bwu = matrix_to_bwu(X_pred[:t, :, :], X_columns)
        print(partial_bwu)

        # Preprocess the data with historical data until t
        ZX_preproc = preprocess(
            Z=doe, X=partial_bwu, process_history=t, input_variables=input_variables
        )

        # Predict for each variable at time t
        for i in range(C):
            X_pred[t, i, :] = multi_step_models[t][i].predict(ZX_preproc)

    return X_pred

In [346]:
X_pred = predict_multi_step_hist_pls_model(
    doe,
    X[0, :, :],
    X_columns=X_columns,
    multi_step_models=models,
    t_steps=15,
    input_variables=INPUT_VARIABLES,
)

     X:VCD:0    X:Glc:0  X:Lac:0  X:Titer:0
0   0.678571  34.489796      0.0        0.0
1   0.515306  32.857143      0.0        0.0
2   0.617347  40.204082      0.0        0.0
3   0.790816  66.326531      0.0        0.0
4   0.872449  54.081633      0.0        0.0
5   0.974490  55.714286      0.0        0.0
6   0.780612  67.142857      0.0        0.0
7   0.729592  56.530612      0.0        0.0
8   0.698980  57.346939      0.0        0.0
9   0.841837  54.897959      0.0        0.0
10  0.923469  37.755102      0.0        0.0
11  0.596939  50.000000      0.0        0.0
12  0.688776  53.265306      0.0        0.0
13  0.739796  42.653061      0.0        0.0
14  0.913265  65.510204      0.0        0.0
15  0.954082  47.551020      0.0        0.0
16  0.709184  63.877551      0.0        0.0
17  0.505102  45.102041      0.0        0.0
18  0.811224  41.020408      0.0        0.0
19  0.556122  36.938776      0.0        0.0
20  0.658163  35.306122      0.0        0.0
21  0.607143  59.795918      0.0

ValueError: The feature names should match those that were passed during fit.
Feature names seen at fit time, yet now missing:
- X:Glc:1
- X:VCD:1


### Data

## Instant-ANN
- ANN models per variable per time point
- Training: $[Z, X(t = t_{model} - 1)] \rightarrow ANN_{i, t} \rightarrow X_i(t = t_{model})$
- Testing: $[Z, \hat{X}(t = t_{model} - 1)] \rightarrow ANN_{i, t} \rightarrow X_i(t = t_{model})$

## OWU-ANN
- a single model is used for all time points
- Training: $[Z, X(t = t_{model} - 1)] \rightarrow ANN_i \rightarrow X_i(t = t_{model})$​
- Testing: $[Z, \hat{X}(t = t_{model} - 1)] \rightarrow ANN_i \rightarrow X_i(t = t_{model})$