In [17]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:98% !important; }</style>"))

%load_ext autoreload
%autoreload 2

import os
# import numpy as np
# import pandas as pd
# import matplotlib.pyplot as plt
from pprint import pprint

import torch
from torchvision import transforms

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
import sys
sys.path.append('../code_base')  # Only if not installed as a package

from model_suite.utils.common import load_config
from model_suite import training 
from model_suite.dataloader import generate_dataloader
from model_suite.architectures import get_model
from model_suite.evaluation_metrics import cal_classification_metrics
from model_suite.utils.common import visualize_confusion_matrix

In [8]:
torch.cuda.empty_cache()

# Training

In [9]:
config_path = "../configs/classification.yaml"
config = load_config(config_path)
pprint(config)

{'data_loader': {'batch_size': 1, 'num_workers': 2, 'shuffle': True},
 'dataset_loc': {'dataset_type': 'classification',
                 'train': {'degradation_csv': '/cs6945share/retro_project/bdd100k/generated_segments/train.csv',
                           'img_dir': '/cs6945share/retro_project/bdd100k/images/train',
                           'random_subset': None},
                 'val': {'degradation_csv': '/cs6945share/retro_project/bdd100k/generated_segments/val.csv',
                         'img_dir': '/cs6945share/retro_project/bdd100k/images/val',
                         'random_subset': None}},
 'enable_cuda': True,
 'model': {'in_channels': 3, 'out_dim': 3},
 'results_loc': './experiment_results/',
 'training': {'learning_rate': 0.05,
              'loss_fn_name': 'CrossEntropy',
              'num_epochs': 10,
              'resume_checkpoint': None,
              'save_checkpoint_freq': 5}}


In [15]:
# # copy paste config here to edit and experiment
config={
  'data_loader': {
    'batch_size': 1,
    'num_workers': 2,
    'shuffle': True
  },
    
  'dataset_loc': {
    'dataset_type': 'classification',
    'train': {
      'degradation_csv': '/cs6945share/retro_project/bdd100k/generated_segments/train.csv',
      'img_dir': '/cs6945share/retro_project/bdd100k/images/train',
      'random_subset': None
    },
    'val': {
      'degradation_csv': '/cs6945share/retro_project/bdd100k/generated_segments/val.csv',
      'img_dir': '/cs6945share/retro_project/bdd100k/images/val',
      'random_subset': None
    }
  },
  'enable_cuda': True,
    
  'model': {
    'in_channels': 3,
    'out_dim': 3
  },
    
  'results_loc': './experiment_results/',
    
  'training': {
    'learning_rate': 0.05,
    'loss_fn_name': 'CrossEntropy',
    'num_epochs': 10,
    'resume_checkpoint': None,
    'save_checkpoint_freq': 5
  }
}

In [11]:
# get required config parameters
model_config = config["model"]
dataset_config = config["dataset_loc"]
dataloader_config = config["data_loader"]
preprocess_config = config.get("dataset_preprocessing", None)
train_config = config["training"]

In [14]:
if config["enable_cuda"]:
    training.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using DEVICE: {training.DEVICE}")

Using DEVICE: cuda


In [20]:
# update img trasnform if required

train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(p=0.5),  # Flip images horizontally with 50% probability
    transforms.ToTensor()                    # Convert the image to a tensor
])

# generate train data loader
train_loader, train_size = generate_dataloader(dataset_type=dataset_config["dataset_type"],
                                               data_loc=dataset_config["train"],
                                               dataloader_config=dataloader_config,
                                               preprocess_config=preprocess_config)

# generate validation data loader
val_loader, val_size = generate_dataloader(dataset_type=dataset_config["dataset_type"],
                                           data_loc=dataset_config["val"],
                                           dataloader_config=dataloader_config,
                                           preprocess_config=preprocess_config)

print(f"Train Dataset loaded. #samples: {train_size}")
print(f"Validation Dataset loaded. #samples: {val_size}")

FileNotFoundError: [Errno 2] No such file or directory: '/cs6945share/retro_project/bdd100k/generated_segments/train.csv'

In [None]:
# Ensure that we are getting correct data from data loaders
batch_img, degradation_values = next(iter(train_loader))

print("Image batch shape:", batch_img.shape) 
print("Degradation label shape:", degradation_values.shape) 
print("Degradation target:", degradation_values[0])

sample_img = batch_img[0].numpy()   # (c, h, w)
sample_img = sample_img.transpose(1, 2, 0)

sample_mask = batch_mask[0].numpy()   # (c, h, w)
sample_mask = np.squeeze(sample_mask) # (h, w)

fig, ax = plt.subplots(figsize=(15, 5))
ax.imshow(sample_img)
ax.set_title("input image")
plt.show()

In [23]:
# Initializing the model, loss function, and the optimizer
model_name = "cnn_sppf"
model = get_model(model_name, in_channels=model_config['in_channels'], out_channels=model_config['out_channels'])
model = model.to(training.DEVICE)

# fetch loss function as set in config
criterion = get_loss(train_config["loss_fn_name"])

## OR add your own loss fucntion like shown below
# from model_suite.focal_loss import FocalLoss
# num_classes = 3
# alpha = [1.0, 1.2, 1.2]  # Example class weights
# criterion = FocalLoss(gamma=2, alpha=alpha, task_type='multi-class', num_classes=num_classes)

# initialized optimizer as required
optimizer = optim.Adam(model.parameters(), lr=train_config["learning_rate"])

checkpoint_path = train_config["resume_checkpoint"]
if checkpoint_path is not None:
    model.load_state_dict((torch.load(checkpoint_path, weights_only=True)))
    print("Model checkpoint loaded.")

NameError: name 'model_name' is not defined

In [None]:
# train the model
train_loop(model=model, loss_fn=criterion, optimizer=optimizer,
           train_loader=train_loader, val_loader=val_loader,
           num_epochs=train_config["num_epochs"], save_path=config["results_loc"],
           checkpoint_freq=train_config["save_checkpoint_freq"])

In [None]:
# open saved learning curve
plot_saved_path = "experiment_results/train_log/learning_curve_2025-04-14_00-23-53.png"
img = load_image(plot_saved_path)
plt.imshow(img)
plt.axis("off")
plt.show()

In [None]:
# Evaluate model performance at end of training using different losses
train_metrics, train_cm = cal_classification_metrics(model, train_loader, num_classes=3)
val_metrics, val_cm = cal_classification_metrics(model, val_loader, num_classes=3)

In [None]:
stats = pd.DataFrame([train_metrics, val_metrics], index=["train", "val"]).T
display(stats)

In [27]:
visualize_confusion_matrix(train_cm=train_cm, val_cm=val_cm, label_names=["good", "slight", "severe"])

NameError: name 'train_cm' is not defined

In [None]:
csv_path = dataset_config["val"]["degradation_values_csv"]
dir_path = dataset_config["val"]["img_dir"]
df = pd.read_csv(csv_path)
df.head(3)

In [None]:
# row = df.loc[df["degradation_target"]==1].sample(1)
row = df.sample(1)
img_name, label = row["name"].values[0], row["degradation_target"].values[0]
p = os.path.join(dir_path, img_name)
if ".npy" in p:
    img = np.load(p)
else:
    img = load_image(p)

In [None]:
# Generate saliency map
print("target class", label)
saliency = generate_saliency_map(model, img, device=training.DEVICE)

# Plot the saliency map
fig, axes = plt.subplots(1, 2, figsize=(12, 12))
axes[0].imshow(img[:, :, :3])
axes[0].set_title('Original Image')
axes[0].axis('off')

# Plot the saliency map
axes[1].imshow(saliency, cmap='hot')
axes[1].set_title('Saliency Map')
axes[1].axis('off')

plt.tight_layout()
plt.show()

# Inference

In [None]:
saved_weight_path = "experiment_results/checkpoints/cnn_sppf_final_2025-03-13_18-02-43.pth"

model_name = "cnn_sppf"
model_config = {'in_channels': 3, 'out_dim': 1}

# update preprocessing according to training
resize_height, resize_width = 720, 1280

# Define the image transformations
img_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((resize_height, resize_width)),   # ensure resize is same as used during training for loaded model 
    transforms.ToTensor()
])

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using DEVICE: {DEVICE}")

# initialize and load saved model
model = load_saved_model(model_name=model_name, saved_weight_path=saved_weight_path, **model_config)
model = model.to(DEVICE)

test_img_path = "../damage_ratio_calc_data/segmented_objects/191_jpg.rf.e27c030e763e58ce48964e670158b6e7/191_jpg.rf.e27c030e763e58ce48964e670158b6e7_object_4.png"
test_img = load_image(test_img_path)
print("Test image shape:", test_img.shape)
plt.imshow(test_img)
plt.axis("off")
plt.title("Input image", fontsize=9)
plt.show()

pred_val = pred_degradation_value(model=model, test_img=test_img, img_transform=None, add_batch_dim=True, device=DEVICE)
print("Predicticted degradation value:", pred_val)