In [1]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import math
import itertools

from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import ShuffleSplit
from sklearn.model_selection import RepeatedKFold, cross_val_score

import xgboost as xgb
from sklearn.metrics import r2_score, mean_squared_error


import seaborn as sns
sns.set_theme('poster')
sns.set(style='ticks', font_scale=2)
mpl.rcParams['font.weight'] = 'bold'
mpl.rcParams.update({'text.usetex': False})

random_seed = 42
np.random.seed(random_seed)

with open('Data.csv', 'r') as f:
    exp_df = pd.read_csv(f)
    exp_df['MTTF'] = np.log(exp_df['MTTF'])

info_dict = {'135':(9.42, 19.03), '140':(9.78, 18.58), '150':(9.14, 17.05)}
n_dict = {'135':6.35, '140':6.3, '150':7.15}
T_list = [135, 140, 150]

In [6]:
def get_features(df_orig):
    df = df_orig.copy()
    df['V'] = df_orig['V']
    df['T'] = df_orig['T']
    df['V2'] = df['V'].apply(lambda x: x**2)
    df['V4'] = df['V'].apply(lambda x: x**4)
    df['V*T'] = df['V'] * df['T']
    df['V/T'] = df['V'] / df['T']
    df['LnV'] = df['V'].apply(lambda x: math.log(x))
    df['LnT'] = df['T'].apply(lambda x: math.log(x))
    df['TLnV'] = df['T'] * df['LnV']
    df['VLnT'] = df['V'] * df['LnT']
    return df

def get_TPM_MTTF(V, T, Beta, C):
    T += 273.15
    MTTF = np.exp(C-np.log(np.sinh(Beta*V/T)))/3600
    return MTTF

def generate_TPM_dataset(T, Beta, C):
    Vs, MTTFs = [], []
    for V in np.linspace(200, 400, 101):
        MTTF = get_TPM_MTTF(V, T, Beta, C)
        MTTFs.append(MTTF)
        Vs.append(V)
    df = pd.DataFrame()
    df['V'] = Vs
    df['T'] = [T] * len(Vs)
    df = get_features(df)
    df['MTTF'] = MTTFs
    return df

def rmspe(y_true, y_pred):
    if isinstance(y_true, pd.Series):
        y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series):
        y_pred = y_pred.to_numpy()

    if len(y_true) != len(y_pred):
        raise ValueError("Input arrays must have the same length.")

    percentage_errors = ((y_true - y_pred) / y_true) ** 2
    rmspe_value = np.sqrt(np.mean(percentage_errors))
    return rmspe_value

def rse(y_true, y_pred):
    if isinstance(y_true, pd.Series):
        y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.Series):
        y_pred = y_pred.to_numpy()

    if len(y_true) != len(y_pred):
        raise ValueError("Input arrays must have the same length.")

    numerator = np.sum((y_true - y_pred) ** 2)
    denominator = np.sum((y_true - np.mean(y_true)) ** 2)
    rse = numerator / denominator
    return rse

def get_Ea(df1, df2):
    Ea_list = []
    Kb = 8.617e-5
    for _, row1 in df1.iterrows():
        for _, row2 in df2.iterrows():
            T1 = row1.iloc[1] + 273.15
            T2 = row2.iloc[1] + 273.15
            V1 = row1.iloc[0]
            V2 = row2.iloc[0]
            t1 = np.exp(row1.iloc[-4])
            t2 = np.exp(row2.iloc[-4])
            if V1 == V2:
                Ea = np.log(t1/t2)*Kb/(1/T1 - 1/T2)
                Ea_list.append(Ea)
    Ea = np.average(Ea_list)
    return Ea



def train_base_model(data):
    params = {
        "learning_rate": [0.01, 0.1],
        "max_depth": [3, 4],
        "n_estimators": [100, 200]}
    
    model = xgb.XGBRegressor(objective='reg:squarederror')
    cv = ShuffleSplit(n_splits=10, test_size=0.2, random_state=0)

    X = data.iloc[:,:-1]
    y = data.iloc[:, -1]
    grid_search = GridSearchCV(model, params, cv=cv, scoring="neg_mean_squared_error")
    grid_search.fit(X, y)
    best_params = grid_search.best_params_

    model = xgb.XGBRegressor(objective='reg:squarederror')
    model.set_params(**best_params)
    model.fit(X, y)
    return model, best_params

def train_transferred_model(data, model, best_params):
    transferred_model = xgb.XGBRegressor(objective='reg:squarederror')
    transferred_model.set_params(**best_params)
    X = data.iloc[:,:-1]
    y = data.iloc[:, -1]
    transferred_model.fit(X, y, xgb_model = model)
    return transferred_model

In [None]:
df = get_features(exp_df)
df = df[['V', 'T', 'V2', 'V4', 'V*T', 'V/T', 'LnV', 'LnT', 'TLnV', 'VLnT', 'MTTF', 'n', 'Beta', 'C']]
results_dict = {}

for T in T_list:
    exp_results, ml_results, tpm_results, Vs, Ts = [], [], [], [], []
    df_base = df[df['T'] == T].reset_index(drop=True)
    for index, row in df_base.iterrows():
        Ts.append(T)
        V0 = row.iloc[0]
        Vs.append(V0)
        Beta = row.iloc[-2]
        C = row.iloc[-1]
        yhat_exp = row.iloc[-4]
        df_TPM = generate_TPM_dataset(T, Beta, C)
        df_TPM['MTTF'] = np.log(df_TPM['MTTF'])
        base_model, best_params = train_base_model(df_TPM)
        df_transfer = df_base.drop(index = index).reset_index(drop=True).iloc[:, :-3]
        transferred_model = train_transferred_model(df_transfer, base_model, best_params)
        yhat_ml = transferred_model.predict(pd.DataFrame(row.iloc[:-4]).T)
        yhat_tpm = get_TPM_MTTF(V0, T, Beta, C)
        exp_results.append(np.exp(yhat_exp))
        ml_results.append(np.exp(yhat_ml[0]))
        tpm_results.append(yhat_tpm)
    results_dict[str(T)] = pd.DataFrame({'V': Vs, 'T': Ts, 'MLM':ml_results, 'TPM':tpm_results, 'Experiment': exp_results})

for T in T_list:
    df_base = df[df['T'] == T].reset_index(drop=True)
    em_results = []
    for index, row in df_base.iterrows():
        n = row.iloc[-3]
        rem_df = df_base.drop(index = index).reset_index(drop=True)
        V0 = row.iloc[0]
        random_row = rem_df.sample(n=1)
        V1 = random_row.iloc[:,0]
        mttf1 = np.exp(random_row.iloc[:, -4])
        mttf0 = mttf1/((V0/V1) ** n)
        em_results.append(mttf0)
    results_dict[str(T)]['EM'] = [x.iloc[0] for x in em_results]


rmse_scores_ml, rmse_scores_tpm, rmse_scores_em, rmspe_scores_ml, rmspe_scores_tpm, rmspe_scores_em = [], [], [], [], [], []
Ts = []
for T in T_list:
    Ts.append(T)
    plot_df = results_dict[str(T)]
    Vs = plot_df['V']
    tpm_results = plot_df['TPM']
    ml_results = plot_df['MLM']
    em_results = plot_df['EM']
    true_results = plot_df['Experiment']
    rmspe_tpm = rmspe(true_results, tpm_results)
    rmspe_ml = rmspe(true_results, ml_results)
    rmspe_em = rmspe(true_results, em_results)
    rmse_tpm = mean_squared_error(true_results, tpm_results) ** 0.5
    rmse_ml = mean_squared_error(true_results, ml_results) ** 0.5
    rmse_em = mean_squared_error(true_results, em_results) ** 0.5
    rmse_scores_ml.append(rmse_ml)
    rmse_scores_tpm.append(rmse_tpm)
    rmse_scores_em.append(rmse_em)
    rmspe_scores_ml.append(rmspe_ml)
    rmspe_scores_tpm.append(rmspe_tpm)
    rmspe_scores_em.append(rmspe_em)
    fig, ax = plt.subplots(figsize=(10,10), dpi=300)
    ax.plot(Vs, true_results, label='Experiment', linestyle=':', marker='^', markersize=15, linewidth=3)
    ax.plot(Vs, em_results, label = 'EM', linestyle=':', marker='P', markersize=15, linewidth=3)
    ax.plot(Vs, tpm_results, label = 'TPM', linestyle=':', marker='o', markersize=15, linewidth=3)
    ax.plot(Vs, ml_results, label = 'MLM', linestyle=':', marker='s', markersize=15, linewidth=3)
    
    
    ax.set_xlabel('V (v)', fontsize=25, fontweight='bold')
    ax.set_ylabel('MTTF (hr)', fontsize=25, fontweight='bold')
    ax.set_title('Model performance for T = {}°C'.format(T), fontsize=25, fontweight='bold')
    ax.legend(frameon=False)
    # ax.text(0.6, 0.7, f'$RMSE_{{EM}}$: {round(rmse_em, 2)}\n$RMSE_{{TPM}}$: {round(rmse_tpm, 2)}\n$RMSE_{{MLM}}$: {round(rmse_ml, 2)}',
    #          ha='left', va='top',transform=plt.gca().transAxes)
    ax.spines['top'].set_visible(True)
    ax.spines['right'].set_visible(True)
    ax.spines['bottom'].set_visible(True)
    ax.spines['left'].set_visible(True)
    ax = plt.gca()
    yticks = ax.get_yticks()

    if 0 not in yticks:
        yticks = np.append(0, yticks)
    ax.minorticks_off()
    ax.tick_params(axis='both', which='major', direction='in', length=10, width=2)
    # fig.savefig(f"{T}.tiff", dpi=300)
    # fig.savefig(f"{T}.eps", dpi=300)

scores = {'T':Ts, 'RMSE_TPM':rmse_scores_tpm, 'RMSE_EM': rmse_scores_em, 'RMSE_MLM':rmse_scores_ml, 'RMSPE_TPM':rmspe_scores_tpm, 'RMSPE_EM': rmspe_scores_em, 'RMSPE_MLM':rmspe_scores_ml}
scores = pd.DataFrame(scores)

In [None]:
T_perms = list(itertools.permutations(T_list, 3))
Kb = 8.617e-5
T3_results_dict = {'135':pd.DataFrame(), '140':pd.DataFrame(), '150':pd.DataFrame()}

for T1, T2, T3 in T_perms:
    df1 = df[df['T']==T1]
    df2 = df[df['T']==T2]
    df3 = df[df['T']==T3]
    df12 = pd.concat((df1, df2)).reset_index(drop=True).iloc[:, :-3]
    Ea = get_Ea(df1, df2)
    Beta1, C1 = info_dict[str(T1)]
    Beta2, C2 = info_dict[str(T2)]
    df_TPM1 = generate_TPM_dataset(T1, Beta1, C1)
    df_TPM2 = generate_TPM_dataset(T2, Beta2, C2)
    df_TPM = pd.concat((df_TPM1, df_TPM2)).reset_index(drop=True)
    df_TPM['MTTF'] = np.log(df_TPM['MTTF'])
    base_model, best_params = train_base_model(df_TPM)
    transferred_model1 = train_transferred_model(df12, base_model, best_params)

    Vs, Ts, ts = [], [], []
    for _, row in df12.iterrows():
        T4 = row.iloc[1] + 273.15
        exp_coeff = np.exp(Ea/Kb*(1/T4-1/(T3+273.15)))
        V4 = row.iloc[0]
        V3 = V4
        t4 = np.exp(row.iloc[-1])
        t3 = t4/exp_coeff
        Vs.append(V3)
        Ts.append(T3)
        ts.append(t3)

    df_finetune = pd.DataFrame({'V':Vs, 'T':Ts, 'MTTF':ts})
    df_finetune.drop_duplicates(subset='V', keep='first', inplace=True)
    df_finetune = df_finetune.sort_values(by='V')
    df_finetune['MTTF'] = np.log(df_finetune['MTTF'])
    df_finetune = get_features(df_finetune)
    df_finetune = df_finetune[['V', 'T', 'V2', 'V4', 'V*T', 'V/T', 'LnV', 'LnT', 'TLnV', 'VLnT', 'MTTF']]
    transferred_model2 = train_transferred_model(df_finetune, transferred_model1, best_params)

    yhat_ml = np.exp(transferred_model2.predict(df3.iloc[:, :-4]))
    T3_results_dict[str(T3)]['MLM'] = yhat_ml
    yhat_exp = np.exp(df3.iloc[:, -4])
    T3_results_dict[str(T3)]['Experiment'] = list(yhat_exp)
    n = n_dict[str(T3)]
    
    shared_vs = set(sorted(Vs))
    yhat_em = []
    for v in df3.iloc[:, 0]:
        if v in shared_vs:
            indices = np.where(df12['V'] == v)
            first_idx = indices[0][0]
            row = df12.loc[first_idx]
            V0 = row.iloc[0]
            T0 = row.iloc[1]
            V1 = V0
            mttf0 = np.exp(row.iloc[-1])
            mttf3 = mttf0 * np.exp(Ea/Kb*(1/(T3+273.15) - 1/(T0+273.15)))
        else:
            random_row = df12.sample(n=1)
            V0 = random_row.iloc[:, 0]
            T0 = random_row.iloc[:, 1]
            V1 = V0
            mttf0 = np.exp(random_row.iloc[:, -4])
            mttf3 = mttf0 * (V0/V1) ** n * np.exp(Ea/Kb*(1/(T3+273.15) - 1/(T0+273.15)))
            mttf3 = mttf3.iloc[0]

        yhat_em.append(mttf3)
    
    T3_results_dict[str(T3)]['EM'] = yhat_em
    T3_results_dict[str(T3)]['V'] = list(df3.iloc[:, 0])

rmse_scores_ml, rmse_scores_em, rmspe_scores_ml, rmspe_scores_em = [], [], [], []
Ts = []
for T in T_list:
    Ts.append(T)
    plot_df = T3_results_dict[str(T)]
    Vs = plot_df['V']
    ml_results = plot_df['MLM']
    em_results = plot_df['EM']
    true_results = plot_df['Experiment']
    rmspe_ml = rmspe(true_results, ml_results)
    rmspe_em = rmspe(true_results, em_results)
    rmse_ml = mean_squared_error(true_results, ml_results) ** 0.5
    rmse_em = mean_squared_error(true_results, em_results) ** 0.5
    rmse_scores_ml.append(rmse_ml)
    rmse_scores_em.append(rmse_em)
    rmspe_scores_ml.append(rmspe_ml)
    rmspe_scores_em.append(rmspe_em)
    fig, ax = plt.subplots(figsize=(10,10), dpi=300)
    ax.plot(Vs, true_results, label='Experiment', linestyle=':', marker='^', markersize=15, linewidth=3)
    ax.plot(Vs, ml_results, label = 'MLM', linestyle=':', marker='s', markersize=15, linewidth=3)
    ax.plot(Vs, em_results, label = 'EM', linestyle=':', marker='P', markersize=15, linewidth=3)
    
    
    
    ax.set_xlabel('V (v)', fontsize=25, fontweight='bold')
    ax.set_ylabel('MTTF (hr)', fontsize=25, fontweight='bold')
    ax.set_title(r'Model performance for T = {}°C'.format(T), fontsize=25, fontweight='bold')
    ax.legend(frameon=False)
    # ax.text(0.6, 0.7, f'$RMSE_{{EM}}$: {round(rmse_em, 2)}\n$RMSE_{{TPM}}$: {round(rmse_tpm, 2)}\n$RMSE_{{MLM}}$: {round(rmse_ml, 2)}',
    #          ha='left', va='top',transform=plt.gca().transAxes)
    ax.spines['top'].set_visible(True)
    ax.spines['right'].set_visible(True)
    ax.spines['bottom'].set_visible(True)
    ax.spines['left'].set_visible(True)
    ax = plt.gca()
    yticks = ax.get_yticks()

    if 0 not in yticks:
        yticks = np.append(0, yticks)
    ax.minorticks_off()
    ax.tick_params(axis='both', which='major', direction='in', length=10, width=2)
    fig.savefig(f"{T}.tiff", dpi=300)
    fig.savefig(f"{T}.eps", dpi=300)

scores = {'T':Ts, 'RMSE_EM': rmse_scores_em, 'RMSE_MLM':rmse_scores_ml, 'RMSPE_EM': rmspe_scores_em, 'RMSPE_MLM':rmspe_scores_ml}
scores = pd.DataFrame(scores)