In [None]:
from typing import List, Dict, Tuple, Callable
import os
import gc
import traceback
import warnings
from pdb import set_trace

import sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
def l1_norm(x: np.ndarray) -> float:
    """ Computes the L1 norm 

        Args:
            x: A 1D or 2D column vector 

        Return:
            A float corresponding to the L1 norm
    """
    lms = np.linalg.norm(x, 1)
    return lms

In [None]:
def squared_l2_norm(x: np.ndarray) -> float:
    """ Computes the squared L2 norm 

        Args:
            x: A 1D or 2D column vector 

        Return:
            A float corresponding to the squared L2 norm
    """
    ln = np.linalg.norm(x, ord=2)
    return ln**2

In [None]:
crop_df = pd.read_csv(r'C:\Users\hanna\Fall2024\ITCS3156\Projects\crop_yield.csv')

In [None]:
crop_df.info() 

In [None]:
crop_df.describe()

In [None]:
import seaborn as sns
rng = np.random.RandomState(0)
indices = rng.choice(np.arange(len(crop_df)), size=500, replace=False)
sns.pairplot(
    data=crop_df.iloc[indices],
    y_vars='Yield_tons_per_hectare',
    x_vars=list(crop_df.drop('Yield_tons_per_hectare',axis=1).columns),
    hue="Yield_tons_per_hectare",
)
plt.show()

In [None]:
crops_ohe = crop_df
categorical_columns = ['Region','Soil_Type','Crop','Fertilizer_Used','Irrigation_Used', 'Weather_Condition']
for col in categorical_columns:
    col_ohe = pd.get_dummies(crop_df[col], prefix=col)
    crops_ohe = pd.concat((crops_ohe, col_ohe), axis=1).drop(col, axis=1)

In [None]:
def poly_features(X: np.ndarray, degree: int) -> np.ndarray:
    """ Compute polynomial features for pass data

        Args:
            X: Matrix of input data for which polynomial features
                will be computed for.

            degree: The degree of the polynomial which will be computed.

        Return:
            A matrix containing the original data and the new polynomial data.
    """
   
    X_poly = X.copy()
    for i in range(2, degree+1):  
        x_stack = X**i
        X_poly = np.hstack([X_poly, x_stack]) 
    return X_poly

In [None]:
from sklearn.model_selection import train_test_split
def get_train_valid_test_data(
    X: np.ndarray, 
    y: np.ndarray, 
) -> Tuple[np.ndarray]:
    """ Randomizes and then splits the data into train, validation, and test sets.

        Args:
            X: Data given as a 2D matrix

            y: Labels given as a vector 
    """
    X_trn, y_trn, X_vld, y_vld, X_tst, y_tst= None, None, None, None, None, None
    X_trn,X_tst,y_trn, y_tst = train_test_split(X,y,test_size=0.2, random_state=42)
    X_trn, X_vld, y_trn, y_vld = train_test_split(X_trn,y_trn,test_size=0.2, random_state=42)
    return X_trn, y_trn, X_vld, y_vld, X_tst, y_tst

In [None]:
from sklearn import preprocessing
def get_preprocessed_data(degree: int)  -> Tuple[np.ndarray]:
    """ Gets preprocessed data for training, validation, and testing

        Args:
            degree: The degree to use when computing polynomial features. 
        
        Return:
            A tuple of NumPy arrays where indices 0-1 
            contain the training data/targets, indices 2-3
            contain the validation data/targets, and 4-5
            contain the testing data/targets.
    """
    X = crops_ohe.drop('Yield_tons_per_hectare', axis=1).values
    y = crops_ohe['Yield_tons_per_hectare'].values

    X_poly  = poly_features(X, degree)
    
    X_trn, y_trn, X_vld, y_vld, X_tst, y_tst = get_train_valid_test_data(X_poly, y)
    
    scaler = preprocessing.StandardScaler().fit(X_trn)
    X_trn = scaler.transform(X_trn)
    X_tst = scaler.transform(X_tst)
    X_vld = scaler.transform(X_vld)
    
    bias = np.ones((X_trn.shape[0], 1))  
    X_trn= np.hstack([bias, X_trn])
    bias = np.ones((X_tst.shape[0], 1))  
    X_tst = np.hstack([bias, X_tst])
    bias = np.ones((X_vld.shape[0], 1))  
    X_vld = np.hstack([bias, X_vld])

    return X_trn, y_trn.reshape(-1, 1), X_vld, y_vld.reshape(-1, 1), X_tst, y_tst.reshape(-1, 1)

In [None]:
from sklearn.metrics import root_mean_squared_error as rmse
from sklearn.metrics import mean_squared_error as mse

In [None]:
class RidgeOrdinaryLeastSquares():
    """ Perfroms ordinary least squares regression
    
        Attributes:

            lamb (float): Regularization parameter for controlling
                L2 regularization.

            w: Vector of weights 

    """
    def __init__(self, lamb: float):
        self.lamb = lamb
        self.w = None
        
    def fit(self, X: np.ndarray, y: np.ndarray) -> object:
        """ Train OLS to learn optimal weights

            Args:
                X: Training data given as a 2D matrix

                y: Training labels given as a 1D vector

             Returns:
                The class's own object reference. 
        """
        n_features = X.shape[1]
        regularization_matrix = self.lamb * np.eye(n_features)
        regularization_matrix[0, 0] = 0 
        self.w = np.linalg.inv(X.T @ X + regularization_matrix) @ X.T @ y
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """ Make predictions using learned weights

            Args:
                X: Testing data given as a 2D matrix

            Returns:
                A 2D column vector of predictions for each data sample in X
        """
        predictions = X @ self.w
        return predictions.reshape(-1, 1)

In [None]:
X_trn, y_trn, _, _, X_tst, y_tst = get_preprocessed_data(
    degree=5
)

rols = RidgeOrdinaryLeastSquares(lamb = 0.00006)
rols.fit(X_trn, y_trn)
y_hat_tst = rols.predict(X_tst)
test_mse = mse(y_tst, y_hat_tst)
print("mse: ",test_mse)
test_rmse = rmse(y_tst, y_hat_tst)
print("rmse: ",test_rmse)

plt.plot(y_tst, 'ob', label='Targets')
plt.plot(y_hat_tst, 'xr', label='Predictions')
plt.xlabel("Data Sample Index")
plt.ylabel("Error in MPa")
plt.title("Test Targets vs Predictions")
plt.legend()
plt.legend(bbox_to_anchor=(1.3, 1.00))
plt.show()

print(f"Weights:\n{rols.w.flatten()}")

In [None]:
def get_batches(
    data_len: int, 
    batch_size: int = 32,
) -> List[np.ndarray]:
    """ Generates mini-batches based on the data indexes
        
        Args:
            data_len: Length of the data or number of data samples 
                in the data. This is used to generate the indices of
                the data.
            
            batch_size: Size of each mini-batch where the last mini-batch
                might be smaller than the rest if the batch_size does not 
                evenly divide the data length.

        Returns:
            A list of NumPy array's holding the indices of batches
    """
    indices = np.arange(data_len)
    np.random.shuffle(indices)
    batches = [indices[i:i+batch_size] for i in range(0, data_len, batch_size)]

    return batches

In [None]:
class RidgeLeastMeanSquares():
    """ Performs ridge regression using least mean squares (gradient descent)
    
        Attributes:

            alpha: learning rate or step size

            lamb (float): Regularization parameter for controlling
                L2 regularization.
                
            batch_size: Size of mini-batches for mini-batch gradient
                descent.
            
            epochs: Number of epochs to run for mini-batch
                gradient descent
                
            seed: Seed to be used for NumPy's RandomState class
                or universal seed np.random.seed() function.

            w: 1D vector of weights 

            trn_error: Stores the training error for each epoch.

            vld_error: Stores the validation error for each epoch.
    """

    def __init__(
        self, 
        alpha: float,
        lamb: float, 
        batch_size: int,
        seed: int = 0,
        epochs: int = 1,
    ):
        self.alpha = alpha
        self.lamb = lamb
        self.batch_size = batch_size
        self.epochs = epochs
        self.seed = seed
        self.w = None
        self.trn_error = None
        self.vld_error = None
    
    def fit(
         self, X: np.ndarray, 
         y: np.ndarray, 
         X_vld: np.ndarray=None, 
         y_vld: np.ndarray=None
     ) -> object:
        """ Train LMS to learn weights

            Args:
                X: Training data given as a 2D matrix

                y: Training labels given as a 2D column vector
                
            Returns:
                The class's own object reference. 
        """
        np.random.seed(self.seed)
        self.trn_error = []
        self.vld_error = []
        self.w = np.random.rand(X.shape[1],1)
        self.lamb = np.full(self.w.shape, self.lamb)
        self.lamb[0] = 0 
        for e in range(self.epochs):
            batches = get_batches(len(X), batch_size=self.batch_size)
            
            for i in batches:
                X_batch = X[i]
                y_batch = y[i]
                preds = self.predict(X[i])
                error = preds - y[i]
                mean_grad = (np.dot(X_batch.T, error) / len(X_batch)) + self.lamb * self.w
                self.w -= mean_grad * self.alpha
        
            trn_preds = self.predict(X)
            trn_error = rmse(y, trn_preds)
            self.trn_error.append(trn_error)
    
            if X_vld is not None and y_vld is not None:
                vld_preds = self.predict(X_vld)
                vld_error = rmse(y_vld, vld_preds)
                self.vld_error.append(vld_error)
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """ Make predictions using learned weights

            Args:
                X: Testing data given as a 2D matrix

            Returns:
                A 2D column vector of predictions for each data sample in X
        """
        return np.dot(X, self.w)

In [None]:
X_trn, y_trn, X_vld, y_vld, X_tst, y_tst = get_preprocessed_data(
    degree=4
)

rlms = RidgeLeastMeanSquares(
        lamb=0.000001,
        alpha=.099, 
        batch_size=100, 
        epochs=6750,
        seed=42
)
rlms.fit(X_trn, y_trn)
y_hat_tst = rlms.predict(X_tst)
print(f"Test y_hat shape: {y_hat_tst.shape}")
tst_rmse = rmse(y_tst, y_hat_tst)
print(f"Test RMSE: {tst_rmse}")

adjusted_r2 = 1 - (1 - r2) * ((len(y_test) - 1) / (len(y_test) - X_test.shape[1] - 1))
print(adjusted_r2)

plt.plot(rlms.trn_error, label='train')
plt.plot(rlms.vld_error, label='valid')
plt.title("LMS Learning Curve")
plt.xlabel("Epochs")
plt.ylabel("Error")
plt.legend(bbox_to_anchor=(1.2, 1.00))
plt.show()

plt.plot(y_tst, 'ob', label='Targets')
plt.plot(y_hat_tst, 'xr', label='Predictions')
plt.xlabel("Data Sample Index")
plt.ylabel("Error in MPa")
plt.title("Test Targets vs Predictions")
plt.legend()
plt.legend(bbox_to_anchor=(1.3, 1.00))
plt.show()

print(f"Weights:\n{rlms.w.flatten()}")