CRNN Base Structure (TODO Training and Accuracy)

In [10]:
# Import required libraries
import numpy as np
import torch
import torch.nn as nn

In [6]:
# Base structure of CRNN
class CRNN(nn.Module):
    def __init__(self):
        super(CRNN, self).__init__()
        self.features = nn.Sequential (
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3,3)),
            nn.BatchNorm2d(64),
            nn.ELU(),
            nn.MaxPool2d((2,2)),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,3)),
            nn.BatchNorm2d(128),
            nn.ELU(),
            nn.MaxPool2d((2, 2)),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3,3)),
            nn.BatchNorm2d(256),
            nn.ELU(),
            nn.MaxPool2d((2, 2)),

            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(3,3)),
            nn.BatchNorm2d(256),
            nn.ELU(),
            nn.MaxPool2d((2, 2)),

            nn.Flatten()
        )

        W = 86 // (2**4)
        H = 96 // (2**4)
        C = 256

        totalInstruments = 3

        self.gru = nn.GRU(
            input_size=C*H, 
            hidden_size=256, 
            batch_first=True
        )
        # out_features is technically 18 in paper, but realistically 3 at the beginning
        self.fc = nn.Linear(in_features=256, out_features=totalInstruments)
        
    def forward(self, input):
        conv_output = self.features(input)
        gru_output, _ = self.gru(conv_output)

        fc_input = gru_output[:, -1, :]

        return self.fc(fc_input)

In [11]:
# Future stuff for training and accuracy of CNN
import torch.optim as optim

model = CRNN()
lossAlg = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr = 0.0001, momentum=0.9)

In [13]:
from torch.utils.data import DataLoader, Dataset, random_split
import logging, sys

sys.path.append('/Users/adarshbharathwaj/Desktop/eng100/project3/ENGR_100_Project_3/src')
from utils import *

class FrameDataset(Dataset):
    def __init__(self, npz_path):
        self.data = load_npz_file_with_condition(npz_path, max_size=1024**3)
        self.keys = [k for k in self.data.keys() if "_data" in k]

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        data_key = self.keys[idx]
        data = self.data[data_key]
        labels = self.data[f'{data_key.split("_data_")[0]}_labels']
        return torch.tensor(data.reshape(-1), dtype=torch.float32), torch.tensor(
            labels, dtype=torch.float32
        )
    
def load_npz_file_with_condition(file_path, max_size: int):
    file_size = os.path.getsize(file_path)

    if file_size > max_size:
        logging.info(
            f"File size is {file_size / (1024**2):.2f}MB. Using mmap_mode='r'."
        )
        data = np.load(file_path, mmap_mode="r", allow_pickle=True)
    else:
        logging.info(f"File size is {file_size / (1024**2):.2f}MB. Loading normally.")
        data = np.load(file_path, allow_pickle=True)

    return data

# def train_model(
#     model, train_dataloader, validation_dataloader, criterion, optimizer, epochs=5
# ) -> Tuple[list, list]:
#     train_accuracies = []
#     validation_accuracies = []

#     for epoch in range(epochs):
#         logging.info(f"Epoch {epoch+1}")
#         model.train()  # Set model to training mode
#         total_loss = 0
#         correct_predictions = 0
#         total_predictions = 0
#         for data, labels in train_dataloader:
#             optimizer.zero_grad()
#             outputs = model(data)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()

#             total_loss += loss.item()
#             predicted = torch.sigmoid(outputs) > 0.5
#             correct_predictions += (predicted == labels).float().sum()
#             total_predictions += torch.numel(labels)

#         train_accuracy = correct_predictions / total_predictions
#         logging.info(f"Loss: {total_loss}")
#         logging.info(f"Train Accuracy: {train_accuracy.item()}")
#         train_accuracies.append(train_accuracy.item())

#         # Validation phase
#         model.eval()  # Set model to evaluation mode
#         with torch.no_grad():
#             correct_predictions = 0
#             total_predictions = 0
#             for data, labels in validation_dataloader:
#                 outputs = model(data)
#                 predicted = torch.sigmoid(outputs) > 0.5
#                 correct_predictions += (predicted == labels).float().sum()
#                 total_predictions += torch.numel(labels)

#             validation_accuracy = correct_predictions / total_predictions
#             logging.info(f"Validation Accuracy: {validation_accuracy.item()}")
#             validation_accuracies.append(validation_accuracy.item())

#     return train_accuracies, validation_accuracies


# def plot_accuracy(
#     train_accuracies: list, validation_accuracies: list, epoch_count: int
# ):
#     epochs = range(1, epoch_count + 1)
#     plt.figure(figsize=(10, 6))
#     plt.plot(epochs, train_accuracies, label="Training Accuracy")
#     plt.plot(epochs, validation_accuracies, label="Validation Accuracy")
#     plt.title("Training and Validation Accuracy")
#     plt.xlabel("Epoch")
#     plt.ylabel("Accuracy")
#     plt.legend()
#     plt.show()


# def test_model(model, test_dataloader):
#     model.eval()  # Set model to evaluation mode
#     with torch.no_grad():
#         correct_predictions = 0
#         total_predictions = 0
#         for data, labels in test_dataloader:
#             outputs = model(data)
#             predicted = torch.sigmoid(outputs) > 0.5
#             correct_predictions += (predicted == labels).float().sum()
#             total_predictions += torch.numel(labels)
#         logging.info(
#             f"Test Accuracy: {(correct_predictions / total_predictions).item()}"
#         )


# def save_model(model, path: str):
#     torch.save(model.state_dict(), path)
#     logging.info(f"Model saved to {path}")


# def load_model(*parameters, path: str):
#     loaded_model = MultiLabelMLP(parameters)

#     # Then, load the saved state dict
#     loaded_model.load_state_dict(torch.load(path))

#     return loaded_model

In [16]:
# Training the model
npz_path = "data.npz"
full_dataset = FrameDataset(npz_path)

train_size = int(0.8 * len(full_dataset))
validation_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - validation_size
batch_size = 32

train_dataset, validation_dataset, test_dataset = random_split(
    full_dataset, [train_size, validation_size, test_size]
)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(
    validation_dataset, batch_size=batch_size, shuffle=True
)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

num_epochs = 20
for epoch in range(num_epochs):
    for batch in train_dataloader:
        input, labels = batch
        output = CRNN(input)
        loss = lossAlg(output.view(-1), labels.view(-1))
        optimizer.zero_grad()
        
        optimizer.step()
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

TypeError: CRNN.__init__() takes 1 positional argument but 2 were given