<a href="https://colab.research.google.com/github/DianeMack22/econ8310-assignment3/blob/main/assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# -----------------------------
# 1. Install and import tools
# -----------------------------
!pip install -q gdown
import gdown
import os
import gzip
import shutil
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import struct

# -----------------------------
# 2. Download .gz files from Google Drive
# -----------------------------
os.makedirs("data", exist_ok=True)

drive_files = {
    # Train images
    "1pe4h0HUyugjAvSp8-xUbK61svrkcnF4x": "data/train-images-idx3-ubyte.gz",
    # Train labels
    "12vOBpJKWuW2_y5R__Dv16IiGJ1YPJiq2": "data/train-labels-idx1-ubyte.gz",
    # Test images
    "1F7k0T5nC0XDufouFzU9QB0LPcfxcEFkx": "data/t10k-images-idx3-ubyte.gz",
    # Test labels
    "1wWYt5HKjf1s-R9XzfkaYQesuOL7BZFHM": "data/t10k-labels-idx1-ubyte.gz"
}

for file_id, dest_path in drive_files.items():
    if not os.path.exists(dest_path):
        gdown.download(f"https://drive.google.com/uc?id={file_id}", dest_path, quiet=False)

# -----------------------------
# 3. Extract all .gz files
# -----------------------------
gz_files = list(drive_files.values())  # use the same files we downloaded

for gz_path in gz_files:
    out_path = gz_path[:-3]  # remove '.gz'
    if not os.path.exists(out_path):
        with gzip.open(gz_path, 'rb') as f_in:
            with open(out_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

# Note: ChatGPT, Google, and Gemini assisted significantly with Sections 2-3, and
# had some influence in the debugging process for the rest of the code.

# -----------------------------
# 4. Define CustomMNIST Dataset
# -----------------------------
class CustomMNIST(Dataset):
    def __init__(self, image_path, label_path, transform=None):
        self.images = self._read_images(image_path)
        self.labels = self._read_labels(label_path)
        self.transform = transform

    def _read_images(self, path):
      with open(path, 'rb') as f:
        magic, num, rows, cols = struct.unpack(">IIII", f.read(16))
        expected_size = num * rows * cols
        image_data = np.frombuffer(f.read(), dtype=np.uint8)

        print(f"Image data shape: {image_data.shape}, Expected shape: ({num}, {rows}, {cols})")

        if image_data.size < expected_size:
            num_actual = image_data.size // (rows * cols)
            print(f"Reshape failed, adjusted number of images to: {num_actual}")
            image_data = image_data[:num_actual * rows * cols]
            images = image_data.reshape((num_actual, rows, cols))
        else:
            images = image_data.reshape((num, rows, cols))
      return images

    def _read_labels(self, path):
      with open(path, 'rb') as f:
        magic, num = struct.unpack(">II", f.read(8))
        label_data = np.frombuffer(f.read(), dtype=np.uint8)

        if label_data.size != num:
            print(f"Label count mismatch: expected {num}, got {label_data.size}")
            num_actual = min(num, label_data.size)
            label_data = label_data[:num_actual]
        return label_data

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)
        else:
            # Normalize and add channel dimension [1, 28, 28]
            image = torch.tensor(image, dtype=torch.float32).unsqueeze(0) / 255.0
        return image, label

# -----------------------------
# 5. Create datasets and DataLoaders
# -----------------------------
train_dataset = CustomMNIST(
    image_path="data/train-images-idx3-ubyte",
    label_path="data/train-labels-idx1-ubyte"
)

test_dataset = CustomMNIST(
    image_path="data/t10k-images-idx3-ubyte",
    label_path="data/t10k-labels-idx1-ubyte"
)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

Image data shape: (47040000,), Expected shape: (60000, 28, 28)
Image data shape: (7840000,), Expected shape: (10000, 28, 28)


In [9]:
import plotly.express as px

idx = 100

image, label = train_dataset[idx]
print(f"This image is labeled a {label}")
px.imshow(image.squeeze().numpy(), color_continuous_scale="gray").show()

This image is labeled a 8


In [13]:
# -----------------------------
# 6. Create model
# -----------------------------
import torch.nn as nn
import torch.nn.functional as F

class FirstNet(nn.Module):
  def __init__(self):
    # Define the components of the model
    super(FirstNet, self).__init__()
    # Function to flatten our image
    self.flatten = nn.Flatten()
    # Create the sequence of our network
    self.linear_relu_model = nn.Sequential(
        # Add a linear output layer with 10 perceptrons
        nn.LazyLinear(10),
    )

  def forward(self, x):
    # Construct the sequencing of the model here
    x = self.flatten(x)
    # Pass flattened images through our sequence
    output = self.linear_relu_model(x)

    # Return the evaluations of our ten classes as a 10-dimensional vector
    return output

  # Create an instance of our model
model = FirstNet()

In [14]:
# -----------------------------
# 7. Prepare to Train
# -----------------------------
# Define training parameters
learning_rate = 1e-2
batch_size = 64
epochs = 20

# Define the loss function, for multiclass problems
loss_fn = nn.CrossEntropyLoss()

In [15]:
# Build our optimizer with the parameters from the model we defined, and the learning rate we selected
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [16]:
def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)
  # Set the model to training mode, important for batch normalization & dropout layers
  model.train()
  # Loop over batches via the dataloader
  for batch, (X, y) in enumerate(dataloader):
    # Compute prediction and loss
    pred = model(X)
    loss = loss_fn(pred, y)

    # Backpropagation and looking for improved gradients
    loss.backward()
    optimizer.step()
    # Zeroing out the gradient (otherwise they are summed) in prep for next round
    optimizer.zero_grad()

    # Print progress update every few loops
    if batch % 10 == 0:
      loss, current = loss.item(), (batch + 1) * len(X)
      print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [17]:
# -----------------------------
# 8. Prepare to Train AND Test the Model
# -----------------------------
def test_loop(dataloader, model, loss_fn):
  # Set the model to evaluation mode
  model.eval()
  size = len(dataloader.dataset)
  num_batches = len(dataloader)
  test_loss, correct = 0, 0

  # Evaluating the model with torch.no_grad()
  with torch.no_grad():
    for X, y in dataloader:
      pred = model(X)
      test_loss += loss_fn(pred, y).item()
      correct += (pred.argmax(1) ==y).type(torch.float).sum().item()

  # Printing some output after a testing round
  test_loss /= num_batches
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [19]:
# -----------------------------
# 9. Train the Model
# -----------------------------
# Repeat the training process for each epoch
for t in range(epochs):
  print(f"Epoch {t+1} \n---------------------------")
  train_loop(train_loader, model, loss_fn, optimizer)
  test_loop(test_loader, model, loss_fn)
print("Done!")

Epoch 1 
---------------------------
loss: 2.307655  [   64/60000]
loss: 2.121762  [  704/60000]
loss: 1.981479  [ 1344/60000]
loss: 1.795505  [ 1984/60000]
loss: 1.699080  [ 2624/60000]
loss: 1.682289  [ 3264/60000]
loss: 1.567694  [ 3904/60000]
loss: 1.559464  [ 4544/60000]
loss: 1.460485  [ 5184/60000]
loss: 1.394568  [ 5824/60000]
loss: 1.357782  [ 6464/60000]
loss: 1.420395  [ 7104/60000]
loss: 1.327754  [ 7744/60000]
loss: 1.259997  [ 8384/60000]
loss: 1.223877  [ 9024/60000]
loss: 1.204618  [ 9664/60000]
loss: 1.063499  [10304/60000]
loss: 1.194144  [10944/60000]
loss: 1.033257  [11584/60000]
loss: 1.083131  [12224/60000]
loss: 1.069833  [12864/60000]
loss: 1.051857  [13504/60000]
loss: 1.077788  [14144/60000]
loss: 1.025258  [14784/60000]
loss: 0.982967  [15424/60000]
loss: 0.934557  [16064/60000]
loss: 0.978111  [16704/60000]
loss: 1.052807  [17344/60000]
loss: 0.966036  [17984/60000]
loss: 0.944122  [18624/60000]
loss: 0.948546  [19264/60000]
loss: 1.026074  [19904/60000]
los

In [20]:
# -----------------------------
# 10. Saving the Model
# -----------------------------
EPOCH = epochs
PATH = "model1.pt"
torch.save({
            'epoch': EPOCH,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss_fn,
            }, PATH)

In [22]:
# -----------------------------
# 11. Loading Data Back In
# -----------------------------
PATH = "model1.pt"
model = FirstNet()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

checkpoint = torch.load(PATH, weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
EPOCH = checkpoint['epoch']
loss = checkpoint['loss']

model.eval()

FirstNet(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_model): Sequential(
    (0): LazyLinear(in_features=0, out_features=10, bias=True)
  )
)