## Imports

In [1]:
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt

from fastprogress.fastprogress import master_bar, progress_bar

from jupyterthemes import jtplot

from torch.utils.data import DataLoader, Subset, Dataset
from torchsummary import summary
from torchvision.datasets import CIFAR10
from torchvision.transforms import Compose, Normalize, ToTensor
from torchvision.utils import make_grid
from torchvision import transforms

from PIL import Image

import os

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

jtplot.style(context="talk")

In [68]:
#makes github ignore all the data

with open(".gitignore", "w") as f:
    f.write("""
# Ignore image data folders
tiny-imagenet-200/
tiny-imagenet-200-grayscale/

# Ignore any .DS_Store or similar files
.DS_Store

# Ignore Python cache files
__pycache__/
*.pyc
""")


## Dataset Download

In [2]:
# Download download Tiny ImageNet directly into your Jupyter Notebook

import os
import requests
import zipfile

# set directory and download URL
url = "http://cs231n.stanford.edu/tiny-imagenet-200.zip"
output_dir = "tiny-imagenet-200"
zip_filename = "tiny-imagenet-200.zip"

# download zip file if it doesn't exist
if not os.path.exists(zip_filename):
    print("Downloading Tiny ImageNet...")
    r = requests.get(url, stream=True)
    with open(zip_filename, 'wb') as f:
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)
    print("Download complete.")

# unzip if not already extracted
if not os.path.exists(output_dir):
    print("Extracting zip file...")
    with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
        zip_ref.extractall()
    print("Extraction complete.")
else:
    print("Already extracted.")



Already extracted.


In [44]:
def convert_jpegs_to_bw(source_root, dest_root):
    source_root = Path(source_root)
    dest_root = Path(dest_root)

    dest_root.mkdir(parents=True, exist_ok=True)

    # Collect all image paths first
    all_files = []
    for root, dirs, files in os.walk(source_root):
        for file in files:
            if file.lower().endswith((".jpg", ".jpeg", ".png", ".jpeg".upper(), ".jpg".upper())):
                all_files.append((root, file))

    # Convert with progress bar
    for root, file in tqdm(all_files, desc="Converting images"):
        source_path = Path(root) / file
        relative_path = source_path.relative_to(source_root)
        dest_path = dest_root / relative_path

        dest_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            img = Image.open(source_path).convert("L")
            img.save(dest_path)
        except Exception as e:
            print(f"Failed to process {source_path}: {e}")

splits = ["train", "val", "test"]
for split in splits:
    source = f"tiny-imagenet-200/{split}"
    dest = f"tiny-imagenet-200-grayscale/{split}"
    print(f"\n📂 Processing: {split}")
    convert_jpegs_to_bw(source, dest)


📂 Processing: train


Converting images:   0%|          | 0/100000 [00:00<?, ?it/s]


📂 Processing: val


Converting images:   0%|          | 0/10001 [00:00<?, ?it/s]


📂 Processing: test


Converting images:   0%|          | 0/10000 [00:00<?, ?it/s]

In [42]:
print(os.getcwd())

/home/jmpb2020/Tintelligence/notebooks


In [43]:
source_folder = "tiny-imagenet-200/train"
destination_folder = "tiny-imagenet-200-grayscale/train"

convert_jpegs_to_bw(source_folder, destination_folder)

Converting images:   0%|          | 0/100000 [00:00<?, ?it/s]

In [45]:
# # Choose a sample image from Tiny ImageNet
# sample_img_path = os.path.join("tiny-imagenet-200", "train", "n01443537", "images")
# sample_img = os.listdir(sample_img_path)[0]
# full_img_path = os.path.join(sample_img_path, sample_img)

# # Load images
# img_color = Image.open(full_img_path).convert('RGB')
# img_gray = img_color.convert('L')

# # Plot side by side
# fig, axs = plt.subplots(1, 2, figsize=(8, 4))

# axs[0].imshow(img_color)
# axs[0].set_title("Original Color")
# axs[0].axis('off')

# axs[1].imshow(img_gray, cmap='gray')
# axs[1].set_title("Grayscale Version")
# axs[1].axis('off')

# plt.tight_layout()
# plt.show()


## Dataset Utility

In [46]:
class ColorizationDataset(Dataset):
    def __init__(self, grayscale_dir, color_dir, transform_input=None, transform_target=None):
        self.grayscale_dir = grayscale_dir
        self.color_dir = color_dir
        self.transform_input = transform_input
        self.transform_target = transform_target
        self.samples = []

        for class_name in os.listdir(grayscale_dir):
            gray_class_path = os.path.join(grayscale_dir, class_name, "images")
            color_class_path = os.path.join(color_dir, class_name, "images")

            if not os.path.isdir(gray_class_path):
                continue

            for img_name in os.listdir(gray_class_path):
                gray_img_path = os.path.join(gray_class_path, img_name)
                color_img_path = os.path.join(color_class_path, img_name)

                if os.path.exists(color_img_path):
                    self.samples.append((gray_img_path, color_img_path))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        gray_path, color_path = self.samples[idx]
        gray = Image.open(gray_path).convert("L")
        color = Image.open(color_path).convert("RGB")

        if self.transform_input:
            gray = self.transform_input(gray)
        if self.transform_target:
            color = self.transform_target(color)

        return gray, color

In [47]:
def get_colorization_loaders(grayscale_path, color_path, batch_size=32):
    transform_input = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
    ])

    transform_target = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
    ])

    dataset = ColorizationDataset(grayscale_path, color_path, transform_input, transform_target)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    return loader


In [48]:
import os
print("Current Working Directory:", os.getcwd())


Current Working Directory: /home/jmpb2020/Tintelligence/notebooks


In [49]:
print("Grayscale Directory Exists:", os.path.exists("tiny-imagenet-200-grayscale/train"))

Grayscale Directory Exists: True


In [52]:
import os

print("Grayscale root exists:", os.path.exists("tiny-imagenet-200-grayscale"))
print("Grayscale train exists:", os.path.exists("tiny-imagenet-200-grayscale/train"))
print("Sample class folders:", os.listdir("../tiny-imagenet-200-grayscale/train")[:5] if os.path.exists("../tiny-imagenet-200-grayscale/train") else "❌ No /train folder")


Grayscale root exists: True
Grayscale train exists: True
Sample class folders: ❌ No /train folder


In [53]:
grayscale_path = "./tiny-imagenet-200-grayscale/train"
color_path = "./tiny-imagenet-200/train"
train_loader = get_colorization_loaders(grayscale_path, color_path, batch_size=8)


In [54]:
import os
os.listdir("tiny-imagenet-200-grayscale")

['train', 'val', 'test']

In [55]:
# def get_cifar10_data_loaders(path, batch_size, valid_batch_size=0):
#     # TINT TODO: Change this to utilize some of the training progress we made in bw_colorizer

#     # Data specific transforms
#     data_std = (0.2470, 0.2435, 0.2616)
#     data_mean = (0.4914, 0.4822, 0.4465)
#     xforms = Compose([ToTensor(), Normalize(data_mean, data_std)])

#     # Training dataset and loader
#     train_dataset = CIFAR10(root=path, train=True, download=True, transform=xforms)

#     # Set the batch size to N if batch_size is 0
#     tbs = len(train_dataset) if batch_size == 0 else batch_size
#     train_loader = DataLoader(train_dataset, batch_size=tbs, shuffle=True)

#     valid_dataset = CIFAR10(root=path, train=False, download=True, transform=xforms)

#     # Set the batch size to N if batch_size is 0
#     vbs = len(valid_dataset) if valid_batch_size == 0 else valid_batch_size
#     valid_loader = DataLoader(valid_dataset, batch_size=vbs, shuffle=True)

#     return train_loader, valid_loader

## Training Utility

In [56]:
def train_one_epoch(mb, loader, device, model, criterion, optimizer):

    model.train()

    losses = []

    num_batches = len(loader)
    dataiterator = iter(loader)

    for batch in progress_bar(range(num_batches), parent=mb):

        mb.child.comment = "Training"

        # Grab the batch of data and send it to the correct device
        X, Y = next(dataiterator)
        X, Y = X.to(device), Y.to(device)

        # Compute the output
        output = model(X)

        # Compute loss
        loss = criterion(output, Y)
        losses.append(loss.item())

        # Update parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return losses

## Validation Utility

In [57]:
def validate(mb, loader, device, model, criterion):

    model.eval()

    losses = []
    num_correct = 0

    num_classes = len(loader.dataset.classes)
    class_correct = [0] * num_classes
    class_total = [0] * num_classes

    N = len(loader.dataset)
    num_batches = len(loader)
    dataiterator = iter(loader)

    with torch.no_grad():

        batches = range(num_batches)
        batches = progress_bar(batches, parent=mb) if mb else batches
        for batch in batches:

            if mb:
                mb.child.comment = f"Validation"

            # Grab the batch of data and send it to the correct device
            X, Y = next(dataiterator)
            X, Y = X.to(device), Y.to(device)

            output = model(X)

            loss = criterion(output, Y)
            losses.append(loss.item())

            # Convert network output into predictions (one-hot -> number)
            predictions = output.argmax(dim=1)

            # Sum up total number that were correct
            comparisons = predictions == Y
            num_correct += comparisons.type(torch.float).sum().item()

            # Sum up number of correct per class
            for result, clss in zip(comparisons, Y):
                class_correct[clss] += result.item()
                class_total[clss] += 1

    accuracy = 100 * (num_correct / N)
    accuracies = {
        clss: 100 * class_correct[clss] / class_total[clss]
        for clss in range(num_classes)
    }

    return losses, accuracy, accuracies

## Loss Plotting Utility

In [58]:
def update_plots(mb, train_losses, valid_losses, epoch, num_epochs):

    # Update plot data
    max_loss = max(max(train_losses), max(valid_losses))
    min_loss = min(min(train_losses), min(valid_losses))

    x_margin = 0.2
    x_bounds = [0 - x_margin, num_epochs + x_margin]

    y_margin = 0.1
    y_bounds = [min_loss - y_margin, max_loss + y_margin]

    train_xaxis = torch.linspace(0, epoch + 1, len(train_losses))
    valid_xaxis = torch.linspace(0, epoch + 1, len(valid_losses))
    graph_data = [[train_xaxis, train_losses], [valid_xaxis, valid_losses]]

    mb.update_graph(graph_data, x_bounds, y_bounds)

## Data Loading

In [59]:
# TODO: tune the training batch size
train_batch_size = 128

# Let's use some shared space for the data (so that we don't have copies
# sitting around everywhere)
data_path = "~/data"

# Use the GPUs if they are available
# TODO: if you run into GPU memory errors you should set device to "cpu" and restart the notebook
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using '{device}' device.")

valid_batch_size = 5000
grayscale_path = "./tiny-imagenet-200-grayscale/train"
color_path = "./tiny-imagenet-200/train"
train_loader = get_colorization_loaders(grayscale_path, color_path, batch_size=8)

# Input and output sizes depend on data
class_names = sorted(os.listdir(grayscale_path))
num_classes = len(class_names)

# print(class_names)

Using 'cuda' device.


In [60]:
# # Grab a bunch of images and change the range to [0, 1]
# nprint = 64
# images = torch.tensor(train_loader.dataset.data[:nprint] / 255)
# targets = train_loader.dataset.targets[:nprint]
# labels = [f"{class_names[target]:>10}" for target in targets]

# # Create a grid of the images (make_grid expects (BxCxHxW))
# image_grid = make_grid(images.permute(0, 3, 1, 2))

# _, ax = plt.subplots(figsize=(16, 16))
# ax.imshow(image_grid.permute(1, 2, 0))
# ax.grid(None)

# images_per_row = int(nprint ** 0.5)
# for row in range(images_per_row):
#     start_index = row * images_per_row
#     print(" ".join(labels[start_index : start_index + images_per_row]))

## Model Creation

In [61]:
class NeuralNetwork(nn.Module):
    def __init__(self, layer_sizes):
        super().__init__()

        # The first "layer" just rearranges an image into a column vector
        first_layer = nn.Flatten()

        # The hidden layers include:
        # 1. a linear component (computing Z) and
        # 2. a non-linear comonent (computing A)
        # TODO: add dropout and/or batch normalization
        hidden_layers = [
            nn.Sequential(nn.Linear(nlminus1, nl), nn.ReLU())
            for nl, nlminus1 in zip(layer_sizes[1:-1], layer_sizes)
        ]

        # The output layer must be Linear without an activation. See:
        #   https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
        output_layer = nn.Linear(layer_sizes[-2], layer_sizes[-1])

        # Group all layers into the sequential container
        all_layers = [first_layer, *hidden_layers, output_layer]
        self.layers = nn.Sequential(*all_layers)

    def forward(self, X):
        return self.layers(X)

In [62]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        # TODO: create layers here

        # Early CNNs had the following structure:
        #    X -> [[Conv2d -> ReLU] x N -> MaxPool2d] x M
        #      -> [Linear -> ReLU] x K -> Linear
        #   Where
        #     0 ≤ N ≤ 3
        #     0 ≤ M ≤ 3
        #     0 ≤ K < 3
        #
        # The "[[Conv2d -> ReLU] x N -> MaxPool2d] x M" part extracts
        # useful features, and the "[Linear -> ReLU] x K -> Linear" part
        # performs the classification.
        self.conv_layers = nn.Sequential(
            # Conv layer 1: (3 input channels, 32 output channels)
            nn.Conv2d(3, 32, kernel_size=3, padding=1),  
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # downsample by 2

            # Conv layer 2: (32 input channels, 64 output channels)
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # downsample again
        )

        # After two max-pool layers on 32x32 input → 8x8 feature maps
        self.fc_layers = nn.Sequential(
            nn.Flatten(),  # flatten the output from conv layers
            nn.Linear(64 * 8 * 8, 128),  # fully connected layer
            nn.ReLU(),
            nn.Linear(128, 10)  # 10 output classes for CIFAR-10
        )

    def forward(self, X):

        # TODO: implement forward pass here
        X = self.conv_layers(X)
        X = self.fc_layers(X)
        return X

In [63]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Simple CNN
class ColorizationCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 64, 3, padding=1),  # grayscale input
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, 2, stride=2),  # 3-channel output
            nn.Sigmoid()  # values in [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


In [64]:
# TODO: try out different network widths and depths
# neurons_per_hidden_layer = [1024, 512, 256]
# layer_sizes = [num_features, *neurons_per_hidden_layer, num_classes]
# model = NeuralNetwork(layer_sizes).to(device)

# TODO: complete the CNN class in the cell above this one and then uncomment this line
# model = CNN().to(device)

# TODO: use an off-the-shell model from PyTorch
# from torchvision.models import ...
# model = ...

# TINT TODO: make the output have 3 nodes.
# from torchvision.models import resnet18
# model = resnet18(num_classes=num_classes).to(device)

# summary(model)

#TINT
# Instantiate the model
model = ColorizationCNN().to(device)
summary(model)
# pixel-wise loss
criterion = nn.MSELoss()

# TODO: try out different Adam hyperparameters
optimizer = optim.Adam(model.parameters())
criterion = nn.MSELoss()

Layer (type:depth-idx)                   Param #
├─Sequential: 1-1                        --
|    └─Conv2d: 2-1                       640
|    └─ReLU: 2-2                         --
|    └─MaxPool2d: 2-3                    --
|    └─Conv2d: 2-4                       73,856
|    └─ReLU: 2-5                         --
|    └─MaxPool2d: 2-6                    --
├─Sequential: 1-2                        --
|    └─ConvTranspose2d: 2-7              32,832
|    └─ReLU: 2-8                         --
|    └─ConvTranspose2d: 2-9              771
|    └─Sigmoid: 2-10                     --
Total params: 108,099
Trainable params: 108,099
Non-trainable params: 0


In [66]:
# Test single batch

gray_batch, color_batch = next(iter(train_loader))
gray_batch, color_batch = gray_batch.to(device), color_batch.to(device)
output = model(gray_batch)
print("Output shape:", output.shape)  # Should be [batch_size, 3, 64, 64]


Output shape: torch.Size([8, 3, 64, 64])


## Training and Analysis

In [25]:
# TODO: tune the number of epochs
num_epochs = 3

train_losses = []
valid_losses = []
accuracies = []

# A master bar for fancy output progress
mb = master_bar(range(num_epochs))
mb.names = ["Train Loss", "Valid Loss"]
mb.main_bar.comment = f"Epochs"

# Loss and accuracy prior to training
vl, accuracy, _ = validate(None, valid_loader, device, model, criterion)
valid_losses.extend(vl)
accuracies.append(accuracy)

for epoch in mb:

    tl = train_one_epoch(mb, train_loader, device, model, criterion, optimizer)
    train_losses.extend(tl)

    vl, accuracy, acc_by_class = validate(mb, valid_loader, device, model, criterion)
    valid_losses.extend(vl)
    accuracies.append(accuracy)

    update_plots(mb, train_losses, valid_losses, epoch, num_epochs)

NameError: name 'valid_loader' is not defined

In [None]:
plt.plot(accuracies, '--o')
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.xticks(range(num_epochs+1))
plt.ylim([0, 100])

max_name_len = max(len(name) for name in class_names)

print("Accuracy per class")
for clss in acc_by_class:
    class_name = class_names[clss]
    class_accuracy = acc_by_class[clss]
    print(f"  {class_name:>{max_name_len+2}}: {class_accuracy:.1f}%")

In [None]:
y_trues = []
y_preds = []
model.to(device)
for x, y in valid_loader:
    y_trues.append(y.cpu())
    y_preds.append(model(x.to(device)).argmax(dim=1).cpu())

y_true = torch.hstack(y_trues)
y_pred = torch.hstack(y_preds)

In [None]:
cm = confusion_matrix(y_true, y_pred)
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names).plot();
plt.grid(False)

In [None]:
# TODO: Take the three outputs and reconstruct an image