In [None]:
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
import shap
import joblib
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.model_selection import train_test_split, GridSearchCV, RandomizedSearchCV
from sklearn.decomposition import PCA
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import xgboost as xgb
import os
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers, callbacks, optimizers, backend as K
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization, Add, Concatenate
from tensorflow.keras.models import Model, load_model

import keras_tuner as kt
DATA_DIR = 'data'
MODEL_DIR = 'models'
PRED_DIR = 'predictions'
# Display settings
# pd.set_option('display.max_columns', None)





def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    mask = y_true != 0
    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask]))



def get_unique_path(base_path):
    if not os.path.exists(base_path):
        return base_path
    name, ext = os.path.splitext(base_path)
    version = 2
    new_path = f"{name}_V{version}{ext}"
    while os.path.exists(new_path):
        version += 1
        new_path = f"{name}_V{version}{ext}"
    return new_path

def saveModel(model, fileName):
    base_path = os.path.join(MODEL_DIR, fileName)
    path = get_unique_path(base_path)

    if 'nn' in fileName:
        model.save(path)
    else:
        joblib.dump(model, path)
        

def loadModel(modelName):
    path = os.path.join(MODEL_DIR, modelName)
    if 'nn' in modelName:
        return load_model(path)
    else:
        return joblib.load(path)
    



def read_file(csvFileName):
    return pd.read_csv(DATA_DIR+'/'+csvFileName)


In [None]:
# DATA CLEANING 

def cleanColumns(data):
       # Drop 'Unnamed: 0' column
    #remove brackets
    if "Unnamed: 0" in data.columns:
        data = data.drop(["Unnamed: 0"], axis="columns")
    columns = data.columns
    data.columns =  [re.sub(r'\s*\([^)]*\)', '', col).strip() for col in columns]
    return data

def applyConstraints(data):
    return data[(data['Tank Width'] > 0) & 
                (data['Tank Length'] > 0) & 
                (data['Tank Height'] > 0) & 
                (data['Vapour Height'] >= 0) &
                (data['Vapour Temperature'] > 0) &
                (data['Liquid Temperature'] > 0)]

def standardizeStatus(data):
    data["Status"] = data["Status"].str.lower().str.replace(' ', '')
    data['Status'] = data['Status'].apply(lambda x: 'superheated' if 'h' in x else ('subcooled' if 'c' in x else x))
    return data["Status"]

def remove_outliers_iqr(df, numeric_cols):
    cleaned_df = df.copy()
    total_outliers = 0

    for col in numeric_cols:
        Q1 = cleaned_df[col].quantile(0.25)
        Q3 = cleaned_df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        outliers = cleaned_df[(cleaned_df[col] < lower_bound) | (cleaned_df[col] > upper_bound)]
        cleaned_df = cleaned_df[(cleaned_df[col] >= lower_bound) & (cleaned_df[col] <= upper_bound)]

        outlier_count = len(outliers)
        if outlier_count > 0:
            print(f"\nColumn: {col}")
            print(f"  IQR range: [{lower_bound:.2f}, {upper_bound:.2f}]")
            print(f"  Count of outliers: {outlier_count}")
            print(f"  Sample outlier values:\n{outliers[col].head(5).to_string(index=False)}")

        total_outliers += outlier_count

    print(f"\nTotal outliers removed: {total_outliers}")
    return cleaned_df

def clean_data(data):
    # Initial row count
    initial_rows = len(data)
    print(f"Initial rows: {initial_rows}")

    # Cleanup columns
    data = cleanColumns(data)

    # Drop rows with missing values
    before_na = len(data)
    data = data.dropna(axis=0)
    after_na = len(data)
    print(f"Rows removed due to NA: {before_na - after_na}")

    # Logical constraints
    before_constraints = len(data)
    data = applyConstraints(data)
    after_constraints = len(data)
    print(f"Rows removed due to logical constraints: {before_constraints - after_constraints}")

    #Duplicates
    before_duplicates = len(data)
    data = data.drop_duplicates()
    after_duplicates = len(data)
    print(f"Rows removed as duplicates: {before_duplicates - after_duplicates}")

    # Standardize 'Status' column
    data["Status"] = standardizeStatus(data)


    numeric_cols = data.select_dtypes(include=['float64', 'int64']).columns
    before_outliers = len(data)
    data = remove_outliers_iqr(data, numeric_cols)
    after_outliers = len(data)
    print(f"Rows removed as outliers: {before_outliers - after_outliers}")

    # Final row count
    final_rows = len(data)
    print(f"Final rows: {final_rows}")
    print(f"Total rows removed after cleaning up: {initial_rows - final_rows}")
    return data





In [None]:
def feature_engineer(data):
    
    data["Liquid Boiling Temperature"] = data["Liquid Boiling Temperature"] +273.15
    data["Liquid Critical Temperature"] = data["Liquid Critical Temperature"] +273.15
    data["Tank Volume"] = data["Tank Width"] * data["Tank Length"] * data["Tank Height"]  
    data["HeightRatio"]= data["Vapour Height"] / data["Tank Height"] 
    data["Superheat Margin"] = data["Liquid Temperature"] - data["Liquid Boiling Temperature"]
    # Total Energy (approximate thermal energy in the tank)
    data["Liquid Volume"] = data["Liquid Ratio"] * data["Tank Volume"]
    data["Total Energy"] = data["Liquid Volume"] * data["Superheat Margin"]

    # (BLEVE assumed at tank center top)
    data["Sensor Distance to BLEVE"] = (
        data["Sensor Position x"]**2 +
        data["Sensor Position y"]**2 +
        (data["Sensor Position z"] - data["BLEVE Height"])**2
    ) ** 0.5
    return data


def preprocess(data,train=True,feature_eng = True):

    if train:
        data = clean_data(data)
    data = pd.get_dummies(data, columns=["Sensor Position Side"], prefix="Side")
    data = pd.get_dummies(data, columns=["Status"], prefix="Status")
    print(data.head(1))
    if feature_eng:
        data = feature_engineer(data)
    y =  None
    if (train):
        y= data[ "Target Pressure"] 
        data = data.drop(["Target Pressure"], axis="columns")
    
    # StandardScaler for inputs
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(data)

    #split data
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42
    )
    print("Training Set Size:", X_train.shape)
    print("Validation Set Size:", X_val.shape)
    return  X_train, X_val, y_train, y_val


In [None]:
### LOAD DATA HERE
trainData = read_file('train.csv')
trainData = preprocess(trainData,train=True,feature_eng=True)
X_train, X_val, y_train, y_val= trainData


In [196]:




def plot_grid_search_table(grid_search,modelName, metric='mean_test_score', top_n=20):
    # Extract and sort results
    results = pd.DataFrame(grid_search.cv_results_)
    results = results.sort_values(by=metric, ascending=False).head(top_n)

    # Get only param columns and the score
    table_data = results.filter(like='param_').copy()
    table_data[metric] = results[metric].values

    # Reset index for clean display
    table_data.reset_index(drop=True, inplace=True)

    # Convert all values to string for uniform display
    table_data = table_data.astype(str)

    # Plot table as figure
    fig, ax = plt.subplots(figsize=(min(25, table_data.shape[1]*2), top_n * 0.5 + 1))
    ax.axis('off')

    table = ax.table(cellText=table_data.values,
                     colLabels=table_data.columns,
                     cellLoc='center',
                     loc='center')

    table.auto_set_font_size(False)
    table.set_fontsize(10)
    table.scale(1, 1.5)

    plt.title(f"Top {top_n} GridSearchCV {modelName} Results by {metric}", fontsize=14)
    plt.tight_layout()
    plt.show()


def summarize_results(model,name):
    y_pred = model.predict(X_val)
    # Compute metrics
    mse = mean_squared_error(y_val, y_pred)
    r2 = r2_score(y_val, y_pred)

    print(f"MSE: {mse:.4f}")
    print(f"R²: {r2:.4f}")
    summary = {
        'Model': name,
        'MSE': mse,
        'R2': r2,
        'Best Params': str(model.getparams())
    }
    return pd.DataFrame([summary])

def train_xgboost():
    # param_grid = {
    #     'n_estimators': [100, 300, 500],
    #     'max_depth': [3, 6, 9],
    #     'learning_rate': [0.01, 0.1, 0.3],
    #     'subsample': [0.8],
    #     'colsample_bytree': [0.8],
    #     'min_child_weight': [1],
    #     'reg_lambda': [1],
    #     'reg_alpha': [0]
    # }
    ## Fine TUnning
    param_grid = {
    'n_estimators': [500,700],
    'max_depth': [5, 6, 7],
    'learning_rate': [0.05, 0.1, 0.15],
    'subsample': [0.7, 0.8, 0.9],
    'colsample_bytree': [0.7, 0.8, 0.9],
    'colsample_bylevel': [0.7, 0.8, 0.9],
    'gamma': [0, 0.1, 0.2],
    'min_child_weight': [1, 3, 5],
    'reg_lambda': [0.1, 1, 10],
    'reg_alpha': [0, 0.1, 1]
}

    model = xgb.XGBRegressor(objective="reg:squarederror", random_state=42)
    grid_search = GridSearchCV(model, param_grid=param_grid, cv=3, scoring='neg_mean_squared_error', verbose=1, n_jobs=-1)
    grid_search.fit(X_train, y_train)

    best_model = grid_search.best_estimator_
    return best_model , grid_search


def train_svr():
    param_grid = {
        'svr__C': [0.01,0.1,1, 10, 100],
        'svr__epsilon': [0.01, 0.1,0.5],
        'svr__kernel': ['rbf','poly']
    }

    pipeline = make_pipeline(SVR())
    grid_search = GridSearchCV(pipeline, param_grid=param_grid, cv=3, scoring='neg_mean_squared_error', verbose=1, n_jobs=-1)
    grid_search.fit(X_train, y_train)
    
    best_model = grid_search.best_estimator_
    return best_model , grid_search







In [None]:
xgb_model,grid_search = train_xgboost()
plot_grid_search_table(grid_search,"XGBBoost")




Fitting 3 folds for each of 39366 candidates, totalling 118098 fits


In [194]:
saveModel(xgb_model,'xgb_best.pkl')

In [None]:
# Neural network
def mish(x):
    return x * tf.math.tanh(tf.math.softplus(x))


def build_nn():
    model = models.Sequential([
        layers.Dense(256, activation='mish', input_shape=(X_train.shape[1],),
                    kernel_regularizer=regularizers.l2(1e-5)),
        layers.Dropout(0.1),
        layers.Dense(256, activation='mish', kernel_regularizer=regularizers.l2(1e-5)),
        layers.Dropout(0.1),
        layers.Dense(256, activation='mish', kernel_regularizer=regularizers.l2(1e-5)),
        layers.Dropout(0.1),
        layers.Dense(1, activation='softplus')
    ])

    optimizer = optimizers.SGD(learning_rate=0.1, momentum=0.9)
    model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

    # optimizer = optimizers.Adam(learning_rate=0.001)
    # model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

    # Callbacks
    
    return model




def plot_nn(history):
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training & Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plot Training & Validation MAE
    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'], label='Training MAE')
    plt.plot(history.history['val_mae'], label='Validation MAE')
    plt.title('Training & Validation MAE')
    plt.xlabel('Epochs')
    plt.ylabel('Mean Absolute Error')
    plt.legend()

    # Show the plots
    plt.tight_layout()
    plt.show()


def train_nn(model):
    early_stop = callbacks.EarlyStopping(
        monitor='val_loss', patience=100, restore_best_weights=True
    )
    history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=500,
    batch_size=32,
    callbacks=[early_stop],
    verbose=2)
    plot_nn(history)
    return history









In [None]:
#TRAINING
svr_model = train_svr()
saveModel(svr_model,'svr.pkl')
summarize_results(svr_model)

In [None]:
history = train_nn(nn_model)

val_loss, val_mae = nn_model.evaluate(X_val, y_val, batch_size=512, verbose=1)

# Print the results
print(f"Validation Loss: {val_loss}")
print(f"Validation MAE: {val_mae}")

In [None]:
# Assuming models array contains [xgb_model, svr_model, nn_model]

# Initialize lists to store metrics
val_losses = []
val_maes = []
val_mapes = []
model_names = ["XGBoost", "SVR", "Neural Network"]
models=[xgb_model,svr_model,nn_model]

# Loop through models and calculate metrics
for model in models:
    y_pred = model.predict(X_val).flatten()  # Ensure predictions are 1D
    val_loss = mean_squared_error(y_val, y_pred)
    val_mae = mean_absolute_error(y_val, y_pred)
    val_mape = mean_absolute_percentage_error(y_val, y_pred)

    val_losses.append(val_loss)
    val_maes.append(val_mae)
    val_mapes.append(val_mape)


plt.figure(figsize=(15, 5))

# Histogram for val_loss
plt.subplot(1, 3, 1)
plt.bar(model_names, val_losses, color='skyblue')
plt.title("Validation Loss")
plt.ylabel("Loss")
plt.xlabel("Model")

# Histogram for val_mae
plt.subplot(1, 3, 2)
plt.bar(model_names, val_maes, color='orange')
plt.title("Validation MAE")
plt.ylabel("Mean Absolute Error")
plt.xlabel("Model")

# Histogram for val_mape
plt.subplot(1, 3, 3)
plt.bar(model_names, val_mapes, color='green')
plt.title("Validation MAPE")
plt.ylabel("Mean Absolute Percentage Error (%)")
plt.xlabel("Model")

plt.tight_layout()
plt.show()

In [None]:

def test_model(model,modelName):
    test_data = read_file('test.csv')
    y_test = pd.read_csv("sample_prediction.csv")
    test_data = preprocess(test_data,False,False)
    y_pred = model.predict(test_scaled).flatten()
    test_scaled,_ = preprocess(data=test_data,train=False,feature_eng=True)
    output_df = pd.DataFrame({
    'ID': y_test['ID'], 
    'Target Pressure (bar)': y_pred 
})
    fileName = get_unique_path(os.path.join(PRED_DIR,f'{modelName}_pred.csv'))
    output_df.to_csv(fileName, index=False)
    print(f'Predictions have been saved to ${fileName}.')







In [None]:

def build_model(hp):
    model = models.Sequential()
    
    # Tunable units
    for i in range(hp.Int("num_layers", 2, 4)):
        model.add(layers.Dense(
            units=hp.Int(f'units_{i}', min_value=128, max_value=512, step=64),
            activation='mish',
            kernel_regularizer=regularizers.l2(hp.Choice('l2_reg', [1e-5, 1e-4, 1e-3]))
        ))
        model.add(layers.Dropout(rate=hp.Float(f'dropout_{i}', min_value=0.1, max_value=0.5, step=0.1)))
    
    model.add(layers.Dense(1, activation='softplus'))
    
    # Compile
    optimizer = optimizers.SGD(
        learning_rate=hp.Choice('learning_rate', [0.01, 0.05, 0.1]),
        momentum=hp.Float('momentum', min_value=0.5, max_value=0.95, step=0.05)
    )
    model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
    return model





tuner = kt.RandomSearch(
    build_model,
    objective='val_loss',
    max_trials=20,
    executions_per_trial=1,
    directory='tuner_dir',
    project_name='mish_model'
)

# Register Mish
tf.keras.utils.get_custom_objects().update({'mish': mish})

# Run search
tuner.search(
    X_train, y_train,
    epochs=100,
    validation_data=(X_val, y_val),
    callbacks=[callbacks.EarlyStopping(monitor='val_loss', patience=10)],
    verbose=1
)


best_model = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(1)[0]

print("Best hyperparameters:")
for key in best_hps.values:
    print(f"{key}: {best_hps.get(key)}")

