<a href="https://colab.research.google.com/github/BrajeshSonar/BrajeshSonar/blob/main/Object_Recognition_with_ResNet50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch torchvision torchaudio pillow scikit-learn matplotlib mlflow

Collecting mlflow
  Downloading mlflow-3.4.0-py3-none-any.whl.metadata (30 kB)
Collecting mlflow-skinny==3.4.0 (from mlflow)
  Downloading mlflow_skinny-3.4.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-tracing==3.4.0 (from mlflow)
  Downloading mlflow_tracing-3.4.0-py3-none-any.whl.metadata (19 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting fastmcp<3,>=2.0.0 (from mlflow)
  Downloading fastmcp-2.12.3-py3-none-any.whl.metadata (17 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading databricks_sdk-0.66.0-py3-none-any.whl.metadata (39 kB)
Collecting opentelemetry-proto<3,>=1.9.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading opentelemetry_proto-1.37.0-py3-none-any.w

In [None]:
import torch, torchvision
print(torch.__version__)
print(torch.cuda.is_available())


2.8.0+cu126
True


In [None]:
from torchvision import transforms, datasets
from torch.utils.data import DataLoader

# Normalization values for pretrained ResNet50 (ImageNet stats)
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

# Training transformations (augmentations + resize)
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

# Validation/Test transformations (no heavy augmentations)
val_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

def get_dataloaders(batch_size=64, num_workers=2):
    train_dataset = datasets.CIFAR10(root="data", train=True, download=True, transform=train_transform)
    val_dataset   = datasets.CIFAR10(root="data", train=False, download=True, transform=val_transform)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

    return train_loader, val_loader


In [None]:
train_loader, val_loader = get_dataloaders(batch_size=8)

images, labels = next(iter(train_loader))
print("Batch shape:", images.shape)   # should be [8, 3, 224, 224]
print("Labels:", labels)

100%|██████████| 170M/170M [00:03<00:00, 46.2MB/s]


Batch shape: torch.Size([8, 3, 224, 224])
Labels: tensor([8, 6, 5, 4, 7, 1, 1, 1])


In [None]:
import torch.nn as nn
import torchvision.models as models

def get_resnet50(num_classes=10, pretrained=True, freeze_backbone=False):
    """
    Returns a ResNet50 model adapted for CIFAR-10 classification.

    Args:
        num_classes (int): Number of output classes (default=10 for CIFAR-10).
        pretrained (bool): Whether to use ImageNet pretrained weights.
        freeze_backbone (bool): If True, freezes all layers except the final FC.
    """
    # Load pretrained ResNet50
    # Use weights instead of pretrained
    if pretrained:
        weights = models.ResNet50_Weights.IMAGENET1K_V1
    else:
        weights = None
    model = models.resnet50(weights=weights)

    # Replace final fully connected layer
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)

    # Optionally freeze backbone (all except fc layer)
    if freeze_backbone:
        for name, param in model.named_parameters():
            if "fc" not in name:
                param.requires_grad = False

    return model

In [None]:
import torch

model = get_resnet50(num_classes=10, pretrained=True, freeze_backbone=True)
x = torch.randn(1, 3, 224, 224)   # fake image
out = model(x)
print("Output shape:", out.shape)   # should be [1, 10]

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


100%|██████████| 97.8M/97.8M [00:00<00:00, 179MB/s]


Output shape: torch.Size([1, 10])


In [None]:
import torch
from torch import nn, optim
from torch.cuda.amp import GradScaler # Keep import for now, will update usage
from torch.optim.lr_scheduler import StepLR

# ==== Training script ====
def run_training(epochs=10, batch_size=64, lr=0.01, freeze_backbone=True):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    # Load data
    train_loader, val_loader = get_dataloaders(batch_size=batch_size)

    # Model, loss, optimizer
    model = get_resnet50(num_classes=10, pretrained=True, freeze_backbone=freeze_backbone).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
    scheduler = StepLR(optimizer, step_size=30, gamma=0.1)  # decay LR every 30 epochs
    # Use torch.amp.GradScaler instead of torch.cuda.amp.GradScaler
    scaler = torch.amp.GradScaler('cuda') if device == 'cuda' else torch.cuda.amp.GradScaler()


    best_acc = 0.0

    for epoch in range(1, epochs + 1):
        # Train
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device, scaler)

        # Validate
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)

        # Step the scheduler
        scheduler.step()

        print(f"Epoch [{epoch}/{epochs}]")
        print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"  Val   Loss: {val_loss:.4f}, Val   Acc: {val_acc:.2f}%")

        # Save best model
        if val_acc > best_acc:
            best_acc = val_acc
            # Create artifacts directory if it doesn't exist
            import os
            if not os.path.exists("artifacts"):
                os.makedirs("artifacts")
            torch.save(model.state_dict(), "artifacts/best_model.pt")
            print(f"  ✅ Saved new best model with Acc: {best_acc:.2f}%")

    print("Training complete. Best Val Acc: {:.2f}%".format(best_acc))
    return model

In [None]:
import torch
from torch.cuda.amp import autocast # Keep import for now, will update usage

def train_one_epoch(model, train_loader, criterion, optimizer, device, scaler):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        # Use torch.amp.autocast instead of torch.cuda.amp.autocast
        with torch.amp.autocast(device_type=device):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Use torch.amp.autocast instead of torch.cuda.amp.autocast
            with torch.amp.autocast(device_type=device):
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

In [None]:
trained_model = run_training(epochs=3, batch_size=64, lr=0.01, freeze_backbone=True)


Using device: cuda
Epoch [1/3]
  Train Loss: 0.8065, Train Acc: 72.28%
  Val   Loss: 0.6781, Val   Acc: 77.28%
  ✅ Saved new best model with Acc: 77.28%
Epoch [2/3]
  Train Loss: 0.6969, Train Acc: 76.10%
  Val   Loss: 0.6193, Val   Acc: 79.23%
  ✅ Saved new best model with Acc: 79.23%
Epoch [3/3]
  Train Loss: 0.6675, Train Acc: 77.28%
  Val   Loss: 0.7799, Val   Acc: 76.40%
Training complete. Best Val Acc: 79.23%


In [None]:
import torch
from torch import nn

# CIFAR-10 class names
CIFAR10_CLASSES = ['plane','car','bird','cat','deer','dog','frog','horse','ship','truck']

def load_model(checkpoint_path="artifacts/best_model.pt", num_classes=10, device=None):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    model = get_resnet50(num_classes=num_classes, pretrained=False)
    model.load_state_dict(torch.load(checkpoint_path, map_location=device))
    model.to(device).eval()
    return model

def test_model(model, dataloader, device):
    criterion = nn.CrossEntropyLoss()
    loss, acc = evaluate(model, dataloader, criterion, device)
    print(f"Test Loss: {loss:.4f}, Test Acc: {acc:.2f}%")
    return loss, acc


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
test_loader = get_dataloaders(batch_size=128)[1]  # val_loader = test set
model = load_model("artifacts/best_model.pt", device=device)

test_loss, test_acc = test_model(model, test_loader, device)


Test Loss: 0.6193, Test Acc: 79.24%


In [None]:
from PIL import Image
# from src.utils import val_transform  # reuse transforms

def predict_image(model, image_path, device):
    img = Image.open(image_path).convert("RGB")
    x = val_transform(img).unsqueeze(0).to(device)  # [1, 3, 224, 224]

    with torch.no_grad():
        outputs = model(x)
        probs = torch.softmax(outputs, dim=1)
        conf, pred = torch.max(probs, 1)

    print(f"Predicted: {CIFAR10_CLASSES[pred.item()]} (Confidence: {conf.item():.2f})")
    return CIFAR10_CLASSES[pred.item()], conf.item()

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving dog.jpg.png to dog.jpg.png
User uploaded file "dog.jpg.png" with length 2182946 bytes


In [None]:
# Assuming the user uploaded a single file and its name is the key in the uploaded dictionary
if uploaded:
  uploaded_filename = list(uploaded.keys())[0]
  predict_image(model, uploaded_filename, device)
else:
  print("No file was uploaded.")

Predicted: dog (Confidence: 0.89)


In [None]:
%%writefile Dockerfile
# Base image
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Copy requirements
COPY requirements.txt .

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy project files
COPY . .

# Expose port
EXPOSE 8000

# Run FastAPI
CMD ["uvicorn", "predict_app:app", "--host", "0.0.0.0", "--port", "8000"]

Writing Dockerfile


In [None]:
%%writefile requirements.txt
torch
torchvision
fastapi
uvicorn
Pillow

Writing requirements.txt


I have created the `Dockerfile` and `requirements.txt` files for you.

To build and run the Docker container, you would typically use the following commands in a terminal on a system with Docker installed:

In [None]:
%%writefile predict_app.py
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
from PIL import Image
import torch
# Assuming these are defined in your notebook or accessible
# from src.model import get_resnet50
# from src.utils import val_transform, CIFAR10_CLASSES

# If not using src, use the functions defined in the notebook
# from __main__ import get_resnet50, val_transform, CIFAR10_CLASSES
import sys
sys.path.append('.') # Add current directory to path to import from notebook

# Assuming get_resnet50, val_transform, and CIFAR10_CLASSES are defined in the notebook
from ipykernel import get_ipython
ipython = get_ipython()

def get_notebook_variable(name):
    if ipython is not None:
        return ipython.user_ns.get(name)
    return None

get_resnet50 = get_notebook_variable('get_resnet50')
val_transform = get_notebook_variable('val_transform')
CIFAR10_CLASSES = get_notebook_variable('CIFAR10_CLASSES')


app = FastAPI(title="CIFAR-10 ResNet50 Inference API")

# Load model
device = "cuda" if torch.cuda.is_available() else "cpu"
# Ensure get_resnet50 is available
if get_resnet50 is None:
    raise RuntimeError("get_resnet50 function not found in notebook.")

model = get_resnet50(num_classes=10, pretrained=False)
model.load_state_dict(torch.load("artifacts/best_model.pt", map_location=device))
model.to(device).eval()


def predict_image_bytes(image_bytes):
    img = Image.open(image_bytes).convert("RGB")
    # Ensure val_transform is available
    if val_transform is None:
        raise RuntimeError("val_transform function not found in notebook.")
    x = val_transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(x)
        probs = torch.softmax(outputs, dim=1)
        conf, pred = torch.max(probs, 1)
    # Ensure CIFAR10_CLASSES is available
    if CIFAR10_CLASSES is None:
        raise RuntimeError("CIFAR10_CLASSES not found in notebook.")
    return CIFAR10_CLASSES[pred.item()], float(conf.item())


@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    try:
        class_name, confidence = predict_image_bytes(file.file)
        return JSONResponse({"predicted_class": class_name, "confidence": round(confidence, 2)})
    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=400)

Writing predict_app.py


In [None]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-3.4.0-py3-none-any.whl.metadata (30 kB)
Collecting mlflow-skinny==3.4.0 (from mlflow)
  Downloading mlflow_skinny-3.4.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-tracing==3.4.0 (from mlflow)
  Downloading mlflow_tracing-3.4.0-py3-none-any.whl.metadata (19 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting fastmcp<3,>=2.0.0 (from mlflow)
  Downloading fastmcp-2.12.4-py3-none-any.whl.metadata (19 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading databricks_sdk-0.67.0-py3-none-any.whl.metadata (39 kB)
Collecting opentelemetry-proto<3,>=1.9.0 (from mlflow-skinny==3.4.0->mlflow)
  Downloading opentelemetry_proto-1.37.0-py3-none-any.w

In [None]:
import mlflow
import mlflow.tensorflow


In [None]:
mlflow.set_experiment("Object Recognition with ResNet50")


2025/09/29 14:07:17 INFO mlflow.tracking.fluent: Experiment with name 'Object Recognition with ResNet50' does not exist. Creating a new experiment.


<Experiment: artifact_location='file:///content/mlruns/613307867315159500', creation_time=1759154837052, experiment_id='613307867315159500', last_update_time=1759154837052, lifecycle_stage='active', name='Object Recognition with ResNet50', tags={}>

In [None]:
mlflow.set_experiment("Object Recognition with ResNet50")

<Experiment: artifact_location='file:///content/mlruns/613307867315159500', creation_time=1759154837052, experiment_id='613307867315159500', last_update_time=1759154837052, lifecycle_stage='active', name='Object Recognition with ResNet50', tags={}>

In [None]:
with mlflow.start_run():
    # Define parameters
    epochs = 5
    batch_size = 64
    lr = 0.01
    freeze_backbone = True

    # Log parameters
    mlflow.log_param("model_name", "ResNet50")
    mlflow.log_param("epochs", epochs)
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("lr", lr)
    mlflow.log_param("freeze_backbone", freeze_backbone)


    # Training your model
    trained_model = run_training(epochs=epochs,
                                 batch_size=batch_size,
                                 lr=lr,
                                 freeze_backbone=freeze_backbone)

    # Log metrics after training (metrics are already logged within run_training)
    # No need to log metrics here as they are logged per epoch in run_training


    # Log model
    mlflow.pytorch.log_model(trained_model, "resnet50_model")

Using device: cuda
Epoch [1/5]
  Train Loss: 0.8142, Train Acc: 71.97%
  Val   Loss: 0.6652, Val   Acc: 77.55%
  ✅ Saved new best model with Acc: 77.55%
Epoch [2/5]
  Train Loss: 0.6991, Train Acc: 76.05%
  Val   Loss: 0.6688, Val   Acc: 77.33%
Epoch [3/5]
  Train Loss: 0.6750, Train Acc: 77.05%
  Val   Loss: 0.6363, Val   Acc: 78.11%
  ✅ Saved new best model with Acc: 78.11%
Epoch [4/5]
  Train Loss: 0.6511, Train Acc: 77.92%
  Val   Loss: 0.6335, Val   Acc: 78.37%
  ✅ Saved new best model with Acc: 78.37%




Epoch [5/5]
  Train Loss: 0.6436, Train Acc: 78.11%
  Val   Loss: 0.6344, Val   Acc: 78.35%
Training complete. Best Val Acc: 78.37%


