# **HW1: Regression**
In *assignment 1*, you need to finish:

1.  Basic Part: Implement two regression models to predict the Systolic blood pressure (SBP) of a patient. You will need to implement **both Matrix Inversion and Gradient Descent**.


> *   Step 1: Split Data
> *   Step 2: Preprocess Data
> *   Step 3: Implement Regression
> *   Step 4: Make Prediction
> *   Step 5: Train Model and Generate Result

2.  Advanced Part: Implement one regression model to predict the SBP of multiple patients in a different way than the basic part. You can choose **either** of the two methods for this part.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **1. Basic Part (55%)**
In the first part, you need to implement the regression to predict SBP from the given DBP


## 1.1 Matrix Inversion Method (25%)


*   Save the prediction result in a csv file **hw1_basic_mi.csv**
*   Print your coefficient


### *Import Packages*

> Note: You **cannot** import any other package

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import csv
import math
import random

### *Global attributes*
Define the global attributes

In [None]:
training_dataroot = 'hw1_basic_training.csv' # Training data file file named as 'hw1_basic_training.csv'
testing_dataroot = 'hw1_basic_testing.csv'   # Testing data file named as 'hw1_basic_training.csv'
output_dataroot = 'hw1_basic_mi.csv' # Output file will be named as 'hw1_basic.csv'

training_datalist =  [] # Training datalist, saved as numpy array
testing_datalist =  [] # Testing datalist, saved as numpy array

output_datalist =  [] # Your prediction, should be 20 * 3 matrix and saved as numpy array
                      # The format of each row should be ['subject_id', 'charttime', 'sbp']

You can add your own global attributes here


In [None]:
training_set =  []
validation_set =  []
testing_set =  []

def transform_X(X):
  X_transformed = X
  #X_transformed = np.hstack((X_transformed, X ** 2))
  #X_transformed = np.hstack((X_transformed, X ** 3))
  X_transformed = np.column_stack((np.ones(X_transformed.shape[0]), X_transformed))
  return X_transformed

### *Load the Input File*
First, load the basic input file **hw1_basic_training.csv** and **hw1_basic_testing.csv**

Input data would be stored in *training_datalist* and *testing_datalist*

In [None]:
# Read input csv to datalist
with open(training_dataroot, newline='') as csvfile:
  training_datalist = np.array(list(csv.reader(csvfile)))

with open(testing_dataroot, newline='') as csvfile:
  testing_datalist = np.array(list(csv.reader(csvfile)))

### *Implement the Regression Model*

> Note: It is recommended to use the functions we defined, you can also define your own functions


#### Step 1: Split Data
Split data in *training_datalist* into training dataset and validation dataset
* Validation dataset is used to validate your own model without the testing data



In [None]:
def SplitData():
  global training_datalist, testing_datalist
  global training_set, validation_set, testing_set

  spliting_index = int(len(training_datalist[1:])*0.9)
  training_set = training_datalist[1:spliting_index].astype(float)
  validation_set = training_datalist[spliting_index:].astype(float)
  testing_set = testing_datalist[1:].astype(float)

#### Step 2: Preprocess Data
Handle the unreasonable data
> Hint: Outlier and missing data can be handled by removing the data or adding the values with the help of statistics  

In [None]:
def PreprocessData(threshold_z):
  global training_set, validation_set
  global training_datalist
  traning_all_set = training_datalist[1:].astype(float)

  # Use the Z Score method
  mean_dbp = np.mean(traning_all_set[:, 0])
  std_dbp = np.std(traning_all_set[:, 0])
  mean_sbp = np.mean(traning_all_set[:, 1])
  std_sbp = np.std(traning_all_set[:, 1])

  # Preprocess the training set
  z_scores_dbp = (training_set[:, 0] - mean_dbp) / std_dbp
  outliers_dbp = np.where(np.abs(z_scores_dbp) > threshold_z)
  training_set = np.delete(training_set, outliers_dbp, axis=0)

  z_scores_sbp = (training_set[:, 1] - mean_sbp) / std_sbp
  outliers_sbp = np.where(np.abs(z_scores_sbp) > threshold_z)
  training_set = np.delete(training_set, outliers_sbp, axis=0)

  ''' I should not preprocess the validation set
  # Preprocess the validating set
  z_scores_dbp_val = (validation_set[:, 0] - mean_dbp) / std_dbp
  outliers_dbp_val = np.where(np.abs(z_scores_dbp_val) > threshold_z)
  validation_set = np.delete(validation_set, outliers_dbp_val, axis=0)

  z_scores_sbp_val = (validation_set[:, 1] - mean_sbp) / std_sbp
  outliers_sbp_val = np.where(np.abs(z_scores_sbp_val) > threshold_z)
  validation_set = np.delete(validation_set, outliers_sbp_val, axis=0)
  '''

#### Step 3: Implement Regression
> use Matrix Inversion to finish this part




In [None]:
def MatrixInversion(X, Y):
  X = transform_X(X)
  W = np.linalg.inv(X.T @ X) @ X.T @ Y

  # Validate W with validation set
  def mape(Y_true, Y_pred):
    return np.mean(np.abs((Y_true - Y_pred) / Y_true))

  '''
  X_validation = np.array([row[0] for row in validation_set]).reshape(-1, 1)
  X_validation = transform_X(X_validation)
  predicted_sbp = X_validation @ W
  actual_sbp_values = np.array([row[1] for row in validation_set])
  print("MAPE from validation_datalist:", mape(actual_sbp_values, predicted_sbp)*100, " %")
  '''

  return W

#### Step 4: Make Prediction
Make prediction of testing dataset and store the value in *output_datalist*
The final *output_datalist* should look something like this
> [ [100], [80], ... , [90] ] where each row contains the predicted SBP

In [None]:
def MakePrediction(new_dbp_set, W):
  global output_datalist
  X_test = transform_X(new_dbp_set)
  output_datalist = np.round(X_test @ W, 3)

#### Step 5: Train Model and Generate Result

> Notice: **Remember to output the coefficients of the model here**, otherwise 5 points would be deducted
* If your regression model is *3x^2 + 2x^1 + 1*, your output would be:
```
3 2 1
```





In [None]:
SplitData()
PreprocessData(2)

X = training_set[:, 0].reshape(-1, 1)
Y = training_set[:, 1].reshape(-1, 1)
W = MatrixInversion(X, Y)

print(' '.join(map(str, W[::-1].flatten())))

# Start to prediction on the testing data
dbp_testing_values = np.array([row[0] for row in testing_set]).reshape(-1, 1)
MakePrediction(dbp_testing_values, W)

### *Write the Output File*
Write the prediction to output csv
> Format: 'sbp'




In [None]:
with open(output_dataroot, 'w', newline='', encoding="utf-8") as csvfile:
  writer = csv.writer(csvfile)
  for row in output_datalist:
    writer.writerow(row)

## 1.2 Gradient Descent Method (30%)


*   Save the prediction result in a csv file **hw1_basic_gd.csv**
*   Output your coefficient update in a csv file **hw1_basic_coefficient.csv**
*   Print your coefficient





### *Global attributes*

In [None]:
output_dataroot = 'hw1_basic_gd.csv' # Output file will be named as 'hw1_basic.csv'
coefficient_output_dataroot = 'hw1_basic_coefficient.csv'

training_datalist =  [] # Training datalist, saved as numpy array
testing_datalist =  [] # Testing datalist, saved as numpy array

output_datalist =  [] # Your prediction, should be 20 * 1 matrix and saved as numpy array
                      # The format of each row should be ['sbp']

coefficient_output = [] # Your coefficient update during gradient descent
                   # Should be a (number of iterations * number_of coefficient) matrix
                   # The format of each row should be ['w0', 'w1', ...., 'wn']

Your own global attributes

In [None]:
training_set =  []
validation_set =  []
testing_set =  []
highest_degree = 1

def transform_X_and_W(X, highest_degree):
  X_transformed = X

  for i in range(highest_degree-1):
    X_transformed = np.hstack((X_transformed, X ** (highest_degree+2)))
  X_transformed = np.column_stack((np.ones(X_transformed.shape[0]), X_transformed))
  W = np.zeros((highest_degree+1, 1)).astype(float)
  return X_transformed, W

# Open files
training_dataroot = 'hw1_basic_training.csv' # Training data file file named as 'hw1_basic_training.csv'
testing_dataroot = 'hw1_basic_testing.csv'   # Testing data file named as 'hw1_basic_training.csv'
# Read input csv to datalist
with open(training_dataroot, newline='') as csvfile:
  training_datalist = np.array(list(csv.reader(csvfile)))

with open(testing_dataroot, newline='') as csvfile:
  testing_datalist = np.array(list(csv.reader(csvfile)))

### *Implement the Regression Model*


#### Step 1: Split Data

In [None]:
def SplitData():
  global training_datalist, testing_datalist
  global training_set, validation_set, testing_set

  spliting_index = int(len(training_datalist[1:])*0.9)
  training_set = training_datalist[1:spliting_index].astype(float)
  validation_set = training_datalist[spliting_index:].astype(float)
  testing_set = testing_datalist[1:].astype(float)

#### Step 2: Preprocess Data

In [None]:
def PreprocessData(threshold_z):
  global training_set, validation_set
  global training_datalist
  traning_all_set = training_datalist[1:].astype(float)

  # Use the Z Score method
  mean_dbp = np.mean(traning_all_set[:, 0])
  std_dbp = np.std(traning_all_set[:, 0])
  mean_sbp = np.mean(traning_all_set[:, 1])
  std_sbp = np.std(traning_all_set[:, 1])

  # Preprocess the training set
  z_scores_dbp = (training_set[:, 0] - mean_dbp) / std_dbp
  outliers_dbp = np.where(np.abs(z_scores_dbp) > threshold_z)
  training_set = np.delete(training_set, outliers_dbp, axis=0)

  z_scores_sbp = (training_set[:, 1] - mean_sbp) / std_sbp
  outliers_sbp = np.where(np.abs(z_scores_sbp) > threshold_z)
  training_set = np.delete(training_set, outliers_sbp, axis=0)

  ''' I should not preprocess the validation set
  # Preprocess the validating set
  z_scores_dbp_val = (validation_set[:, 0] - mean_dbp) / std_dbp
  outliers_dbp_val = np.where(np.abs(z_scores_dbp_val) > threshold_z)
  validation_set = np.delete(validation_set, outliers_dbp_val, axis=0)

  z_scores_sbp_val = (validation_set[:, 1] - mean_sbp) / std_sbp
  outliers_sbp_val = np.where(np.abs(z_scores_sbp_val) > threshold_z)
  validation_set = np.delete(validation_set, outliers_sbp_val, axis=0)
  '''

#### Step 3: Implement Regression
> use Gradient Descent to finish this part

In [None]:
def GradientDescent(X, Y, learning_rate=0.0001, target_mape=0.05, max_iterations=100000):
  def mape(Y_true, Y_pred):
    return np.mean(np.abs((Y_true - Y_pred) / Y_true))

  '''
  def loss_function(X, Y, W):
    predictions = X @ W
    squared_error = (predictions - Y) ** 2
    return np.sum(squared_error) / (2 * len(Y)) # /(2*m) is for normalization
  '''

  global coefficient_output, validation_set, highest_degree
  coefficient_output = []
  X, W = transform_X_and_W(X, highest_degree)

  iteration = 0
  m = len(Y)

  while iteration < max_iterations:
    gradient = (X.T @ (X @ W - Y)) / m # m is for normalization as well
    W = W - learning_rate * gradient

    # Validate W with validation set
    X_validation = np.array([row[0] for row in validation_set]).reshape(-1, 1)
    X_validation, W_not_used = transform_X_and_W(X_validation, highest_degree)
    predicted_sbp = X_validation @ W
    actual_sbp_values = np.array([row[1] for row in validation_set])
    current_mape = mape(actual_sbp_values, predicted_sbp)

    coefficient_output.append(np.concatenate(W))

    #print(f"Iteration {iteration + 1}: MAPE from validation_set = {current_mape*100}%")

    if current_mape < target_mape:
      break

    iteration += 1

  #print(f"Final MAPE from validation_set after Iteration {iteration} = {current_mape*100}%")
  coefficient_output = np.array(coefficient_output)
  return W

#### Step 4: Make Prediction

Make prediction of testing dataset and store the values in *output_datalist*
The final *output_datalist* should look something like this
> [ [100], [80], ... , [90] ] where each row contains the predicted SBP

Remember to also store your coefficient update in *coefficient_output*
The final *coefficient_output* should look something like this
> [ [1, 0, 3, 5], ... , [0.1, 0.3, 0.2, 0.5] ] where each row contains the [w0, w1, ..., wn] of your coefficient





In [None]:
def MakePrediction(new_dbp_set, W):
  global output_datalist, highest_degree
  X_test, W_not_used = transform_X_and_W(new_dbp_set, highest_degree)
  output_datalist = np.round(X_test @ W, 3)

#### Step 5: Train Model and Generate Result

> Notice: **Remember to output the coefficients of the model here**, otherwise 5 points would be deducted
* If your regression model is *3x^2 + 2x^1 + 1*, your output would be:
```
3 2 1
```



In [None]:
SplitData()
PreprocessData(2)

X = training_set[:, 0].reshape(-1, 1)
Y = training_set[:, 1].reshape(-1, 1)
W = GradientDescent(X, Y, 0.00025, 0.1, 100000)

print(' '.join(map(str, W[::-1].flatten())))

# Start to prediction on the testing data
dbp_testing_values = np.array([row[0] for row in testing_set]).reshape(-1, 1)
MakePrediction(dbp_testing_values, W)

### *Write the Output File*

Write the prediction to output csv
> Format: 'sbp'

**Write the coefficient update to csv**
> Format: 'w0', 'w1', ..., 'wn'
>*   The number of columns is based on your number of coefficient
>*   The number of row is based on your number of iterations

In [None]:
with open(output_dataroot, 'w', newline='', encoding="utf-8") as csvfile:
  writer = csv.writer(csvfile)
  for row in output_datalist:
    writer.writerow(row)

with open(coefficient_output_dataroot, 'w', newline='', encoding="utf-8") as csvfile:
  writer = csv.writer(csvfile)
  for row in coefficient_output:
      writer.writerow(row)

# **2. Advanced Part (40%)**
In the second part, you need to implement the regression in a different way than the basic part to help your predictions of multiple patients SBP.

You can choose **either** Matrix Inversion or Gradient Descent method.

The training data will be in **hw1_advanced_training.csv** and the testing data will be in **hw1_advanced_testing.csv**.

Output your prediction in **hw1_advanced.csv**

Notice:
> You cannot import any other package other than those given



### Input the training and testing dataset

In [9]:
training_dataroot = 'hw1_advanced_training.csv' # Training data file file named as 'hw1_basic_training.csv'
testing_dataroot = 'hw1_advanced_testing.csv'   # Testing data file named as 'hw1_basic_training.csv'
output_dataroot = 'hw1_advanced.csv' # Output file will be named as 'hw1_basic.csv'

training_datalist =  [] # Training datalist, saved as numpy array
testing_datalist =  [] # Testing datalist, saved as numpy array

output_datalist =  [] # Your prediction, should be 220 * 1 matrix and saved as numpy array
                      # The format of each row should be ['sbp']

### Your Implementation

In [83]:
training_set =  {}
validation_set =  {}

# Read input csv to datalist
with open(training_dataroot, newline='') as csvfile:
  training_datalist = np.array(list(csv.reader(csvfile)))

with open(testing_dataroot, newline='') as csvfile:
  testing_datalist = np.array(list(csv.reader(csvfile)))

def CountAllDataFrame_FromDictionary(data_dict):
  count = sum(len(data_df) for data_df in data_dict.values())
  print(f"Total DataFrames: {count}")

def PrintAllDataFrame_FromDictionary(data_dict):
  for subject_id, data_df in training_set.items():
    print(subject_id, ":\n", data_df)

def ProcessTimeString_ToSeconds(time_str):
  #'D days hh:mm:ss'
  if len(time_str) == 0 or time_str == '0' or time_str == 0:
    return ''

  time_parts = time_str.split()
  days = int(time_parts[0])
  hh_mm_ss = time_parts[2]

  hh_mm_ss_parts = hh_mm_ss.split(':')
  hours = int(hh_mm_ss_parts[0])
  minutes = int(hh_mm_ss_parts[1])
  seconds = int(hh_mm_ss_parts[2])

  total_seconds = (days * 24 * 60 * 60) + (hours * 60 * 60) + (minutes * 60) + seconds
  return total_seconds

def SplitData():
  global training_datalist, testing_datalist
  global training_set, validation_set, testing_set

  columns = ['subject_id', 'charttime', 'temperature', 'heartrate', 'resprate', 'o2sat', 'sbp']

  # For training and validation
  for i in range(len(training_datalist[1:])):
    training_datalist[i+1, 1] = ProcessTimeString_ToSeconds(training_datalist[i+1, 1])

  df = pd.DataFrame(training_datalist, columns=columns)
  subject_data = {}
  for subject_id, subject_df in df.groupby('subject_id'):
    subject_data[subject_id] = subject_df.drop('subject_id', axis=1)

  for subject_id, data_df in subject_data.items():
    data_array = data_df.values
    num_samples = len(data_array)
    num_train_samples = int(0.9 * num_samples)  # 90% 用于训练，10% 用于验证
    train_data = data_array[:num_train_samples, :]
    valid_data = data_array[num_train_samples:, :]
    training_set[subject_id] = pd.DataFrame(train_data, columns=columns[1:])
    validation_set[subject_id] = pd.DataFrame(valid_data, columns=columns[1:])

  if 'subject_id' in training_set:
    del training_set['subject_id']
  if 'subject_id' in validation_set:
    del validation_set['subject_id']

def PreprocessData(threshold = 2):
  global training_set, validation_set, testing_set

  # All data type to float and clean data with empty block
  columns_to_process = ['charttime', 'temperature', 'heartrate', 'resprate', 'o2sat', 'sbp']
  for subject_id, train_data_df in training_set.items():
    for column in columns_to_process:
        train_data_df[column] = pd.to_numeric(train_data_df[column], errors='coerce')
    train_data_df.dropna(subset=columns_to_process, inplace=True)
  for subject_id, valid_data_df in validation_set.items():
    for column in columns_to_process:
        valid_data_df[column] = pd.to_numeric(valid_data_df[column], errors='coerce')
    valid_data_df.dropna(subset=columns_to_process, inplace=True)

  # testing data
  for i in range(len(testing_datalist[1:])):
    testing_datalist[i+1, 1] = ProcessTimeString_ToSeconds(testing_datalist[i+1, 1])

  # Cleaning outliers
  for key, df in training_set.items():
    for column in df.columns:
        if column != 'charttime':
            z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
            outlier_indices = z_scores > threshold
            df = df[~outlier_indices]
    training_set[key] = df

  for key, df in validation_set.items():
    for column in df.columns:
        if column != 'charttime':
            z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
            outlier_indices = z_scores > threshold
            df = df[~outlier_indices]
    validation_set[key] = df

def transform_dataframe_into_nparray(data_dict, selected_columns):
  data_dict_transformed = {}
  for dataset_name, df in data_dict.items():
    data_dict_transformed[dataset_name] = df[selected_columns].values
  return data_dict_transformed

def MatrixInversion(X, Y):
  W = np.linalg.inv(X.T @ X) @ X.T @ Y
  return W

def validate_and_print_MAPE(W_dict):
  global validation_set

  MAPE_array = []
  for dataset_name, df in validation_set.items():
    for index, row in df.iterrows():
      row = np.array(row.values).astype(float)
      X_validation = row[:5]
      Y_pred = X_validation @ W_dict[dataset_name]
      Y_true = row[5:]
      MAPE_num = (Y_true[0] - Y_pred[0]) / Y_true[0]
      MAPE_array.append(MAPE_num)
  print("MAPE from validation_set:", np.mean(np.abs(MAPE_array))*100, " %")

def MakePrediction(X_test, W):
  return np.round(X_test @ W, 3)

SplitData()
PreprocessData(2)

# Matrix Inversion
X_dict = transform_dataframe_into_nparray(training_set, ['charttime', 'temperature', 'heartrate', 'resprate', 'o2sat'])
Y_dict = transform_dataframe_into_nparray(training_set, ['sbp'])
W_dict = {}
for dataset_name in X_dict.keys():
  X = X_dict[dataset_name]
  Y = Y_dict[dataset_name]
  W = MatrixInversion(X, Y)
  #print(' '.join(map(str, W[::-1].flatten())))
  W_dict[dataset_name] = W

validate_and_print_MAPE(W_dict)

# Start to prediction on the testing data
for patient in testing_datalist[1:]:
  subject_id = patient[0]
  X_test = patient[1:6].astype(float)
  W = W_dict[subject_id]
  Y_predicted = MakePrediction(X_test, W)
  output_datalist.append(Y_predicted)

MAPE from validation_set: 10.22441265219449  %


### Output your Prediction

> your filename should be **hw1_advanced.csv**

In [85]:
with open(output_dataroot, 'w', newline='', encoding="utf-8") as csvfile:
  writer = csv.writer(csvfile)
  for row in output_datalist:
    writer.writerow(row)

# Report *(5%)*

Report should be submitted as a pdf file **hw1_report.pdf**

*   Briefly describe the difficulty you encountered
*   Summarize your work and your reflections
*   No more than one page






# Save the Code File
Please save your code and submit it as an ipynb file! (**hw1.ipynb**)