In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%pip install pandas
%pip install matplotlib
%pip install seaborn
%pip install torch
%pip install sklearn
%pip install skorch

In [None]:
# Import necessary libraries
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
import torch
import torch.nn as nn
import torch.optim as optim
from numpy import vstack,sqrt
from pandas import read_csv,to_datetime
from sklearn.metrics import mean_squared_error
from torch.utils.data import Dataset,DataLoader,random_split
from torch import Tensor
from torch.nn import ReLU,Module,MSELoss,Linear
from torch.optim import SGD
from torch.nn.init import xavier_uniform_
from tqdm import tqdm

In [None]:
###### Q1 : Exploratory Data Analysis (EDA)
# Load the data
prices_dt = '/content/drive/MyDrive/Colab Notebooks/datasets/prices-split-adjusted.csv'
df = pd.read_csv(prices_dt)

# Understand the structure of the data
print(df.describe())
print(df.info())
print(df.isnull().sum())

# Visualize the data
sns.pairplot(df)
plt.show()

In [None]:
###### Q2 : Deep Neural Network Architecture for Regression

# dataset definition perparation
class CSVDataset(Dataset):
    # load the dataset
    def __init__(self, path):
        # load the csv file as a dataframe
        df = read_csv(path, header=None, skiprows=1)  # Skip the header

        # Convert 'date' to a numeric representation
        df.iloc[:, 0] = (to_datetime(df.iloc[:, 0]) - to_datetime(df.iloc[:, 0].min())).dt.days


        df.head()
        # store the inputs and outputs
        self.X = df.values[:, :-1].astype('float32')
        self.y = df.values[:, -1].astype('float32')
        # ensure target has the right shape
        self.y = self.y.reshape((len(self.y), 1))

    # number of rows in the dataset
    def __len__(self):
        return len(self.X)

    # get a row at an index
    def __getitem__(self, idx):
        return [self.X[idx], self.y[idx]]

    # get indexes for train and test rows
    def get_splits(self, n_test=0.33):
        # determine sizes
        test_size = round(n_test * len(self.X))
        train_size = len(self.X) - test_size
        # calculate the split
        return random_split(self, [train_size, test_size])

# model definition
class MLP(Module):
    # define model elements
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # input to first hidden layer
        self.hidden1 = Linear(n_inputs, 10)
        xavier_uniform_(self.hidden1.weight)
        self.act1 = ReLU()
        # second hidden layer
        self.hidden2 = Linear(10, 8)
        xavier_uniform_(self.hidden2.weight)
        self.act2 = ReLU()
        # third hidden layer and output
        self.hidden3 = Linear(8, 1)
        xavier_uniform_(self.hidden3.weight)

    # forward propagate input
    def forward(self, X):
        # input to first hidden layer
        X = self.hidden1(X)
        X = self.act1(X)
        # second hidden layer
        X = self.hidden2(X)
        X = self.act2(X)
        # third hidden layer and output
        X = self.hidden3(X)
        return X

# prepare the dataset
def prepare_data(path):
    # load the dataset
    dataset = CSVDataset(path)
    # calculate split
    train, test = dataset.get_splits()
    # prepare data loaders
    train_dl = DataLoader(train, batch_size=32, shuffle=True)
    test_dl = DataLoader(test, batch_size=32, shuffle=False)
    return train_dl, test_dl

# train the model
def train_model(train_dl, model):
    size = len(train_dl.dataset)
    # define the optimization
    criterion = MSELoss()
    optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
    # enumerate epochs
    # enumerate epochs
    for epoch in tqdm(range(100),desc='Training Epochs'):
        print(f"Epoch {epoch+1}\n-------------------------------")
        # enumerate mini batches
        for batch, (inputs, targets) in enumerate(train_dl):
            # clear the gradients
            optimizer.zero_grad()
            # compute the model output
            yhat = model(inputs)
            # calculate loss
            loss = criterion(yhat, targets)
            # credit assignment
            loss.backward()
            # update model weights
            optimizer.step()

            if batch % 100 == 0:
                loss, current = loss.item(), batch * len(inputs)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

# evaluate the model
def evaluate_model(test_dl, model):
    predictions, actuals = list(), list()
    for i, (inputs, targets) in enumerate(test_dl):
        # evaluate the model on the test set
        yhat = model(inputs)
        # retrieve numpy array
        yhat = yhat.detach().numpy()
        actual = targets.numpy()
        actual = actual.reshape((len(actual), 1))
        # store
        predictions.append(yhat)
        actuals.append(actual)
    predictions, actuals = vstack(predictions), vstack(actuals)
    # calculate mse
    mse = mean_squared_error(actuals, predictions)
    return mse

# make a class prediction for one row of data
def predict(row, model):
    # convert row to data
    row = Tensor([row])
    # make prediction
    yhat = model(row)
    # retrieve numpy array
    yhat = yhat.detach().numpy()
    return yhat

In [None]:
X = df.drop(['date', 'symbol', 'volume'], axis=1)
X = X[['low', 'high', 'open', 'close']]

prices_dt_adjusted = '/content/drive/MyDrive/Colab Notebooks/datasets/prices-split-adjusted-2.csv'

X.to_csv(prices_dt_adjusted, index=False)
X = pd.read_csv(prices_dt_adjusted, skiprows=1)
X.to_csv(prices_dt_adjusted, index=False)

train_dl, test_dl = prepare_data(prices_dt_adjusted)
print(len(train_dl.dataset), len(test_dl.dataset))
model = MLP(3)
train_losses = train_model(train_dl, model)

In [None]:
###### Q3 : Hyperparameter Tuning

mlp = MLP(n_inputs=3)
grid_search = GridSearchCV(mlp, param_grid, cv=3, scoring='neg_mean_squared_error', n_jobs=-1)
grid_search.fit(X_train, y_train)

print("Best Parameters: ", grid_search.best_params_)
print("Best Negative Mean Squared Error: ", grid_search.best_score_)

best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)
test_mse = mean_squared_error(y_test, y_pred)

print("Test Mean Squared Error with Best Model: ", test_mse)

In [None]:
###### Q4 : Visualization

plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs Epochs')
plt.legend()
plt.show()

In [None]:
###### Q5 : Regularization Techniques
# L2 regularization
class RegularizedRegressionNN(nn.Module):
    def __init__(self, input_dim):
        super(RegularizedRegressionNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x