Wills Kookogey PyTorch Learning Enviromnment

## Setup and config

In [1]:
import fennec_ml as fn
import json
import os
from pathlib import Path
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms  as transforms
import matplotlib.pyplot as plt
%matplotlib inline

# hyper parameters
input_size = 12
hidden_size = 128
num_classes = 9
num_epochs = 25
batch_size = 100
learning_rate = 0.01

sequence_length = 10
num_layers = 1

In [None]:
# folder config
cwd = os.getcwd()
main_dir = os.path.dirname(cwd)
data_dir = os.path.join(main_dir, "DATA")
os.makedirs(data_dir, exist_ok=True)
excel_dir = os.path.join(data_dir, "RAW_DATA/GERALD_2D")
os.makedirs(excel_dir, exist_ok=True)
csv_dir = os.path.join(data_dir, "PROCESSED_DATA/GERALD_2D")
os.makedirs(csv_dir, exist_ok=True)

json_path = os.path.join(main_dir, "vars_of_interest.json")

# convert excel data to csv
print("Using folder_cleaner() to convert from raw excel files to useful .csv's.\nfolder_cleaner() output:")
fn.folder_cleaner(excel_dir, csv_dir, overwrite= True)

# convert csv to numpy arrays
scaled_data = fn.standardize(csv_dir)
# scaled_data = fn.normalize(csv_dir)
labels = fn.get_2D_CG_labels(csv_dir)

# segment and split data
print("\nUsing segment_and_split() to cut data into training/validtion/testing sets\nsegment_and_split() output:")
# using default 70%/15%/15% train/val/test split
dataset_dict = fn.segment_and_split(scaled_data, labels, timesteps=sequence_length)

Using folder_cleaner() to convert from raw excel files to useful .csv's.
folder_cleaner() output:
065G_AAC_L_2.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 065G_AAC_L_2.csv
084G_CCC_L_2.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 084G_CCC_L_2.csv
083G_CCC_L_3.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 083G_CCC_L_3.csv
069G_BBC_S_4.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 069G_BBC_S_4.csv
059G_AAC_L_1.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 059G_AAC_L_1.csv
081G_CCS_L_2.xlsx processed and saved to /Users/willsstoddard/Documents/Development/Python/FENNEC/FENNEC-25_26/DATA/PROCESSED_DATA as 081G_CCS_L_2.

## Check Data Shape

In [3]:
print(dataset_dict["Training_Set"]["sets"].shape)
print(dataset_dict["Training_Set"]["labels"][0])

(27716, 10, 12)
CCS


## Convert data to tensors

In [5]:
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler, TensorDataset
import numpy as np

encoder = LabelEncoder()

training_labels = dataset_dict["Training_Set"]["labels"]
encoded_labels_train = encoder.fit_transform(training_labels)

validation_labels = dataset_dict["Validation_Set"]["labels"]
encoded_labels_val = encoder.fit_transform(validation_labels)

testing_labels = dataset_dict["Testing_Set"]["labels"]
encoded_labels_test = encoder.fit_transform(testing_labels)

print(encoded_labels_train)
print(encoder.classes_)

input_train = torch.from_numpy(dataset_dict["Training_Set"]["sets"]).type(torch.float)
output_train = torch.from_numpy(encoded_labels_train).type(torch.int)
input_val = torch.from_numpy(dataset_dict["Validation_Set"]["sets"]).type(torch.float)
output_val = torch.from_numpy(encoded_labels_val).type(torch.int)
input_test = torch.from_numpy(dataset_dict["Testing_Set"]["sets"]).type(torch.float)
output_test = torch.from_numpy(encoded_labels_test).type(torch.int)

# stack inputs and outputs
train_data = TensorDataset(input_train, output_train)
val_data = TensorDataset(input_val, output_val)
test_data = TensorDataset(input_test, output_test)

# data loader
class_counts = np.bincount(output_train)
weights = 1. / class_counts
sample_weights = [weights[label] for label in output_train]
sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

train_loader = DataLoader(train_data, batch_size=batch_size, sampler=sampler)
val_loader   = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_data, batch_size=batch_size, shuffle=False)

[8 0 0 ... 8 4 3]
['AAC' 'AAP' 'AAS' 'BBC' 'BBP' 'BBS' 'CCC' 'CCP' 'CCS']


# Playground

In [7]:
from pathlib import Path
import torch
import torch.nn as nn
from torch.optim import Adam
import torchvision
import torchvision.transforms  as transforms
import matplotlib.pyplot as plt
%matplotlib inline

# Create models directory 
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)

# Create model save path
MODEL_NAME = "model_0.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

# Device config
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("MPS device found.")
else:
    print("MPS device not found. Using CPU.")
    device = torch.device("cpu") # Fallback to CPU if MPS is not available

# Ben Keller 1D Model
class ResidualGRU(nn.Module):
    def __init__(self, input_size, hidden_size=128, num_layers=2, num_classes=5, dropout=0.3):
        super(ResidualGRU, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers,
                          batch_first=True, dropout=dropout if num_layers > 1 else 0)
        self.fc1 = nn.Linear(hidden_size, hidden_size // 2)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_size // 2, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        # GRU output
        out, _ = self.gru(x)   # out: [batch, seq, hidden]
        
        # Take only the last timestep
        out = out[:, -1, :]    # [batch, hidden]

        # Residual connection (skip last layer’s features through dense layers)
        residual = out.clone()

        # Fully connected layers with dropout + residual
        out = self.fc1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)

        return out

model = ResidualGRU(input_size=12, num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
best_val_loss = float('inf')

MPS device found.


In [None]:
# Train Loop

for epoch in range(num_epochs):
    # Training
    model.train()
    running_loss = 0
    for input, output in train_loader:
        input, output = input.to(device), output.to(device)
        optimizer.zero_grad()
        model_outputs = model(input)
        loss = criterion(model_outputs,output)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * input.size(0)
    epoch_loss = running_loss / len(train_data)

    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs,y_batch)
            val_loss += loss.item() * X_batch.size(0)
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == y_batch).sum().item()
    val_loss /= len(val_data)
    val_acc = correct / len(val_data)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        print(f"✅ Saved best model at epoch {epoch+1}")

print("Training complete!")

Output Graphs

In [21]:
import matplotlib.pyplot as plt
from scipy import stats

correct_guesses = 0
total_guesses = 0
correctness = 0

model.load_state_dict(torch.load(MODEL_SAVE_PATH))
model.to(device)
model.eval()

for inputs, labels in test_loader:   # each iteration = 1 file dataset tensor
    inputs = inputs.to(device)

    with torch.no_grad():
        outputs = model(inputs)
        probs = torch.softmax(outputs, dim=1).cpu().numpy()

    # Average prediction for the file
    mean_preds = probs.mean(axis=0)
    mode_true = stats.mode(labels.numpy())
    predicted_class = encoder.classes_[np.argmax(mean_preds)]
    true_class = encoder.classes_[mode_true.mode]

    # print(f"File: {true_class}, Predicted: {predicted_class}")

    # tally correct guesses
    if (predicted_class == true_class):
        correct_guesses += 1
    
    total_guesses += 1

# calculate correctness
correctness = correct_guesses / total_guesses
print(f"Correctness Percentage: {correctness}")

Correctness Percentage: 0.8166666666666667


chat gpt training loop

In [None]:
num_epochs = 30

for epoch in range(num_epochs):
    ##########################
    #       TRAINING         #
    ##########################
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X_batch.size(0)

        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

    train_loss = running_loss / len(train_dataset)
    train_acc = correct / total

    ##########################
    #     VALIDATION         #
    ##########################
    model.eval()
    val_loss_total = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            val_loss_total += loss.item() * X_batch.size(0)

            _, predicted = torch.max(outputs, 1)
            val_total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()

    val_loss = val_loss_total / len(val_dataset)
    val_acc = val_correct / val_total

    ##########################
    #  SAVE BEST MODEL       #
    ##########################
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_residual_gru.pth")

    ##########################
    #   LOG RESULTS          #
    ##########################
    print(f"Epoch [{epoch+1}/{num_epochs}] "
          f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")


an RNN

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size=12, hidden_size=32, num_layers=1, num_classes=9):
        super(RNN, self).__init__()

        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

        # x -> (batch_size, seq, input_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        out, _ = self.rnn(x, h0)
        # out: (batch_size, seq_length, hidden_size)
        # out: (N, 10, 128)
        out = out[:, -1, :]
        # out (N, 128)
        logits = self.fc(out)
        return logits
    

model_0 = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
untrained_pred = model_0(input_test)

# Save the model state dict
print(f"Saving model to: {MODEL_SAVE_PATH}")
torch.save(obj=model_0.state_dict(),
           f=MODEL_SAVE_PATH)


MPS device found.


AttributeError: 'RNN' object has no attribute 'num_layers'