In [None]:
# setup for multiple output display
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [None]:
import os
import csv
import glob
import keras
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import matplotlib.pyplot as plt

from scipy.io import loadmat
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from sklearn.utils import shuffle

from sklearn.compose import make_column_transformer
from  sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder # Convert data to oneHotEncoder
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler, MaxAbsScaler

# Set up the GPU device [optional]

In [None]:
# A tensor environment should be setup for GPU, read instructions for details.
# Set up the GPU device
physical_devices = tf.config.list_physical_devices('GPU')
physical_devices
# tf.config.experimental.set_memory_growth(physical_devices[0], True)

# Check if GPU is available
if tf.test.gpu_device_name():
    print('GPU found')
else:
    print("No GPU found")

# Read the train and test dataset files

In [None]:
trainDataFilePath = "data/train_dataset.csv";
testDataFilePath = "data/test_dataset.csv";
coDataFilePath = "data/unseen_test_dataset.csv";

df_train = pd.read_csv(trainDataFilePath) # Read train data CSV file
df_train = shuffle(df_train)
df_train.head()

df_test = pd.read_csv(testDataFilePath) # Read test data CSV file
df_test.head()

df_co = pd.read_csv(coDataFilePath) # Read cohort test data CSV file
df_co.head()

# Checking class counts in respective datasets [for verification]

In [None]:
class_counts = df_train['Jettable'].value_counts() #Training Data Class Count
print ("Train: ",class_counts.index,"| ", class_counts.values)

class_counts = df_test['Jettable'].value_counts() #Test Data Class Count
print ("Test: ",class_counts.index,"| ", class_counts.values)

class_counts = df_co['Jettable'].value_counts() #Test Data Class Count
print ("Cohort: ",class_counts.index,"| ", class_counts.values)

# Drop the unnecessary feature columns

In [None]:
df_train = df_train.drop(['MaterialName','Printer','Re','We','Ca','Z','Oh','Bo'],axis=1)
df_test = df_test.drop(['MaterialName','Printer','Re','We','Ca','Z','Oh','Bo'],axis=1)
df_co = df_co.drop(['MaterialName','Printer','Re','We','Ca','Z','Oh','Bo'],axis=1)

# Check for NA values in each column

In [None]:
has_nan_per_column = df_train.isnull().any(axis=0)
print(has_nan_per_column)

print ('--------------------------------------------------')
has_nan_per_column = df_test.isnull().any(axis=0)
print(has_nan_per_column)

print ('--------------------------------------------------')
has_nan_per_column = df_co.isnull().any(axis=0)
print(has_nan_per_column)

# replace the na values with zero
df_train = df_train.fillna(0)
df_test = df_test.fillna(0)
df_co = df_co.fillna(0)

# Data normalization

In [None]:
# separate features and target variable
df_train_num = df_train.drop(['PulseDuration','Voltage','Trise','Tfall','Jettable','WaveformType'], axis=1) # target columns are also dropped
df_test_num = df_test.drop(['PulseDuration','Voltage','Trise','Tfall','Jettable','WaveformType'], axis=1)
df_co_num = df_co.drop(['PulseDuration','Voltage','Trise','Tfall','Jettable','WaveformType'], axis=1)

# save non-numeric columns
non_numeric_cols_train = df_train.select_dtypes(exclude='number').columns
non_numeric_cols_test = df_test.select_dtypes(exclude='number').columns
non_numeric_cols_co = df_co.select_dtypes(exclude='number').columns

# apply MinMaxScaler to all numeric columns except target column
scaler = MinMaxScaler()
df_train_num_scaled = scaler.fit_transform(df_train_num)
df_test_num_scaled = scaler.transform(df_test_num)
df_co_num_scaled = scaler.transform(df_co_num)

# replace the original numeric columns with scaled data
df_train[df_train_num.columns] = df_train_num_scaled
df_test[df_test_num.columns] = df_test_num_scaled
df_co[df_co_num.columns] = df_co_num_scaled

# restore non-numeric columns
df_train[non_numeric_cols_train] = df_train[non_numeric_cols_train]
df_test[non_numeric_cols_test] = df_test[non_numeric_cols_test]
df_co[non_numeric_cols_co] = df_co[non_numeric_cols_co]

# Applying the Label Encoder

In [None]:
def encode_column(df_train, df_test, df_co, column_name):
    # Select the column by name
    train_selected_column = df_train[column_name]
    test_selected_column = df_test[column_name]
    co_selected_column = df_co[column_name]

    # Initialize the label encoder
    encoder = LabelEncoder()

    # Convert string values to numerical values using the encoder
    train_encoded_values = encoder.fit_transform(train_selected_column)
    test_encoded_values = encoder.transform(test_selected_column)
    co_encoded_values = encoder.transform(co_selected_column)

    # Replace the original column with the encoded values
    df_train[column_name] = train_encoded_values
    df_test[column_name] = test_encoded_values
    df_co[column_name] = co_encoded_values

    # Get the categories and their corresponding labels
    categories = encoder.classes_
    labels = encoder.transform(categories)

    # Print the categories and labels
    for category, label in zip(categories, labels):
        print(f"{category} -> {label}")

    return categories, labels

In [None]:
encode_column(df_train, df_test, df_co, 'Jettable')
encode_column(df_train, df_test, df_co, 'WaveformType')

In [None]:
x_train = df_train.drop(['PulseDuration','Voltage','Trise','Tfall'],axis=1) 
y_train = df_train[['PulseDuration','Voltage','Trise','Tfall']] 

x_test = df_test.drop(['PulseDuration','Voltage','Trise','Tfall'],axis=1)
y_test = df_test[['PulseDuration','Voltage','Trise','Tfall']] 


x_co = df_co.drop(['PulseDuration','Voltage','Trise','Tfall'],axis=1)
y_co = df_co[['PulseDuration','Voltage','Trise','Tfall']] 

x_train, x_val, y_train, y_val = train_test_split(x_train,
                                                  y_train,
                                                  test_size=ratioTestTrain,
                                                  random_state=1)

# Setup models

In [None]:
def build_model(input_shape):

    inputs = Input(input_shape)
    x = Dense(100, activation='relu')(inputs) 
    x = Dense(300, activation='relu')(x) 
    x = Dense(500, activation='relu')(x) 
    x = Dense(300, activation='relu')(x)
    x = Dense(100, activation='relu')(x)
    x = Flatten()(x)
    output = Dense(y_train.shape[1], activation='linear')(x)
    
    model = Model(inputs=inputs, outputs=output)
    return model

In [None]:
trainTotalColumns = len(df_train.columns)-4; #total number of columns in training data CSV file [total columns-number of target columns]
ratioTestTrain = 0.1; # ratio of the train/validation dataset
modelInputShapeParam = trainTotalColumns; # input shape parameter for NN model with Keras
batch_size = 3
epochs = 3000

In [None]:
model = build_model((modelInputShapeParam, 1)) # input shape parameter for the model

optimizer = keras.optimizers.Adam(learning_rate=0.01)

callbacks = [
             keras.callbacks.ModelCheckpoint('model_trained_weights.h5',
                                             save_best_only=True,
                                             monitor='val_loss'),
             keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
                                               factor=0.3,
                                               patience=100,
                                               ),
             keras.callbacks.EarlyStopping(monitor='val_loss',
                                           patience=300,
                                           verbose=1),
             tf.keras.callbacks.CSVLogger( 'trans_History.csv', separator=",", append=True)
             ]

model.compile(loss='mean_squared_error',
              optimizer=optimizer,
              metrics=['mae'])

In [None]:
Train = False 

if Train == True:
    # Train the model with GPU acceleration
    with tf.device('/GPU:0'):    
        history = model.fit(x_train, y_train,
                            batch_size=batch_size, epochs=epochs, verbose=2,
                            validation_data=(x_val, y_val),
                            shuffle=True, callbacks=callbacks)

# Model's training and validation plot

In [None]:
#Ploting the training and validation of the model
plt.plot(history.history['mae'])
plt.plot(history.history['val_mae'])
plt.title('Model Training and Validation MAE')
plt.ylabel('mae')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc = 'upper left')
plt.savefig("trans_Train_Val_mae.png", dpi = 600)
plt.show()

# Exporting results custom function

In [None]:
def multi_output_metrics(y_true, y_pred):
    """
    Calculate performance metrics (RMSE, MAE, MAPE) for multiple outputs in a regression task.

    Parameters:
    y_true: The true target values for each output.
    y_pred: The predicted target values for each output.

    Returns:
    tuple: A dictionary (`metrics`) with performance metrics for each output and 
           a pandas DataFrame (`metrics_df`) summarizing the metrics.
    """
    metrics = {}
    for i, col in enumerate(y_true.columns):
        rmse = np.sqrt(mean_squared_error(y_true[col], y_pred[:, i]))
        mae = mean_absolute_error(y_true[col], y_pred[:, i])
        r2 = r2_score(y_true[col], y_pred[:, i])
        mape = np.mean(np.abs((y_true[col] - y_pred[:, i]) / y_true[col])) * 100  # Convert to percentage

        # metrics[col] = {'RMSE': rmse, 'R2 Score': r2, 'MAE': mae, 'MAPE': mape}
        metrics[col] = {'RMSE': rmse, 'MAE': mae, 'MAPE': mape}
        print(f"Output: {col}")
        print(f"RMSE: {rmse:.6f}")
        # print(f"R2 Score: {r2:.6f}")
        print(f"MAE: {mae:.6f}")
        print(f"MAPE: {mape:.6f}%")  # Display MAPE as percentage
        print()
    
    metrics = pd.DataFrame(metrics)
    # print(metrics)
    metrics_df = pd.DataFrame(metrics).T
    print(metrics_df)
    return metrics, metrics_df

# Model Performance Evaluation

In [None]:
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import r2_score

# Load the trained model for Testing
model = load_model("model_trained_weights.h5")

with open('model_summary.txt', 'w') as f:
    # Redirect the print output to the file
    model.summary(print_fn=lambda x: f.write(x + '\n'))  # Write summary to file

In [None]:
y_pred = model.predict(x_test)

column_names = ['PulseDuration','Voltage','Trise','Tfall']
pd.DataFrame(y_pred, columns=column_names).to_csv('y_pred.csv')

# Compute and display metrics
output_metrics, output_metrics_df  = multi_output_metrics(y_test, y_pred)

# # Append performance metrics data to file
output_metrics.to_csv('PerformanceMetrics.csv',mode='a')

# Append performance metrics data to file
output_metrics_df.to_csv('PerformanceScores.csv')

In [None]:
y_pred = model.predict(x_co)

pd.DataFrame(y_pred, columns=column_names).to_csv('y_coPred.csv')

# Compute and display metrics
output_metrics, output_metrics_df = multi_output_metrics(y_co, y_pred)

# Append performance metrics data to file
output_metrics.to_csv('co_PerformanceMetrics.csv',mode='a')

# Append performance metrics data to file
output_metrics_df.to_csv('co_PerformanceScores.csv')