## Group 6 - Math for AI, AI23 @ HCMUS
- 23122013 - Đinh Đức Tài
- 23122002 - Nguyễn Đình Hà Dương
- 23122004 - Nguyễn Lê Hoàng Trung
- 23122014 - Hoàng Minh Trung

## [Lab1] Linear Regression

## Part 0: Import libs, define DataProcessor and functions

#### 0.1: Import libs: Numpy, Pandas, Matplotlib

In [None]:
# Importing the libraries: numpy, pandas, matplotlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import copy

#### 0.2: Create class DataProcessor

In [None]:
class DataProcessor:
    def __init__(self, file_path):
        self.file_path = file_path
        self.data = None
        self.original = True

    def load_data(self):
        """Load data from the CSV file."""
        self.data = pd.read_csv(self.file_path)
        print("Data loaded successfully!")
        return self.data
    
    def get_summary(self):
        """Print a summary of the data."""
        print("Number of rows:", len(self.data))
        print("Column names:", self.data.columns.tolist())
        return self.data.describe()
    
    def head(self, n = 5):
        """Return the first n rows of the data."""
        return self.data.head(n)
    
    def null_info(self):
        """Print information about missing values."""
        print("\nNumber of rows with NaN values:", self.data.isna().any(axis=1).sum())

    def get_column_initial_info(self):
        print("\nInformation about the columns:")
        column_info = pd.DataFrame({
            'Column Name': self.data.columns,
            'Description': [
                "Hãng xe", "Mẫu xe", "Giá xe (VNĐ)", "Năm sản xuất", "Số km đã đi", 
                "Loại nhiên liệu", "Hộp số", "Địa điểm bán", "Màu xe", "Số chủ sở hữu trước đó", 
                "Loại người bán", "Dung tích động cơ (cc)", "Công suất tối đa (bhp)", 
                "Mô-men xoắn tối đa (Nm)", "Hệ dẫn động", "Chiều dài xe (mm)", 
                "Chiều rộng xe (mm)", "Chiều cao xe (mm)", "Số chỗ ngồi", 
                "Dung tích bình nhiên liệu (lít)"
            ],
            'Data Type': self.data.dtypes.values,
            'Number of NaN': self.data.isna().sum().values,
            'Unique Values': self.data.nunique().values,
            'Most Frequent Value': self.data.mode().iloc[0].values,
        })

        return column_info
    
    def get_column_after_transform_info(self):
        print("\nInformation about the columns:")
        column_info = pd.DataFrame({
            'Column Name': self.data.columns,
            'Description': [
                "Hãng xe", "Mẫu xe", "Giá xe (VNĐ)", "Năm sản xuất", "Số km đã đi", 
                "Loại nhiên liệu", "Hộp số", "Địa điểm bán", "Màu xe", "Số chủ sở hữu trước đó", 
                "Loại người bán", "Dung tích động cơ (cc)", "Công suất tối đa (bhp)", 
                "Mô-men xoắn tối đa (Nm)", "Hệ dẫn động", "Chiều dài xe (mm)", 
                "Chiều rộng xe (mm)", "Chiều cao xe (mm)", "Số chỗ ngồi", 
                "Dung tích bình nhiên liệu (lít)", 'Vòng tua tại Công suất tối đa (rpm)',
                'Vòng tua tại Mô-men xoắn tối đa (rpm)',
            ],
            'Data Type': self.data.dtypes.values,
            'Number of NaN': self.data.isna().sum().values,
            'Unique Values': self.data.nunique().values,
            'Most Frequent Value': self.data.mode().iloc[0].values,
        })

        return column_info

#### 0.3: Get infomation about unique values functions

In [None]:
def get_some_unique_values(self):
    print("\nUnique values of some columns:")
    print("Fuel Type:", self.data['Fuel Type'].unique())
    print("Transmission:", self.data['Transmission'].unique())
    print("Seller Type:", self.data['Seller Type'].unique())
    print("Drivetrain:", self.data['Drivetrain'].unique())

    print("Owner:", self.data['Owner'].unique())
    print("Seating Capacity:", self.data['Seating Capacity'].unique())

def unique_values(self):
    object_columns = self.data.select_dtypes(include=['object']).columns
    numeric_columns = self.data.select_dtypes(include=['float64', 'int64']).columns

    # List of object columns and their unique values
    object_columns_list = [(col, self.data[col].nunique()) for col in object_columns]

    # List of numeric columns and their unique values
    numeric_columns_list = [(col, self.data[col].nunique()) for col in numeric_columns]

    print("Object Columns and number of unique values: {}".format(len(object_columns_list)))
    print(object_columns_list)

    print("\nNumeric Columns and number of unique values: {}".format(len(numeric_columns_list)))
    print(numeric_columns_list)
    self.numeric_columns = self.data.select_dtypes(include=['float64', 'int64']).columns
    self.object_columns = self.data.select_dtypes(include=['object']).columns
    return numeric_columns, object_columns

DataProcessor.get_some_unique_values = get_some_unique_values
DataProcessor.unique_values = unique_values

#### 0.4: Clean and transform data functions

In [None]:
def clean_data(self):
    """Clean the data by handling missing values and duplicates."""
    # Handle missing values
    # Fill numeric columns with their mean
    numeric_columns = self.data.select_dtypes(include=['float64', 'int64']).columns
    self.data[numeric_columns] = self.data[numeric_columns].fillna(self.data[numeric_columns].mean().astype(int))

    # Fill categorical columns with the most frequent value
    categorical_columns = self.data.select_dtypes(include=['object']).columns
    self.data[categorical_columns] = self.data[categorical_columns].fillna(self.data[categorical_columns].mode().iloc[0])

    # Remove duplicates
    self.data = self.data.drop_duplicates()

    # Reset index after cleaning
    self.data.reset_index(drop=True, inplace=True)

    # Print summary after cleaning
    print("Data cleaned successfully!")
    print("Number of rows after cleaning:", len(self.data))
    print("Number of missing values after cleaning:", self.data.isna().sum().sum())

def transform_data(self):
    """Transform data by standardizing specific columns."""
    # 'Engine' ('cc') -> float
    self.data['Engine'] = self.data['Engine'].astype(str).str.replace(' cc', '').astype(float)

    # Extract RPM values from the original string values before conversion
    if (self.original is True):
        self.data['rpm at Max Power'] = (
            self.data['Max Power']
            .astype(str)
            .str.extract(r'@\s*(\d+)\s*rpm', expand=False)
        )
        self.data['rpm at Max Torque'] = (
            self.data['Max Torque']
            .astype(str)
            .str.extract(r'@\s*(\d+)\s*rpm', expand=False)
        )
    self.original = False

    # Fill missing values with the most frequent value
    self.data['rpm at Max Power'] = self.data['rpm at Max Power'].fillna(self.data['rpm at Max Power'].mode().iloc[0])
    self.data['rpm at Max Torque'] = self.data['rpm at Max Torque'].fillna(self.data['rpm at Max Torque'].mode().iloc[0])

    # 'rpm at Max Power' -> int
    self.data['rpm at Max Power'] = self.data['rpm at Max Power'].astype(int)

    # 'rpm at Max Torque' -> int
    self.data['rpm at Max Torque'] = self.data['rpm at Max Torque'].astype(int)
    
    # 'Max Power' ('bhp') -> int
    self.data['Max Power'] = self.data['Max Power'].astype(str).str.extract(r'(\d+)', expand=False).astype(int)

    # 'Max Torque' ('Nm') -> int
    self.data['Max Torque'] = self.data['Max Torque'].astype(str).str.extract(r'(\d+)', expand=False).astype(int)

    # 'Seating Capacity' -> int
    self.data['Seating Capacity'] = self.data['Seating Capacity'].astype(int)

    # 'Fuel Tank Capacity' -> int
    self.data['Fuel Tank Capacity'] = self.data['Fuel Tank Capacity'].astype(int)

    # 'Owner' -> int
    self.data['Owner'] = self.data['Owner'].map({
        'UnRegistered Car': 0,
        'First': 1,
        'Second': 2,
        'Third': 3,
        'Fourth': 4,
        '4 or More': 5,
        0: 0,
        1: 1,
        2: 2,
        3: 3,
        4: 4,
        5: 5,
    })

    self.numeric_columns = self.data.select_dtypes(include=['float64', 'int64']).columns
    self.object_columns = self.data.select_dtypes(include=['object']).columns

    print("Data transformed successfully!")
    print("\nData Transformation Details:")
    print("- 'Engine' (cc) converted to float.")
    print("- 'Max Power' (bhp) converted to integer.")
    print("- 'Max Torque' (Nm) converted to integer.")
    print("- Add 'rpm at Max Power' and converted to integer.")
    print("- Add 'rpm at Max Torque' and converted to integer.")
    print("- 'Seating Capacity' converted to integer.")
    print("- 'Fuel Tank Capacity' converted to integer.")
    print("- 'Owner' converted to numerical categories.")

DataProcessor.clean_data = clean_data
DataProcessor.transform_data = transform_data

#### 0.5: Data visualization functions

In [None]:
def plot_corr_matrix(self, width=12, height=8):
    # Compute the correlation matrix using only numeric features
    corr_matrix = self.data[self.numeric_columns].corr()

    # Plot the correlation matrix using matplotlib
    plt.figure(figsize=(width, height))
    plt.imshow(corr_matrix, cmap='coolwarm', interpolation='none')
    plt.colorbar()
    plt.xticks(range(len(corr_matrix)), corr_matrix.columns, rotation=90)
    plt.yticks(range(len(corr_matrix)), corr_matrix.columns)
    plt.title("Correlation Matrix of Numeric Features")

    # Annotate the matrix with correlation coefficients
    for i in range(len(corr_matrix)):
        for j in range(len(corr_matrix)):
            plt.text(j, i, f"{corr_matrix.iloc[i, j]:.2f}", ha='center', va='center', color='black')

    plt.tight_layout()
    plt.show()

def plot_distribution_of_numeric_columns(self):
    # Plot histograms for all numeric columns
    num_cols = self.numeric_columns

    num_cols_count = len(num_cols)
    n_cols = 3  # Number of columns in the figure
    n_rows = (num_cols_count + n_cols - 1) // n_cols  # Calculate the number of rows needed

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 3.5 * n_rows))
    axes = axes.flatten()  # Flatten the axes array for easy iteration

    for i, col in enumerate(num_cols):
        axes[i].hist(self.data[col], bins=30, color='skyblue', edgecolor='black', alpha=0.7)
        axes[i].set_title(f'Histogram of {col}')
        axes[i].set_xlabel(col)
        axes[i].set_ylabel('Frequency')
        axes[i].grid(axis='y', linestyle='--', alpha=0.7)

    # Hide any unused subplots
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.show()

def box_plot_for_object_columns(self):
    for col in self.object_columns:
        if (col == 'Model'): 
            continue
        plt.figure(figsize=(12, 3))
        categories = self.data[col].unique()
        groups = [self.data.loc[self.data[col] == category, 'Price'] for category in categories]
        plt.boxplot(groups, patch_artist=True, tick_labels=categories)
        plt.title(f'Box Plot: Price by {col}')
        plt.xlabel(col)
        plt.ylabel('Price')
        if (col == 'Location'):
            plt.xticks(rotation=90)
        elif (col == 'Fuel Type' or col == 'Transmission' or col == 'Seller Type' or col == 'Drivetrain'):
            plt.xticks(rotation=0)
        else:
            plt.xticks(rotation=60)
        plt.show()

def scatter_plot_for_numeric_columns(self):
    cols = [col for col in self.numeric_columns if col != 'Price']
    n_plots = len(cols)
    n_cols = 3
    n_rows = int(n_plots / n_cols) + 1

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 4 * n_rows))
    axes = axes.flatten()

    for i, col in enumerate(cols):
        axes[i].scatter(self.data[col], self.data['Price'], alpha=0.5, color='blue', edgecolors='k')
        axes[i].set_title(f'Relationship between Price and {col}')
        axes[i].set_xlabel(col)
        axes[i].set_ylabel('Price')
        axes[i].grid(True, linestyle='--', alpha=0.7)

    # Remove any unused subplots
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.show()

DataProcessor.plot_corr_matrix = plot_corr_matrix
DataProcessor.plot_distribution_of_numeric_columns = plot_distribution_of_numeric_columns
DataProcessor.box_plot_for_object_columns = box_plot_for_object_columns
DataProcessor.scatter_plot_for_numeric_columns = scatter_plot_for_numeric_columns

#### 0.6: Split dataset to train set and validation set

In [None]:
def train_valid_split(self, valid_size=0.2, random_state=None):
    """Split the data into training and validation sets, ensuring all unique 'Model' values are present in the training set."""
    train_set = copy.deepcopy(self)
    valid_set = copy.deepcopy(self)
    
    # Set random seed if provided
    if random_state is not None:
        np.random.seed(random_state)

    # Identify unique models
    unique_models = self.data['Model'].unique()
    
    # Create training indices: start with an empty list
    train_indices = []
    
    # Iterate through each unique model
    for model in unique_models:
        # Get indices where the 'Model' column equals the current unique model
        model_indices = self.data[self.data['Model'] == model].index.tolist()
        
        # Randomly choose one index for each model to be included in the training set
        # This ensures that at least one sample of each model is in the training set
        chosen_index = np.random.choice(model_indices)
        train_indices.append(chosen_index)

    # Create remaining indices for the training set
    remaining_count = int((1 - valid_size) * len(self.data)) - len(train_indices)
    remaining_indices = list(set(np.arange(len(self.data))) - set(train_indices))
    
    # Randomly sample from the remaining indices to complete the training set
    sampled_indices = np.random.choice(remaining_indices, size=remaining_count, replace=False).tolist()
    train_indices.extend(sampled_indices)
    
    # Create validation indices: the indices not in the training set
    valid_indices = list(set(np.arange(len(self.data))) - set(train_indices))
    
    # Shuffle the training indices
    np.random.shuffle(train_indices)

    # Assign data to train_set and valid_set
    train_set.data = self.data.iloc[train_indices].reset_index(drop=True)
    valid_set.data = self.data.iloc[valid_indices].reset_index(drop=True)

    return train_set, valid_set

DataProcessor.train_valid_split = train_valid_split

#### 0.7: Data Encoding

In [None]:
def encode_data(self):
    """
    Mã hóa các biến theo yêu cầu:
    - Hãng xe (Make): thay thế bằng giá trung vị của các xe thuộc hãng đó
    - Mẫu xe (Model): thay thế bằng giá trung vị của các xe cùng mẫu
    - Loại nhiên liệu (Fuel Type): Hybrid -> 1; Ngược lại -> 0
    - Hộp số (Transmission): Auto -> 1; Manual -> 0
    - Địa điểm (Location): nếu giá trung vị của Location ≥ giá trung vị toàn cục -> 1; ngược lại -> 0
    - Màu sắc (Color): Black, Blue, Yellow -> 1; ngược lại -> 0
    - Loại người bán (Seller Type): Corporate -> 1; Individual và Commercial Registration -> 0
    - Drivetrain: FWD -> 1, RWD -> 2, AWD -> 3
    """
    # Hãng xe: map giá trung vị theo Make
    make_median = self.data.groupby('Make')['Price'].median()
    self.data['Make'] = self.data['Make'].map(make_median)
    
    # Mẫu xe: map giá trung vị theo Model
    model_median = self.data.groupby('Model')['Price'].median()
    self.data['Model'] = self.data['Model'].map(model_median)
    
    # Loại nhiên liệu: chỉ giữ Hybrid = 1, còn lại = 0
    self.data['Fuel Type'] = self.data['Fuel Type'].apply(lambda x: 1 if x.strip().lower() == 'hybrid' else 0)
    
    # Hộp số: Auto = 1; Manual = 0
    self.data['Transmission'] = self.data['Transmission'].apply(lambda x: 1 if 'auto' in x.strip().lower() else 0)
    
    # Địa điểm: chia theo giá trung vị so với toàn bộ data
    global_median_price = self.data['Price'].median()
    location_medians = self.data.groupby('Location')['Price'].median()
    def encode_location(loc):
        return 1 if location_medians.loc[loc] >= global_median_price else 0
    self.data['Location'] = self.data['Location'].apply(encode_location)
    
    # Màu sắc: Black, Blue, Yellow -> 1; khác -> 0
    self.data['Color'] = self.data['Color'].apply(lambda x: 1 if x.strip().lower() in ['black', 'blue', 'yellow'] else 0)
    
    # Loại người bán: Corporate = 1; Individual và Commercial Registration = 0
    self.data['Seller Type'] = self.data['Seller Type'].apply(lambda x: 1 if x.strip().lower() == 'corporate' else 0)
    
    # Drivetrain: FWD = 1, RWD = 2, AWD = 3
    drivetrain_mapping = {'FWD': 1, 'RWD': 2, 'AWD': 3}
    self.data['Drivetrain'] = self.data['Drivetrain'].map(drivetrain_mapping)
    
    print("Data encoding completed!")
    encoding_information = {}
    encoding_information['make_median'] = make_median
    encoding_information['model_median'] = model_median
    encoding_information['location_medians'] = location_medians
    encoding_information['global_median_price'] = global_median_price
    encoding_information['drivetrain_mapping'] = drivetrain_mapping
    return encoding_information
    
# Gắn hàm encode_data vào class DataProcessor
DataProcessor.encode_data = encode_data

def encode_data_test_set(self, encoding_information):
    """
    Mã hóa dữ liệu kiểm tra theo thông tin đã học từ tập huấn luyện:
    - Hãng xe (Make): thay thế bằng giá trung vị của các xe thuộc hãng đó
    - Mẫu xe (Model): thay thế bằng giá trung vị của các xe cùng mẫu
    - Loại nhiên liệu (Fuel Type): Hybrid -> 1; Ngược lại -> 0
    - Hộp số (Transmission): Auto -> 1; Manual -> 0
    - Địa điểm (Location): nếu giá trung vị của Location ≥ giá trung vị toàn cục -> 1; ngược lại -> 0
    - Màu sắc (Color): Black, Blue, Yellow -> 1; ngược lại -> 0
    - Loại người bán (Seller Type): Corporate -> 1; Individual và Commercial Registration -> 0
    - Drivetrain: FWD -> 1, RWD -> 2, AWD -> 3
    """
    # Hãng xe: map giá trung vị theo Make, use global_median_price for missing keys
    self.data['Make'] = self.data['Make'].apply(
        lambda x: encoding_information['make_median'].get(x, encoding_information['global_median_price'])
    )
    
    # Mẫu xe: map giá trung vị theo Model, use global_median_price for missing keys
    self.data['Model'] = self.data['Model'].apply(
        lambda x: encoding_information['model_median'].get(x, encoding_information['global_median_price'])
    )
    # Loại nhiên liệu: chỉ giữ Hybrid = 1, còn lại = 0
    self.data['Fuel Type'] = self.data['Fuel Type'].apply(lambda x: 1 if x.strip().lower() == 'hybrid' else 0)
    
    # Hộp số: Auto = 1; Manual = 0
    self.data['Transmission'] = self.data['Transmission'].apply(lambda x: 1 if 'auto' in x.strip().lower() else 0)
    
    # Địa điểm: chia theo giá trung vị so với toàn bộ data
    global_median_price = encoding_information['global_median_price']
    location_medians = encoding_information['location_medians']
    def encode_location(loc):
        try:
            return 1 if location_medians.loc[loc] >= global_median_price else 0
        except KeyError:
            return 0
    self.data['Location'] = self.data['Location'].apply(encode_location)

    # Màu sắc: Black, Blue, Yellow -> 1; khác -> 0
    self.data['Color'] = self.data['Color'].apply(lambda x: 1 if x.strip().lower() in ['black', 'blue', 'yellow'] else 0)

    # Loại người bán: Corporate = 1; Individual và Commercial Registration = 0
    self.data['Seller Type'] = self.data['Seller Type'].apply(lambda x: 1 if x.strip().lower() == 'corporate' else 0)

    # Drivetrain: FWD = 1, RWD = 2, AWD = 3
    drivetrain_mapping = encoding_information['drivetrain_mapping']
    self.data['Drivetrain'] = self.data['Drivetrain'].map(drivetrain_mapping)

    print("Data encoding completed!")

# Gắn hàm encode_data_test_set vào class DataProcessor
DataProcessor.encode_data_test_set = encode_data_test_set

#### 0.8: Data Normalization

In [None]:
def normalize_data(self):
    """
    Chuẩn hóa các feature dữ liệu:
    - Áp dụng log transformation cho 'Price' và 'Kilometer' nhằm giảm ảnh hưởng do độ lệch quy mô.
    - Sau đó dùng MinMaxScaler chuẩn hóa toàn bộ các biến số."
    """

    # Áp dụng log1p (log(1+x)) để tránh lỗi với giá trị 0
    self.data['Kilometer'] = np.log1p(self.data['Kilometer'])
    
    # Chọn các cột số để scale
    numeric_cols = self.data.select_dtypes(include=['float64', 'int64']).columns
    numeric_cols = numeric_cols.drop(['Price'])
    
    min_vals = self.data[numeric_cols].min()
    max_vals = self.data[numeric_cols].max()
    range_vals = max_vals - min_vals
    # Avoid division by zero by replacing 0 differences with 1
    range_vals[range_vals == 0] = 1
    self.data[numeric_cols] = (self.data[numeric_cols] - min_vals) / range_vals
    
    self.unique_values()
    print("Data normalization completed!")

    return range_vals, min_vals, max_vals
    
DataProcessor.normalize_data = normalize_data

def normalize_data_test_set(self, range_vals, min_vals):
    """
    Chuẩn hóa dữ liệu kiểm tra theo thông tin đã học từ tập huấn luyện:
    - Áp dụng log transformation cho 'Price' và 'Kilometer' nhằm giảm ảnh hưởng do độ lệch quy mô.
    - Sau đó dùng MinMaxScaler chuẩn hóa toàn bộ các biến số."
    """
    # Áp dụng log1p (log(1+x)) để tránh lỗi với giá trị 0
    self.data['Kilometer'] = np.log1p(self.data['Kilometer'])
    
    # Chọn các cột số để scale
    numeric_cols = self.data.select_dtypes(include=['float64', 'int64']).columns
    numeric_cols = numeric_cols.drop(['Price'])
    
    # Normalize the test set
    self.data[numeric_cols] = (self.data[numeric_cols] - min_vals) / range_vals

    self.unique_values()
    print("Data normalization completed!")

DataProcessor.normalize_data_test_set = normalize_data_test_set

## Part I: Load and Explore data (train.csv)

In this part, we will load and explore some information about the original data.

In [None]:
# Initialize the DataProcessor class and load the data (train.csv)
file_path = './data/train.csv'
data = DataProcessor(file_path)
data.load_data()

# Print summary of the data
print("\nSummary of the data:")
data.get_summary()

# First 5 rows of data
print("\nFirst 5 rows of data:")
data.head()

In [None]:
# Print information about the null values
data.null_info()

# Print information about the columns
data.get_column_initial_info()

In [None]:
# Print the unique values of some columns
data.get_some_unique_values()

## Part II: Data Preprocessing

In this part, we split data into a training set and a validation set, fill NaN values, and perform some data transformation.

#### II.0: Split dataset

In [None]:
train_data, valid_data = data.train_valid_split(valid_size=0.2, random_state=0)

#### II.1: Data cleaning

In [None]:
# Clean the data and fill missing values
train_data.clean_data()
valid_data.clean_data()

#### II.2: Data Transformation

In [None]:
# Transform the data
train_data.transform_data()
valid_data.transform_data()

#### II.3: Explore train data after preprocessing

In [None]:
train_data.get_column_after_transform_info()

In [None]:
valid_data.get_column_after_transform_info()

In [None]:
# Find models in validation set that are not in train set
train_models = train_data.data['Model'].unique()
validation_models = valid_data.data['Model'].unique()

new_models = [model for model in validation_models if model not in train_models]

print("Models in validation set but not in train set:", new_models)

In [None]:
train_data.get_some_unique_values()

In [None]:
# Print number of unique values for each column
numeric_columns, object_columns = train_data.unique_values()
print()
numeric_columns, object_columns = valid_data.unique_values()

# Create save point: after preprocessing
train_data_after_preprocessing = copy.deepcopy(train_data)
validation_data_after_preprocessing = copy.deepcopy(valid_data)

## Part III: Data visualization

In [None]:
# 1. Plot the correlation matrix
train_data.plot_corr_matrix()

In [None]:
# 2. Plot the distribution of numeric columns
train_data.plot_distribution_of_numeric_columns()

In [None]:
# 3. Box plot: Relationship between Price and Object Columns
train_data.box_plot_for_object_columns()

In [None]:
# 4. Scatter Plot: Relationship between Price and Numeric Columns
train_data.scatter_plot_for_numeric_columns()

## Part IV: Data Encoding and Data Normalization

#### IV.1: Train set and Validation set
We will use train data and validation data (after preprocessing)
- train_data_after_preprocessing
- validation_data_after_preprocessing

In [None]:
train_data = copy.deepcopy(train_data_after_preprocessing) # after preprocessing
numeric_columns, object_columns = train_data.unique_values()
print()
validation_data = copy.deepcopy(validation_data_after_preprocessing) # after preprocessing
numeric_columns, object_columns = validation_data.unique_values()

#### IV.2: Data Encoding

In [None]:
# Encode the training data 
encoding_information = train_data.encode_data()
train_data.get_column_after_transform_info()

In [None]:
# Encode the validation data
validation_data.encode_data_test_set(encoding_information)
validation_data.get_column_after_transform_info()

In [None]:
# Create save point: after encoding
train_data_after_encoding = copy.deepcopy(train_data)
validation_data_after_encoding = copy.deepcopy(validation_data)

#### IV.3: Data Normalization

##### Training set normalization

In [None]:
train_data = copy.deepcopy(train_data_after_encoding)
train_data.data.describe()

In [None]:
range_vals, min_vals, max_vals = train_data.normalize_data()
train_data_after_normalization = copy.deepcopy(train_data) # after normalization
train_data_after_normalization.data.describe()

In [None]:
range_vals, min_vals, max_vals 
# print range_vals, min_vals, max_vals  size
print(range_vals.size, min_vals.size, max_vals.size)

##### Validation set normalization

In [None]:
validation_data = copy.deepcopy(validation_data_after_encoding)
validation_data.data.describe()
validation_data.data

In [None]:
validation_data.normalize_data_test_set(range_vals, min_vals)
validation_data_after_normalization = copy.deepcopy(validation_data) # after normalization
validation_data_after_normalization.data.describe()

In [None]:
train_data_after_normalization.plot_corr_matrix(15, 10)

In [None]:
validation_data_after_normalization.plot_corr_matrix(15, 10)

## Part V: Split dataset and tools to evaluate models

#### V.1: Split dataset
Split training set into X_train and y_train. Split validation set into X_val and y_val.

In [None]:
train_data = copy.deepcopy(train_data_after_normalization)
validation_data = copy.deepcopy(validation_data_after_normalization)
numeric_columns, object_columns = train_data.numeric_columns, train_data.object_columns
print(numeric_columns, len(numeric_columns))

train_data = train_data.data
validation_data = validation_data.data

# X_train, y_train from train_data
# X_val, y_val from validation_data

X_train = train_data.drop(columns=['Price'])
y_train = train_data['Price']

X_val = validation_data.drop(columns=['Price'])
y_val = validation_data['Price']

#### V.2: Tools to evaluate the models

In [None]:
# Function to evaluate the model
def mse(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def rmse(y_true, y_pred):
    return np.sqrt(mse(y_true, y_pred))

def mae(y_true, y_pred):
    return np.mean(np.abs(y_true - y_pred))

def r2_score(y_true, y_pred):
    SSE = np.sum((y_true - y_pred) ** 2)
    SST = np.sum((y_true - np.mean(y_true)) ** 2) 
    return 1 - (SSE / SST) # R^2 = 1 - SSE/SST = SSR/SST

def evaluate_model(y_true, y_pred):
    model_eval = pd.DataFrame({
        'Metric': ['Mean Squared Error', 'Root Mean Squared Error', 'Mean Absolute Error', 'R^2 Score'],
        'Value': [mse(y_true, y_pred), rmse(y_true, y_pred), mae(y_true, y_pred), r2_score(y_true, y_pred)]
    })
    return model_eval

## Part VI: Simple Linear Regression model from Statistics's point of view

Based on Correlation coefficient map, **Model** maybe the best feature for model. We will prove it.

In [None]:
# Y = beta0 + beta1*X
# beta1 = Sxx / Sxy
# beta0 = mean(Y) - beta1 * mean(X)
# Sxx = sum((X - mean(X))^2)
# Sxy = sum((X - mean(X)) * (Y - mean(Y)))

def simple_linear_regression(X, y):
    mean_X = np.mean(X)
    mean_y = np.mean(y)
    Sxx = np.sum((X - mean_X) ** 2)
    Sxy = np.sum((X - mean_X) * (y - mean_y))
    beta1 = Sxy / Sxx
    beta0 = mean_y - beta1 * mean_X
    return beta0, beta1

result = []

for F in numeric_columns.drop(['Price']):
    # Predict the price of a car based on feature F
    beta0, beta1 = simple_linear_regression(X_train[F], y_train)
    # print(f"{F} feature: Price = {beta0:.2f} + {beta1:.2f} * {F}")

    # Predict the price of a car based on feature F
    y_pred = beta0 + beta1 * X_train[F]

    # Evaluate the model on train set
    model_eval = evaluate_model(y_train, y_pred)
    result.append([F, model_eval, beta0, beta1])

Metric = ['Mean Squared Error', 'Root Mean Squared Error', 'Mean Absolute Error', 'R^2 Score']

for i, metric in enumerate(Metric):
    best = result[0]
    for res in result:
        if (res[1].iloc[i, 1] < best[1].iloc[i, 1] and metric != 'R^2 Score') or \
           (res[1].iloc[i, 1] > best[1].iloc[i, 1] and metric == 'R^2 Score'):
            best = res
    print(f"Best feature for {metric}: {best[0]}. Value: {best[1].iloc[i, 1]}")

print(f"\nConclusion: {best[0]} is the best feature for all metrics")
# => Max Power (best[0]) is the best feature for all metrics 
id_BestFeature_in_result = [i for i in range(len(result)) if result[i][0] == best[0]][0]

beta0 = result[id_BestFeature_in_result][2]
beta1 = result[id_BestFeature_in_result][3]
F = result[id_BestFeature_in_result][0]
print(f"\n{F} formula: Price = {beta0:.2f} + {beta1:.2f} * {F}")

print("\nEvaluation metrics on Training Set:")
result[id_BestFeature_in_result][1]

In [None]:
# Best model on training set:
print(f'Best feature: {result[id_BestFeature_in_result][0]}')
beta0 = result[id_BestFeature_in_result][2]
beta1 = result[id_BestFeature_in_result][3]

# Predict the price of a car based on the best feature (train set)
y_pred = (beta0 + beta1 * X_train[result[id_BestFeature_in_result][0]]).round().astype(int)
df = pd.DataFrame({'Actual': y_train, 'Predicted': y_pred})
df.head(10)

In [None]:
print("\nEvaluation metrics on Validation Set:")
y_pred = beta0 + beta1 * X_val['Model']

model_eval = evaluate_model(y_val, y_pred)
model_eval

In [None]:
# Predict the price of a car based on the best feature (train set)
y_pred = (beta0 + beta1 * X_val[result[id_BestFeature_in_result][0]]).round().astype(int)
df = pd.DataFrame({'Actual': y_val, 'Predicted': y_pred})
df.head(10)

In [None]:
# Plot line of best fit

fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Training set
axes[0].scatter(X_train['Model'], y_train, color='blue', alpha=0.5, label='Actual Prices')
sorted_idx_train = X_train['Model'].argsort()
X_sorted_train = X_train['Model'].iloc[sorted_idx_train]
y_line_train = beta0 + beta1 * X_sorted_train
axes[0].plot(X_sorted_train, y_line_train, color='red', linewidth=2, label='Predicted Line')
axes[0].set_xlabel("Model (Encoded)")
axes[0].set_ylabel("Price")
axes[0].set_title("Training Set: Actual vs Predicted")
axes[0].legend()

# Validation set
axes[1].scatter(X_val['Model'], y_val, color='green', alpha=0.5, label='Actual Prices')
sorted_idx_val = X_val['Model'].argsort()
X_sorted_val = X_val['Model'].iloc[sorted_idx_val]
y_line_val = beta0 + beta1 * X_sorted_val
axes[1].plot(X_sorted_val, y_line_val, color='orange', linewidth=2, label='Predicted Line')
axes[1].set_xlabel("Model (Encoded)")
axes[1].set_ylabel("Price")
axes[1].set_title("Validation Set: Actual vs Predicted")
axes[1].legend()

plt.tight_layout()
plt.show()

## Part VII: Multiple Linear Regression model

### VII.1 Build Model

In [None]:
class MultiLinearRegression:
    def __init__(self, X_train, y_train, X_val, y_val):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.thetas = np.zeros(self.X_train.shape[1])
        self.train_predictions = None
        self.val_predictions = None
        self.train_losses = []
        self.val_losses = []
        self.epochs = None

    def predict(self, X):
        """Make predictions for given input data"""
        return np.dot(X, self.thetas)

    def compute_loss(self, predictions, y):
        """Calculate mean squared error"""
        return np.mean((predictions - y) ** 2) / 2

    def gradient(self, X, predictions, y):
        """Compute gradients for weight updates"""
        return np.dot(X.T, (predictions - y)) / len(y)

    def update_weights(self, learning_rate, gradient):
        """Update model parameters"""
        return self.thetas - learning_rate * gradient

    def train(self, epochs, learning_rate, log_interval, adam=False):
        """Train the model using gradient descent"""
        self.epochs = epochs
        for epoch in range(epochs):
            # Training predictions and loss
            self.train_predictions = self.predict(self.X_train)
            train_loss = self.compute_loss(self.train_predictions, self.y_train)
            self.train_losses.append(train_loss)

            # Validation predictions and loss
            self.val_predictions = self.predict(self.X_val)
            val_loss = self.compute_loss(self.val_predictions, self.y_val)
            self.val_losses.append(val_loss)

            # Update weights
            grad = self.gradient(self.X_train, self.train_predictions, self.y_train)
            self.thetas = self.update_weights(learning_rate, grad)

            if epoch % log_interval == 0:
                print(f"Epoch: {epoch} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

        return self.thetas, self.train_losses, self.val_losses

    def get_parameters(self):
        """Return current model parameters"""
        return self.thetas

    def plot_losses(self):
        plt.figure(figsize=(10, 6))
        plt.plot(range(len(self.train_losses)), self.train_losses,
                label='Training Loss', color='blue', linewidth=2)
        plt.plot(range(len(self.val_losses)), self.val_losses,
                label='Validation Loss', color='red', linewidth=2)

        plt.xlabel('Epoch', fontsize=12)
        plt.ylabel('Loss', fontsize=12)
        plt.title('Training and Validation Loss over Epochs', fontsize=14)
        plt.legend(fontsize=10)
        plt.grid(True, linestyle='--', alpha=0.7)

        # Set log scale for better visualization if losses vary greatly
        plt.yscale('log')

        plt.tight_layout()
        plt.show()

### VII.2 Cross validation functions

In [None]:
# Cross validation function
def k_fold_cross_validation(self, model_class, k, epochs, learning_rate, log_interval, kind=None, adam=False):
    losses_of_each_cross = {}
    train_set = copy.deepcopy(self)
    val_set = copy.deepcopy(self)

    for i in range(k):
        train_end_1 = int(i/k * len(self.data))
        train_start_2 = int((i + 1)/k * len(self.data))

        val_start = int(i/k * len(self.data))
        val_end = int((i + 1)/k * len(self.data))

        train_set.data = pd.concat([self.data.iloc[:train_end_1], self.data.iloc[train_start_2:]], axis=0).copy()
        val_set.data = self.data.iloc[val_start:val_end].copy()

        train_set.encode_data()
        val_set.encode_data()

        train_set.normalize_data()
        val_set.normalize_data()

        train_data = train_set.data
        val_data = val_set.data

        X_train = train_data.drop(columns=['Price'])
        y_train = train_data['Price']
        X_train, y_train = process_data(X_train, y_train, kind)

        X_val = val_data.drop(columns=['Price'])
        y_val = val_data['Price']
        X_val, y_val = process_data(X_val, y_val, kind)

        print(f"Cross {i}: ")
        multiLR = model_class(X_train, y_train, X_val, y_val)
        _, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval, adam)

        loss = {
            f'Cross {i + 1}': {
                'Train loss': train_losses,
                'Val loss': val_losses
            }
        }
        losses_of_each_cross.update(loss)

        print('\n')

    return losses_of_each_cross

DataProcessor.k_fold_cross_validation = k_fold_cross_validation

In [None]:
def plot_cross_validation(cross_validation_losses):
    k = len(cross_validation_losses)

    ncols = int(np.ceil(np.sqrt(k)))
    nrows = int(np.ceil(k / ncols))

    fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5 * ncols, 4 * nrows), sharex=True, sharey=False)

    if nrows == 1:
        axes = np.array([axes]) if ncols > 1 else np.array([axes])
    else:
        axes = np.array(axes)

    for idx, (cross, losses) in enumerate(cross_validation_losses.items()):
        train_losses = losses['Train loss']
        val_losses = losses['Val loss']

        epochs = range(1, len(train_losses) + 1)

        row = idx // ncols
        col = idx % ncols

        axes[row, col].plot(epochs, train_losses, label='Train Loss', linestyle='-', color='blue', alpha=0.7)

        axes[row, col].plot(epochs, val_losses, label='Val Loss', linestyle='--', color='orange', alpha=0.7)

        axes[row, col].set_title(f'{cross}')
        axes[row, col].set_ylabel('Loss')
        axes[row, col].legend()
        axes[row, col].grid(True)

        if row == nrows - 1:
            axes[row, col].set_xlabel('Epoch')

    for idx in range(k, nrows * ncols):
        row = idx // ncols
        col = idx % ncols
        fig.delaxes(axes[row, col])

    plt.tight_layout()
    plt.show()

### VII.3 Prepare data

In [None]:
# Process data to fit different methods
def process_data(X, y, kind: str):
    if kind == 'bias':
        X = np.hstack((np.ones(shape=(X.shape[0], 1)), X))
    elif kind == 'custom':
        y = y / 1e7
    elif kind == 'log':
        y = np.log(y)
    elif kind == 'min_max':
        min_y = np.min(y)
        max_y = np.max(y)
        y = (y - min_y) / (max_y - min_y)
    elif kind == 'standardization':
        mean_y = np.mean(y)
        std_y = np.std(y)
        y = (y - mean_y) / std_y

    return X, y

In [None]:
# Add bias
X_train_bias = np.hstack((np.ones(shape=(X_train.shape[0], 1)), X_train))
X_val_bias = np.hstack((np.ones(shape=(X_val.shape[0], 1)), X_val))

# Custom
y_train_custom = y_train / 1e7
y_val_custom = y_val / 1e7

# Log transformation
y_train_log = np.log(y_train)
y_val_log = np.log(y_val)

# Min-Max scaler
train_min = np.min(y_train)
train_max = np.max(y_train)

val_min = np.min(y_val)
val_max = np.max(y_val)

y_train_scaled = (y_train - train_min) / (train_max - train_min)
y_val_scaled = (y_val - val_min) / (val_max - val_min)

# Standardization
train_mean = np.mean(y_train)
train_std = np.std(y_train)
val_mean = np.mean(y_val)
val_std = np.std(y_val)

y_train_standardized = (y_train - train_mean) / train_std
y_val_standardized = (y_val - val_mean) / val_std

In [None]:
# List to record results
evaluations = []
evaluations_label_normalized = []

In [None]:
# Models dictionary
models = {}

We will experimenting different methods to find out which one works out the best. Base model is using all features and no bias. Next models are built to be expected to outperform this one.

### VII.4 Training

##### Without bias (base)

In [None]:
epochs = 10000
learning_rate = 0.2
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train, y_train, X_val, y_val)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['base'] = multiLR

In [None]:
# Cross validation
k = 4
kind = None
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

The sudden spike in some crosses is most likely due to noise or mismatch in distributions between train set and validation set.

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val)
evaluations.append(evaluate_model(y_val, y_pred).loc[:, 'Value'])

##### With bias

We will add a bias parameter to increase model's complexity.

In [None]:
epochs = 10000
learning_rate = 0.2
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train_bias, y_train, X_val_bias, y_val)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['bias'] = multiLR

In [None]:
# Cross validation
k = 4
kind = 'bias'
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val_bias)
evaluations.append(evaluate_model(y_val, y_pred).loc[:, 'Value'])

#### Label Normalization
So far, we have been using normalized features to predict the original labels. In this section, we will explore scaling the labels to a smaller range to facilitate more efficient training.

##### Custom

In [None]:
epochs = 100
learning_rate = 0.2
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train_bias, y_train_custom, X_val_bias, y_val_custom)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['custom'] = multiLR

In [None]:
# Cross validation
k = 4
kind = 'custom'
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

As expected, the training speed up incredibly fast, with the model converging in just approximately 50 epochs. Although it seems the loss can decrease a bit more, model has nearly reached the minimum so the loss will be updated slowly.

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val_bias)
evaluations_label_normalized.append(evaluate_model(y_val_custom, y_pred).loc[:, 'Value'])

After training, predictions have to be denormalized and evaluated again on original labels.

In [None]:
# Retransform to initial range
y_pred = multiLR.predict(X_val_bias)
y_pred_original = y_pred * 1e7
evaluations.append(evaluate_model(y_val, y_pred_original).loc[:, 'Value'])

##### Log Transformation

In [None]:
epochs = 100
learning_rate = 0.1
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train_bias, y_train_log, X_val_bias, y_val_log)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['log'] = multiLR

In [None]:
# Cross validation
k = 4
kind = 'log'
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val_bias)
evaluations_label_normalized.append(evaluate_model(y_val_log, y_pred).loc[:, 'Value'])

In [None]:
# Retransform to initial range
y_pred = multiLR.predict(X_val_bias)
y_pred_original = np.exp(y_pred)
evaluations.append(evaluate_model(y_val, y_pred_original).loc[:, 'Value'])

##### Min-Max Scaler

In [None]:
epochs = 100
learning_rate = 0.1
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train_bias, y_train_scaled, X_val_bias, y_val_scaled)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['min_max'] = multiLR

In [None]:
# Cross validation
k = 4
kind = 'min_max'
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val_bias)
evaluations_label_normalized.append(evaluate_model(y_val_scaled, y_pred).loc[:, 'Value'])

In [None]:
# Retransform to initial range
y_pred_original = y_pred * (train_max - train_min) + train_min
evaluations.append(evaluate_model(y_val, y_pred_original).loc[:, 'Value'])

##### Standardization

In [None]:
epochs = 5000
learning_rate = 0.2
log_interval = epochs / 10
thetas, train_losses, val_losses = [], [], []

In [None]:
multiLR = MultiLinearRegression(X_train_bias, y_train_standardized, X_val_bias, y_val_standardized)
thetas, train_losses, val_losses = multiLR.train(epochs, learning_rate, log_interval)
models['standardization'] = multiLR

In [None]:
# Cross validation
k = 4
kind = 'standardization'
cross_validation_losses = {}
cross_validation_losses = train_data_after_preprocessing.k_fold_cross_validation(MultiLinearRegression, k, epochs, learning_rate, log_interval, kind)

In [None]:
plot_cross_validation(cross_validation_losses)

In [None]:
# Evaluate on multiple metrics
y_pred = multiLR.predict(X_val_bias)
evaluations_label_normalized.append(evaluate_model(y_val_standardized, y_pred).loc[:, 'Value'])

In [None]:
# Retransform to initial range
y_pred_original = y_pred * val_std + val_mean
evaluations.append(evaluate_model(y_val, y_pred_original).loc[:, 'Value'])

### VII.5 Summary

In [None]:
# Evaluation summary
summary = pd.concat(evaluations, axis=1)
summary.index = ['Mean Squared Error', 'Root Mean Squared Error', 'Mean Absolute Error', 'R^2 Score']
summary.columns = ['Base', 'Bias', 'Custom', 'Log transformation', 'Min-Max scaler', 'Standardization']
summary

In [None]:
# Evaluation summary on normalized labels
summary_label_normalized = pd.concat(evaluations_label_normalized, axis=1)
summary_label_normalized.index = ['Mean Squared Error', 'Root Mean Squared Error', 'Mean Absolute Error', 'R^2 Score']
summary_label_normalized.columns = ['Custom', 'Log transformation', 'Min-Max scaler', 'Standardization']
summary_label_normalized

After experimenting with various approaches, the model with bias achieved the highest R² score. The elevated loss values can be attributed to the significant disparity in the range between the features and labels.

In [None]:
train_data_after_preprocessing.data['Price'].describe()

However, the high R² score indicates that the model can account for 96.7% of the variance in the data, demonstrating that the base model, the model with bias, and the standardization method are all performing effectively. Ultimately, the model with bias was chosen as the final result.

In [None]:
model = models['bias']
y_pred = model.predict(X_val_bias)
evaluate_model(y_val, y_pred)

## Part VIII: Polynomial Regression model

### VI.1: Model building

In [None]:
from itertools import combinations_with_replacement

In [None]:
class PolynomialFeatures:
    """
    Tạo ma trận đặc trưng đa thức.
    
    Parameters:
    -----------
    degree : int, mặc định=2
        Bậc của đa thức. 
    
    include_bias : bool, mặc định=True
        Nếu True, thêm cột toàn 1 vào ma trận (hằng số).
    
    interaction_only : bool, mặc định=False
        Nếu True, chỉ bao gồm các tương tác giữa các đặc trưng.
    """
    
    def __init__(self, degree=2, include_bias=True, interaction_only=False):
        self.degree = degree
        self.include_bias = include_bias
        self.interaction_only = interaction_only
    
    def fit(self, X, y=None):
        """
        Tính số lượng đặc trưng đầu ra.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng đầu vào.
        
        Returns
        -------
        self : object
        """
        n_samples, n_features = np.asarray(X).shape
        self.n_input_features_ = n_features
        
        # Tính số lượng đặc trưng đầu ra
        combinations = []
        for d in range(0, self.degree + 1):
            if d == 0 and not self.include_bias:
                continue
            if d == 1:  # không thay đổi các đặc trưng ban đầu
                combinations.extend(range(n_features))
                continue
                
            if self.interaction_only:
                combinations.extend(combinations_with_replacement(range(n_features), d))
            else:
                combinations.extend([c for c in combinations_with_replacement(range(n_features), d) 
                                    if len(set(c)) == 1])
        
        self.n_output_features_ = len(combinations) + (1 if self.include_bias else 0)
        return self
    
    def transform(self, X):
        """
        Chuyển đổi đặc trưng thành đặc trưng đa thức.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng đầu vào.
        
        Returns
        -------
        XP : np.ndarray, shape (n_samples, n_output_features)
            Ma trận đặc trưng đa thức.
        """
        X = np.asarray(X)
        n_samples, n_features = X.shape
        
        if n_features != self.n_input_features_:
            raise ValueError("X shape does not match training shape")
        
        # Khởi tạo ma trận đầu ra
        XP = np.ones((n_samples, 0))
        
        # Thêm hằng số nếu include_bias=True
        if self.include_bias:
            XP = np.hstack((np.ones((n_samples, 1)), XP))
        
        # Thêm đặc trưng ban đầu (bậc 1)
        if self.degree >= 1:
            XP = np.hstack((XP, X))
        
        # Tạo đặc trưng đa thức bậc cao hơn
        for d in range(2, self.degree + 1):
            if self.interaction_only:
                combs = [c for c in combinations_with_replacement(range(n_features), d)
                         if len(set(c)) > 1]
            else:
                combs = list(combinations_with_replacement(range(n_features), d))
            
            for comb in combs:
                new_col = np.ones((n_samples, 1))
                for i in comb:
                    new_col = new_col * X[:, i:i+1]
                XP = np.hstack((XP, new_col))
        
        return XP
    
    def fit_transform(self, X, y=None):
        """
        Fit và transform cùng một lúc.
        """
        return self.fit(X).transform(X)

In [None]:
class LinearRegression:
    """
    Hồi quy tuyến tính bằng OLS (Ordinary Least Squares).
    
    Parameters
    ----------
    fit_intercept : bool, mặc định=True
        Có tính hằng số hay không.
    """
    
    def __init__(self, fit_intercept=True):
        self.fit_intercept = fit_intercept
        self.coef_ = None
        self.intercept_ = None
    
    def fit(self, X, y):
        """
        Huấn luyện mô hình hồi quy tuyến tính.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng.
        
        y : array-like, shape (n_samples,)
            Vector mục tiêu.
        
        Returns
        -------
        self : object
        """
        X = np.asarray(X)
        y = np.asarray(y)
        
        # Add a column of ones for the intercept if fit_intercept=True
        if self.fit_intercept:
            X_with_intercept = np.column_stack((np.ones(X.shape[0]), X))
        else:
            X_with_intercept = X

        # Calculate coefficients using the normal equation: beta = (X^T X)^(-1) X^T y
        beta, residues, rank, s = np.linalg.lstsq(X_with_intercept, y, rcond=None)

        if self.fit_intercept:
            self.intercept_ = beta[0]
            self.coef_ = beta[1:]
        else:
            self.intercept_ = 0.0
            self.coef_ = beta
            
        return self
    
    def predict(self, X):
        """
        Dự đoán sử dụng mô hình đã huấn luyện.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng.
        
        Returns
        -------
        y_pred : array, shape (n_samples,)
            Giá trị dự đoán.
        """
        X = np.asarray(X)
        return X.dot(self.coef_) + self.intercept_

In [None]:
class PolynomialRegression:
    """
    Mô hình hồi quy đa thức.
    
    Parameters
    ----------
    degree : int, mặc định=2
        Bậc của đa thức.
    
    include_bias : bool, mặc định=True
        Có thêm cột toàn 1 vào ma trận không.
    
    interaction_only : bool, mặc định=False
        Chỉ bao gồm các tương tác giữa các đặc trưng.
    """
    
    def __init__(self, degree=2, include_bias=True, interaction_only=False):
        self.degree = degree
        self.include_bias = include_bias
        self.interaction_only = interaction_only
        self.poly = PolynomialFeatures(degree=degree, 
                                       include_bias=include_bias,
                                       interaction_only=interaction_only)
        self.linear_regression = LinearRegression(fit_intercept=False if include_bias else True)
    
    def fit(self, X, y):
        """
        Huấn luyện mô hình hồi quy đa thức.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng.
        
        y : array-like, shape (n_samples,)
            Vector mục tiêu.
        
        Returns
        -------
        self : object
        """
        X_poly = self.poly.fit_transform(X)
        self.linear_regression.fit(X_poly, y)
        return self
    
    def predict(self, X):
        """
        Dự đoán sử dụng mô hình đã huấn luyện.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Ma trận đặc trưng.
        
        Returns
        -------
        y_pred : array, shape (n_samples,)
            Giá trị dự đoán.
        """
        X_poly = self.poly.transform(X)
        return self.linear_regression.predict(X_poly)

### VI.2: Training and evaluate

In [None]:
X_train_v3 = train_data.drop(columns=['Price'])
y_train_v3 = train_data['Price']

X_val_v3 = validation_data.drop(columns=['Price'])
y_val_v3 = validation_data['Price']

model = PolynomialRegression(degree=2)
model.fit(X_train_v3, y_train_v3)
    
y_pred = model.predict(X_val_v3)
model_eval = evaluate_model(y_val_v3, y_pred)
model_eval

In [None]:
plt.figure(figsize=(18, 6))

# Plot 1: Scatter plot of predicted vs actual
plt.subplot(1, 3, 1)
plt.scatter(y_val_v3, y_pred, alpha=0.5)
plt.plot([y_val_v3.min(), y_val_v3.max()], [y_val_v3.min(), y_val_v3.max()], 'r--')
plt.xlabel('Actual Price')
plt.ylabel('Predicted Price')
plt.title('Actual vs Predicted')

# Plot 2: Same scatter plot with log scale (helps with extreme values)
plt.subplot(1, 3, 2)
plt.scatter(y_val_v3, y_pred, alpha=0.5)
plt.plot([y_val_v3.min(), y_val_v3.max()], [y_val_v3.min(), y_val_v3.max()], 'r--')
plt.xscale('log')
plt.yscale('log')
plt.xlabel('Actual Price (log scale)')
plt.ylabel('Predicted Price (log scale)')
plt.title('Actual vs Predicted - Log Scale')

# Plot 3: Histogram of errors
plt.subplot(1, 3, 3)
errors = y_pred - y_val_v3
plt.hist(errors, bins=50)
plt.xlabel('Prediction Error')
plt.ylabel('Frequency')
plt.title('Distribution of Prediction Errors')

plt.tight_layout()
plt.show()

In [None]:
class PolynomialRegressionGD:
    def __init__(self, degree=2, learning_rate=0.01, n_iterations=1000,
                 batch_size=None, random_state=None, epsilon=1e-8):
        """
        Khởi tạo mô hình Polynomial Regression.
        
        Parameters:
        -----------
        degree : int, default=2
            Bậc của đa thức
        learning_rate : float, default=0.01
            Tốc độ học (alpha)
        n_iterations : int, default=1000
            Số lần lặp tối đa (epochs)
        batch_size : int, default=None
            Kích thước batch (None = dùng toàn bộ dữ liệu)
        random_state : int, default=None
            Giá trị khởi tạo cho random
        epsilon : float, default=1e-8
            Ngưỡng hội tụ
        """
        self.degree = degree
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.batch_size = batch_size
        self.random_state = random_state
        self.epsilon = epsilon
        self.lambda_reg = 0.01
        self.weights = None
        self.history = {'loss': [], 'weights': []}
        
        if random_state is not None:
            np.random.seed(random_state)
    
    def _generate_polynomial_features(self, X):
        """
        Tạo ma trận đặc trưng đa thức từ mảng đầu vào X.
        
        Parameters:
        -----------
        X : array-like, shape (n_samples,) hoặc (n_samples, 1)
            Mảng đặc trưng đầu vào
            
        Returns:
        --------
        X_poly : ndarray, shape (n_samples, degree + 1)
            Ma trận đặc trưng đa thức (bao gồm cột bias - toàn số 1)
        """
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        
        n_samples = X.shape[0]
        X_poly = np.ones((n_samples, self.degree + 1))
        
        for i in range(1, self.degree + 1):
            X_poly[:, i] = X[:, 0] ** i
            
        return X_poly
    
    def _compute_cost(self, X, y, weights):
        """
        Tính toán hàm mất mát (MSE - Mean Squared Error).
        
        Parameters:
        -----------
        X : ndarray, shape (n_samples, n_features)
            Ma trận đặc trưng
        y : ndarray, shape (n_samples,)
            Mảng giá trị thực tế
        weights : ndarray, shape (n_features,)
            Mảng trọng số của mô hình
            
        Returns:
        --------
        float : Giá trị hàm mất mát
        """
        n_samples = X.shape[0]
        y_pred = X @ weights
        error = y_pred - y
        # Thêm thành phần regularization
        reg_term = (self.lambda_reg / (2*n_samples)) * np.sum(weights[1:]**2)
        return np.sum(error**2) / (2*n_samples) + reg_term
    
    def _compute_gradient(self, X, y, weights):
        """
        Tính toán gradient của hàm mất mát.
        
        Parameters:
        -----------
        X : ndarray, shape (n_samples, n_features)
            Ma trận đặc trưng
        y : ndarray, shape (n_samples,)
            Mảng giá trị thực tế
        weights : ndarray, shape (n_features,)
            Mảng trọng số của mô hình
            
        Returns:
        --------
        ndarray, shape (n_features,) : Gradient của hàm mất mát
        """
        n_samples = X.shape[0]
        y_pred = X @ weights
        error = y_pred - y
        gradient = (X.T @ error) / n_samples
        
        # Thêm gradient của thành phần regularization (không áp dụng cho bias)
        reg_gradient = np.zeros_like(weights)
        reg_gradient[1:] = (self.lambda_reg / n_samples) * weights[1:]
        
        return gradient + reg_gradient
    
    def _predict_with_weights(self, X, weights):
        """
        Dự đoán giá trị với trọng số cho trước.
        
        Parameters:
        -----------
        X : ndarray
            Ma trận đặc trưng
        weights : ndarray
            Mảng trọng số
            
        Returns:
        --------
        ndarray : Mảng giá trị dự đoán
        """
        return X @ weights
    
    def fit(self, X, y, verbose=False):
        """
        Huấn luyện mô hình trên dữ liệu X, y.
        
        Parameters:
        -----------
        X : array-like, shape (n_samples,) hoặc (n_samples, 1)
            Mảng đặc trưng đầu vào
        y : array-like, shape (n_samples,)
            Mảng giá trị đầu ra
        verbose : bool, default=False
            Nếu True, in ra thông tin trong quá trình huấn luyện
            
        Returns:
        --------
        self : Đối tượng
        """
        X = np.array(X)
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        y = np.array(y)
        
        X_poly = self._generate_polynomial_features(X)
        n_samples, n_features = X_poly.shape
        
        self.weights = np.zeros(n_features)
        if self.batch_size is None or self.batch_size > n_samples:
            self.batch_size = n_samples
        
        for iteration in range(self.n_iterations):
            # Xáo trộn dữ liệu
            indices = np.random.permutation(n_samples)
            X_shuffled = X_poly[indices]
            y_shuffled = y[indices]
            
            for i in range(0, n_samples, self.batch_size):
                X_batch = X_shuffled[i:i + self.batch_size]
                y_batch = y_shuffled[i:i + self.batch_size]
                
                gradient = self._compute_gradient(X_batch, y_batch, self.weights)
                self.weights -= self.learning_rate * gradient
            
            current_loss = self._compute_cost(X_poly, y, self.weights)
            self.history['loss'].append(current_loss)
            self.history['weights'].append(self.weights.copy())
            
            if verbose and (iteration + 1) % max(1, self.n_iterations // 10) == 0:
                print(f"Epoch {iteration + 1}/{self.n_iterations}, Loss: {current_loss:.6f}")
            
            if iteration > 0 and abs(self.history['loss'][iteration-1] - current_loss) < self.epsilon:
                if verbose:
                    print(f"Đã hội tụ sau {iteration + 1} epochs với loss: {current_loss:.6f}")
                break
        
        return self
    
    def predict(self, X):
        """
        Dự đoán giá trị cho dữ liệu mới.
        
        Parameters:
        -----------
        X : array-like, shape (n_samples,) hoặc (n_samples, 1)
            Mảng đặc trưng đầu vào
            
        Returns:
        --------
        ndarray, shape (n_samples,) : Mảng giá trị dự đoán
        """
        if self.weights is None:
            raise ValueError("Mô hình chưa được huấn luyện.")
        
        X = np.array(X)
        X_poly = self._generate_polynomial_features(X)
        return self._predict_with_weights(X_poly, self.weights)
    
    def plot_learning_curve(self):
        """
        Vẽ đồ thị giá trị loss theo các epoch.
        """
        if not self.history['loss']:
            raise ValueError("Mô hình chưa được huấn luyện.")
        
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, len(self.history['loss']) + 1), self.history['loss'], marker='o', linestyle='-', markersize=2)
        plt.title('Đồ thị hàm mất mát theo epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss (MSE)')
        plt.grid(True)
        plt.show()
    
    def plot_regression_line(self, X, y_true):
        """
        Visualize the regression line and residuals.
        
        Parameters:
        -----------
        X : array-like, shape (n_samples, n_features)
            Mảng đặc trưng đầu vào
        y_true : array-like, shape (n_samples,)
            Mảng giá trị dự đoán
        """
        if self.weights is None:
            raise ValueError("Mô hình chưa được huấn luyện.")
        
        y_pred = self.predict(X)
    
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        ax1.scatter(y_true, y_pred, alpha=0.5, color='blue')
        
        min_val = min(y_true.min(), y_pred.min())
        max_val = max(y_true.max(), y_pred.max())
        ax1.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
        
        ax1.set_xlabel('Actual Values')
        ax1.set_ylabel('Predicted Values')
        ax1.set_title('Actual vs. Predicted')
        ax1.grid(True, alpha=0.3)
        
        residuals = y_pred - y_true
        ax2.hist(residuals, bins=30, alpha=0.7, color='green')
        ax2.axvline(x=0, color='r', linestyle='--', linewidth=2)
        ax2.set_xlabel('Residuals')
        ax2.set_ylabel('Frequency')
        ax2.set_title('Residuals Distribution')
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

In [None]:
X_train_v3 = train_data.drop(columns=['Price'])
y_train_v3 = train_data['Price']

X_val_v3 = validation_data.drop(columns=['Price'])
y_val_v3 = validation_data['Price']

print(f"\nBậc đa thức: {d}")
model = PolynomialRegressionGD(
    degree=d,
    learning_rate=0.01,
    n_iterations=50000,
    batch_size=32,
    random_state=42,
    epsilon=1e-9
)
        
model.fit(X_train_v3, y_train_v3, verbose=False)
y_pred = model.predict(X_val_v3)
        
metrics = evaluate_model(y_val_v3, y_pred)
print(metrics)
    
print("\nHệ số đa thức tìm được:")
for i, coef in enumerate(model.weights):
    print(f"θ{i}: {coef:.4f}")

model.plot_learning_curve()
model.plot_regression_line(X_val_v3, y_pred)

## Part IX: Linear Regression model with PCA

#### IX.1 Data and Heatmap

In [None]:
train_data_PCA= copy.deepcopy(train_data_after_encoding)
validation_data_PCA = copy.deepcopy(validation_data_after_encoding)

In [None]:
#Đưa về dạng data frame
if not isinstance(train_data_PCA, pd.DataFrame):
    if hasattr(train_data_PCA, "to_dataframe"):
        train_data_after_normalization_copy = train_data_PCA.to_dataframe()
    elif hasattr(train_data_PCA ,"data"):
        train_data_after_normalization_copy = pd.DataFrame(train_data_PCA.data)
    else:
        raise TypeError("train_data_after_normalization không phải DataFrame và không có phương thức phù hợp để chuyển đổi.")

In [None]:
# Tính ma trận tương quan
correlation_matrix = train_data_after_normalization_copy.corr().to_numpy()
labels = train_data_after_normalization_copy.columns

# Vẽ heatmap bằng Matplotlib
plt.figure(figsize=(23, 10))  # Điều chỉnh kích thước
plt.imshow(correlation_matrix, cmap="coolwarm", interpolation="nearest")
plt.colorbar()  # Thanh màu

# Thêm giá trị số vào mỗi ô
num_vars = len(correlation_matrix)
for i in range(num_vars):
    for j in range(num_vars):
        plt.text(j, i, f"{correlation_matrix[i, j]:.2f}", ha="center", va="center", color="black")

# Đặt tên trục
plt.xticks(np.arange(num_vars), labels, rotation=90)
plt.yticks(np.arange(num_vars), labels)

plt.title("Heatmap - Ma trận tương quan")
plt.show()

Nhận xét: Từ ma trận heatmap (Ma trận tương quan) ta thấy những bộ tương quan sau có thể giảm số chiều về 1

['Seating Capacity', 'Length', 'Height', 'Width', 'Fuel Tank Capacity']

['Engine', 'Max Power', 'Max Torque', 'Drivetrain']

['rpm at Max Power', 'rpm at Max Torque']

#### IX.2 PCA 

In [None]:
def pca(X, num_components):
 
    X_meaned = X - np.mean(X, axis=0)
    covariance_matrix = np.cov(X_meaned, rowvar=False)
    eigenvalues, eigenvectors = np.linalg.eigh(covariance_matrix)
    sorted_indices = np.argsort(eigenvalues)[::-1]
    eigenvectors_sorted = eigenvectors[:, sorted_indices]
    principal_components = eigenvectors_sorted[:, :num_components]
    X_pca = np.dot(X_meaned, principal_components)
    return X_pca

In [None]:
# Chọn các cột để áp dụng PCA
num_components = 1
#Bước 1: PCA nhung bo du lieu can
##########################################
columns_to_pca = ['Seating Capacity', 'Length', 'Height', 'Width', 'Fuel Tank Capacity']
for col in columns_to_pca:
    if col not in train_data_after_normalization_copy.columns:
        raise KeyError(f"Cột '{col}' không tồn tại trong train_data_after_normalization.")
X_pca_input = train_data_after_normalization_copy[columns_to_pca].values
X_pca_transformed = pca(X_pca_input, num_components)
##########################################
columns_to_pca1 = ['Engine', 'Max Power', 'Max Torque', 'Drivetrain']
for col in columns_to_pca1:
    if col not in train_data_after_normalization_copy.columns:
        raise KeyError(f"Cột '{col}' không tồn tại trong train_data_after_normalization.")
X_pca_input1 = train_data_after_normalization_copy[columns_to_pca1].values
X_pca_transformed1 = pca(X_pca_input1, num_components)
##########################################
columns_to_pca2 = ['rpm at Max Power', 'rpm at Max Torque']
for col in columns_to_pca2:
    if col not in train_data_after_normalization_copy.columns:
        raise KeyError(f"Cột '{col}' không tồn tại trong train_data_after_normalization.")
X_pca_input2 = train_data_after_normalization_copy[columns_to_pca2].values
X_pca_transformed2 = pca(X_pca_input2, num_components)
##########################################

train_data_after_normalization1 = train_data_after_normalization_copy.copy()
#Bước 2: Xoa tren du lieu cu
train_data_after_normalization1.drop(columns=columns_to_pca, inplace=True)
train_data_after_normalization1.drop(columns=columns_to_pca1, inplace=True)
train_data_after_normalization1.drop(columns=columns_to_pca2, inplace=True)
pca_columns1 = [f'PCA_hs_{i+1}' for i in range(num_components)]
pca_df1 = pd.DataFrame(X_pca_transformed, columns=pca_columns1)

# Bước 4: PCA trên nhóm 'cs'
pca_columns2 = [f'PCA_cs_{i+1}' for i in range(num_components)]
pca_df2 = pd.DataFrame(X_pca_transformed1, columns=pca_columns2)

# Bước 5: PCA trên nhóm 'kt'
pca_columns3 = [f'PCA_kt_{i+1}' for i in range(num_components)]
pca_df3 = pd.DataFrame(X_pca_transformed2, columns=pca_columns3)

# Bước 6: Gán dữ liệu PCA vào train_data_after_normalization1
train_data_after_normalization1 = pd.concat([train_data_after_normalization1, pca_df1, pca_df2, pca_df3], axis=1)

In [None]:
if not isinstance(validation_data_PCA , pd.DataFrame):
    if hasattr(validation_data_PCA , "to_dataframe"):
        val_data_after_normalization_copy = validation_data_PCA.to_dataframe()
     
    elif hasattr(validation_data_PCA  ,"data"):
        val_data_after_normalization_copy = pd.DataFrame(validation_data_PCA.data)
    else:
        raise TypeError("val_data_after_normalization_copy không phải DataFrame và không có phương thức phù hợp để chuyển đổi.")

In [None]:
num_components = 1
columns_to_pca = ['Seating Capacity', 'Length', 'Height', 'Width', 'Fuel Tank Capacity']
columns_to_pca1 = ['Engine', 'Max Power', 'Max Torque', 'Drivetrain']
columns_to_pca2 = ['rpm at Max Power', 'rpm at Max Torque']

val_data_after_normalization1 = val_data_after_normalization_copy.drop(columns=columns_to_pca + columns_to_pca1 + columns_to_pca2)
val_data_after_normalization1 = pd.concat([train_data_after_normalization1, pca_df1, pca_df2, pca_df3], axis=1)

X_val_pca1 = pca(val_data_after_normalization_copy[columns_to_pca].values, num_components)
X_val_pca2 = pca(val_data_after_normalization_copy[columns_to_pca1].values, num_components)
X_val_pca3 = pca(val_data_after_normalization_copy[columns_to_pca2].values, num_components)

pca_val_df1 = pd.DataFrame(X_val_pca1, columns=pca_columns1)
pca_val_df2 = pd.DataFrame(X_val_pca2, columns=pca_columns2)
pca_val_df3 = pd.DataFrame(X_val_pca3, columns=pca_columns3)

val_data_after_normalization1 = val_data_after_normalization_copy.drop(columns=columns_to_pca + columns_to_pca1 + columns_to_pca2)
val_data_after_normalization1 = pd.concat([val_data_after_normalization1, pca_val_df1, pca_val_df2, pca_val_df3], axis=1)


#### IX.3 Split data 

In [None]:
train_data_v2 = copy.deepcopy(train_data_after_normalization1)
validation_data_v2 = copy.deepcopy(val_data_after_normalization1)

numeric_features = ['Make', 'Model', 'Year', 'Kilometer', 'Fuel Type', 'Transmission',
                    'Location', 'Color', 'Owner', 'Seller Type', 'PCA_hs_1', 'PCA_cs_1', 'PCA_kt_1']

for col in numeric_features:
    mean = train_data_v2[col].mean()
    std = train_data_v2[col].std()
    
    if std == 0:
        std = 1

    train_data_v2[col] = (train_data_v2[col] - mean) / std
    validation_data_v2[col] = (validation_data_v2[col] - mean) / std


X_train_v2 = train_data_v2.drop(columns=['Price'])
y_train_v2 = train_data_v2['Price']

X_val_v2 = validation_data_v2.drop(columns=['Price'])
y_val_v2 = validation_data_v2['Price']

print("Shape of validation_data_v2 after scaling:", validation_data_v2.shape)
print("Shape of validation_data_v2 after scaling:", train_data_v2.shape)

####  IX.4 Model with PCA

##### Model Multiple Linear Regression with PCA

In [None]:
class MultipleLinearRegressionPCA:
    def __init__(self, learning_rate=0.01, n_epochs=1000, batch_size=32):
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.weights = None
        self.bias = None
        self.loss_history = []

    def compute_loss(self, y_true, y_pred):
        return np.mean((y_true - y_pred) ** 2)

    def forward(self, X):
        return np.dot(X, self.weights) + self.bias

    def backward(self, X, y_true, y_pred):
        dw = -2 / len(X) * np.dot(X.T, (y_true - y_pred))
        db = -2 / len(X) * np.sum((y_true - y_pred))
        return dw, db

    def fit(self, X_train, y_train):
        if isinstance(X_train, (pd.DataFrame, pd.Series)):
             X_train = X_train.to_numpy()
        if isinstance(y_train, (pd.DataFrame, pd.Series)):
            y_train = y_train.to_numpy()

        n_samples, n_features = X_train.shape
        self.weights = np.zeros((n_features, 1))
        self.bias = 0

        if y_train.ndim == 1:
            y_train = y_train.reshape(-1, 1)

        best_loss = float('inf')
        best_weights = None
        best_bias = None
        self.loss_history = []

        for epoch in range(self.n_epochs):
            idx = np.random.permutation(n_samples)
            X_train = X_train[idx]
            y_train = y_train[idx]

            for i in range(0, n_samples, self.batch_size):
                X_batch = X_train[i:i + self.batch_size]
                y_batch = y_train[i:i + self.batch_size]

                y_pred = self.forward(X_batch)
                dw, db = self.backward(X_batch, y_batch, y_pred)

                self.weights -= self.learning_rate * dw
                self.bias -= self.learning_rate * db

            # Tính loss trên toàn bộ tập train
            y_train_pred = self.forward(X_train)
            loss = self.compute_loss(y_train, y_train_pred)
            self.loss_history.append(loss)

            if loss < best_loss:
                best_loss = loss
                best_weights = self.weights.copy()
                best_bias = self.bias

            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}, loss: {loss:.4f}")

        self.weights = best_weights
        self.bias = best_bias
        print(f"Best model found with loss = {best_loss:.4f}")

        return y_train ,y_train_pred 

    def evaluate(self, X_test, y_test):
        if isinstance(X_test, pd.DataFrame) or isinstance(X_test, pd.Series):
            X_test = X_test.to_numpy()
        if isinstance(y_test, pd.DataFrame) or isinstance(y_test, pd.Series):
            y_test = y_test.to_numpy()

        if y_test.ndim == 1:
            y_test = y_test.reshape(-1, 1)

        y_test_pred = self.forward(X_test)
        test_loss = self.compute_loss(y_test, y_test_pred)
        print(f"Test Loss: {test_loss:.4f}")

        weights = self.weights.flatten()
        bias = self.bias
        terms = [f"{w:.2f} * x{i+1}" for i, w in enumerate(weights)]
        equation = " + ".join(terms) + f" + {bias:.2f}"

        print("\nPhương trình hồi quy tuyến tính:")
        print(f"y = {equation}")

        # Vẽ đồ thị dự đoán
        self.plot_actual_vs_predicted(y_test, y_test_pred)
        self.plot_residuals(y_test, y_test_pred)

        return test_loss, y_test_pred

    def mean_absolute_percentage_error(self, y_true, y_pred):
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

    def plot_loss_history(self):
        plt.plot(self.loss_history)
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.title("Loss History")
        plt.show()

    def plot_actual_vs_predicted(self, y_true, y_pred):
        plt.scatter(y_true, y_pred, alpha=0.5)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
        plt.xlabel("Actual Values")
        plt.ylabel("Predicted Values")
        plt.title("Actual vs Predicted")
        plt.show()

    def plot_residuals(self, y_true, y_pred):
        residuals = y_true - y_pred
        plt.scatter(y_pred, residuals, alpha=0.5)
        plt.axhline(y=0, color='red', linestyle='--')
        plt.xlabel("Predicted Values")
        plt.ylabel("Residuals")
        plt.title("Residual Plot")
        plt.show()


# Train and Evaluate Model
print("\nEvaluation metrics on Training Set:")
modelPCA = MultipleLinearRegressionPCA(learning_rate=0.001, n_epochs=200)
y_train , y_train_pred = modelPCA.fit(X_train_v2, y_train_v2)
modelPCA.plot_loss_history()

In [None]:
print("\nEvaluation metrics on Training Set:")
evaluate_model(y_train, y_train_pred)

In [None]:
test_loss, y_test_pred = modelPCA.evaluate(X_val_v2, y_val_v2)

In [None]:
print("\nEvaluation metrics on Validation Set:")
evaluate_model(y_val_v2, y_test_pred.reshape(-1))

Actual vs Predicted

In [None]:
# Sao chép dữ liệu
y_test_pred_copy = np.copy(y_test_pred)
y_val_v2_copy = np.copy(y_val_v2)

# Chuyển về 1D nếu cần
y_test_pred_copy = y_test_pred_copy.ravel()
y_val_v2_copy = y_val_v2_copy.ravel()

# Tạo DataFrame
df_PCA1 = pd.DataFrame({'Predicted': y_test_pred_copy, 'Actual': y_val_v2_copy})

# Chuyển về kiểu int nếu không gây lỗi
df_PCA1 = df_PCA1.astype(int, errors='ignore')  # Dùng 'ignore' để tránh lỗi nếu có giá trị không chuyển được

In [None]:
df_PCA1.head(10)