In [25]:
import os
import random
import shutil
import torch
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import torch.nn.functional as F
from torch import nn
from torchvision import datasets, transforms
from torchvision.transforms import v2
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import ImageFolder # that can be applied on these datasets

# https://www.kaggle.com/code/vikasbhadoria/mnist-data-99-5-accuracy-using-pytorch/

In [26]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # To use to cuda GPU

# data/classifier/
# ├── train/
# │   ├── class_0/  (contains images for class 0)
# │   ├── class_1/  (contains images for class 1)
# │   ├── .../
# └── val/
#     ├── class_0/
#     ├── class_1/
#     ├── .../


In [27]:
input_dir = './data/mnist_images'  # This is the directory with all 60K .png images
train_dir = './data/classifier/train' # 10K
val_dir = './data/classifier/val' # 50K

# Create directories if they don't exist
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

for i in range(10):
    os.makedirs(os.path.join(train_dir, f'class_{i}'), exist_ok=True)
    os.makedirs(os.path.join(val_dir, f'class_{i}'), exist_ok=True)

# Collect all .png files
png_files = [f for f in os.listdir(input_dir) if f.endswith('.png')]

# Classify files by their label
class_files = {str(i): [] for i in range(10)}
for f in png_files:
    class_label = f.split('_')[0]  # Extract class from the filename (class_randomId.png)
    if class_label in class_files:
        class_files[class_label].append(f)

# Progress tracking and batch size control
total_train_size = 10000  # Target for the training set
total_val_size = 50000    # Target for the validation set
train_size_per_class = total_train_size // 10  # Target train size per class (1000)
val_size_per_class = total_val_size // 10      # Target validation size per class (5000)

# Set the deviation limit to 30 files
deviation_limit = 30

# Process each class and ensure each has similar numbers of files
for class_label, files in tqdm(class_files.items(), desc="Processing classes", total=10):
    random.shuffle(files)

    # Split into training and validation based on the batch size rules
    class_total = len(files)
    val_count = min(val_size_per_class, class_total - train_size_per_class)
    train_count = class_total - val_count

    # Ensure the deviation limit of ±30
    if abs(train_count - train_size_per_class) > deviation_limit:
        adjustment = (train_count - train_size_per_class) // abs(train_count - train_size_per_class)
        train_count = train_size_per_class + adjustment * deviation_limit
        val_count = class_total - train_count

    # Separate files into validation and training sets
    val_files = files[:val_count]
    train_files = files[val_count:val_count + train_count]

    # print(f"\nClass {class_label}: Train size {len(train_files)}, Val size {len(val_files)}")

    # Move validation files to the 'val' directory
    for file_name in tqdm(val_files, desc=f"Moving validation files for class {class_label}", leave=False):
        src_path = os.path.join(input_dir, file_name)
        dst_path = os.path.join(val_dir, f'class_{class_label}', file_name)
        shutil.move(src_path, dst_path)
    
    # Move training files to the 'train' directory
    for file_name in tqdm(train_files, desc=f"Moving training files for class {class_label}", leave=False):
        src_path = os.path.join(input_dir, file_name)
        dst_path = os.path.join(train_dir, f'class_{class_label}', file_name)
        shutil.move(src_path, dst_path)

print("Files have been successfully organized into training and validation sets.")



Processing classes: 100%|██████████| 10/10 [00:00<00:00, 367.59it/s]

Files have been successfully organized into training and validation sets.





In [28]:
# Define your transformations (Resize, Tensor conversion, and Normalization)
transform = transforms.Compose([
    transforms.Resize((32, 32)),
    # transforms.Grayscale(num_output_channels=1),  # In case the images are not grayscale
    v2.RGB(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Use ImageFolder to load images from local directories
training_dataset = ImageFolder(root='./data/classifier/train', transform=transform)
validation_dataset = ImageFolder(root='./data/classifier/val', transform=transform)

val_dataset_size = len(validation_dataset)
# Create a subset by specifying indices
# You can select a fixed number or percentage of the dataset for the subset
subset_indices = np.random.choice(val_dataset_size, size=5000, replace=False)  # Subsample 5,000 images
validation_subset_dataset = Subset(validation_dataset, subset_indices)




# Create DataLoader for the training and validation datasets
training_loader = DataLoader(training_dataset, batch_size=100, shuffle=True)
# validation_loader = DataLoader(validation_dataset, batch_size=100, shuffle=False)
validation_loader = validation_subset_loader = DataLoader(validation_subset_dataset, batch_size=100, shuffle=True)

In [29]:
# len(training_dataset)
# len(validation_dataset)
# len(validation_subset_dataset)

In [38]:
class LeNet(nn.Module):
    def __init__(self):
      super().__init__()
      self.conv1 = nn.Conv2d(1, 20, 5, 1) # Conv layer1
      self.conv2 = nn.Conv2d(20, 50, 5, 1) # Conv layer2
      self.fc1 = nn.Linear(4*4*50, 500)    # Fully connected layer1
      self.dropout1 = nn.Dropout(0.5)   # We use dropout layer between these both FCL as they have the highest number of parameters b/t them
      self.fc2 = nn.Linear(500, 10)   # Fully connected layer2
    def forward(self, x):
      x = F.relu(self.conv1(x))  # Apply ReLu to the feature maps produced after Conv 1 layer
      x = F.max_pool2d(x, 2, 2)  # Pooling layer after Conv 1 layer
      x = F.relu(self.conv2(x))  # Apply ReLu to the feature maps produced after Conv 2 layer
      x = F.max_pool2d(x, 2, 2)  # Pooling layer after Conv 2 layer
      print(x.shape)
      batch_size = x.size(0)  # Get the batch size
      x = x.view(batch_size, -1)  # Flatten to [batch_size, 50 * 5 * 5 = 1250]

      # x = x.view(-1, 4*4*50)     # Flattening the output of CNN to feed it into Fully connected layer
      x = F.relu(self.fc1(x))   # Fully connected layer 1 with Relu
      x = self.dropout1(x)     # We use dropout layer between these both FCL as they have the highest number of parameters b/t them
      x = self.fc2(x)         # Fully connected layer 2 with no activation funct as we need raw output from CrossEntropyLoss
      return x

In [39]:
model = LeNet().to(device)
model

LeNet(
  (conv1): Conv2d(1, 20, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=800, out_features=500, bias=True)
  (dropout1): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=500, out_features=10, bias=True)
)

In [40]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)

In [41]:
epochs = 15
running_loss_history = []
running_corrects_history = []
val_running_loss_history = []
val_running_corrects_history = []

for e in range(epochs):
  
  running_loss = 0.0
  running_corrects = 0.0
  val_running_loss = 0.0
  val_running_corrects = 0.0
  
  for inputs, labels in training_loader: # As our train_loader is batch size of 100 and had input images and corresponding labels
    inputs = inputs.to(device)  # Put our inputs and labels in the device as our model is also in the device
    labels = labels.to(device)
    outputs = model(inputs)   # giving input to our model to get corresponding output
    loss = criterion(outputs, labels) # comparing out model's output to original labels
    
    optimizer.zero_grad()  ##sets the initial gradient to zero
    loss.backward()  ## The whole calculated loss is then back propogated to the model
    optimizer.step()  ## Then the weights are updated by doing their derivative w.r.t the Loss
    
    _, preds = torch.max(outputs, 1) # Then we select the max value of raw output and consider it as our prediction. We select it from 10 o/ps
    running_loss += loss.item()  # total loss of 1 epoch
    running_corrects += torch.sum(preds == labels.data) #total accuracy of 1 epoch

  else:
    with torch.no_grad(): # This we done to set no gradient as we do not need it for val as our model is already trained.
      for val_inputs, val_labels in validation_loader:
        val_inputs = val_inputs.to(device)  # Put our val_inputs and labels in the device as our model is also in the device
        val_labels = val_labels.to(device)
        val_outputs = model(val_inputs)
        val_loss = criterion(val_outputs, val_labels)
        
        _, val_preds = torch.max(val_outputs, 1)
        val_running_loss += val_loss.item()
        val_running_corrects += torch.sum(val_preds == val_labels.data)
      
    epoch_loss = running_loss/len(training_loader)
    epoch_acc = running_corrects.float()/ len(training_loader)
    running_loss_history.append(epoch_loss)
    running_corrects_history.append(epoch_acc)
    
    val_epoch_loss = val_running_loss/len(validation_loader)
    val_epoch_acc = val_running_corrects.float()/ len(validation_loader)
    val_running_loss_history.append(val_epoch_loss)
    val_running_corrects_history.append(val_epoch_acc)
    print('epoch :', (e+1))
    print('training loss: {:.4f}, acc {:.4f} '.format(epoch_loss, epoch_acc.item()))
    print('validation loss: {:.4f}, validation acc {:.4f} '.format(val_epoch_loss, val_epoch_acc.item()))

torch.Size([100, 50, 5, 5])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (100x1250 and 800x500)

In [None]:
plt.plot(running_loss_history, label='training loss')
plt.plot(val_running_loss_history, label='validation loss')
plt.legend()

In [None]:
plt.plot(running_corrects_history, label='training accuracy')
plt.plot(val_running_corrects_history, label='validation accuracy')
plt.legend()

In [None]:
import PIL.ImageOps

In [None]:
import requests
from PIL import Image

url = 'https://images.homedepot-static.com/productImages/007164ea-d47e-4f66-8d8c-fd9f621984a2/svn/architectural-mailboxes-house-letters-numbers-3585b-5-64_1000.jpg'
response = requests.get(url, stream = True)
img = Image.open(response.raw)
plt.imshow(img)

In [None]:
img = PIL.ImageOps.invert(img)  # we use Image operations from PIL to invert(i.e. make white black and vice versa)
img = img.convert('1') # we convert from RGB to Gray
img = transform(img) # Apply the transform funct we defined earlier to make our downloaded img same as what we trained on
plt.imshow(im_convert(img))

In [None]:
images = img.to(device)  # As our model is in the device
image = images[0].unsqueeze(0).unsqueeze(0)
output = model(image)
_, pred = torch.max(output, 1)
print(pred.item())

In [None]:
dataiter = iter(validation_loader)
images, labels = dataiter.next()
images = images.to(device)
labels = labels.to(device)
output = model(images)
_, preds = torch.max(output, 1)

fig = plt.figure(figsize=(25, 4))

for idx in np.arange(20):
  ax = fig.add_subplot(2, 10, idx+1, xticks=[], yticks=[])
  plt.imshow(im_convert(images[idx]))
  ax.set_title("{} ({})".format(str(preds[idx].item()), str(labels[idx].item())), color=("green" if preds[idx]==labels[idx] else "red"))