In [None]:
import keras
import numpy as np
from keras.layers import Dense, Lambda
from keras.models import Sequential, load_model
from sklearn.preprocessing import StandardScaler, RobustScaler, Normalizer, MinMaxScaler
import matplotlib.pyplot as plt
#%matplotlib widget   # SEMBRA CHE NON FUNZIONI
from sklearn.model_selection import train_test_split
from pickle import dump
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error, root_mean_squared_error, mean_absolute_error
from datetime import datetime
from keras.callbacks import LambdaCallback
from keras.utils import plot_model
import copy

pd.options.mode.copy_on_write = True

In [None]:
def scaleDataset(x, y, modelName, save=True):
    """ This function scales the input and target data (x, y) 
    returns the scaled input and target data (x_scaled, y_scaled)
    and saves the input and output scalers.
    :param x: input dataset
    :param y: target dataset
    :param modelName: a string with the model name
    :param save: save the scalers (True/False), default: True
    :return: 1) x_scaled: the scaled input data
             2) y_scaled: the scaled output data
             3) input_scaler: the input scaler
             4) output_scaler: the output scaler
    """
    input_scaler = MinMaxScaler()
    output_scaler = MinMaxScaler()
    x_scaled = input_scaler.fit_transform(x)
    y_scaled = output_scaler.fit_transform(y)
    if save:
        dump(input_scaler, open(modelName + '_IS.pkl', 'wb'))   # save the input scaler
        dump(output_scaler, open(modelName + '_OS.pkl', 'wb'))  # save the output scaler
    return x_scaled, y_scaled, input_scaler, output_scaler

In [None]:
def LoadScalerAndScaleDataset(filepath, x, y):
    """ This function load the scalers from a pickle file,
    scales the input and target data (x, y) 
    returns the scaled input and target data (x_scaled, y_scaled)
    and the input and output scalers.
    :param x: input dataset
    :param y: target dataset
    
    :return: 1) x_scaled: the scaled input data
             2) y_scaled: the scaled output data
             3) input_scaler: the input scaler
             4) output_scaler: the output scaler
    """
    input_scaler = pickle.load(open(filepath + "_InputScaler.pkl", 'rb'))
    output_scaler = pickle.load(open(filepath + "_OutputScaler.pkl", 'rb'))
    
    x_scaled = input_scaler.transform(x)
    y_scaled = output_scaler.transform(y)
    return x_scaled, y_scaled, input_scaler, output_scaler

In [None]:
def scaleInputData(x, saveScaler=False, path='LFModel_IS.pkl'):
    """ This function scales only the input data x 
    returns the scaled input data x_scaled
    and saves the input scaler if saveScaler is True
    :param x: input dataset
    :param saveScaler: flag for saving the scaler
    
    :return: 1) x_scaled: the scaled input data
             2) input_scaler: the input scaler
    """
    input_scaler = MinMaxScaler()
    x_scaled = input_scaler.fit_transform(x)

    if saveScaler:
        dump(input_scaler, open(path, 'wb'))   # save the input scaler
    
    return x_scaled, input_scaler

In [None]:
def train_seq_nn(x_tr, y_tr, neurons, activation, model_name, verbosityLevel, lr=0.0009, patience=10, ModelCheckpoint=False, plot=True, saveModel=True, validationSplit=0.3):
    """ This function trains a Sequential model created through keras
    :param x_train: scaled input dataset
    :param y_train: scaled target dataset
    :param neurons_number: number of neurons defined as a list of ints
    :return: 1) trained_model: the trained keras model
             2) history: the history of the trained model
    """
    inputs_dimension = x_tr.shape[1]
    print(f'The number of inputs is {inputs_dimension}')
    output_dimension = y_tr.shape[1]
    print(f'The number of outputs is {output_dimension}')

    # Build the keras model
    trained_model = Sequential()
    trained_model.add(Dense(neurons[0], activation=activation, input_dim=inputs_dimension))
    
    for i in range(1, len(neurons)):
        trained_model.add(Dense(neurons[i], activation=activation))
    trained_model.add(Dense(output_dimension, activation='linear'))  # tanh

    optimizer = keras.optimizers.Adam(learning_rate=lr)
    trained_model.compile(loss="mse", optimizer=optimizer, metrics=["mse"])

    es_call = keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)
    trainingCallbacks = [es_call]
    if ModelCheckpoint:
        mc_call = keras.callbacks.ModelCheckpoint(model_name + '.h5', monitor='val_loss', mode='min', verbose=verbosityLevel, save_best_only=True)
        trainingCallbacks.append(mc_call)
    
    # Train the model
    history = trained_model.fit(x_tr, y_tr,
                                epochs=5000, batch_size=32,
                                validation_split=validationSplit, callbacks=trainingCallbacks,
                                verbose=verbosityLevel)
    
    MinValidationLoss = min(history.history['val_loss'])

    if saveModel:
        trained_model.save(model_name + '.keras')
    
    if plot:
        print(f'\nThe minimum validation loss is: {MinValidationLoss}\n')
        trained_model.summary()
        # summarize history for loss
        plt.plot(history.history['loss'])
        plt.plot(history.history['val_loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train', 'test'], loc='upper left')
        plt.show()

    return trained_model, history, MinValidationLoss

In [None]:
def use_model(trained_model, inputs):
    """ This function simulate the trained_model
    :param trained_model: keras model (trained)
    :param inputs: simulation inputs
    :return: outputs of the simulated model
    """
    return trained_model.predict(inputs)  # check dimensions

In [None]:
def plotAndLinRegression(x, y, targetVar, savePlot, path, fit_intercept_flag=False, xlim=None, ylim=None):
    model = LinearRegression(fit_intercept=fit_intercept_flag)
    x_r = x.reshape(-1, 1)
    y_r = y
    model.fit(x_r, y_r)
    y_pred = model.predict(x_r)
    slope = float(model.coef_) 
    intercept = float(model.intercept_) 
    r2score = r2_score(x, y)
    equation = f'y = {slope:.2f} x + {intercept:.2f}. R2: {r2score:.3f}'
    
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.scatter(x, y, label=f'Output {targetVar}', alpha=0.3)
    ax.plot(x_r, y_pred, color='red', label='Linear regression')
    ax.set_xlabel('Target ' + targetVar)
    ax.set_ylabel('Prediction ' + targetVar)
    plt.title('Target vs Prediction: ' + targetVar)
    ax.grid(True, linestyle='--', alpha=0.7)
    ax.text(0.05, 0.95, equation, transform=ax.transAxes, fontsize=12, color='red',
        bbox=dict(facecolor='white', alpha=0.5, edgecolor='none'))
    ax.legend(loc='lower right')
    if xlim:
        plt.xlim(xlim[0], xlim[1])
    if ylim:
        plt.ylim(ylim[0], ylim[1])
    if savePlot:
        plt.savefig(path + targetVar + "_" + datetime.now().strftime("%Y%m%d_%H%M%S") + '.png', dpi=300)
    plt.show()
    return r2score

In [None]:
def metrics(target, simulation, var=None, printScores=True):
    r2_scores = []
    mse = []
    mae = []
    rmse = []
    if target.ndim > 1:
        n_outputs = target.shape[1]
        for i_out in range(n_outputs):
            r2_scores.append(r2_score(target[:, i_out], simulation[:, i_out])) 
            mse_i = mean_squared_error(target[:, i_out], simulation[:, i_out])
            rmse_i = root_mean_squared_error(target[:, i_out], simulation[:, i_out])
            mae_i = mean_absolute_error(target[:, i_out], simulation[:, i_out])
            mse.append(mse_i)
            rmse.append(rmse_i)
            mae.append(mae_i)    
    else:
        r2_scores.append(r2_score(target[:], simulation[:])) 
        mse.append(mean_squared_error(target[:], simulation[:]))
        rmse.append(root_mean_squared_error(target[:], simulation[:]))
        mae.append(mean_absolute_error(target[:], simulation[:]))
    mean_mse = np.mean(mse)
    if printScores:
        for j_var, name in enumerate(var):
            print(f"Variable: {name}, mse: {mse[j_var]}")
            print(f"Variable: {name}, rmse: {rmse[j_var]}")
            print(f"Variable: {name}, mae: {mae[j_var]}")
            print(f"Variable: {name}, r2: {r2_scores[j_var]}")           
    return r2_scores, mse, rmse, mae, mean_mse  

# Pipeline for training the model

The inputs are the following:
- ScrapWeight,
- StartTemperature
- Argon
- Energy
- Starting chemical composition: LF_CA_ELEMENT_S
- Chemical additions (in terms of the main elements added):
    - Ca, F, Si, C, S, O, N, B, Al, Fe, P, Mg, Zn, Cu, V, Mn, Cr, Ti

The possible targets are:
- Final chemical composition: LF_CA_ELEMENT_F
- Final temperature

In [None]:
InputVars_FL = ['ScrapWeight', 'StartTemperature', 'Argon', 'Energy', 'LF_CA_C_S', 'LF_CA_MN_S',
                'LF_CA_SI_S', 'LF_CA_S_S', 'LF_CA_P_S', 'LF_CA_CA_S', 'LF_CA_AL_S', 'LF_CA_CU_S', 'LF_CA_V_S',
                'LF_CA_PB_S', 'LF_CA_N_S', 'LF_CA_B_S', 'LF_CA_NB_S', 'LF_CA_SN_S', 'LF_CA_NI_S', 'LF_CA_CR_S', 
                'LF_CA_MO_S', 'LF_CA_TI_S', 'LF_CA_FE_S', 'Ca', 'F', 'Si', 'C', 'S', 'O', 'N', 'B', 'Al', 'Fe',
                'P', 'Mg', 'Zn', 'Cu', 'V', 'Mn', 'Cr', 'Ti']

TargetVars_FL = ['LF_CA_C_F', 'LF_CA_MN_F', 'LF_CA_SI_F', 'LF_CA_S_F', 'LF_CA_P_F', 'LF_CA_CA_F', 
                 'LF_CA_AL_F', 'LF_CA_CU_F', 'LF_CA_V_F', 'LF_CA_PB_F', 'LF_CA_N_F', 'LF_CA_B_F', 
                 'LF_CA_NB_F', 'LF_CA_SN_F', 'LF_CA_NI_F', 'LF_CA_CR_F', 'LF_CA_MO_F', 'LF_CA_TI_F',
                 'LF_CA_FE_F', 'FinalTemperature']

print(f'The number of Input variables is: {len(InputVars_FL)}')
print(f'The number of Target variables is: {len(TargetVars_FL)}')

excludedInputVars_FLmodel_chem = [] 
excludedtargetVars_FLmodel_chem = ['FinalTemperature']

InputsVarsSelection_FLmodel_chem = [InElement for InElement in InputVars_FL if InElement not in excludedInputVars_FLmodel_chem]
TargetVarsSelection_FLmodel_chem = [OutElement for OutElement in TargetVars_FL if OutElement not in excludedtargetVars_FLmodel_chem]

print(f'The number of Input variables is: {len(InputsVarsSelection_FLmodel_chem)}')
print(f'The number of Target variables is: {len(TargetVarsSelection_FLmodel_chem)}')

In [None]:
# load the data
dataset_path = ""

dataset = pd.read_excel(dataset_path)

## Chemical model

In [None]:
# train the chemical model
x = dataset[InputsVarsSelection_FLmodel_chem].values
y = dataset[TargetVarsSelection_FLmodel_chem].values

LFModelName = 'LF_chemicalModel_' + datetime.now().strftime("%Y%m%d_%H%M%S")

xScaled, yScaled, inScaler, outScaler = scaleDataset(x, y, modelName=LFModelName)

xTrain, xTest, yTrain, yTest = train_test_split(xScaled, yScaled, test_size=0.2)

Neurons = [50, 40]
model, modelHistory, minValLoss = train_seq_nn(xTrain, yTrain, Neurons, 'relu', LFModelName, verbosityLevel=0, lr=0.0007)

In [None]:
# simulate the test dataset 
model_sim_test = use_model(model, xTest)
out_test = outScaler.inverse_transform(model_sim_test)
yTestDescaled = outScaler.inverse_transform(yTest)

In [None]:
# Parity plots of test dataset
TargetVarsForPlot = ['Carbon (C)', 'Manganese (MN)', 'Silicon (SI)', 'Sulfur (S)', 'Phosphorus (P)', 'Calcium (CA)', 'Aluminum (AL)',  
                     'Copper (CU)', 'Vanadium (V)', 'Lead (PB)', 'Nitrogen (N)', 'Boron (B)', 'Niobium (NB)', 'Tin (SN)', 'NIckel (NI)',  
                     'Chromium (CR)', 'Molybdenum (MO)', 'Titanium (TI)', 'Iron (FE)']
path_figures = r""

for i in range(yTestDescaled.shape[1]):
    plotAndLinRegression(yTestDescaled[:, i], out_test[:, i], TargetVarsForPlot[i], savePlot=True, path=path_figures, fit_intercept_flag=True)
    wait = input("Press Enter to continue.")
    plt.clf()

## temperature model

In [None]:
optVars_temp = ['ScrapWeight', 'StartTemperature', 'Argon', 'Energy', 'LF_CA_MN_S', 'LF_CA_S_S', 'LF_CA_P_S',
                'LF_CA_CA_S', 'LF_CA_V_S', 'LF_CA_B_S', 'LF_CA_NB_S', 'LF_CA_SN_S', 'LF_CA_NI_S', 'LF_CA_CR_S',
                'LF_CA_FE_S', 'S', 'C', 'N', 'P', 'Fe', 'V', 'Ca', 'Mg', 'F']

x_temp = dataset_france[optVars_temp].values
y_temp = dataset_france['FinalTemperature'].values.reshape(-1, 1)

LFModelName_temp = 'LF_optim_temp_' + datetime.now().strftime("%Y%m%d_%H%M%S")

xScaled_temp, yScaled_temp, inScaler_temp, outScaler_temp = scaleDataset(x_temp, y_temp, modelName=LFModelName_temp)

xTrain_temp, xTest_temp, yTrain_temp, yTest_temp = train_test_split(xScaled_temp, yScaled_temp, test_size=0.2)

Neurons_temp = [20, 5]
tempModel_optimized, modelHistory_temp, minValLoss_temp = train_seq_nn(xTrain_temp, yTrain_temp, Neurons_temp, 'relu', LFModelName_temp, verbosityLevel=0, lr=0.0001, validationSplit=0.2)

In [None]:
tempModel_sim_test = use_model(tempModel_optimized, xTest_temp)
tempModel_out_test = outScaler_temp.inverse_transform(tempModel_sim_test)
yTestDescaled_temp = outScaler_temp.inverse_transform(yTest_temp)

TargetVarsForPlot = 'Steel temperature (LF)'
pathFiguresLFModel_2 = r""

plotAndLinRegression(yTestDescaled_temp[:, 0], tempModel_out_test[:, 0], TargetVarsForPlot, savePlot=True, path=pathFiguresLFModel_2, fit_intercept_flag=True)

mse_TempModel = mean_squared_error(yTestDescaled_temp[:, 0], tempModel_out_test[:, 0])
rmse_TempModel = np.sqrt(mse_TempModel)
print(f"RMSE of temperature model (test dataset): {rmse_TempModel} Â°C")