# Homework 2: Linear Regression
## Ariv Ahuja
## DS4400 - Machine Learning

In [None]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

## Data Loading and Preprocessing

In [None]:
# Load data
train_df = pd.read_csv('housing/train.csv', index_col=0)
test_df = pd.read_csv('housing/test.csv', index_col=0)

# Drop columns: id, date, zipcode (as per instructions)
cols_to_drop = ['id', 'date', 'zipcode']
for col in cols_to_drop:
    if col in train_df.columns:
        train_df = train_df.drop(columns=[col])
    if col in test_df.columns:
        test_df = test_df.drop(columns=[col])

# Separate features and target
y_train = train_df['price'].values / 1000  # Divide by 1000 as per instructions
y_test = test_df['price'].values / 1000

X_train = train_df.drop(columns=['price'])
X_test = test_df.drop(columns=['price'])

# Store feature names
feature_names = X_train.columns.tolist()

# Scale features to mean 0 and std 1
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"Training set size: {X_train_scaled.shape[0]} samples, {X_train_scaled.shape[1]} features")
print(f"Testing set size: {X_test_scaled.shape[0]} samples")
print(f"\nFeatures: {feature_names}")

---
## Problem 2: Linear Regression using sklearn

In [None]:
# 2.1 Train multiple linear regression model
lr_model = LinearRegression()
lr_model.fit(X_train_scaled, y_train)

# Get coefficients
print("=" * 60)
print("Problem 2.1: Linear Regression Coefficients")
print("=" * 60)
print(f"\nIntercept (theta_0): {lr_model.intercept_:.4f}")
print("\nFeature Coefficients:")
for name, coef in zip(feature_names, lr_model.coef_):
    print(f"  {name:20s}: {coef:12.4f}")

In [None]:
# Training metrics
y_train_pred = lr_model.predict(X_train_scaled)
train_mse = mean_squared_error(y_train, y_train_pred)
train_r2 = r2_score(y_train, y_train_pred)

print("\nTraining Metrics:")
print(f"  MSE: {train_mse:.4f}")
print(f"  R^2: {train_r2:.4f}")

In [None]:
# 2.2 Evaluate on testing set
y_test_pred = lr_model.predict(X_test_scaled)
test_mse = mean_squared_error(y_test, y_test_pred)
test_r2 = r2_score(y_test, y_test_pred)

print("=" * 60)
print("Problem 2.2: Testing Metrics")
print("=" * 60)
print(f"  MSE: {test_mse:.4f}")
print(f"  R^2: {test_r2:.4f}")

In [None]:
# 2.3 Analysis - Top contributing features
print("=" * 60)
print("Problem 2.3: Feature Importance Analysis")
print("=" * 60)

# Sort features by absolute coefficient value
feature_importance = sorted(zip(feature_names, lr_model.coef_), 
                           key=lambda x: abs(x[1]), reverse=True)

print("\nTop 5 Most Important Features (by absolute coefficient):")
for i, (name, coef) in enumerate(feature_importance[:5], 1):
    print(f"  {i}. {name:20s}: {coef:12.4f}")

print(f"\nSummary:")
print(f"  Training MSE: {train_mse:.4f}, Testing MSE: {test_mse:.4f}")
print(f"  Training R^2: {train_r2:.4f}, Testing R^2: {test_r2:.4f}")

---
## Problem 3: Closed-Form Solution Implementation

In [None]:
class LinearRegressionClosedForm:
    """Linear Regression using closed-form solution: theta = (X^T X)^(-1) X^T y"""
    
    def __init__(self):
        self.theta = None
        
    def fit(self, X, y):
        """Fit the model using closed-form solution with pseudo-inverse for numerical stability."""
        # Add bias column (column of ones)
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        
        # Closed-form solution using pseudo-inverse: theta = (X^T X)^(-1) X^T y
        # Using pinv for numerical stability
        self.theta = np.linalg.pinv(X_b.T @ X_b) @ X_b.T @ y
        return self
    
    def predict(self, X):
        """Predict using the trained model."""
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        return X_b @ self.theta
    
    @property
    def intercept_(self):
        return self.theta[0]
    
    @property
    def coef_(self):
        return self.theta[1:]

In [None]:
# Train using closed-form solution
cf_model = LinearRegressionClosedForm()
cf_model.fit(X_train_scaled, y_train)

print("=" * 60)
print("Problem 3: Closed-Form Solution Results")
print("=" * 60)
print(f"\nIntercept (theta_0): {cf_model.intercept_:.4f}")
print("\nFeature Coefficients:")
for name, coef in zip(feature_names, cf_model.coef_):
    print(f"  {name:20s}: {coef:12.4f}")

In [None]:
# Evaluate closed-form model
y_train_pred_cf = cf_model.predict(X_train_scaled)
y_test_pred_cf = cf_model.predict(X_test_scaled)

cf_train_mse = mean_squared_error(y_train, y_train_pred_cf)
cf_train_r2 = r2_score(y_train, y_train_pred_cf)
cf_test_mse = mean_squared_error(y_test, y_test_pred_cf)
cf_test_r2 = r2_score(y_test, y_test_pred_cf)

print("\nClosed-Form Implementation Metrics:")
print(f"  Training MSE: {cf_train_mse:.4f}, R^2: {cf_train_r2:.4f}")
print(f"  Testing MSE:  {cf_test_mse:.4f}, R^2: {cf_test_r2:.4f}")

print("\nComparison with sklearn:")
print(f"  sklearn    - Train MSE: {train_mse:.4f}, Test MSE: {test_mse:.4f}")
print(f"  Closed-form - Train MSE: {cf_train_mse:.4f}, Test MSE: {cf_test_mse:.4f}")

---
## Problem 4: Polynomial Regression

In [None]:
def create_polynomial_features(X, degree):
    """Create polynomial features up to specified degree."""
    X = np.array(X).reshape(-1, 1) if X.ndim == 1 else X
    features = [X]
    for d in range(2, degree + 1):
        features.append(X ** d)
    return np.hstack(features)

# Extract sqft_living feature
sqft_living_idx = feature_names.index('sqft_living')
X_sqft_train = X_train_scaled[:, sqft_living_idx]
X_sqft_test = X_test_scaled[:, sqft_living_idx]

print("=" * 60)
print("Problem 4: Polynomial Regression Results")
print("=" * 60)
print("\nUsing feature: sqft_living")

results_poly = []
for p in [1, 2, 3, 4, 5]:
    # Create polynomial features
    X_poly_train = create_polynomial_features(X_sqft_train, p)
    X_poly_test = create_polynomial_features(X_sqft_test, p)
    
    # Fit model using closed-form solution
    poly_model = LinearRegressionClosedForm()
    poly_model.fit(X_poly_train, y_train)
    
    # Predictions
    y_train_pred_poly = poly_model.predict(X_poly_train)
    y_test_pred_poly = poly_model.predict(X_poly_test)
    
    # Metrics
    train_mse_poly = mean_squared_error(y_train, y_train_pred_poly)
    train_r2_poly = r2_score(y_train, y_train_pred_poly)
    test_mse_poly = mean_squared_error(y_test, y_test_pred_poly)
    test_r2_poly = r2_score(y_test, y_test_pred_poly)
    
    results_poly.append({
        'Degree': p,
        'Train MSE': train_mse_poly,
        'Train R^2': train_r2_poly,
        'Test MSE': test_mse_poly,
        'Test R^2': test_r2_poly
    })

# Display results table
results_df = pd.DataFrame(results_poly)
print("\nPolynomial Regression Results:")
print(results_df.to_string(index=False))

---
## Problem 5: Gradient Descent

In [None]:
class LinearRegressionGD:
    """Linear Regression using Gradient Descent."""
    
    def __init__(self, learning_rate=0.01, n_iterations=100):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.theta = None
        self.cost_history = []
        
    def fit(self, X, y):
        """Fit the model using gradient descent."""
        # Add bias column
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        n_samples, n_features = X_b.shape
        
        # Initialize theta to zeros
        self.theta = np.zeros(n_features)
        self.cost_history = []
        
        for i in range(self.n_iterations):
            # Predictions
            predictions = X_b @ self.theta
            
            # Compute gradients
            errors = predictions - y
            gradients = (2 / n_samples) * (X_b.T @ errors)
            
            # Update theta
            self.theta -= self.learning_rate * gradients
            
            # Store cost (MSE)
            cost = np.mean(errors ** 2)
            self.cost_history.append(cost)
            
        return self
    
    def predict(self, X):
        """Predict using the trained model."""
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        return X_b @ self.theta
    
    @property
    def intercept_(self):
        return self.theta[0]
    
    @property
    def coef_(self):
        return self.theta[1:]

In [None]:
print("=" * 60)
print("Problem 5: Gradient Descent Results")
print("=" * 60)

learning_rates = [0.01, 0.1, 0.5]
iterations_list = [10, 50, 100]

results_gd = []

for lr in learning_rates:
    for n_iter in iterations_list:
        gd_model = LinearRegressionGD(learning_rate=lr, n_iterations=n_iter)
        gd_model.fit(X_train_scaled, y_train)
        
        # Predictions
        y_train_pred_gd = gd_model.predict(X_train_scaled)
        y_test_pred_gd = gd_model.predict(X_test_scaled)
        
        # Metrics
        train_mse_gd = mean_squared_error(y_train, y_train_pred_gd)
        train_r2_gd = r2_score(y_train, y_train_pred_gd)
        test_mse_gd = mean_squared_error(y_test, y_test_pred_gd)
        test_r2_gd = r2_score(y_test, y_test_pred_gd)
        
        results_gd.append({
            'Learning Rate': lr,
            'Iterations': n_iter,
            'Train MSE': train_mse_gd,
            'Train R^2': train_r2_gd,
            'Test MSE': test_mse_gd,
            'Test R^2': test_r2_gd
        })

# Display results table
results_gd_df = pd.DataFrame(results_gd)
print("\nGradient Descent Results:")
print(results_gd_df.to_string(index=False))

In [None]:
# Plot convergence for different learning rates
plt.figure(figsize=(10, 6))
for lr in [0.01, 0.1]:
    gd_model = LinearRegressionGD(learning_rate=lr, n_iterations=100)
    gd_model.fit(X_train_scaled, y_train)
    plt.plot(gd_model.cost_history, label=f'LR={lr}')

plt.xlabel('Iteration')
plt.ylabel('MSE (Cost)')
plt.title('Gradient Descent Convergence')
plt.legend()
plt.grid(True)
plt.savefig('gd_convergence.png', dpi=150, bbox_inches='tight')
plt.show()

---
## Problem 6: Ridge Regularization

In [None]:
class RidgeRegressionGD:
    """Ridge Regression using Gradient Descent."""
    
    def __init__(self, learning_rate=0.01, n_iterations=1000, lambda_reg=1.0):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.lambda_reg = lambda_reg
        self.theta = None
        
    def fit(self, X, y):
        """Fit the model using gradient descent with L2 regularization."""
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        n_samples, n_features = X_b.shape
        
        self.theta = np.zeros(n_features)
        
        for i in range(self.n_iterations):
            predictions = X_b @ self.theta
            errors = predictions - y
            
            # Gradient with regularization (don't regularize bias term)
            gradients = (2 / n_samples) * (X_b.T @ errors)
            gradients[1:] += (2 * self.lambda_reg / n_samples) * self.theta[1:]
            
            self.theta -= self.learning_rate * gradients
            
        return self
    
    def predict(self, X):
        X_b = np.c_[np.ones((X.shape[0], 1)), X]
        return X_b @ self.theta
    
    @property
    def intercept_(self):
        return self.theta[0]
    
    @property
    def coef_(self):
        return self.theta[1:]

In [None]:
# Problem 6.3: Simulated data experiment
print("=" * 60)
print("Problem 6.3: Ridge Regression on Simulated Data")
print("=" * 60)

np.random.seed(42)
N = 1000

# Generate X uniformly on [-2, 2]
X_sim = np.random.uniform(-2, 2, N)

# Generate Y = 1 + 2*X + e, where e ~ N(0, 2)
e = np.random.normal(0, np.sqrt(2), N)
Y_sim = 1 + 2 * X_sim + e

# Reshape for model fitting
X_sim_reshaped = X_sim.reshape(-1, 1)

# Linear regression (no regularization) using closed-form
X_sim_b = np.c_[np.ones((N, 1)), X_sim_reshaped]
theta_lr = np.linalg.pinv(X_sim_b.T @ X_sim_b) @ X_sim_b.T @ Y_sim
y_pred_lr = X_sim_b @ theta_lr

print(f"\nLinear Regression (lambda=0):")
print(f"  Intercept: {theta_lr[0]:.4f} (true: 1.0)")
print(f"  Slope: {theta_lr[1]:.4f} (true: 2.0)")
print(f"  MSE: {mean_squared_error(Y_sim, y_pred_lr):.4f}")
print(f"  R^2: {r2_score(Y_sim, y_pred_lr):.4f}")

# Ridge regression with different lambda values using closed-form
lambdas = [1, 10, 100, 1000, 10000]
results_ridge = []

print("\nRidge Regression Results:")
print(f"{'Lambda':<12} {'Intercept':<12} {'Slope':<12} {'MSE':<12} {'R^2':<12}")
print('-'*60)

for lam in lambdas:
    # Closed-form ridge: theta = (X^T X + lambda*I)^(-1) X^T y
    I = np.eye(X_sim_b.shape[1])
    I[0, 0] = 0  # Don't regularize bias
    theta_ridge = np.linalg.pinv(X_sim_b.T @ X_sim_b + lam * I) @ X_sim_b.T @ Y_sim
    y_pred_ridge = X_sim_b @ theta_ridge
    
    mse_ridge = mean_squared_error(Y_sim, y_pred_ridge)
    r2_ridge = r2_score(Y_sim, y_pred_ridge)
    
    print(f"{lam:<12} {theta_ridge[0]:<12.4f} {theta_ridge[1]:<12.4f} {mse_ridge:<12.4f} {r2_ridge:<12.4f}")
    
    results_ridge.append({
        'Lambda': lam,
        'Intercept': theta_ridge[0],
        'Slope': theta_ridge[1],
        'MSE': mse_ridge,
        'R^2': r2_ridge
    })

In [None]:
print("\n" + "=" * 60)
print("CODE EXECUTION COMPLETE")
print("=" * 60)