In [None]:
import os
from pathlib import Path

import cv2
import numpy as np
import torch
from sklearn.model_selection import train_test_split

import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

import torch.optim as optim
from tqdm import tqdm

In [None]:
# CNN Model Class
class CNN(nn.Module):
  def __init__(self):
    super(CNN, self).__init__()
    # Start with 3 convolutional layers
    self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(32, 64, 3, 1, 1)
    self.conv3 = nn.Conv2d(64, 128, 3, 1, 1)

    # Number of features * size of image at the end which is: 128 / 2^3 (3 pooling layers, each divide by 2)
    self.fc1 = nn.Linear(128*16*16, 128)

  def forward(self, X):
    # Steps: Conv1 -> Pool -> Conv2 -> Pool -> Conv3 -> Pool -> Outputs

    # Set out convolutional and pooling layers
    X = F.relu(self.conv1(X))
    X = F.max_pool2d(X, kernel_size=2, stride=2)

    X = F.relu(self.conv2(X))
    X = F.max_pool2d(X,2,2)

    X = F.relu(self.conv3(X))
    X = F.max_pool2d(X,2,2)

    # Flatten to a 1D feature vector
    X = X.view(-1, 128*16*16)

    # Fully connected layer
    X = F.relu(self.fc1(X))

    return X

In [None]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size, num_layers=1):
    # Runs constructor method to initialize RNN class
    super(RNN, self).__init__()
    # Use size of feature vectors, size of hidden state, number of LSTM layers, while batch_first ensures batch_size is first dimension for input and output tensors
    self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
    # Initializes a fully connected layer, maps hidden state to 1 (binary classification)
    self.fc = nn.Linear(hidden_size, output_size)
    # Initializes sigmoid activation function to convert output to probabilities
    self.sigmoid = nn.Sigmoid()

    # Define as attribute of the class so can use in forward()
    self.hidden_size = hidden_size

  def forward(self, X):
    # Initializes hidden state, creates tensor of zeros with shape (1, batch size, hidden size), then moves tensor to same GPU as X
    h0 = torch.zeros(1, X.size(0), self.hidden_size).to(X.device)
    # Initializes cell state, same as hidden state
    c0 = torch.zeros(1, X.size(0), self.hidden_size).to(X.device)
    # Performs forward pass through LSTM using initial hidden and cell states, creates out as output tensor for every frame's step
    out, _ = self.lstm(X, (h0, c0))
    # Applies fully connected layer to the output tensor, selects just the information from the last frame of the sequence
    out = self.fc(out[:, -1, :])
    # Applies sigmoid activation to the output, converts to probabilities for binary classification
    out = self.sigmoid(out)
    return out

In [None]:
class CNN_RNN(nn.Module):
  def __init__(self, cnn, rnn):
    super(CNN_RNN, self).__init__()
    self.cnn = cnn
    self.rnn = rnn

  def forward(self, X):
    batch_size, sequence_length, c, h, w = X.size()
    cnn_out = []
    for i in range(sequence_length):
      cnn_out.append(self.cnn(X[:, i, :, :, :]))
    cnn_out = torch.stack(cnn_out, dim=1) # Adds dimension for sequence length to the tensor
    rnn_out = self.rnn(cnn_out)
    return rnn_out

In [None]:
def prepare_frames(path):
  # Initialize sequence and label arrays
  sequences = []
  labels = []

  # Initialize dictionary to keep track of pitches labelled
  pitch_dict = {}

  items = os.listdir(path)
  sorted_dir = sorted(items)
  item = 0

  # Loop through all frames
  for frame in sorted_dir:
    # Ensure dealing with .png files
    if frame.endswith('.png'):
      # Get label, pitch number, and frame number
      label_str = frame.split('_')[0]
      pitch_num = frame.split('_')[-4]
      frame_num = int(frame.split('_')[-2])

      if label_str != 'Hit':
        pitch_num = int(pitch_num)
        pitch_num += 200 # Since there are 200 hits, this will make sure the Hits and No Hits are separated
        pitch_num = str(pitch_num)

      # Normalize pixel values for NN
      frame_path = os.path.join(path, frame)
      frame = cv2.imread(frame_path)
      frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      frame = frame.astype(np.float32) / 255.0

      # Add the pitch to the dictionary if its not already there
      if pitch_num not in pitch_dict:
        # Label the pitch
        label = 1 if label_str == 'Hit' else 0

        # Add to dictionary
        pitch_dict[pitch_num] = {'frame': [], 'label': label}


      pitch_dict[pitch_num]['frame'].append(frame)

  for pitch_num, pitch_data in pitch_dict.items():
    frames = pitch_data['frame']
    if len(frames) == 7:
      sequences.append(np.array(frames))
      labels.append(pitch_data['label'])

  return np.array(sequences), np.array(labels)

In [None]:
home = Path.home()
data_path = os.path.join(home, 'Downloads', 'Wii_Baseball_CNN_RNN', 'Prepared_Frames')

sequences, labels = prepare_frames(data_path)

train_sequences, val_sequences, train_labels, val_labels = train_test_split(sequences, labels, test_size=0.2)

train_sequences = torch.tensor(train_sequences, dtype=torch.float32).unsqueeze(1)
train_labels = torch.tensor(train_labels, dtype=torch.long)
val_sequences = torch.tensor(val_sequences, dtype=torch.float32).unsqueeze(1)
val_labels = torch.tensor(val_labels, dtype=torch.long)

FileNotFoundError: [Errno 2] No such file or directory: '/Users/jamesgough/Downloads/Wii_Baseball_AI_Project/Prepared_Frames'

In [None]:
train_dataset = TensorDataset(train_sequences, train_labels)
val_dataset = TensorDataset(val_sequences, val_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define a function to check the shape of the data in the DataLoader
def check_dataloader_shapes(dataloader, name):
    for inputs, labels in dataloader:
        print(f'{name} - Inputs shape: {inputs.shape}')
        print(f'{name} - Labels shape: {labels.shape}')
        break  # Only need to check the shape for one batch

# Check the shape of data in train_loader and val_loader
check_dataloader_shapes(train_loader, 'Train Loader')
check_dataloader_shapes(val_loader, 'Val Loader')

In [None]:
def test_cnn():
    # Create a CNN model
    cnn_model = CNN()

    # Create a sample input tensor with shape (batch_size, channels, height, width)
    sample_input = torch.randn(1, 1, 128, 128)  # Example input

    # Forward pass through the CNN
    output = cnn_model(sample_input)

    # Check the output shape
    print(f'CNN Output shape: {output.shape}')  # Should be (batch_size, 128)

test_cnn()

In [None]:
def test_rnn():
    # Create an RNN model
    sequence_length = 7
    input_size = 128  # Size of feature vectors produced by CNN
    hidden_size = 128
    output_size = 1  # Binary classification (Hit or No Hit)

    rnn_model = RNN(input_size, hidden_size, output_size)

    # Create a sample input tensor with shape (batch_size, sequence_length, input_size)
    sample_input = torch.randn(32, sequence_length, input_size)  # Example input

    # Forward pass through the RNN
    output = rnn_model(sample_input)

    # Check the output shape
    print(f'RNN Output shape: {output.shape}')  # Should be (batch_size, output_size)

test_rnn()

In [None]:
def test_cnn_rnn():
    # Create CNN and RNN models
    cnn_model = CNN()
    sequence_length = 7
    input_size = 128  # Size of feature vectors produced by CNN
    hidden_size = 128
    output_size = 1  # Binary classification (Hit or No Hit)

    rnn_model = RNN(input_size, hidden_size, output_size)

    # Create a combined CNN-RNN model
    combined_model = CNN_RNN(cnn_model, rnn_model)

    # Create a sample input tensor with shape (batch_size, sequence_length, channels, height, width)
    sample_input = torch.randn(32, sequence_length, 1, 128, 128)  # Example input

    # Forward pass through the combined CNN-RNN model
    output = combined_model(sample_input)

    # Check the output shape
    print(f'Combined CNN-RNN Output shape: {output.shape}')  # Should be (batch_size, output_size)

test_cnn_rnn()

In [None]:
cnn_model = CNN()

sequence_length = 7   # 7 pitches
input_size = 128      # Size of feature vectors produced by CNN
hidden_size = 128     # Number of features in the hidden state of RNN
output_size = 1       # Binary classification (Hit or No Hit)

rnn_model = RNN(sequence_length, input_size, hidden_size, output_size)

combined_model = CNN_RNN(cnn_model, rnn_model)

In [None]:
device = torch.device('cuda' if torch.cuda_is_available() else 'cpu')
combined_model.to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(combined_model.parameters(), lr=0.001)

In [None]:
num_epochs = 20

for epoch in range(num_epochs):
  combined_model.train()
  train_loss = 0.0

  for inputs, labels in tqdm(train_loader, desc=f'Training Epoch {epoch+1}/{num_epochs}'):
    inputs, labels = inputs.to(device), labels.to(device).float()   # Move data to device

    optimizer.zero_grad()   # Clear all gradients
    outputs = combined_model(inputs)    # Forward pass
    loss = criterion(outputs.squeeze(), labels)   # Computes loss for every sample in batch, then calculates averages losses to give one scalar value
    loss.backward()   # Calculates gradients
    optimizer.step()    # Updates model parameters

    train_loss += loss.item() * inputs.size(0)  # Multiplies average loss of batch by sequences in batch to get total loss over the whole batch

  train_loss = train_loss / len(train_dataset)   # Calculates average loss for every sample in this epoch

  combined_model.eval()
  val_loss = 0.0
  val_corr = 0

  with torch.no_grad():
    for inputs, labels in tqdm(val_loader, f'Validation Epoch {epoch+1}/{num_epochs}'):
      inputs, labels = inputs.to(device), labels.to(device).float() # Move data to device and convert to float
      outputs = combined_model(inputs)
      loss = criterion(outputs.squeeze(), labels)   # outputs.squeeze() removes dimensions of size 1, leaves batch_size
      val_loss += loss.item() * inputs.size(0)
      predictions = outputs.squeeze().round()   # Make prediction
      val_corr += (predictions == labels).sum().item()    # Adds 1 if prediction matches the label, use .item() to extract scalar from the tensor

  val_loss = val_loss / len(val_dataset)
  val_acc = val_corr / len(val_dataset)
  val_acc_percent = val_acc * 100

  print(f'Epoch {epoch+1}/{num_epochs},'
        f'Train Loss: {train_loss:.4f},'
        f'Val Loss: {val_loss:.4f},'
        f'Val Accuracy: {val_acc_percent:.4f}%')