# Script

In [11]:
import numpy as np
import scipy.stats as stats


class lr:

    def __init__(self):
        self._d = None
        self._n = None
        self._con_lvl = 0.95
        self.a = 1-self._con_lvl    # alpha
        self.b = None

    @property
    def d(self):
        return self._d
    
    @property
    def n(self):
        return self._n

    @property
    def confidence_level(self):
        return self._con_lvl

    def fit(self, X, y):
        self.b = np.linalg.pinv(X.T @ X) @ X.T @ y
        self._d = len(self.b) - 1
        self._n = y.shape[0]

    def predict(self, X):
        return X @ self.b

    def variance(self, X, y):
        SSE = np.sum(np.square(y - X @ self.b))
        return SSE / (self._n - self._d - 1)

    def standard_deviation(self, X, y):
        var = self.variance(X, y)
        return np.sqrt(var)

    def significance(self, X, y):
        var = self.variance(X, y)
        std_dev = np.sqrt(var)
        SSE = np.sum(np.square(y - X @ self.b))
        SST = np.sum(np.square(y - np.mean(y)))
        SSR = SST - SSE
        f_stat = (SSR / self._d) / var
        f_pvalue = stats.f.sf(f_stat, self._d, self._n - self._d - 1)
        cov_matrix = np.linalg.pinv(X.T @ X) * var
        ti_stat = [self.b[i] / (std_dev * np.sqrt(cov_matrix[i, i])) for i in range(self._d)]
        ti_pvalues = [2 * min(stats.t.cdf(i, self._n - self._d - 1), stats.t.sf(i, self._n - self._d - 1)) for i in ti_stat]
        return {
            "f_pvalue": f_pvalue, 
            "ti_pvalues": ti_pvalues
        }

    def relevance(self, X, y):
        SSE = np.sum(np.square(y - X @ self.b))
        SST = np.sum(np.square(y - np.mean(y)))
        SSR = SST - SSE
        R_squared = SSR / SST
        return R_squared
    
    def test_relevance(self, X, y):
        SSE = np.sum(np.square(y - X @ self.b))
        RSE = np.sqrt(SSE / (self._n - 2))
        MSE = (1 / self._n) * SSE
        RMSE = np.sqrt(MSE)
        return {
            "RSE": RSE,
            "MSE": MSE, 
            "RMSE": RMSE
        }

    def pearson(self, X, y):
        kin_geo = stats.pearsonr(X[:,1], X[:,2])
        kin_ine = stats.pearsonr(X[:,1], X[:,3])
        geo_ine = stats.pearsonr(X[:,2], X[:,3])
        return {
            "kin_geo": kin_geo,
            "kin_ine": kin_ine,
            "geo_ine": geo_ine
        }
    
    def confidence_intervals(self, X, y):
        var = self.variance(X, y)
        std_dev = self.standard_deviation(X, y)
        cov_matrix = np.linalg.pinv(X.T @ X) * var
        t_crit = stats.t.ppf(1 - self.a / 2, self._n - self._d - 1)
        intervals = []
        for i in range(self._d + 1):
            std_err = std_dev * np.sqrt(cov_matrix[i,i])
            margin = t_crit * std_err
            lower = self.b[i] - margin
            upper = self.b[i] + margin
            intervals.append((lower, upper))
        return intervals
    
    def observer_bias(self, X, y):
        """
        Test significance on the observer values. 
        """
        pass

# Analysis

In [28]:
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.stats as stats
import matplotlib.pyplot as plt
# from linear_regression import LinearRegression as lr

# Importing and standardising data

data = pd.read_csv("../data/Small-diameter-flow.csv", index_col=0)
data_std = data.copy()
for column in ["Flow", "Kinematic", "Geometric", "Inertial"]:
    mean = np.mean(data[column])
    sample_std_dev = np.std(data[column], ddof=1)
    data_std[column] = (data[column] - mean) / sample_std_dev
data_std

# Creating sets

data_shuffled = data_std.sample(frac=1, random_state=42)  # setting the seed for reproducibility

train_indices = round(0.8 * len(data_shuffled))
val_indices = round(0.25 * train_indices)
test_indices = round(0.2 * len(data_shuffled))

test_df = pd.DataFrame(data_shuffled[:test_indices])
train_df = pd.DataFrame(data_shuffled[test_indices:])
val_df = pd.DataFrame(train_df[:val_indices])
train_df = pd.DataFrame(train_df[val_indices:])

X_train = np.column_stack([np.ones(len(train_df)), train_df["Kinematic"], train_df["Geometric"], train_df["Inertial"], train_df["Observer"]])
y_train = train_df["Flow"]

X_val = np.column_stack([np.ones(len(val_df)), val_df["Kinematic"], val_df["Geometric"], val_df["Inertial"], val_df["Observer"]])
y_val = val_df["Flow"]

X_test = np.column_stack([np.ones(len(test_df)), test_df["Kinematic"], test_df["Geometric"], test_df["Inertial"], test_df["Observer"]])
y_test = test_df["Flow"]

# Running the model

model = lr()
model.fit(X_train, y_train)
predictions = model.predict(X_val)

print(f"Number of features: {model.d}")
print(f"Number of rows: {model.n}") # from X_train
print()
print(f"Variance: {model.variance(X_train, y_train):.2e}")
print(f"Standard deviation: {model.standard_deviation(X_train, y_train):.2e}")
print()
sig = model.significance(X_train, y_train) # returns dictionary with f_pvalue and ti_pvalues
print(f"Significance of f-statistic: {sig["f_pvalue"]:.2e}")
print(f"Significance of t-statistic for ...\n... Kinematic: {sig["ti_pvalues"][0]:.2e}\n... Geometric: {sig["ti_pvalues"][1]:.2e}\n... Inertial:  {sig["ti_pvalues"][2]:.2e}")
print()
print(f"Relevance (R²): {model.relevance(X_train, y_train):.2e}")
print()
t_rel = model.test_relevance(X_train, y_train) # returns dictionary with RSE, MSE and RMSE
v_rel = model.test_relevance(X_val, y_val)
t_rel = {key: value.item() for key, value in t_rel.items()}
v_rel = {key: value.item() for key, value in v_rel.items()}
print(f"Training relevance ...\n... RSE:  {t_rel["RSE"]:.2e}\n... MSE:  {t_rel["MSE"]:.2e}\n... RMSE: {t_rel["RMSE"]:.2e}")
print(f"Validation relevance ...\n... RSE:  {v_rel["RSE"]:.2e}\n... MSE:  {v_rel["MSE"]:.2e}\n... RMSE: {v_rel["RMSE"]:.2e}")
print()
r = model.pearson(X_train, y_train) # returns Pearson dictionary
print(f"Kinematic - Geometric: {r["kin_geo"][0]:.2e} (correlation), {r["kin_geo"][1]:.2e} (p-value)")
print(f"Kinematic - Inertial: {r["kin_ine"][0]:.2e} (correlation), {r["kin_ine"][1]:.2e} (p-value)")
print(f"Geometric - Inertial: {r["geo_ine"][0]:.2e} (correlation), {r["geo_ine"][1]:.2e} (p-value)")
print()
ci = model.confidence_intervals(X_train, y_train) # returns the confidence intervals for all parameters
print("Confidence intervals for each coefficient ...")
for i, (lower, upper) in enumerate(ci):
    print(f"... β{i}: {model.b[i]:.4f} ± {upper - lower:.4f}")

Number of features: 4
Number of rows: 118

Variance: 2.96e-03
Standard deviation: 5.44e-02

Significance of f-statistic: 3.93e-143
Significance of t-statistic for ...
... Kinematic: 1.75e-75
... Geometric: 3.60e-157
... Inertial:  2.06e-245

Relevance (R²): 9.97e-01

Training relevance ...
... RSE:  5.37e-02
... MSE:  2.84e-03
... RMSE: 5.32e-02
Validation relevance ...
... RSE:  3.19e-02
... MSE:  9.99e-04
... RMSE: 3.16e-02

Kinematic - Geometric: 8.69e-01 (correlation), 3.34e-37 (p-value)
Kinematic - Inertial: 9.72e-01 (correlation), 7.72e-75 (p-value)
Geometric - Inertial: 9.20e-01 (correlation), 6.91e-49 (p-value)

Confidence intervals for each coefficient ...
... β0: -0.0175 ± 0.0015
... β1: 0.2968 ± 0.0047
... β2: 1.1006 ± 0.0029
... β3: -0.3989 ± 0.0059
... β4: 0.0191 ± 0.0022


# To do

In [96]:
# kolla signifikansen på båda observers för sista VG-frågan
# extras

# Test

In [97]:
var = model.variance(X_train, y_train)
std_dev = model.standard_deviation(X_train, y_train)
cov_matrix = np.linalg.pinv(X_train.T @ X_train) * var

# Extract the diagonal elements of the covariance matrix
C_ii = np.diag(cov_matrix)

# Calculate the critical value t_{alpha / 2}
alpha = 0.05
degrees_of_freedom = model.n - model.d - 1
t_value = stats.t.ppf(1 - alpha / 2, degrees_of_freedom)

# Compute the margin of error for each parameter
margin_of_error = t_value * std_dev * np.sqrt(C_ii)

# Calculate the confidence interval for each parameter
confidence_intervals = [(model.b[i] - margin_of_error[i], model.b[i] + margin_of_error[i]) for i in range(len(model.b))]

print("Confidence intervals for each parameter:")
for i, ci in enumerate(confidence_intervals):
    margin = (ci[1] - ci[0]) / 2
    print(f"β_{i}: {model.b[i]:.4f} ± {margin:.4f}")

Confidence intervals for each parameter:
β_0: -0.0175 ± 0.0007
β_1: 0.2968 ± 0.0023
β_2: 1.1006 ± 0.0014
β_3: -0.3989 ± 0.0029
β_4: 0.0191 ± 0.0011


In [103]:
ival = model.confidence_intervals(X_train, y_train)
for i, (lower, upper) in enumerate(ival):
    margin = (upper - lower) / 2
    print(f"β_{i}: {model.b[i]:.4f} ± {margin:.4f}")

β_0: -0.0175 ± 0.0007
β_1: 0.2968 ± 0.0023
β_2: 1.1006 ± 0.0014
β_3: -0.3989 ± 0.0029
β_4: 0.0191 ± 0.0011


In [6]:
cval = model.confidence_intervals(X_train, y_train)
cval

['-0.017527887405245055 ± 0.0014945456344860539',
 '0.2967695009444519 ± 0.004674357920674166',
 '1.1005692072612674 ± 0.0028684135816963163',
 '-0.3989324015907981 ± 0.005867777094761162',
 '0.019103782886399318 ± 0.0022367634106976106']

# Har frågat Raphael om detta

In [112]:
Syy = (n * np.sum(np.square(y_train)) - np.square(np.sum(y_train))) / n             # från Raphaels code-along
TSS = np.sum(np.square(y_train - np.mean(y_train)))                                 # från andra källor

Sxx1 = (n * np.sum(np.square(X_train)) - np.square(np.sum(X_train))) / n            # från Raphaels code-along
Sxx2 = (n * np.sum(np.square(X_train)) - np.square(np.sum(X_train))) / (n * (n-1))  # från handledning
SSX = np.sum(np.square(X_train - np.mean(X_train)))                                 # från andra källor

Sxx_ISLP = np.sum(np.square(X_train - np.mean(X_train))) / (n - 1)  # page 183 in ISLP

print(f"Syy: {Syy}\nTSS: {TSS}")
print()
print(f"Sxx1: {Sxx1}\nSxx2: {Sxx2}\nSSX: {SSX}")
print(f"ISLP: {Sxx_ISLP}")


Syy: 119.14909480499718
TSS: 119.14909480499716

Sxx1: 309.7393883916007
Sxx2: 2.6473451999282114
SSX: 495.83172641397806
ISLP: 4.237878003538274
