Create a train and test set (train-test ratio should be 80:20%).


In [3]:
import os
import random
import shutil
from sklearn.model_selection import train_test_split
from glob import glob
from PIL import Image
import augly.image as imaugs
import augly.utils as utils


In [5]:
def split_dataset(data_path, output_path, train_ratio=0.8):
    categories = ["dogs", "cats"]
    train_dir = os.path.join(output_path, "train")
    test_dir = os.path.join(output_path, "test")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    
    for category in categories:
        img_paths = glob(os.path.join(data_path, category, "*.jpg"))
        train_imgs, test_imgs = train_test_split(img_paths, train_size=train_ratio, random_state=42)
        
        train_cat_dir = os.path.join(train_dir, category)
        test_cat_dir = os.path.join(test_dir, category)
        os.makedirs(train_cat_dir, exist_ok=True)
        os.makedirs(test_cat_dir, exist_ok=True)
        
        for img_path in train_imgs:
            shutil.move(img_path, train_cat_dir)
        for img_path in test_imgs:
            shutil.move(img_path, test_cat_dir)
    
    print("Dataset split completed.")

data_path = "./test"
output_path = "./data"
split_dataset(data_path, output_path)

Dataset split completed.


Create Custom Function using Augly which will perform multiple random data augmentation according to input. (At least 10 data augmentation needs to be added like rotate, cropping, blur …)


In [9]:
import augly.image as imaugs

image_path = "./data/test/dogs/dog_43.jpg"
# output_path = "your_output_path.png"

# Augmentation functions can accept image paths as input and
# always return the resulting augmented PIL Image
aug_image = imaugs.overlay_emoji(image_path, opacity=1.0, emoji_size=0.15)

# Augmentation functions can also accept PIL Images as input
aug_image = imaugs.pad_square(aug_image)

# If an output path is specified, the image will also be saved to a file
aug_image = imaugs.overlay_onto_screenshot(aug_image, output_path="./testing/output.png")
aug_image.show()

In [18]:
def augment_image(image_path):
    image = Image.open(image_path).convert("RGB")
    aug_transforms = [
        # imaugs.RandomRotate(),
        # imaugs.functional.Blur,
        # imaugs.Blur(),
        # imaugs.RandomCrop(),
        # imaugs.ColorJitter(),
        # imaugs.HFlip(),
        # imaugs.VFlip(),
        # imaugs.RandomNoise(),
        # imaugs.PerspectiveTransform(),
        # imaugs.OverlayText(text="Augmented"),
        # imaugs.Brightness(factor=random.uniform(0.5, 1.5))
        # imaugs.hflip(),
        # imaugs.crop(),
        imaugs.Resize(),
        imaugs.Blur(),
        # imaugs.ColorJitter(),
        imaugs.RandomNoise(),
        imaugs.Pixelization(),
        imaugs.Rotate(30),
        imaugs.Scale(factor=0.5),
        imaugs.Pad(30),
        lambda img: imaugs.saturation(img, factor=0.5),  # Use a lambda function to pass the image
        lambda img: imaugs.contrast(img, factor=0.5),    # Use a lambda function to pass the image
    ]
    
    augmented_images = []
    for _ in range(3):  # Apply 3 augmentations sequentially
        random.shuffle(aug_transforms)
        aug_image = image
        for aug in aug_transforms[:3]:
            aug_image = aug(aug_image)
        augmented_images.append(aug_image)
    
    return augmented_images

def augment_train_data(train_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    for category in ["dogs", "cats"]:
        category_dir = os.path.join(train_dir, category)
        output_category_dir = os.path.join(output_dir, category)
        os.makedirs(output_category_dir, exist_ok=True)
        
        train_images = glob(os.path.join(category_dir, "*.jpg"))
        
        for img_path in train_images:
            base_name = os.path.basename(img_path)
            augmented_imgs = augment_image(img_path)
            
            for i, aug_img in enumerate(augmented_imgs):
                aug_img.save(os.path.join(output_category_dir, f"aug_{i}_{base_name}"))

train_dir = os.path.join("./data", "train")
test_dir = os.path.join("./data", "test")
augmented_dir = os.path.join("./", "augmented_train")
augment_train_data(train_dir, augmented_dir)


OSError: broken data stream when writing image file

In [None]:

def show_statistics(train_dir, test_dir, augmented_dir):
    original_train_count = sum(len(glob(os.path.join(train_dir, cat, "*.jpg"))) for cat in ["dogs", "cats"])
    test_count = sum(len(glob(os.path.join(test_dir, cat, "*.jpg"))) for cat in ["dogs", "cats"])
    augmented_count = sum(len(glob(os.path.join(augmented_dir, cat, "*.jpg"))) for cat in ["dogs", "cats"])
    
    print(f"Original Train Images: {original_train_count}")
    print(f"Test Images: {test_count}")
    print(f"Augmented Images: {augmented_count}")

show_statistics(train_dir, test_dir, augmented_dir)


In [13]:
import os
import random
from PIL import Image
import augly.image as imaugs

# ------------------------------------------------------------------------------
# Helper function for a random crop augmentation.
# ------------------------------------------------------------------------------
def random_crop(img):
    """
    Randomly crops the image by choosing random margins on all sides.
    Adjusts the crop box relative to the image dimensions.
    """
    w, h = img.size
    # Define random margins (as a fraction of the image dimensions)
    left = int(random.uniform(0, 0.1) * w)
    top = int(random.uniform(0, 0.1) * h)
    right = int(random.uniform(0.9, 1.0) * w)
    bottom = int(random.uniform(0.9, 1.0) * h)
    # If augly.crop is not available, you can use PIL's crop:
    return img.crop((left, top, right, bottom))
    # Alternatively, if imaugs.crop exists, you could use:
    # return imaugs.crop(img, left=left, top=top, right=right, bottom=bottom)

# ------------------------------------------------------------------------------
# Custom augmentation function: applies a random sequence of augmentations.
# ------------------------------------------------------------------------------
def apply_random_augmentations(image, num_ops=3):
    """
    Applies a sequence of `num_ops` random augmentations (selected without replacement)
    from a list of ten possible augmentation operations.
    """
    augmentation_functions = [
        lambda img: imaugs.rotate(img, degrees=30.0),

        lambda img: imaugs.scale(img),
        
        lambda img: imaugs.saturation(img),
        
        lambda img: imaugs.blur(img, radius=random.randint(2, 5)),
        
        lambda img: imaugs.random_noise(img),

        lambda img: imaugs.color_jitter(
            img,
            brightness_factor=random.uniform(0.8, 1.2),
            contrast_factor=random.uniform(0.8, 1.2),
            saturation_factor=random.uniform(0.8, 1.2)
        ),
        
                
        lambda img: imaugs.pad_square(img),

        lambda img: imaugs.pad(img),

        lambda img: imaugs.hflip(img),

        lambda img: imaugs.pixelization(img),

    ]
    
    # Randomly select 'num_ops' operations without replacement.
    selected_ops = random.sample(augmentation_functions, num_ops)
    
    aug_img = image
    for op in selected_ops:
        aug_img = op(aug_img)
    
    return aug_img

# ------------------------------------------------------------------------------
# Function to augment images in a train directory with subdirectories.
# ------------------------------------------------------------------------------
def augment_train_set_with_subdirs(train_dir, output_dir, augmentations_per_image=2):
    """
    For every image in each subdirectory (e.g., 'cats', 'dogs') of the training directory,
    generate a specified number of augmented images using a random chain of augmentations.
    
    Parameters:
      - train_dir: Directory containing subdirectories of images.
      - output_dir: Directory where augmented images will be saved (with the same subdirectory structure).
      - augmentations_per_image: How many augmented images to generate per original image.
    """
    # Create the output directory if it doesn't exist.
    os.makedirs(output_dir, exist_ok=True)
    
    # Define allowed image extensions.
    image_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff"}
    
    # Iterate over each subdirectory in the training directory.
    for subdir in os.listdir(train_dir):
        subdir_path = os.path.join(train_dir, subdir)
        if not os.path.isdir(subdir_path):
            continue  # Skip if it's not a directory.
        
        # Create a corresponding subdirectory in the output directory.
        output_subdir = os.path.join(output_dir, subdir)
        os.makedirs(output_subdir, exist_ok=True)
        
        # Process each image file in the subdirectory.
        for img_file in os.listdir(subdir_path):
            # Check if the file is an image.
            if os.path.splitext(img_file)[1].lower() not in image_extensions:
                continue
            img_path = os.path.join(subdir_path, img_file)
            
            try:
                img = Image.open(img_path)
            except Exception as e:
                print(f"Could not open {img_path}: {e}")
                continue
            
            # Create the specified number of augmented versions for each image.
            for i in range(augmentations_per_image):
                aug_img = apply_random_augmentations(img, num_ops=3)
                filename, ext = os.path.splitext(img_file)
                new_filename = f"{filename}_aug_{i}{ext}"
                output_path = os.path.join(output_subdir, new_filename)
                aug_img.save(output_path)
                print(f"Saved augmented image: {output_path}")

# ------------------------------------------------------------------------------
# Example usage:
# ------------------------------------------------------------------------------
if __name__ == "__main__":
    # Paths to your training data and where you want the augmented images saved.
    train_directory = "./data/train"            # e.g., "./data/train"
    augmented_output_directory = "./data/augmented_train"  # e.g., "./data/augmented_train"
    
    # For each image in the subdirectories, create 2 augmented images.
    augment_train_set_with_subdirs(train_directory, augmented_output_directory, augmentations_per_image=2)


Saved augmented image: ./data/augmented_train\cats\cat_106_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_106_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_109_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_109_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_113_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_113_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_118_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_118_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_119_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_119_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_124_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_124_aug_1.jpg
Saved augmented image: ./data/augmented_train\cats\cat_156_aug_0.jpg
Saved augmented image: ./data/augmented_train\cats\cat_156_aug_1.jpg
Saved augmented image: ./data/augm

In [19]:
DIR = "./data/augmented_train/dogs"
import os

_, _, files = next(os.walk(DIR))
file_count = len(files)
file_count

112

In [27]:
DIR1 = "./data/augmented_train/cats"
DIR2 = "./data/augmented_train/dogs"
import os

_, _, files = next(os.walk(DIR))
file_count = len(files)
print(file_count)

_, _, files = next(os.walk(DIR))
file_count = len(files)
print(file_count)

112
112


In [None]:
DIR1 = "./data/train/dogs"
DIR2 = "./data/train/cats"

import os

_, _, files = next(os.walk(DIR1))
file_count = len(files)
print(file_count)
_, _, files = next(os.walk(DIR2))
file_count = len(files)
print(file_count)


56
56


In [25]:
# 224 -> augmented train
# 112 -> train original
import os
import shutil

def merge_directories(src_dir1, src_dir2, dest_dir):
    """
    Merges the contents of src_dir1 and src_dir2 into dest_dir.
    
    If files with the same name exist in both directories, the file from the second
    directory will be renamed to avoid overwriting.
    """
    # Create the destination directory if it doesn't exist.
    os.makedirs(dest_dir, exist_ok=True)

    def copy_contents(src):
        for root, dirs, files in os.walk(src):
            # Compute the relative path from the source directory.
            rel_path = os.path.relpath(root, src)
            # Define the corresponding destination directory.
            dest_subdir = os.path.join(dest_dir, rel_path)
            os.makedirs(dest_subdir, exist_ok=True)
            
            for file in files:
                src_file = os.path.join(root, file)
                dest_file = os.path.join(dest_subdir, file)
                
                # Check for filename conflicts: if the destination file exists,
                # append a counter to the filename.
                if os.path.exists(dest_file):
                    base, ext = os.path.splitext(file)
                    counter = 1
                    new_filename = f"{base}_{counter}{ext}"
                    dest_file = os.path.join(dest_subdir, new_filename)
                    while os.path.exists(dest_file):
                        counter += 1
                        new_filename = f"{base}_{counter}{ext}"
                        dest_file = os.path.join(dest_subdir, new_filename)
                
                shutil.copy2(src_file, dest_file)
                print(f"Copied {src_file} to {dest_file}")

    # First copy contents from the first source directory.
    copy_contents(src_dir1)
    # Then copy contents from the second source directory.
    copy_contents(src_dir2)

if __name__ == "__main__":
    # Define your source directories and the destination directory.
    train_dir = "./data/train"
    augmented_train_dir = "./data/augmented_train"
    final_train_dir = "./data/final_train"

    merge_directories(train_dir, augmented_train_dir, final_train_dir)
    print("Merge complete! All files are in the final_train directory.")


Copied ./data/train\cats\cat_106.jpg to ./data/final_train\cats\cat_106.jpg
Copied ./data/train\cats\cat_109.jpg to ./data/final_train\cats\cat_109.jpg
Copied ./data/train\cats\cat_113.jpg to ./data/final_train\cats\cat_113.jpg
Copied ./data/train\cats\cat_118.jpg to ./data/final_train\cats\cat_118.jpg
Copied ./data/train\cats\cat_119.jpg to ./data/final_train\cats\cat_119.jpg
Copied ./data/train\cats\cat_124.jpg to ./data/final_train\cats\cat_124.jpg
Copied ./data/train\cats\cat_156.jpg to ./data/final_train\cats\cat_156.jpg
Copied ./data/train\cats\cat_162.jpg to ./data/final_train\cats\cat_162.jpg
Copied ./data/train\cats\cat_18.jpg to ./data/final_train\cats\cat_18.jpg
Copied ./data/train\cats\cat_190.jpg to ./data/final_train\cats\cat_190.jpg
Copied ./data/train\cats\cat_203.jpg to ./data/final_train\cats\cat_203.jpg
Copied ./data/train\cats\cat_223.jpg to ./data/final_train\cats\cat_223.jpg
Copied ./data/train\cats\cat_244.jpg to ./data/final_train\cats\cat_244.jpg
Copied ./data/

In [26]:
DIR1 = "./data/final_train/dogs"
DIR2 = "./data/final_train/cats"

import os

_, _, files = next(os.walk(DIR1))
file_count = len(files)
print(file_count)
_, _, files = next(os.walk(DIR2))
file_count = len(files)
print(file_count)


168
168


So now the total train set has thrice the size of the original train set

## Task 2 

In [None]:
%pip install transformers
%pip install torch
%pip install datasets

In [35]:

from transformers import AutoImageProcessor, ResNetForImageClassification
import torch
from datasets import load_dataset

image = Image.open("./data/final_train/dogs/dog_536_aug_0.jpg")

processor = AutoImageProcessor.from_pretrained("microsoft/resnet-50")
model = ResNetForImageClassification.from_pretrained("microsoft/resnet-50")

inputs = processor(image, return_tensors="pt")

with torch.no_grad():
    logits = model(**inputs).logits

# model predicts one of the 1000 ImageNet classes
predicted_label = logits.argmax(-1).item()
print(model.config.id2label[predicted_label])


Labrador retriever


In [1]:
%pip install torchvision

Collecting torchvision
  Downloading torchvision-0.21.0-cp312-cp312-win_amd64.whl.metadata (6.3 kB)
Downloading torchvision-0.21.0-cp312-cp312-win_amd64.whl (1.6 MB)
   ---------------------------------------- 0.0/1.6 MB ? eta -:--:--
   ---------------------------------------- 1.6/1.6 MB 9.3 MB/s eta 0:00:00
Installing collected packages: torchvision
Successfully installed torchvision-0.21.0
Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import copy
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
from transformers import ResNetForImageClassification, ResNetConfig
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
# Setting the random seeds for debubing
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)

train_dir_non_aug = "./data/train"
train_dir_aug = "./data/final_train"
test_dir = "./data/test"


In [None]:
train_transform_non_aug = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    # Using ImageNet statistics for normalization.
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# same pre processing as above
train_transform_aug = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# For the test set also same pre processing
test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Creating datasets using ImageFolder.
train_dataset_non_aug = datasets.ImageFolder(train_dir_non_aug, transform=train_transform_non_aug)
train_dataset_aug = datasets.ImageFolder(train_dir_aug, transform=train_transform_aug)
test_dataset = datasets.ImageFolder(test_dir, transform=test_transform)

# Creating DataLoaders with batch size 32.
batch_size = 32
train_loader_non_aug = DataLoader(train_dataset_non_aug, batch_size=batch_size, shuffle=True, num_workers=4)
train_loader_aug = DataLoader(train_dataset_aug, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Determine the number of classes (for example, cats and dogs -> 2).
num_classes = len(train_dataset_non_aug.classes)
print("Number of classes:", num_classes)

Note: Initial weights of the model should be the same when training with both datasets.

- Choose (microsoft/resnet-50)model from the hugging face and initialize its new weights.

In [None]:
# Get the configuration from the Hugging Face hub.
config = ResNetConfig.from_pretrained("microsoft/resnet-50", num_labels=num_classes)

# Initialize a model with new (random) weights.
model_init = ResNetForImageClassification(config)
# Save the initial state dict.
initial_state_dict = copy.deepcopy(model_init.state_dict())

# Create two separate model instances and load the same initial weights.
model_non_aug = ResNetForImageClassification(config)
model_non_aug.load_state_dict(copy.deepcopy(initial_state_dict))

model_aug = ResNetForImageClassification(config)
model_aug.load_state_dict(copy.deepcopy(initial_state_dict))

# Move models to device.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_non_aug.to(device)
model_aug.to(device)

Train model(created in the above point) on a downloaded dataset, without augmentation.

Train model(created in the above point) on a downloaded dataset, without augmentation.

Train model(created in the first point) on a downloaded dataset with augmentation.
Get the precision, recall, F1 score, and accuracy of both the models on the test set.


In [3]:



criterion = nn.CrossEntropyLoss()
optimizer_non_aug = optim.Adam(model_non_aug.parameters(), lr=1e-4)
optimizer_aug = optim.Adam(model_aug.parameters(), lr=1e-4)

num_epochs = 10

def train_model(model, optimizer, dataloader):
    model.train()
    running_loss = 0.0
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        # For Hugging Face image models, the output is typically a ModelOutput object.
        outputs = model(images).logits  # logits shape: [batch_size, num_labels]
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss

def evaluate_model(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images).logits
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    # Compute evaluation metrics.
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)
    return accuracy, precision, recall, f1


print("----- Training model on non-augmented data -----")
for epoch in range(num_epochs):
    loss = train_model(model_non_aug, optimizer_non_aug, train_loader_non_aug)
    print(f"[Non-Aug] Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

acc_non_aug, prec_non_aug, rec_non_aug, f1_non_aug = evaluate_model(model_non_aug, test_loader)
print("\nNon-Augmented Model Metrics on Test Set:")
print(f"Accuracy:  {acc_non_aug:.4f}")
print(f"Precision: {prec_non_aug:.4f}")
print(f"Recall:    {rec_non_aug:.4f}")
print(f"F1 Score:  {f1_non_aug:.4f}")

print("\n----- Training model on augmented data -----")
for epoch in range(num_epochs):
    loss = train_model(model_aug, optimizer_aug, train_loader_aug)
    print(f"[Augmented] Epoch {epoch+1}/{num_epochs}, Loss: {loss:.4f}")

acc_aug, prec_aug, rec_aug, f1_aug = evaluate_model(model_aug, test_loader)
print("\nAugmented Model Metrics on Test Set:")
print(f"Accuracy:  {acc_aug:.4f}")
print(f"Precision: {prec_aug:.4f}")
print(f"Recall:    {rec_aug:.4f}")
print(f"F1 Score:  {f1_aug:.4f}")


Number of classes: 2
----- Training model on non-augmented data -----
[Non-Aug] Epoch 1/10, Loss: 0.7980
[Non-Aug] Epoch 2/10, Loss: 0.6556
[Non-Aug] Epoch 3/10, Loss: 0.7312
[Non-Aug] Epoch 4/10, Loss: 0.6862
[Non-Aug] Epoch 5/10, Loss: 0.6715
[Non-Aug] Epoch 6/10, Loss: 0.6266
[Non-Aug] Epoch 7/10, Loss: 0.5542
[Non-Aug] Epoch 8/10, Loss: 0.5976
[Non-Aug] Epoch 9/10, Loss: 0.5467
[Non-Aug] Epoch 10/10, Loss: 0.4535

Non-Augmented Model Metrics on Test Set:
Accuracy:  0.5000
Precision: 0.5000
Recall:    0.5000
F1 Score:  0.4556

----- Training model on augmented data -----
[Augmented] Epoch 1/10, Loss: 0.7630
[Augmented] Epoch 2/10, Loss: 0.7279
[Augmented] Epoch 3/10, Loss: 0.6928
[Augmented] Epoch 4/10, Loss: 0.6354
[Augmented] Epoch 5/10, Loss: 0.6439
[Augmented] Epoch 6/10, Loss: 0.6055
[Augmented] Epoch 7/10, Loss: 0.5516
[Augmented] Epoch 8/10, Loss: 0.4927
[Augmented] Epoch 9/10, Loss: 0.4698
[Augmented] Epoch 10/10, Loss: 0.5199

Augmented Model Metrics on Test Set:
Accuracy: 