# Computation Physics & Data Science: Exercise 3 - Large Dataset Analysis

## Libary Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import sklearn
from sklearn import linear_model
from sklearn.linear_model import LinearRegression, QuantileRegressor, HuberRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_validate, cross_val_predict

## Part 1: Reading and Verifying Data

In [None]:
#==============================
#IMPORT RAW DATA & CLEAN HEADER
#==============================
rows_skipped = [0, 2, 3] #Decorative/ unit lines in data file to skip

df = pd.read_table("data.csv", skiprows=rows_skipped, delimiter=",",on_bad_lines="skip") #Creates panda data frame with the csv file


#=============
#DATA CLEANING
#=============
df.columns = df.columns.str.replace('#', '').str.strip() #Remove special characters from header

for col in df.columns[1:]: #Loop through columns skipping first materials column 
    df[col] = pd.to_numeric(df[col], errors="coerce") #Convert to int & turn errors into NA values   

df = df.drop(df[df['material'] == 'm'].index) #Removes invalid 'm' material row
df = df.dropna() #Drop all other NA cells


#==============
#MIN & MAX DATA
#==============
#Store array of min and maximum of each feature
feature_max = df.max()
feature_min = df.min()

#print out each features min and max values
feature_names = ['density', 'radius', 'mass', 'temperature', 'pressure', 'height', 'time']
for name in feature_names:
    print(f"{name}: Maximum = {feature_max[name]:.4f}, Minimum = {feature_min[name]:.4f}")


#===========
#OUTPUT DATA
#===========
df #Print first and last few rows of data (just to visulise)

In [None]:
#=====
#PLOTS
#=====
#Height-Time per Material
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='height', y='time', hue='material') #Utalises seaborn libary w/ panda tables
plt.title('Fall Height vs Time by Material')
plt.xlabel("Fall Height (m)")
plt.ylabel("Time (s)")
plt.grid(True)
plt.show()

#Height-Time per Sphere Radius
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='height', y='time', hue='radius')
plt.title('Fall Height vs Time by Sphere Radius')
plt.xlabel("Fall Height (m)")
plt.ylabel("Time (s)")
plt.grid(True)
plt.show()

## Part 2: Correlation Matrix

In [None]:
#==================
#HEATMAP W/ SEABORN
#==================
plt.figure(figsize=(10, 6))
heatmap = sns.heatmap(df.corr(), cmap='coolwarm', vmin=-1, vmax=1, annot=True) #Inbuilt heatmap and correlation functions
heatmap.set_title('Correlation Heatmap')

## Part 3: Multiple Linear Regression

### 3a: MSE Loss Function

In [None]:
X = df[['density', 'radius', 'mass', 'temperature', 'pressure', 'height']].values #Define matrix of variable data
y = df['time'].values #Defines our actual observed results


#=================
#MSE LOSS FUNCTION
#=================
model_mse = LinearRegression() #Model is set to linear regression (used for MSE)
model_mse.fit(X, y) #Trains the model with our X matrix and its target y values

betas_mse = model_mse.coef_ #Creates database of model coefficients
intercept_mse = model_mse.intercept_ #Saves intercept of model

yp_mse = model_mse.predict(X) #Creates predicted y (time) values

mse = mean_squared_error(y, yp_mse) #Computes mse equation w/ true and predicted y (time) values to determine loss

print(f'Mean Squared Error (MSE) Loss: {mse:.6f}') #Output mse loss value

#Output best fit line coefficients and intercept
print(f'Intercept: {intercept_mse:.6f}')
for name, beta in zip(feature_names, betas_mse):
    print(f"{name}: {beta:.6f}")
    

#==========================
#VISULISING DATA (2D PLOTS)
#==========================
#HEIGHT MODEL (using same methods as before but trained only on height data)
model_height = LinearRegression()
model_height.fit(df[['height']], y) 

betas_height = model_height.coef_
intercept_height = model_height.intercept_

yp_height = model_height.predict(df[['height']])  

mse_height = mean_squared_error(y, yp_height)

print(f'\nMean Squared Error (MSE) Height Loss: {mse_height:.6f}')  

#Output line of best fit beta weights
print(f'Intercept: {intercept_height:.6f}')  
print(f"Height Beta: {betas_height[0]:.6f}") 

#Plot height data in scatter
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='height', y='time', color='blue')

#Plot height-time regression line
plt.plot(df['height'], yp_height, color='red', linewidth=2, label='Regression Line (MSE)')
plt.xlabel('Height (m)')
plt.ylabel('Time (s)')
plt.title('Linear Regression (MSE): Height vs Time')
plt.grid(True)
plt.show()


#RADIUS MODEL
model_radius = LinearRegression()
model_radius.fit(df[['radius']], y) 

betas_radius = model_radius.coef_
intercept_radius = model_radius.intercept_

yp_radius = model_radius.predict(df[['radius']])  

mse_radius = mean_squared_error(y, yp_radius)

print(f'Mean Squared Error (MSE) Radius Loss: {mse_radius:.6f}')  

#Output line of best fit beta weights
print(f'Intercept: {intercept_radius:.6f}')  
print(f"Radius Beta: {betas_radius[0]:.6f}") 

#Plot radius data in scatter plot
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='radius', y='time', color='blue')

#Plot radius-time regression line
plt.plot(df['radius'], yp_radius, color='red', linewidth=2, label='Regression Line (MSE)')
plt.xlabel('Radius (m)')
plt.ylabel('Time (s)')
plt.title('Linear Regression (MSE): Radius vs Time')
plt.grid(True)
plt.show()


#DENSITY MODEL
model_density = LinearRegression()
model_density.fit(df[['density']], y) 

betas_density = model_density.coef_
intercept_density = model_density.intercept_

yp_density = model_density.predict(df[['density']])  

mse_density = mean_squared_error(y, yp_density)

print(f'Mean Squared Error (MSE) Density Loss: {mse_density:.6f}')  

#Output line of best fit beta weights
print(f'Intercept: {intercept_density:.6f}')  
print(f"Density Beta: {betas_density[0]:.6f}") 

#Density scatter plot
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='density', y='time', color='blue')

#Plot density-time regression line
plt.plot(df['density'], yp_density, color='red', linewidth=2, label='Regression Line (MSE)')
plt.xlabel('Density (kg/m^3)')
plt.ylabel('Time (s)')
plt.title('Linear Regression (MSE): Density vs Time')
plt.grid(True)
plt.show()

### 3b: Additional Loss Functions

In [None]:
#===================
#MEAN ABSOLUTE ERROR
#===================
model_mae = QuantileRegressor(quantile=0.5, solver='highs') #Fits through median target values where minimum MAE loss lies
model_mae.fit(X, y) #Trains our model to pur dataset
betas_mae = model_mae.coef_ #Defines beta values
intercept_mae = model_mae.intercept_ 
yp_mae = model_mae.predict(X) #Predicts y values for our model
mae = mean_absolute_error(y, yp_mae) #Computes loss from our actual and expected data

#Output MAE model results
print(f'Mean Absolute Error (MAE) Loss: {mae:.6f}')
print(f'Intercept: {intercept_mae:.6f}')
for name, beta in zip(feature_names, betas_mae):
    print(f"{name}: {beta:.6f}")
    
#=====
#HUBER
#=====
model_huber = HuberRegressor() #Initalise huber regression
model_huber.fit(X, y) #Train 
betas_huber = model_huber.coef_ #Gather betas
intercept_huber = model_huber.intercept_ 
yp_huber = model_huber.predict(X) #Predict target values

#Huber loss function using mathematical function as shown in report
def huber_loss(delta, residuals):
    #If else array array to return the loss array
    return np.where(np.abs(residuals) <= delta, 0.5 * residuals**2, delta * (np.abs(residuals) - 0.5 * delta))

#Calculate Huber model loss
delta = 1.0 #Condition for MSE and MAE transition
huber = np.mean(huber_loss(delta, y - yp_huber))

#Output Huber results
print(f"\nHuber Loss: {huber:.6f}")
print(f"Intercept: {intercept_huber:.6f}")
for name, beta in zip(feature_names, betas_huber):
    print(f"{name}: {beta:.6f}")

### 3c: Gradient Decent

In [None]:
eta = 1e-3 #Global learning-rate 
n_iterations = 1000 #Number of iterations for each descent

scaler = StandardScaler() #Creates scalar object to store mean and std of each variable
X_normalised = scaler.fit_transform(X) #Normalises each variable to avoid overflow errors w/ learning rates 

N = X_normalised.shape[0] #Number of datapoints
X_normalised = np.c_[np.ones((N, 1)), X_normalised] #Joins together objects w/ extra column for intercept term descent

print(f"Learning Rate: {eta}")
print(f"No. Iterations: {n_iterations}")

#Function to allow for epeated use of standardising our beta weights
def unnormalise(betas):
    for i in range(1, len(betas)):
        betas[i] /= scaler.scale_[i-1] #Divide by std 
        betas[0] -= betas[i] * scaler.mean_[i-1] #Correct intercept
    
    return betas


#======================
#BATCH GRADIENT DESCENT
#======================
#Uses entire datafile to calculate gradient in each iteration

betas_bgd = np.zeros((7, 1)) #Creates 'empty' list of zeros for beta values
betas_bgd_path = [] #Creates empty array to store each iteration of the beta values

for iteration in range(n_iterations):
    gradients = (2/N) * X_normalised.T.dot(X_normalised.dot(betas_bgd) - yp_mse.reshape(-1,1)) #Calculates gradients using whole datafile
    
    betas_bgd = betas_bgd - eta * gradients #Creates new value for beta in step size of learning rate in direction of minima (direction of gradient)
    betas_bgd_path.append(betas_bgd.copy()) #Saves copy of new beta values

betas_bgd_standard = unnormalise(betas_bgd) #Unnormalise the beta weights

#Calculate MSE loss
X_original = np.c_[np.ones((N, 1)), X] #Creates object of w/ X variables as well as intercept term
mse_bgd = mean_squared_error(X_original.dot(betas_bgd_standard), yp_mse) #Used to calculate MSE loss

#Print results
print(f'\nBatch Gradient Descent Results: {mse_bgd:.6f}')
for name, beta in zip(["Intercept"] + feature_names, betas_bgd_standard):
    print(f"{name}: {beta[0]:.6f}")

    
#===========================
#STOCHASTIC GRADIENT DESCENT
#===========================
#Uses a single random element each iteration to calculate gradient

betas_sgd = np.zeros((7,1))
betas_sgd_path = []

for iteration in range(n_iterations):
    random_index = np.random.randint(N) #Calculates random number (index) wihtin datafile size N
    x1 = X_normalised[random_index:random_index+1] #Finds random row
    y1 = yp_mse[random_index:random_index+1] #Finds random element in that row
    
    gradients = 2 * x1.T.dot(x1.dot(betas_sgd) - y1) #Gradient calculated for that element
    
    betas_sgd = betas_sgd - eta * gradients 
    betas_sgd_path.append(beta)

betas_sgd_standard = unnormalise(betas_sgd.copy()) #Unnormalise weights

#Calculate MSE loss
X_original = np.c_[np.ones((N, 1)), X]
mse_sgd = mean_squared_error(X_original.dot(betas_sgd_standard), yp_mse)

#Output final beta weights
print(f'\nStochastic Gradient Descent Results: {mse_sgd:.6f}')
for name, beta in zip(["Intercept"] + feature_names, betas_sgd_standard):
    print(f"{name}: {beta[0]:.6f}")

    
#===========================    
#MINI-BATCH GRADIENT DESCENT
#===========================
#Uses small random subset of data to calculate gradient at each iteration

betas_mgd = np.zeros((7, 1))
betas_mgd_path = []

minibatch_size = 10 #Size of each mini-batch sample

for iteration in range(n_iterations):
    #Shuffle data
    shuffled_indices = np.random.permutation(N) #Shuffles matrix index
    X_shuffled = X_normalised[shuffled_indices] #Shuffles columns according to shuffled index
    y_shuffled = yp_mse.reshape(-1,1)[shuffled_indices] #Shuffles values according to shuffled columns
    
    #Select subset of shuffled data
    xi = X_shuffled[:minibatch_size] #Selects first initial mini-batch of features
    yi = y_shuffled[:minibatch_size] #Selects corresponding values
    
    gradients = 2/minibatch_size * xi.T.dot(xi.dot(betas_mgd) - yi) #Gradient for mini-batch group
    
    #Update beta values accordingly
    betas_mgd = betas_mgd - eta * gradients
    betas_mgd_path.append(betas_mgd.copy())

betas_mgd_standard = unnormalise(betas_mgd) #Unnormalise beta weights

#Calculate MSE loss
X_original = np.c_[np.ones((N, 1)), X]
mse_mgd = mean_squared_error(X_original.dot(betas_mgd_standard), yp_mse)

#Output weights & intercept
print(f'\nMini-batch Gradient Descent Results: {mse_mgd:.6f}')
for name, beta in zip(["Intercept"] + feature_names, betas_mgd_standard):
    print(f"{name}: {beta[0]:.6f}")

## Part 4: Model Verification

### 4a: Train & Test Data

In [None]:
#============
#TRAIN & TEST
#============
#Creating train & test data subsets to understand models reaction to unseen data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
model_mse.fit(X_train, y_train) #Plot train dataset to mse model (90% of datafile)

betas_test = np.zeros((7, 1)) #Create empty array for test betas
betas_test = model_mse.coef_ #Fit to linear model as trained above with train dataset
intercept_test = model_mse.intercept_

#Output weights & intercept for test dataset
print(f'\nTest Betas Results:')
print(f'Intercept: {intercept_test}')
for name, beta in zip(feature_names, betas_test):
    print(f'{name}: {beta:.6f}')

residuals = X_test.dot(betas_test) - y_test + intercept_test #Calculate residuals of train and test datasets w/ intercept to normalise

#Residual mean and standard deviation
residual_mean = np.mean(residuals)
residual_std = np.std(residuals)
print(f"\nResiduals (Mean & Std): {residual_mean:.6f} ± {residual_std:.6f}")

#Validate the model with train and test scores
model_validate = linear_model.LinearRegression()
valid_scores = cross_validate(model_validate, X, y, return_train_score = True) #Cross validadtion returns model performance to train and test datasets
#valid_scores['train_score']

#Scores how well model fits training data
print(f"\nTrain Score (Mean & Std): {np.mean(valid_scores['train_score']):.4f} ± {np.std(valid_scores['train_score']):.6f}")
#Scores how well models to unseen data
print(f"Test Score (Mean & Std): {np.mean(valid_scores['test_score']):.4f} ± {np.std(valid_scores['test_score']):.6f}")


#=====
#PLOTS
#=====
plt.figure(figsize=(10, 6))
plt.scatter(range(len(residuals)), residuals, color = 'b', alpha=0.75)
plt.axhline(y = residual_mean + residual_std, color='r', linewidth=2, label = 'Standard Deviation')
plt.axhline(y = residual_mean - residual_std, color='r', linewidth=2)
plt.axhline(y = residual_mean, color='g',linewidth=2, label = 'Residual Mean')
plt.xlabel('Dataset Index')
plt.ylabel('Residual')
plt.title('Train vs Test Data Residuals')
plt.legend()
plt.grid(True)
plt.show

plt.figure(figsize=(10, 6))
plt.hist(residuals, bins=30, color='b', alpha=0.75, edgecolor='black', label='Residuals')
plt.axvline(x=residual_mean, color='g', linewidth=2, label='Residual Mean')
plt.axvline(x=residual_mean + residual_std, color='r', linewidth=2, label='Standard Deviation')
plt.axvline(x=residual_mean - residual_std, color='r', linewidth=2)
plt.xlabel('Residual Value')
plt.ylabel('No. Instances')
plt.title('Train vs Test Data Residuals Histogram')
plt.legend()
plt.grid(True)
plt.show()


### 4b: Physical Data Comparison

In [None]:
#===================
#DETERMINE CONSTANTS
#===================
M = 0.0289652 #kg/mol, molar mass of dry air
R = 8.314462 #J/(mol*K), molar gas constant
C_d = 0.47 #Assumed drag coefficient of a sphere
g = 9.81 #m/s^2, acceleration due to gravity 


#===============
#QUADRATIC MODEL
#===============
def fall_time(h, m, r, p, T):
    #Calculate air density
    rho = (p * M) / (R * T)
    
    #Calculate cross-sectional areas of sphere
    A = np.pi * r**2
    
    #Calculate drag coefficient
    k = (C_d * rho * A) / 2
    
    #Calculate fall time
    time = np.sqrt(m / (k * g)) * np.arccosh(np.exp((h * k) / m))

    return time

#Create height arrays
h_low = np.linspace(0,500,1000)
h_high = np.linspace(1000,1500,1000)

#Find sample data to be used in above and below models 
#In future model w/ more time two values for below and above height range would be found to better simulate air resistance at those heights
sample = df.iloc[0]
m = sample['mass']
r = sample['radius']
p = sample['pressure']
T = sample['temperature']

#Calculate fall time above and below our data w/ quadratic model for each height interval
t_low = [fall_time(h, m, r, p, T) for h in h_low]
t_high = [fall_time(h, m, r, p, T) for h in h_high]


#============
#LINEAR MODEL
#============
lin_model = LinearRegression()
lin_model.fit(X, y)

#Initalise two new df which are copies of original dataframe
X_low = pd.DataFrame(columns=feature_names)
X_high = pd.DataFrame(columns=feature_names)

#Copies over sample data to low and high dataframes for same length as h_low and h_high
for feature in feature_names:
    if feature != 'height':
        X_low[feature] = [sample[feature]] * len(h_low)
        X_high[feature] = [sample[feature]] * len(h_high)

#Copy over our height values above and below dataset
X_low['height'] = h_low
X_high['height'] = h_high

#Linear predictions of fall times based on above/ below heights
linear_t_low = lin_model.predict(X_low)
linear_t_high = lin_model.predict(X_high)


#================
#MODEL COMPARISON
#================
plt.figure(figsize=(10, 6))
plt.plot(h_low, t_low, label='Quadratic Drag Model', color = 'b')
plt.plot(h_low, linear_t_low, label='Linear Model', color = 'r')
plt.title('Comparison of Quadratic Drag and Linear Models: Heights < 500m')
plt.xlabel('Height (m)')
plt.ylabel('Fall Time (s)')
plt.grid(True)
plt.legend()
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(h_high, t_high, label='Quadratic Drag Model', color = 'b')
plt.plot(h_high, linear_t_high, label='Linear Model', color = 'r')
plt.title('Comparison of Quadratic Drag and Linear Models: Heights > 1000m')
plt.xlabel('Height (m)')
plt.ylabel('Fall Time (s)')
plt.grid(True)
plt.legend()
plt.show()

#Calculate MSE loss for each height subset
mse_low = mean_squared_error(t_low, linear_t_low)
mse_high = mean_squared_error(t_low, linear_t_high)
print(f'MSE loss (h < 500m): {mse_low:.6f}')
print(f'MSE loss (h > 1000m): {mse_high:.6f}')

#NOTE: Last night the code was working fine (as why I have achieved plots in my report) but I have woken up this morning and the code no longer works - with more time this would've been fixed