<a href="https://colab.research.google.com/github/harvey-py/COMP3010/blob/main/Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Packages

In [1]:
import numpy as np
import pandas as pd

from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
import statsmodels.api as sm

import matplotlib.pyplot as plt
import seaborn as sns

import os

## Config

In [None]:
working_dir = "~/Documents/GitHub/COMP3010/Assignment/Data/"

train_dir = os.path.join(working_dir, "train.csv")
test_dir = os.path.join(working_dir, "test.csv")

## Importing Data

In [None]:
raw_data = pd.read_csv(train_dir)
raw_data.rename(columns = {"Target Pressure (bar)": "tgt_pressure"}, inplace = True)

In [None]:
raw_test_data = pd.read_csv(test_dir)
raw_test_data.rename(columns = {"Target Pressure (bar)": "tgt_pressure"}, inplace = True)

# Exploratory Analysis

## Variables

In [None]:
raw_data.columns

In [None]:
raw_data

## Plots

In [None]:
raw_data.hist(figsize = (8,8))
plt.tight_layout()
plt.show()

In [None]:
raw_data_num = raw_data.select_dtypes(include=[np.number])

n_cols = 4
n_rows = int(len(raw_data_num.columns) / n_cols) + (len(raw_data_num.columns) % n_cols > 0)
fig, axs = plt.subplots(n_rows, n_cols, figsize=(15, 20))

for i, col_name in enumerate(raw_data_num.columns):
    if col_name != "tgt_pressure":
        ax = axs[i//n_cols, i%n_cols]
        ax.scatter(raw_data_num[col_name], raw_data_num["tgt_pressure"])
        ax.set_title(f"{col_name} vs tgt_pressure")
        ax.set_xlabel(col_name)
        ax.set_ylabel("tgt_pressure")

plt.tight_layout()
plt.show()

In [None]:
raw_data_num.corr().style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

In [None]:
raw_data_num.corr()[["tgt_pressure"]].T.style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

In [None]:
raw_data_num["Volume"] = raw_data_num["Tank Length (m)"] * raw_data_num["Tank Height (m)"]

In [None]:
## TURN OFF NIGHT READER WHEN VIEWING CORRPLOTS
raw_data_num.corr()[["tgt_pressure"]].T.style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

# Data Preprocessing

## Missing Values

In [None]:
raw_nans = raw_data[raw_data.isna().any(axis = 1)]
print(f"{len(raw_nans)} points with NaNs out of {len(raw_data)} ({len(raw_nans)/len(raw_data)*100:.3f}%)")

In [None]:
# 1: Removing NaNs
raw_data1 = raw_data.dropna()

## Outliers

In [None]:
# 2: Removing Outliers
def remove_outliers_IQR(df, cols, quantile = 0.25, mult = 1.5):
    for col in cols:
        Q1 = df[col].quantile(quantile)
        Q3 = df[col].quantile(1-quantile)
        
        IQR = Q3 - Q1
        
        lower_bound = Q1 - mult * IQR
        upper_bound = Q3 + mult * IQR

        n_prior = len(df)
        df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
        n_after = len(df)
        
        print(f"##### {col} #####\nBounds: {lower_bound:.5f} - {upper_bound:.5f} (IQR: {IQR:.5f})\nn(removed): {n_prior - n_after}\n")
        
    return df

In [None]:
raw_data2 = remove_outliers_IQR(raw_data1, ["Tank Failure Pressure (bar)"])

## Data Encoding & Incorrect Entries

### Data Encoding

In [None]:
# 3.1: Encoding Status Column
raw_data2["Status"].unique()

In [None]:
raw_data2.loc[raw_data2['Status'].str.contains('sub|cool', case=False), 'Status'] = 'Subcooled'
raw_data2.loc[raw_data2['Status'].str.contains('super|heat', case=False), 'Status'] = 'Superheated'

In [None]:
raw_data2["Status"].unique()

In [None]:
temp = pd.concat([raw_data2, pd.get_dummies(raw_data2['Status'], drop_first=True).astype(int)], axis=1)
raw_data3 = temp.drop(columns = "Status").rename(columns = {"Superheated": "Superheated_status"})

temp = pd.concat([raw_test_data, pd.get_dummies(raw_test_data['Status'], drop_first=True).astype(int)], axis=1)
raw_test_data2 = temp.drop(columns = "Status").rename(columns = {"Superheated": "Superheated_status"})

In [None]:
raw_data2[["Liquid Critical Pressure (bar)", "Liquid Boiling Temperature (K)", "Liquid Critical Temperature (K)"]].drop_duplicates()

In [None]:
dummies = pd.get_dummies(raw_data3['Liquid Critical Pressure (bar)'], drop_first=False)
dummies.columns = ['37.9', '42.5']
dummy_col = dummies['42.5'].rename(f"Liquid Critical Pressure (bar)") * 1
raw_data3 = pd.concat([raw_data3.drop('Liquid Critical Pressure (bar)', axis=1), dummy_col], axis=1)

dummies = pd.get_dummies(raw_data3['Liquid Boiling Temperature (K)'], drop_first=False)
dummies.columns = ['-42', '-1']
dummy_col = dummies['-42'].rename(f"Liquid Boiling Temperature (K)") * 1
raw_data3 = pd.concat([raw_data3.drop('Liquid Boiling Temperature (K)', axis=1), dummy_col], axis=1)

dummies = pd.get_dummies(raw_data3['Liquid Critical Temperature (K)'], drop_first=False)
dummies.columns = ['152.0', '96.7']
dummy_col = dummies['152.0'].rename(f"Liquid Critical Temperature (K)") * 1
raw_data3 = pd.concat([raw_data3.drop('Liquid Critical Temperature (K)', axis=1), dummy_col], axis=1)

In [None]:
dummies = pd.get_dummies(raw_test_data2['Liquid Critical Pressure (bar)'], drop_first=False)
dummies.columns = ['37.9', '42.5']
dummy_col = dummies['42.5'].rename(f"Liquid Critical Pressure (bar)") * 1
raw_test_data2 = pd.concat([raw_test_data2.drop('Liquid Critical Pressure (bar)', axis=1), dummy_col], axis=1)

dummies = pd.get_dummies(raw_test_data2['Liquid Boiling Temperature (K)'], drop_first=False)
dummies.columns = ['-42', '-1']
dummy_col = dummies['-42'].rename(f"Liquid Boiling Temperature (K)") * 1
raw_test_data2 = pd.concat([raw_test_data2.drop('Liquid Boiling Temperature (K)', axis=1), dummy_col], axis=1)

dummies = pd.get_dummies(raw_test_data2['Liquid Critical Temperature (K)'], drop_first=False)
dummies.columns = ['152.0', '96.7']
dummy_col = dummies['152.0'].rename(f"Liquid Critical Temperature (K)") * 1
raw_test_data2 = pd.concat([raw_test_data2.drop('Liquid Critical Temperature (K)', axis=1), dummy_col], axis=1)

In [None]:
# Creating a lookup for the properties of each substance (there are only 2 distinct substance present; we will denote them 0 and 1). This table will allow us to view their properties later
substance_properties = raw_data2[["Liquid Critical Pressure (bar)", "Liquid Boiling Temperature (K)", "Liquid Critical Temperature (K)"]].drop_duplicates()
substance_properties = substance_properties.rename(index = {substance_properties.index[1]: 1})

### Negative and 0 Values

In [None]:
raw_data2

In [None]:
n_cols = 3
n_rows = 2
fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols*2.5, n_rows*2))

i = 0
for var in raw_data3.columns:
    temp = raw_data3[raw_data3[var] < 0]
    
    if not temp.empty:
        ax = axs[i//n_cols, i%n_cols]
        temp[var].hist(bins=15, ax = ax)
        ax.set_title(f'{var}')
        i += 1

plt.tight_layout()

In [None]:
raw_data3["Liquid Boiling Temperature (K)"].unique()

Note that the "BLEVE Height (m)" is "the distance of the tank to the ground (in meter)" [sic]. After confirming with the source of the data that this can not be negative, we must remove it from our data.

Additionally, the "Liquid Boiling Temperature" is in Kelvin, however, all values in our set are negative (which is impossible as Kelvin is an absolute scale). Whilst this appears to be invalid, we'll discuss in 2.6.6 how this is actually just a mistake in the recording of data

In [None]:
raw_data3 = raw_data3.query("`BLEVE Height (m)` > 0")

## Duplicates

In [None]:
# 4: Removing Duplicates
display(raw_data3[raw_data3.duplicated(keep=False)].sort_values(by=raw_data3.columns.tolist()))

In [None]:
raw_data4 = raw_data3.drop_duplicates()
print(f"Data dropped: {len(raw_data3) - len(raw_data4)}")

In [None]:
len(raw_data4)

## Splitting Data

We will split the data into train, validation and test sets before analysing the data (so we can test our hypotheses locally before submitting them for assessment). However, before we do this, we'll do perform any changes that apply to all datasets here (so we don't have to add them to each dataset separately). This mainly includes renaming the columns to something more friendly for analysis (i.e. snake_case), as well as adding features that we'll explore later.

### Variable Renaming

In [None]:
renamed_cols = [
    "ID",
    "failure_pressure",
    "liquid_pct",
    "tank_w",
    "tank_l",
    "tank_h",
    "BLEVE_h",
    "vapour_height",
    "vapour_temp",
    "liquid_temp",
    "obstacle_dist",
    "obstacle_w",
    "obstacle_h",
    "obstacle_thk",
    "obstacle_angle",
    "sensor_id",
    "sensor_side",
    "sensor_x",
    "sensor_y",
    "sensor_z",
    "tgt_pressure",
    "superheated_status",
    "lqd_crit_pressure",
    "lqd_boil_temp",
    "lqd_crit_temp",
    "event_num",
    "tank_volume",
    "net_sensor_dist"
]

In [None]:
dict_names = dict(zip(raw_data4.columns, renamed_cols))

raw_data4 = raw_data4.rename(columns = dict_names)
raw_test_data2.rename(columns = dict_names, inplace = True)

### Feature Addition

Immediately we can see that many of our variables relate to each other
* E.g. we have tank height, length and width, which suggest that we should create a variable for the tank volume.
* Also, we have the location of sensors. This is quite important, but we can group sensors by "front", "back" and "side" (rather than looking at them individually)
* Furthermore, we can try convert the sensor's position into a distance metric

In [None]:
raw_data5 = raw_data4.copy()

In [None]:
tank_axis = ["tank_w", "tank_h", "tank_l"]
raw_data5["tank_volume"] = raw_data5[tank_axis].prod(axis = 1)
raw_data5.drop(columns = tank_axis, inplace = True)

raw_data5["sensor_location"] = raw_data5["sensor_side"].map({1: 1, 2: 2, 3: 3, 4: 3, 5: 3}) # changes 4 and 5 (both sides) to 3 (1 = back, 2 = front, 3 = sides)

sensor_vars = ["sensor_x", "sensor_y", "net_z"]
raw_data5["net_z"] = raw_data5["sensor_z"] - raw_data5["BLEVE_h"]
raw_data5["net_sensor_dist"] = np.linalg.norm(raw_data5[sensor_vars], axis=1)
sensor_vars.append("sensor_z")
raw_data5.drop(columns = sensor_vars, inplace = True)

lqd_properties = ["lqd_crit_pressure", "lqd_boil_temp", "lqd_crit_temp"]
raw_data5["substance"] = raw_data5[lqd_properties].prod(axis = 1)
raw_data5.drop(columns = lqd_properties, inplace = True)

In [None]:
raw_test_data2["tank_volume"] = raw_test_data2[tank_axis].prod(axis = 1)
raw_test_data2["sensor_location"] = raw_test_data2["sensor_side"].map({1: 1, 2: 2, 3: 3, 4: 3, 5: 3})
raw_test_data2["net_z"] = raw_test_data2["sensor_z"] - raw_test_data2["BLEVE_h"]
raw_test_data2["net_sensor_dist"] = np.linalg.norm(raw_test_data2[sensor_vars], axis=1)
raw_test_data2["substance"] = raw_test_data2[lqd_properties].prod(axis = 1)

Additionally, we'll add an "event_num" so we can tell when data from different sensors has come from the same event.

In [None]:
event_num = 1
prev_id = raw_data5.loc[0, "sensor_id"]

event_nums = []

for index, row in raw_data5.iterrows():
    curr_id = row["sensor_id"]
    if curr_id < prev_id:
        event_num += 1 
    
    event_nums.append(event_num)
    prev_id = curr_id

raw_data5.loc[:,"event_num"] = event_nums

In [None]:
# Splitting Data
val_train_ratio = 0.7
val_val_ratio = 0.15

n_events = max(raw_data5["event_num"])
val_train_qty = int(val_train_ratio * n_events)
val_val_qty = int(val_val_ratio * n_events) + val_train_qty

train_split = raw_data5[raw_data5["event_num"]<val_train_qty]
validation_split = raw_data5[(raw_data5["event_num"]>=val_train_qty) & (raw_data5["event_num"]<val_val_qty)]
test_split = raw_data5[raw_data5["event_num"]>=val_val_qty]

## Feature Selection

In [None]:
ft_data = train_split.copy()

Immediately based off priors, we can see that many of our variables relate to each other. E.g. we have tank height, length and width, which suggest that we should create a variable for the tank volume.

### Exploration

In [None]:
# Plotting Variables vs tgt_pressure
def plot_tgt_var(df, vars = None, n_cols = 4):
    if vars == None:
        vars = df.columns
        
    numeric_cols = df[vars].select_dtypes(include = np.number).columns.to_list()
    n_cols = n_cols
    n_rows = int(len(numeric_cols) / n_cols) + (len(numeric_cols) % n_cols > 0)
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols * 2.5, n_rows * 2))
    
    i = 0
    for col_name in numeric_cols:
        if col_name != "tgt_pressure":
            ax = axs[i//n_cols, i%n_cols]
            ax.scatter(df[col_name], df["tgt_pressure"])
            ax.set_title(f"{col_name}")
            i += 1
    
    plt.tight_layout()
    plt.show()

In [None]:
plot_tgt_var(ft_data)

In [None]:
ft_data.corr().style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

In [None]:
ft_data.corr()[["tgt_pressure"]].T.style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

In [None]:
sample = ft_data.loc[10:36]

In [None]:
sample

In [None]:
plot_tgt_var(sample)

Looking into distinguishing by sensor_side (instead of individual sensors)

In [None]:
raw_data2[["Sensor Position Side", "Sensor ID"]].drop_duplicates().sort_values(by = "Sensor ID")

In [None]:
# We can group IDs by Position Side as such:
# 1: 1-9
# 2: 10-18
# 3: 19-21
# 4: 22-24
# 5: 25-27

# This was done above in 2.5.1 "sensor_location" (using 1 = back, 2 = front, 3 = sides)

In [None]:
grpd_sensor_loc = ft_data.groupby("sensor_location")
grpd_sensor_loc_vld = validation_split.groupby("sensor_location")

### Net Sensor Distance

In [None]:
# Plotting Variables vs tgt_pressure for Each Sensor
def plot_sensor_data(grpd_data, vars = None, n_cols = 4):
    if vars == None:
        vars = df.columns
        
    n_cols = min(len(vars), n_cols)
    n_rows = 27
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols * 2.5, n_rows * 2))
    
    i = 0
    for sensor_id, df in grpd_data:
        for col_name in vars:
            ax = axs[i//n_cols, i%n_cols]
            ax.scatter(df[col_name], df["tgt_pressure"])
            ax.set_title(f"{i//n_cols+1}: {col_name}")
            i += 1
        
    plt.tight_layout()
    plt.show()

In [None]:
grpd = ft_data.groupby("sensor_id")

# plot_sensor_data(grpd, ["sensor_x", "sensor_y", "sensor_z", "net_sensor_dist"])

In [None]:
n_cols = 3
n_rows = 9
fig, axs = plt.subplots(n_rows, n_cols, figsize=(n_cols * 2.5, n_rows * 2))

i = 0
for sensor_id, df in grpd:
    ax = axs[i//n_cols, i%n_cols]
    ax.scatter(df["obstacle_dist"], df["net_sensor_dist"])
    ax.set_title(f"{i+1}: sens_dist vs obs_dist")
    i += 1
    
plt.tight_layout()
plt.show()

In [None]:
y_intercepts = []

for sensor_id, df in grpd:
    X = df[["obstacle_dist"]]
    y = df["net_sensor_dist"]
    model = LinearRegression()
    model.fit(X, y)

    # Get y-intercept
    y_intercept = model.intercept_
    y_intercepts.append(y_intercept)

# Plot y-intercepts on a bar graph
plt.figure(figsize=(6, 3))
plt.bar(range(1, len(y_intercepts) + 1), y_intercepts)
plt.show()

In [None]:
ft_data.corr()[["tgt_pressure","net_sensor_dist","obstacle_dist"]].T.style.background_gradient(cmap = "coolwarm", vmin = -1, vmax = 1)

In [None]:
def reg_stats(df, vars):
    r_sq_result = {}
    
    for predictor in vars:
        X = df[[predictor]]
        y = df["tgt_pressure"]
        model = LinearRegression()
        model.fit(X, y)
        y_pred = model.predict(X)
        r_squared = r2_score(y, y_pred)
        coefficients = model.coef_  # Get the coefficients
        r_sq_result[predictor] = {"R-squared": r_squared, "Coefficients": coefficients}
    
    
    for key, val in r_sq_result.items():
        print(f"### {key} ###\nCoeff = {val['Coefficients'][0]:.4f}\nR^2 = {val['R-squared']:.6f}\n")

In [None]:
X = ft_data[["net_sensor_dist"]]
y = ft_data["tgt_pressure"]
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)
r_squared = r2_score(y, y_pred)
coefficients = model.coef_  # Get the coefficients
display(pd.DataFrame({"R-squared": r_squared, "Coefficients": coefficients}))

In [None]:
reg_stats(ft_data, ["net_sensor_dist", "obstacle_dist"])

Conclusion: it appears that the net_sensor_dist and obstacle_distance_to_BLEVE are very similar in nature, and without more information regarding the nature of the experiment, it is hard to pinpoint where exactly this difference comes from.net_sensor_dist has a stronger correlation with tgt_pressure than the obstacle_distance, we will use that instead.

### Automatic Variable Selection

In [None]:
def mape_calc(y_tgt, y_pred):
        return np.mean(np.abs(1 - y_pred / y_tgt))

In [None]:
def out_of_sample_test(model, data):
    y2 = data["tgt_pressure"]
    X2 = data.drop(columns=["tgt_pressure"])
    y2_pred = model.predict(X2)
    
    r2_2 = r2_score(y2, y2_pred)
    mape2 = mape_calc(y2, y2_pred)
    
    print(f"R^2: {r2_2:.4f}, MAPE: {mape2:.4f}")

In [None]:
def forward_selected_r2(data, response, max_features=5, select_features = 5, criterion='aic', k = False):
    remaining = set(data.columns)
    remaining.remove(response)
    selected = []
    current_score, best_new_score = float('inf'), float('inf')
    if k == False:
        k = '-1 +'
    else:
        k = ''
    while remaining and len(selected) < max_features:
        scores_with_candidates = []
        if not selected:
            curr_score = float("inf")
            curr_r2 = 0
        else:
            curr_formula = f"{response} ~ {k}{' + '.join(selected)}"
            curr_model = sm.OLS.from_formula(curr_formula, data).fit()
            curr_r2 = curr_model.rsquared
            if criterion == "aic":
                curr_score = curr_model.aic
            elif criterion == "bic":
                curr_score = curr_model.bic
        for candidate in remaining:
            formula = f"{response} ~ {k}{' + '.join(selected + [candidate])}"
            if criterion == 'aic':
                model = sm.OLS.from_formula(formula, data).fit()
                score = model.aic
                r_squared = model.rsquared
            elif criterion == 'bic':
                model = sm.OLS.from_formula(formula, data).fit()
                score = model.bic
                r_squared = model.rsquared
            else:
                raise ValueError("Invalid: use 'aic' or 'bic'")
            scores_with_candidates.append((score, r_squared, candidate))
        scores_with_candidates.sort()
        
        print(f"\nBest variables (current: {criterion.upper()} = {curr_score:.2f}, R^2 = {curr_r2:.4f}):")
        for i, (score, r_squared, candidate) in enumerate(scores_with_candidates[:select_features], 1): # enumerate from 1 instead of 0
            print(f"{i}. {candidate.ljust(20)} \tAIC={score:.2f}, \tR^2={r_squared:.4f}")
        
        user_input = input(f"Variable to add (1-{select_features}): ")
        try:
            choice = int(user_input)
            if 1 <= choice <= select_features:
                best_candidate = scores_with_candidates[choice - 1][2]
                print(f"\n### Adding {best_candidate} ###")
                remaining.remove(best_candidate)
                selected.append(best_candidate)
                current_score = scores_with_candidates[choice - 1][0]
            else:
                break
        except ValueError:
            print("Invalid. Enter a number")

    if not selected:
        raise ValueError("No variables added to model")
            
    formula = f"{response} ~ {k}{' + '.join(selected)}"
    model = sm.OLS.from_formula(formula, data).fit()

    y_tgt = data[response]
    y_pred = model.predict(data).clip(0)
    mape = mape_calc(y_tgt, y_pred)
    
    print(f"\n\n### METRICS ###\nMape:\t{mape:.4f} \nR^2:\t{model.rsquared:.4f}")
    print("\n",model.summary(),"\n")

    
    plt.figure(figsize=(6, 3))
    plt.scatter(y_pred, y_tgt)
    plt.plot([y_pred.min(), y_pred.max()], [y_pred.min(), y_pred.max()], color='red', linestyle='--', label='y = x')
    
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Accuracy of Predictions")
    plt.grid(True)
    plt.show()
    
    return model

In [None]:
def forward_selected_mape(data, response, max_features=5, select_features=5, criterion='aic', k=False):
    remaining = set(data.columns)
    remaining.remove(response)
    selected = []
    current_score, best_new_score = float('inf'), float('inf')
    if k == False:
        k = '-1 +'
    else:
        k = ''
    while remaining and len(selected) < max_features:
        scores_with_candidates = []
        if not selected:
            curr_score = float("inf")
            curr_mape = float("inf")
            curr_r2 = 0
        else:
            curr_formula = f"{response} ~ {k}{' + '.join(selected)}"
            curr_model = sm.OLS.from_formula(curr_formula, data).fit()
            curr_mape = mape_calc(data[response], curr_model.predict(data))
            curr_r2 = curr_model.rsquared
            if criterion == "aic":
                curr_score = curr_model.aic
            elif criterion == "bic":
                curr_score = curr_model.bic
        for candidate in remaining:
            formula = f"{response} ~ {k}{' + '.join(selected + [candidate])}"
            if criterion == 'aic':
                model = sm.OLS.from_formula(formula, data).fit()
                score = model.aic
            elif criterion == 'bic':
                model = sm.OLS.from_formula(formula, data).fit()
                score = model.bic
            else:
                raise ValueError("Invalid: use 'aic' or 'bic'")
            mape = mape_calc(data[response], model.predict(data))
            r2 = model.rsquared
            scores_with_candidates.append((score, mape, r2, candidate))
        scores_with_candidates.sort(key=lambda x: x[1])
        
        print(f"\nBest variables (current: {criterion.upper()} = {curr_score:.2f}, MAPE = {curr_mape:.4f}, R^2 = {curr_r2:.4f}):")
        for i, (score, mape, r2, candidate) in enumerate(scores_with_candidates[:select_features], 1):
            print(f"{i}. {candidate.ljust(20)} \tAIC={score:.2f}, \tMAPE={mape:.4f}, \tR^2={r2:.4f}")

        user_input = input(f"Variable to add (1-{select_features}): ")
        try:
            choice = int(user_input)
            if 1 <= choice <= select_features:
                best_candidate = scores_with_candidates[choice - 1][3]
                print(f"\n### Adding {best_candidate} ###")
                remaining.remove(best_candidate)
                selected.append(best_candidate)
                current_score = scores_with_candidates[choice - 1][0]
            else:
                break
        except ValueError:
            print("Invalid. Enter a number")

    if not selected:
        raise ValueError("No variables added to model")

    formula = f"{response} ~ {k}{' + '.join(selected)}"
    model = sm.OLS.from_formula(formula, data).fit()

    y_tgt = data[response]
    y_pred = model.predict(data).clip(0)
    mape = mape_calc(y_tgt, y_pred)

    print(f"\n\n### METRICS ###\nMAPE:\t{mape:.4f} \n{criterion.upper()}:\t{current_score:.2f}")
    print("\n",model.summary(),"\n")

    plt.figure(figsize=(6, 3))
    plt.scatter(y_pred, y_tgt)
    plt.plot([y_pred.min(), y_pred.max()], [y_pred.min(), y_pred.max()], color='red', linestyle='--', label='y = x')

    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Accuracy of Predictions")
    plt.grid(True)
    plt.show()

    return model


In [None]:
selected_model = forward_selected_r2(ft_data, "tgt_pressure", criterion='aic', k = False)

In [None]:
out_of_sample_test(selected_model, validation_split)

In [None]:
preds = selected_model.predict(raw_test_data2).clip(0)

In [None]:
output_df = pd.DataFrame({'ID': preds.index, 'Target Pressure (bar)': preds.values})
output_df.to_csv('predictions.csv', index=False)

### Testing by Sensor Side

In [None]:
grpd_sensor_side = ft_data.groupby("sensor_side")
grpd_sensor_side_vld = validation_split.groupby("sensor_side")
grpd_sensor_side_test = raw_test_data2.groupby("sensor_side")

In [None]:
models = {}
side_dict = {1: "back", 2: "front", 3: "side_L", 4: "side_T", 5: "side_R"}

for side, df in grpd_sensor_side:
    print(f"\n##### SIDE: {side_dict[side]} #####\n")
    model_side = f"model_{side_dict[side]}" 
    model = forward_selected_mape(df, "tgt_pressure", criterion='aic', k=False)
    models[model_side] = model
    print("\n")

In [None]:
for model, (side, df) in zip(models.values(), grpd_sensor_side_vld):
    out_of_sample_test(model, df)

In [None]:
for model, (side, df) in zip(models.values(), grpd_sensor_side_test):
    model.predict(df).clip(0)

In [None]:
preds = [model.predict(df).clip(0) for model, (_, df) in zip(models.values(), grpd_sensor_side_test)]
preds_concat = pd.concat(preds).sort_index()

In [None]:
output_df = pd.DataFrame({'ID': preds_concat.index, 'Target Pressure (bar)': preds_concat.values})
output_df.to_csv('predictions.csv', index=False)

### Testing by Sensor

In [None]:
grpd_sensor = ft_data.groupby("sensor_id")
grpd_sensor_vld = validation_split.groupby("sensor_id")
grpd_sensor_tst = test_split.groupby("sensor_id")
grpd_sensor_test = raw_test_data2.groupby("sensor_id")

In [None]:
models = {}

for side, df in grpd_sensor:
    if side % 4 == 0:
        print(f"\n##### SIDE: {side} #####\n")
        model_side = f"model_{side}" 
        model = forward_selected_mape(df, "tgt_pressure", criterion='aic', k=False)
        models[model_side] = model
        print("\n")

In [None]:
sensor10 = ft_data.query("sensor_id == 10")

In [None]:
temp = sensor10.sort_values("tgt_pressure", ascending = False).round(2).drop(columns = ["ID", "event_num", "BLEVE_h", "obstacle_angle", "sensor_side", "sensor_id", "sensor_location"]).reset_index(drop = True)

In [None]:
fig, axes = plt.subplots(nrows=5, ncols=3, figsize=(15, 10))
axes = axes.flatten()

for i, column in enumerate(temp.columns):
    ax = axes[i]
    ax.plot(temp.index, temp[column], alpha = 0.2)
    ewma = temp[column].ewm(span=10).mean()
    ax.plot(temp.index, ewma, label='EWMA', color='orange')
    ax.set_title(column)

plt.tight_layout()
plt.show()


In [None]:
models = {}

m1 = ["tank_volume", "vapour_height", "net_sensor_dist", "failure_pressure"]
m2 = ["tank_volume", "vapour_height"]
m3 = ["tank_volume", "vapour_height"]

for side, df in grpd_sensor:
    if side <= 9:
        model_side = f"model_{int(side)}" 
        temp_formula = f"tgt_pressure ~ {' + '.join(m1)} - 1"
        temp_model = sm.OLS.from_formula(temp_formula, df).fit()
        temp_mape = mape_calc(df["tgt_pressure"], temp_model.predict(df))
        temp_r2 = temp_model.rsquared
    
        models[model_side] = temp_model
        
    elif side <= 15:
        model_side = f"model_{int(side)}" 
        temp_formula = f"tgt_pressure ~ {' + '.join(m2)} - 1"
        temp_model = sm.OLS.from_formula(temp_formula, df).fit()
        temp_mape = mape_calc(df["tgt_pressure"], temp_model.predict(df))
        temp_r2 = temp_model.rsquared
    
        models[model_side] = temp_model

    else:
        model_side = f"model_{int(side)}" 
    
        temp_formula = f"tgt_pressure ~ {' + '.join(m3)} - 1"
        temp_model = sm.OLS.from_formula(temp_formula, df).fit()
        temp_mape = mape_calc(df["tgt_pressure"], temp_model.predict(df))
        temp_r2 = temp_model.rsquared
    
        models[model_side] = temp_model

In [None]:
for model, (side, df) in zip(models.values(), grpd_sensor_vld):
    if side in [10,16]:
        print("\n")
    out_of_sample_test(model, df)

In [None]:
for model, (side, df) in zip(models.values(), grpd_sensor_test):
    model.predict(df).clip(0)

In [None]:
preds = [model.predict(df).clip(0) for model, (_, df) in zip(models.values(), grpd_sensor_test)]
preds_concat = pd.concat(preds).sort_index()

In [None]:
output_df = pd.DataFrame({'ID': preds_concat.index, 'Target Pressure (bar)': preds_concat.values})
output_df.to_csv('predictions.csv', index=False)

In [None]:
coefficients_df = pd.DataFrame()

for i, model in enumerate(models.values()):
    coefficients = model.params
    coefficients_df[f'{i+1}'] = coefficients

coefficients_df = coefficients_df.T
coefficients_df.index = coefficients_df.index.astype(int)

In [None]:
first_range = coefficients_df[coefficients_df.index < 9]
second_range = coefficients_df[(coefficients_df.index >= 9) & (coefficients_df.index <= 15)]
third_range = coefficients_df[coefficients_df.index > 15]

plt.figure(figsize=(6, 3))
for column in first_range.columns:
    plt.plot(first_range.index, first_range[column], label=column)
plt.legend()
plt.show()

plt.figure(figsize=(6, 3))
for column in second_range.columns:
    plt.plot(second_range.index, second_range[column], label=column)
plt.legend()
plt.show()

plt.figure(figsize=(6, 3))
for column in third_range.columns:
    plt.plot(third_range.index, third_range[column], label=column)
plt.legend()
plt.show()


### Real-World Research

In [None]:
ft_data2 = ft_data.copy()

Here we have the properties of the 2 unique substances present in our data

In [None]:
substance_properties

As mentioned in 2.3.2 (and by using common sense), a sub-0 boiling temperature raises some red flags. Through some elementary research, we can discover that these are the distinct properties of 2 unique molecules; n-butane and propane respectively.

In [None]:
def calc_new_BP(p2, element):
    if element == "n-butane" or element == 0:
        h = 22.40 * 1000
        p1 = 1
        t1 = -1 + 273.15
    elif element == "propane" or element == 1:
        h = 16.25 * 1000
        p1 = 1
        t1 = -42 + 273.15
    else:
        raise ValueError("Element must be propane or n-butane")

    calc = 1/t1 - 8.3145 * np.log(p2/p1)/h
    return round(1/calc, 3)

In [None]:
map_dict = {0: -1.0, 1: -42.0}
ft_data2["BP_orig"] = ft_data2["substance"].map(map_dict)
ft_data2["BP_new"] = ft_data2.apply(lambda row: calc_new_BP(row["failure_pressure"], row["substance"]), axis = 1)
ft_data2["temp_excess"] =  ft_data2["liquid_temp"] - ft_data2["BP_new"]

In [None]:
ft_data2

In [None]:
sensor1 = list(ft_data2.groupby("sensor_id"))[14][1]

In [None]:
plot_tgt_var(sensor1.query("substance == 0"),["vapour_temp", "liquid_temp","failure_pressure", "temp_excess"], n_cols = 2)

In [None]:
plot_tgt_var(sensor1.query("substance == 1"),["vapour_temp", "liquid_temp","failure_pressure", "temp_excess"], n_cols = 2)