# Exploration

In [36]:
import os
import pandas as pd
import numpy as np
import cv2
import torch
import torchvision
import sklearn.model_selection
from collections import Counter
import matplotlib.pyplot as plt
import albumentations as A
import torchsummary
import transformers

os.getcwd()

'/home/gpievanitrapletti/genai/multimodality/clip-classifier/notebooks'

## Data

In [3]:
df = pd.read_csv('data/stairs_dataset_annotation.csv')
df['GT'].replace({'curved': 'bent', 'spiral': 'bent'}, inplace=True)

df

Unnamed: 0,filename,GT
0,stairs_001,bent
1,stairs_002,straight
2,stairs_003,bent
3,stairs_004,angular
4,stairs_005,straight
...,...,...
192,stairs_193,angular
193,stairs_194,bent
194,stairs_195,angular
195,stairs_196,straight


In [4]:
df.GT.value_counts()

GT
bent        74
straight    63
angular     60
Name: count, dtype: int64

In [5]:
IMAGE_PATH = 'data/stairs_dataset_20231124'
filepaths = sorted([os.path.join(IMAGE_PATH, fp) for fp in os.listdir(IMAGE_PATH)])
len(filepaths)

197

## Image sizes

In [6]:
# Inspect image size
sizes = [cv2.imread(filepath).shape for filepath in filepaths]
sorted_sizes = sorted(sizes, reverse=True)

In [7]:
# Larger and smaller images
sorted_sizes[0], sorted_sizes[-1]

((5312, 2988, 3), (251, 200, 3))

## CLIP instantiation

In [38]:
clip_model = transformers.CLIPModel.from_pretrained('openai/clip-vit-base-patch32')
clip_model = clip_model.eval()

In [41]:
# Batch of dummy images
input = np.random.randint(low=0, high=255, size=(16, 3, 512, 512))
input.shape

(16, 3, 512, 512)

In [42]:
processor = transformers.CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [43]:
inputs = processor(images=input, return_tensors='pt')

type(inputs), len(list(inputs.items())), inputs['pixel_values'].shape

(transformers.tokenization_utils_base.BatchEncoding,
 1,
 torch.Size([16, 3, 224, 224]))

In [14]:
# Processed input
outputs = clip_model.get_image_features(pixel_values=inputs['pixel_values'])

outputs.shape

torch.Size([16, 512])

In [15]:
# Non processed input
v = np.random.randint(low=0, high=255, size=(16, 3, 224, 224))
v = torch.tensor(v)
outputs2 = clip_model.get_image_features(pixel_values=v)
outputs2.shape

torch.Size([16, 512])

In [16]:
torch.all(outputs == outputs2)

tensor(False)

In [19]:
texts = [
    'Three Rings for the Elven-kings under the sky',
    'Seven for the Dwarf-lords in their halls of stone',
    'Nine for Mortal Men doomed to die'
]

In [30]:
text_inputs = processor(text=texts, return_tensors='pt', padding=True)

text_inputs

{'input_ids': tensor([[49406,  2097,  5751,   556,   518,   544,  1638,   268,  4232,  1798,
           518,  2390, 49407],
        [49406,  5757,   556,   518, 22643,   268, 14969,   530,   911, 18052,
           539,  2441, 49407],
        [49406,  7330,   556, 14680,  1656, 33251,   531,  2082, 49407, 49407,
         49407, 49407, 49407]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]])}

In [34]:
text_embeddings = clip_model.get_text_features(
    input_ids=text_inputs['input_ids'],
    attention_mask=text_inputs['attention_mask'],    
)

text_embeddings.shape

torch.Size([3, 512])

In [35]:
dir(clip_model)

['T_destination',
 '__annotations__',
 '__call__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_apply',
 '_auto_class',
 '_autoset_attn_implementation',
 '_backward_compatibility_gradient_checkpointing',
 '_backward_hooks',
 '_backward_pre_hooks',
 '_buffers',
 '_call_impl',
 '_check_and_enable_flash_attn_2',
 '_check_and_enable_sdpa',
 '_compiled_call_impl',
 '_convert_head_mask_to_5d',
 '_copy_lm_head_original_to_resized',
 '_create_repo',
 '_dispatch_accelerate_model',
 '_expand_inputs_for_generation',
 '_extract_past_from_model_output',
 '_forward_hooks',
 '_forward_hooks_always_called',
 '_forward_hooks_with_k

## ResNet instantiation

In [16]:
model = torchvision.models.resnet50(weights='IMAGENET1K_V2')
# model = torch.hub.load("pytorch/vision", "resnet50", weights="IMAGENET1K_V2")
model = model.eval()

In [14]:
# Transforms applied by the model
torchvision.models.ResNet50_Weights.DEFAULT.transforms()

ImageClassification(
    crop_size=[224]
    resize_size=[232]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BILINEAR
)

In [None]:
# The inference transforms are available at ResNet50_Weights.IMAGENET1K_V2.transforms and perform 
# the following preprocessing operations: Accepts PIL.Image, batched (B, C, H, W) and single (C, H, W) 
# image torch.Tensor objects. The images are resized to resize_size=[232] using 
# interpolation=InterpolationMode.BILINEAR, followed by a central crop of crop_size=[224]. Finally 
# the values are first rescaled to [0.0, 1.0] and then normalized using mean=[0.485, 0.456, 0.406] 
# and std=[0.229, 0.224, 0.225].
# https://pytorch.org/vision/main/models/generated/torchvision.models.resnet50.html

In [15]:
# Random data
input = torch.randn((16, 3, 512, 512))

In [16]:
output = model(input)
output.shape

# torch.Size([16, 1000])

torch.Size([16, 1000])

In [17]:
# # How output changes if I remove the last layer
# # MAKES NOTEBOOK CRASH ON LIGHTNING
# n_layers_to_prune = 1
# pruned_model = torch.nn.Sequential(*list(model.children())[:-n_layers_to_prune])
# pruned_model = pruned_model.eval()
# output = pruned_model(input)
# output.shape

In [15]:
# In-depth check of the output shape of each layer
torchsummary.summary(model, input_size=(3, 224, 224), batch_size=1)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [1, 64, 112, 112]           9,408
       BatchNorm2d-2          [1, 64, 112, 112]             128
              ReLU-3          [1, 64, 112, 112]               0
         MaxPool2d-4            [1, 64, 56, 56]               0
            Conv2d-5            [1, 64, 56, 56]           4,096
       BatchNorm2d-6            [1, 64, 56, 56]             128
              ReLU-7            [1, 64, 56, 56]               0
            Conv2d-8            [1, 64, 56, 56]          36,864
       BatchNorm2d-9            [1, 64, 56, 56]             128
             ReLU-10            [1, 64, 56, 56]               0
           Conv2d-11           [1, 256, 56, 56]          16,384
      BatchNorm2d-12           [1, 256, 56, 56]             512
           Conv2d-13           [1, 256, 56, 56]          16,384
      BatchNorm2d-14           [1, 256,

In [48]:
layers = torch.nn.Sequential(*list(model.children()))

print(len(layers))

# layers

10


In [17]:
named_params = list(model.named_parameters())
len(named_params)

161

In [31]:
# Look at layer names
names = [x[0] for x in named_params]

names[:3]

['conv1.weight', 'bn1.weight', 'bn1.bias']

In [40]:
# Check if layers require grad (that is, are unfrozen)
def check_unfrozen_state(model):
    named_params = list(model.named_parameters())
    return Counter([named_param[1].requires_grad for named_param in named_params])

In [49]:
check_unfrozen_state(model)

Counter({False: 160, True: 1})

In [34]:
def freeze_layers(model, n):
    '''To freeze the layers of a model. The last `n` layers remain unfrozen.
    '''
    n_tot_layers = len(list(model.parameters()))
    idxs_layers_to_freeze = list(range(0, n_tot_layers - n))

    for i, param in enumerate(model.parameters()):
        if i in idxs_layers_to_freeze:    
            param.requires_grad = False

In [41]:
model_temp = torchvision.models.resnet50(weights='IMAGENET1K_V2')
check_unfrozen_state(model_temp)

Counter({True: 161})

In [42]:
freeze_layers(model_temp, 5)
check_unfrozen_state(model_temp)

Counter({False: 156, True: 5})

## Train-Val-Test split

In [None]:
X_train_val, X_test, y_train_val, y_test = sklearn.model_selection.train_test_split(
    df['filename'].to_numpy(), 
    df['GT'].to_numpy(), 
    test_size=0.15, 
    stratify=df['GT'].to_numpy(),
    random_state=42
)

X_train_val.shape, y_train_val.shape, X_test.shape, y_test.shape

((167,), (167,), (30,), (30,))

In [None]:
X_train, X_val, y_train, y_val = sklearn.model_selection.train_test_split(
    X_train_val, 
    y_train_val, 
    test_size=0.2, 
    stratify=y_train_val,
    random_state=42
)

X_train.shape, y_train.shape, X_val.shape, y_val.shape

((133,), (133,), (34,), (34,))

In [None]:
Counter(y_val), Counter(y_test)

(Counter({'bent': 13, 'straight': 11, 'angular': 10}),
 Counter({'bent': 11, 'straight': 10, 'angular': 9}))

In [None]:
val_df = pd.DataFrame({'filename': X_val, 'GT': y_val})

In [None]:
# Probably cleaner approach
df1, df2 = sklearn.model_selection.train_test_split(df, test_size=0.2)

df1.shape, df2.shape

((157, 2), (40, 2))

In [None]:
gts = df2.GT.to_list()
gts

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
encoder = LabelEncoder()
encoder.fit_transform(gts)

array([2, 1, 0, 2, 2, 2, 0, 2, 0, 2, 0, 0, 1, 0, 2, 1, 0, 2, 2, 0, 1, 0,
       2, 1, 2, 1, 1, 0, 2, 0, 1, 2, 1, 0, 2, 1, 1, 0, 1, 2])

In [None]:
encoded = [0 if x=='angular' else 1 if x=='bent' else 2 if x=='straight' else ValueError for x in gts]

In [None]:
torch.nn.functional.one_hot(torch.tensor(encoded[0]))

tensor([0, 0, 1])

In [None]:
x = 'angular'

0 if x=='angular' else 1 if x=='bent' else 2 if x=='straight' else ValueError

0

In [None]:
def from_gt_to_ohe(gt):
    label = 0 if gt =='angular' else 1 if gt =='bent' else 2 if gt =='straight' else ValueError
    ohe = torch.nn.functional.one_hot(torch.tensor(label), num_classes=3)
    return ohe

In [None]:
from_gt_to_ohe('angular'), from_gt_to_ohe('bent'), from_gt_to_ohe('straight')

(tensor([1, 0, 0]), tensor([0, 1, 0]), tensor([0, 0, 1]))

## Transforms

In [6]:
transforms = A.Compose([
    A.LongestMaxSize(max_size=224, interpolation=3, p=1.0),
    A.PadIfNeeded(min_height=224, min_width=224, border_mode=0, value=0, mask_value=0, p=1.0),

])

In [7]:
image = cv2.imread(filepaths[1])
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image.shape

(1080, 716, 3)

In [None]:
plt.imshow(image)

In [None]:
transformed_image = transforms(image=image)['image']
print(transformed_image.shape)
plt.imshow(transformed_image)
plt.show()

In [28]:
# train_transforms = A.Compose([
#     # Dual transforms
#     A.Resize(height=256, width=256, interpolation=3, always_apply=True),
#     A.Affine(
#         scale = (0.8, 1.2),
#         rotate = (-360, 360),
#         shear = (-20, 20),
#         p = 0.5
#     ),
#     A.HorizontalFlip(p=0.5),
#     A.VerticalFlip(p=0.5),
#     # Image only transforms
#     A.ColorJitter(
#         brightness = 0.5,
#         contrast = 0.5,
#         saturation = 0.5,
#         hue = 0,
#         p = 0.5
#     ),
#     A.CLAHE(p=0.5),
#     # A.Normalize(mean=(0.4456, 0.4436, 0.4018), std=(0.2220, 0.2154, 0.2298), p=1) # mean and std computed on this dataset.    
# ])

# val_transforms = A.Compose([
#     A.Resize(height=256, width=256, interpolation=3, always_apply=True)

In [32]:
transforms1 = A.Compose([
    A.LongestMaxSize(max_size=224, interpolation=3, p=1.0),
    A.PadIfNeeded(min_height=224, min_width=224, border_mode=0, value=0, mask_value=0, p=1.0),
    
    # Geometric
    A.Affine(
        scale = (0.8, 1.2),
        rotate = (-360, 360),
        shear = (-20, 20),
        p = 0.5
    ),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
   
    # Color
    A.ColorJitter(
        brightness = 0.7,
        contrast = 0.7,
        saturation = 0.7,
        hue = 0.7,
        p = 0.5
    ),
    A.CLAHE(p=0.5),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], p=1.0) # normalization values the ResNet was trained to. torchvision.models.ResNet50_Weights.DEFAULT.transforms()
])

In [33]:
transformed_image = transforms1(image=image)['image']
print(transformed_image.shape)
plt.imshow(transformed_image)
plt.axis('off')
plt.show()

NameError: name 'image' is not defined

In [31]:
transformed_image.min(), transformed_image.max()

(-2.117904, -0.3403921)

In [32]:
resnet_transform_info = torchvision.models.ResNet50_Weights.DEFAULT.transforms()
mean, std = resnet_transform_info.__dict__['mean'], resnet_transform_info.__dict__['std']

mean, std

([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

### Transforms with different normalization parameters for the two encoders

In [2]:
from src.transforms import CCTransforms

In [3]:
input = np.random.randint(0, 255, (512, 512, 3)).astype(np.uint8) # dummy image                       # NOTE: resizing with CV2 requires uint8!
input.shape

(512, 512, 3)

In [4]:
mytransforms = CCTransforms(encoder_type='clip')

In [5]:
transformed = mytransforms.train_transforms(image=input)['image']
transformed.shape

torch.Size([3, 224, 224])

In [8]:
# Add batch dimension
transformed_batch = transformed[None, ...]
transformed_batch.shape

torch.Size([1, 3, 224, 224])

In [9]:
# CLIP
model = transformers.CLIPModel.from_pretrained('openai/clip-vit-base-patch32')
model = model.eval()
outputs = model.get_image_features(pixel_values=transformed_batch)
outputs.shape

torch.Size([1, 512])

In [10]:
# ResNet
# (using transforms with clip normalization just to test the pipeline)
model = torchvision.models.resnet50(weights='IMAGENET1K_V2')
model = model.eval()
outputs = model(transformed_batch)
outputs.shape

torch.Size([1, 1000])

## Data loading

In [2]:
from src.datamodule import CCDataModule

In [3]:
dm = CCDataModule(encoder_type='clip')
dm.prepare_data()
dm.setup('train')

In [4]:
ds = dm.train_dataset
len(ds)

133

In [5]:
dl = dm.train_dataloader()
len(dl)

17

In [8]:
for images, labels in dl:
    break 

images.shape, labels.shape

(torch.Size([8, 3, 224, 224]), torch.Size([8, 3]))

## Module

In [1]:
from src.encoder import CLIPEncoder, ResNetEncoder
from src.classifier import CCClassifier
from src.module import CCModule

In [2]:
encoder = CLIPEncoder(version='base')
classifier = CCClassifier(encoder_type='clip')

In [3]:
model = CCModule(
    encoder=encoder,
    classifier=classifier,
    lr=0.01
)

In [7]:
input = torch.rand((4, 3, 224, 224))
input.shape

torch.Size([4, 3, 224, 224])

In [8]:
output = model(input)
output.shape

torch.Size([4, 3])

In [9]:
output

tensor([[-0.0558,  0.1430, -0.0774],
        [-0.0549,  0.1093, -0.0838],
        [-0.0397,  0.1301, -0.0762],
        [-0.0619,  0.1057, -0.0559]], grad_fn=<AddmmBackward0>)

## Loss & Metric

### Loss

In [26]:
import torchmetrics
from src.datamodule import CCDataModule

In [9]:
dm = CCDataModule(encoder_type='clip')
dm.prepare_data()
dm.setup('train')

ds = dm.train_dataset
dl = dm.train_dataloader()

len(ds), len(dl)

(133, 17)

In [34]:
for images, gts in dl:
    break 

images.shape, gts.shape

(torch.Size([8, 3, 224, 224]), torch.Size([8, 3]))

In [12]:
loss_fn = torch.nn.CrossEntropyLoss()

In [35]:
preds = torch.rand_like(gts)
preds.shape

torch.Size([8, 3])

In [36]:
gts.dtype, preds.dtype

(torch.float16, torch.float16)

In [37]:
loss_fn(input=gts.float(), target=gts)

tensor(0.5514)

In [38]:
loss_fn(input=preds.float(), target=gts)

tensor(1.1436)

### Metric

In [39]:
metric = torchmetrics.Accuracy(task="multiclass", num_classes=3)

In [58]:
gts

tensor([[0., 0., 1.],
        [0., 1., 0.],
        [0., 0., 1.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.]], dtype=torch.float16)

In [56]:
# Random preds (already thresholded)
indices = torch.randint(low=0, high=3, size=[8, ])
preds = torch.nn.functional.one_hot(indices, num_classes=3).type(torch.float16)

preds

tensor([[1., 0., 0.],
        [0., 0., 1.],
        [1., 0., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 1., 0.]], dtype=torch.float16)

In [59]:
gts_labels = torch.argmax(gts, dim=1)
gts_labels

tensor([2, 1, 2, 0, 2, 1, 1, 1])

In [60]:
preds_labels = torch.argmax(preds, dim=1)
preds_labels

tensor([0, 2, 0, 1, 1, 0, 0, 1])

In [62]:
metric(preds=gts_labels, target=gts_labels)

tensor(1.)

In [61]:
metric(preds=preds_labels, target=gts_labels)

tensor(0.1250)