In [35]:
from pathlib import Path
import torch
import matplotlib.pyplot as plt
import torch

from torch import nn
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
from torchvision import transforms

from torchvision.models import EfficientNet_V2_S_Weights, efficientnet_v2_s
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models import inception_v3, Inception_V3_Weights

from torchvision import datasets

from torch.utils.data import DataLoader

from torch import nn
from timeit import default_timer as timer

from tqdm.auto import tqdm
from typing import Dict, List, Tuple
from copy import deepcopy

In [36]:
lr1 = 0.01
lr2 = 0.001
step_size = 10
patience = 3
num_epochs = 30

In [37]:
data_path = Path("data/")
image_path = data_path / "events"
train_dir = image_path / "train"
test_dir = image_path / "test"

train_dir, test_dir
output_dir = 'output'

In [38]:

torch.cuda.empty_cache()
torch.manual_seed(42)
torch.cuda.manual_seed(42)
device = torch.device("cuda:0")
torch.cuda.set_device(0)

In [39]:
weights = EfficientNet_V2_S_Weights.DEFAULT
model = efficientnet_v2_s(weights=weights).to(device)

weights = ResNet50_Weights.DEFAULT
model = resnet50(weights=weights).to(device)

In [40]:
auto_transforms = weights.transforms()

In [41]:
train_transform = transforms.Compose([
    transforms.Resize((800, 800)),  # Resize images to (800, 800)
    transforms.RandomResizedCrop(150),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(15),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    transforms.Resize((150, 150)),  # Resize images to (80, 80)
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [42]:
train_data = datasets.ImageFolder(root=train_dir, # target folder of images
                                  transform=train_transform, # transforms to perform on data (images)
                                  target_transform=None) # transforms to perform on labels (if necessary)

test_data = datasets.ImageFolder(root=test_dir,
                                 transform=test_transform)

In [43]:
class_names = train_data.classes

In [44]:
train_dataloader = DataLoader(dataset=train_data, batch_size=32, shuffle=True)
test_dataloader = DataLoader(dataset=test_data, batch_size=32,shuffle=False)

In [45]:
# Get the length of class_names (one output unit for each class)
output_shape = len(class_names)
# Recreate the classifier layer and seed it to the target device

# model.classifier = torch.nn.Sequential(
#     nn.Dropout(p=0.1, inplace=True),
#     nn.Linear(in_features=1280, out_features=output_shape, bias=True),
# ).to(device)

model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model.fc = nn.Linear(model.fc.in_features, output_shape).to(device)



In [46]:
loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1) # this is also called "criterion"/"cost function" in some places
optimizer = torch.optim.SGD(model.parameters(), lr=lr1, momentum=0.9, weight_decay=0.01)
# optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=step_size, gamma=0.1)
# warmup_scheduler = WarmupLR(scheduler, warmup_factor=0.1, warmup_iters=10, warmup_method="linear")

In [47]:
# for layer in model.features.parameters():
#     layer.requires_grad = False

for layer in model.parameters():
    layer.requires_grad = False

In [48]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
    """Trains a PyTorch model for a single epoch.

    Turns a target PyTorch model to training mode and then
    runs through all of the required training steps (forward
    pass, loss calculation, optimizer step).

    Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
    """
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc


In [49]:
def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
    """Tests a PyTorch model for a single epoch.

    Turns a target PyTorch model to "eval" mode and then performs
    a forward pass on a testing dataset.

    Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
    """
    # Put model in eval mode
    model.eval() 

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [50]:
def train(model: torch.nn.Module, 
        train_dataloader: torch.utils.data.DataLoader, 
        test_dataloader: torch.utils.data.DataLoader, 
        optimizer: torch.optim.Optimizer,
        scheduler: torch.optim.lr_scheduler._LRScheduler,
        loss_fn: torch.nn.Module,
        epochs: int,
        device: torch.device,
        patience: int) -> Dict[str, List]:
  """Trains and tests a PyTorch model.

  Passes a target PyTorch models through train_step() and test_step()
  functions for a number of epochs, training and testing the model
  in the same epoch loop.

  Calculates, prints and stores evaluation metrics throughout.

  Args:
  model: A PyTorch model to be trained and tested.
  train_dataloader: A DataLoader instance for the model to be trained on.
  test_dataloader: A DataLoader instance for the model to be tested on.
  optimizer: A PyTorch optimizer to help minimize the loss function.
  loss_fn: A PyTorch loss function to calculate loss on both datasets.
  epochs: An integer indicating how many epochs to train for.
  device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
  A dictionary of training and testing loss as well as training and
  testing accuracy metrics. Each metric has a value in a list for 
  each epoch.
  In the form: {train_loss: [...],
            train_acc: [...],
            test_loss: [...],
            test_acc: [...]} 
  For example if training for epochs=2: 
            {train_loss: [2.0616, 1.0537],
            train_acc: [0.3945, 0.3945],
            test_loss: [1.2641, 1.5706],
            test_acc: [0.3400, 0.2973]} 
  """
  # Create empty results dictionary
  results = {"train_loss": [],
              "train_acc": [],
              "test_loss": [],
              "test_acc": []
  }

  
  # Make sure model on target device
  model.to(device)

  best_loss = float('inf')
  best_model_wts = deepcopy(model.state_dict())
  no_improve_epochs = 0

  # Loop through training and testing steps for a number of epochs
  for epoch in tqdm(range(epochs)):
      train_loss, train_acc = train_step(model=model,
                                        dataloader=train_dataloader,
                                        loss_fn=loss_fn,
                                        optimizer=optimizer,
                                        device=device)
      test_loss, test_acc = test_step(model=model,
        dataloader=test_dataloader,
        loss_fn=loss_fn,
        device=device)
      
      # scheduler.step()

      if test_loss < best_loss:
          best_loss = test_loss
          best_model_wts = deepcopy(model.state_dict())
          no_improve_epochs = 0
      else:
          no_improve_epochs += 1

      # if no_improve_epochs >= patience:
      #     print("Early stopping...")
      #     model.load_state_dict(best_model_wts)
      #     break

      # Print out what's happening
      print(
        f"Epoch: {epoch+1} | "
        f"train_loss: {train_loss:.4f} | "
        f"train_acc: {train_acc:.4f} | "
        f"test_loss: {test_loss:.4f} | "
        f"test_acc: {test_acc:.4f}"
      )

      # Update results dictionary
      results["train_loss"].append(train_loss)
      results["train_acc"].append(train_acc)
      results["test_loss"].append(test_loss)
      results["test_acc"].append(test_acc)

  # Return the filled results at the end of the epochs
  model.load_state_dict(best_model_wts)

In [51]:
train(
    model=model, 
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    optimizer=optimizer,
    scheduler=scheduler,
    loss_fn=loss_fn,
    epochs=num_epochs,
    device=device,
    patience=patience
)

  0%|          | 0/30 [00:00<?, ?it/s]

RuntimeError: output with shape [1, 150, 150] doesn't match the broadcast shape [3, 150, 150]

In [None]:
# for layer in model.features.parameters():
#     layer.requires_grad = True

In [None]:
# optimizer = torch.optim.SGD(model.parameters(), lr=lr2, momentum=0.9)
# scheduler = StepLR(optimizer, step_size=step_size, gamma=0.1)
# patience = 3

In [None]:
# train(
#     model=model, 
#     train_dataloader=train_dataloader,
#     test_dataloader=test_dataloader,
#     optimizer=optimizer,
#     scheduler=scheduler,
#     loss_fn=loss_fn,
#     epochs=num_epochs,
#     device=device,
#     patience=patience
# )

In [None]:
torch.save(model.state_dict(), 'weights/w1.tf')