# ResNet50 Feature Extractor on CMNIST

## Imports

In [1]:
import os
import numpy as np
import cv2
from tqdm import tqdm
import torch
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as T
from PIL import Image

## Dataset CMNIST

In [2]:
def load_images_bias(folder_path):
    X = []
    Y = []
    Y_bias = []
    for filename in os.listdir(folder_path):
        img_path = os.path.join(folder_path, filename)
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        X.append(img)

        parts = filename.split('_')
        y = int(parts[1])
        y_bias = int(parts[2].split('.')[0])
        Y.append(y)
        Y_bias.append(y_bias)

    return np.array(X), np.array(Y), np.array(Y_bias)

### Training set

In [3]:
folder_path = 'cmnist/5pct/'
numbers = [str(i) + "/" for i in range(10)]

In [4]:
X_train = []
Y_train = []
Y_train_bias = []

In [5]:
for parent_folder in ['align/', 'conflict/']:
    for number in numbers:
        X_batch, Y_batch, Y_bias_batch = load_images_bias(folder_path + parent_folder + number)
        X_train.extend(X_batch)        
        Y_train.extend(Y_batch)
        Y_train_bias.extend(Y_bias_batch)

X_train = np.array(X_train)
Y_train = np.array(Y_train)
Y_train_bias = np.array(Y_train_bias)

In [6]:
print(f"X_train shape      = {X_train.shape}")
print(f"Y_train shape      = {Y_train.shape}")
print(f"Y_train_bias shape = {Y_train_bias.shape}")

X_train shape      = (55000, 28, 28, 3)
Y_train shape      = (55000,)
Y_train_bias shape = (55000,)


### Validation set

In [7]:
X_valid = []
Y_valid = []
Y_valid_bias = []

In [8]:
folder_path = 'cmnist/5pct/valid/'

In [9]:
X_batch, Y_batch, Y_bias_batch = load_images_bias(folder_path)
X_valid.extend(X_batch)        
Y_valid.extend(Y_batch)
Y_valid_bias.extend(Y_bias_batch)

X_valid = np.array(X_valid)
Y_valid = np.array(Y_valid)
Y_valid_bias = np.array(Y_valid_bias)

In [10]:
print(f"X_valid shape      = {X_valid.shape}")
print(f"Y_valid shape      = {Y_valid.shape}")
print(f"Y_valid_bias shape = {Y_valid_bias.shape}")

X_valid shape      = (5000, 28, 28, 3)
Y_valid shape      = (5000,)
Y_valid_bias shape = (5000,)


## Testing set

In [11]:
X_test = []
Y_test = []
Y_test_bias = []

In [12]:
folder_path = 'cmnist/test/'

for number in numbers:
    X_batch, Y_batch, Y_bias_batch = load_images_bias(folder_path + number)
    X_test.extend(X_batch)        
    Y_test.extend(Y_batch)
    Y_test_bias.extend(Y_bias_batch)

X_test = np.array(X_test)
Y_test = np.array(Y_test)
Y_test_bias = np.array(Y_test_bias)

In [13]:
print(f"X_test shape      = {X_test.shape}")
print(f"Y_test shape      = {Y_test.shape}")
print(f"Y_test_bias shape = {Y_test_bias.shape}")

X_test shape      = (10000, 28, 28, 3)
Y_test shape      = (10000,)
Y_test_bias shape = (10000,)


## Resize, to Tensor, Normalize

In [14]:
imagenet_transforms = T.Compose([
    # 1. Ridimensiona l'immagine da 28x28 a 224x224. I 3 canali vengono mantenuti.
    T.Resize((224, 224)),
    
    # 2. Converte l'immagine in un Tensore PyTorch.
    T.ToTensor(),
    
    # 3. Normalizza il tensore usando la media e la deviazione standard di ImageNet.
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

## ResNet50 modified

In [15]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
feature_extractor = torch.nn.Sequential(*list(model.children())[:-1])
feature_extractor.to(device)
feature_extractor.eval()

Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)


## Extracting features

In [35]:
batch_size = 32

In [53]:
def extract_features(X):
    result = []
    with torch.no_grad():
        # The main for-loop iterates through the X array in steps of 'batch_size'
        for i in tqdm(range(0, len(X), batch_size), desc="Processing Batches"):
            
            # 1. Get a small batch of images from X
            batch_numpy = X[i : i + batch_size]
            
            # 2. Transform each image in this small batch
            batch_tensor_list = []
            for img_numpy in batch_numpy:
                img_pil = Image.fromarray(img_numpy)
                transformed_tensor = imagenet_transforms(img_pil)
                batch_tensor_list.append(transformed_tensor)
            
            # 3. Stack the list of transformed tensors into a single batch tensor
            batch_to_process = torch.stack(batch_tensor_list)
            
            # 4. Move the batch to the correct device and pass it to the model
            batch_to_process = batch_to_process.to(device)
            features = feature_extractor(batch_to_process)
            
            # 5. Move the results to the CPU, convert to NumPy, and store them
            features_np = features.squeeze().cpu().numpy()
            # Use .extend() to add each feature vector from the batch to our main list
            result.extend(features_np)
    return result

### Training set

In [54]:
X_train_features = np.array(extract_features(X_train))

Processing Batches: 100%|███████████████████| 1719/1719 [13:23<00:00,  2.14it/s]


In [56]:
print(f"X_train_features shape: {X_train_features.shape}")

X_train_features shape: (55000, 2048)


### Validation set

In [59]:
X_valid_features = np.array(extract_features(X_valid))

Processing Batches: 100%|█████████████████████| 157/157 [01:03<00:00,  2.46it/s]


In [61]:
print(f"X_valid_features shape: {X_valid_features.shape}")

X_valid_features shape: (5000, 2048)


### Testing set

In [60]:
X_test_features = np.array(extract_features(X_test))

Processing Batches: 100%|█████████████████████| 313/313 [02:46<00:00,  1.88it/s]


In [62]:
print(f"X_test_features shape: {X_test_features.shape}")

X_test_features shape: (10000, 2048)


## Save Data to .npy Files

In [67]:
folder_path = "features_RN/"

### Training set

In [90]:
print("Original array shapes:")
print(f"X_train shape: {X_train_features.shape}")
print(f"Y_train shape: {Y_train.shape}")
print(f"Y_train_bias shape: {Y_train_bias.shape}")

Original array shapes:
X_train shape: (55000, 2048)
Y_train shape: (55000,)
Y_train_bias shape: (55000,)


In [91]:
np.save(folder_path + 'x_train.npy', X_train_features)
print("'x_train.npy' saved successfully.")

np.save(folder_path + 'y_train.npy', Y_train)
print("'y_train.npy' saved successfully.")

np.save(folder_path + 'b_train.npy', Y_train_bias)
print("'b_train.npy' saved successfully.")

'x_train.npy' saved successfully.
'y_train.npy' saved successfully.
'b_train.npy' saved successfully.


In [99]:
print("Loading arrays back from .npy files to verify...")

X_loaded = np.load(folder_path + 'x_train.npy')
Y_loaded = np.load(folder_path + 'y_train.npy')
Y_bias_loaded = np.load(folder_path + 'b_train.npy')

print("\nShapes of loaded arrays:")
print(f"Shape of loaded X: {X_loaded.shape}")
print(f"Shape of loaded Y: {Y_loaded.shape}")
print(f"Shape of loaded Y_bias: {Y_bias_loaded.shape}")

assert np.array_equal(X_train_features, X_loaded) and np.array_equal(Y_train, Y_loaded) and np.array_equal(Y_train_bias, Y_bias_loaded)
print("\nVerification successful: Original and loaded arrays are identical.")

Loading arrays back from .npy files to verify...

Shapes of loaded arrays:
Shape of loaded X: (55000, 2048)
Shape of loaded Y: (55000,)
Shape of loaded Y_bias: (55000,)

Verification successful: Original and loaded arrays are identical.


### Validation set

In [93]:
print("Original array shapes:")
print(f"X_train shape: {X_valid_features.shape}")
print(f"Y_train shape: {Y_valid.shape}")
print(f"Y_train_bias shape: {Y_valid_bias.shape}")

Original array shapes:
X_train shape: (5000, 2048)
Y_train shape: (5000,)
Y_train_bias shape: (5000,)


In [94]:
np.save(folder_path + 'x_val.npy', X_valid_features)
print("'x_val.npy' saved successfully.")

np.save(folder_path + 'y_val.npy', Y_valid)
print("'y_val.npy' saved successfully.")

np.save(folder_path + 'b_val.npy', Y_valid_bias)
print("'b_val.npy' saved successfully.")

'x_val.npy' saved successfully.
'y_val.npy' saved successfully.
'b_val.npy' saved successfully.


In [100]:
print("Loading arrays back from .npy files to verify...")

X_loaded = np.load(folder_path + 'x_val.npy')
Y_loaded = np.load(folder_path + 'y_val.npy')
Y_bias_loaded = np.load(folder_path + 'b_val.npy')

print("\nShapes of loaded arrays:")
print(f"Shape of loaded X: {X_loaded.shape}")
print(f"Shape of loaded Y: {Y_loaded.shape}")
print(f"Shape of loaded Y_bias: {Y_bias_loaded.shape}")

assert np.array_equal(X_valid_features, X_loaded) and np.array_equal(Y_valid, Y_loaded) and np.array_equal(Y_valid_bias, Y_bias_loaded)
print("\nVerification successful: Original and loaded arrays are identical.")

Loading arrays back from .npy files to verify...

Shapes of loaded arrays:
Shape of loaded X: (5000, 2048)
Shape of loaded Y: (5000,)
Shape of loaded Y_bias: (5000,)

Verification successful: Original and loaded arrays are identical.


### Testing set

In [96]:
print("Original array shapes:")
print(f"X_train shape: {X_test_features.shape}")
print(f"Y_train shape: {Y_test.shape}")
print(f"Y_train_bias shape: {Y_test_bias.shape}")

Original array shapes:
X_train shape: (10000, 2048)
Y_train shape: (10000,)
Y_train_bias shape: (10000,)


In [97]:
np.save(folder_path + 'x_test.npy', X_test_features)
print("'x_test.npy' saved successfully.")

np.save(folder_path + 'y_test.npy', Y_test)
print("'y_test.npy' saved successfully.")

np.save(folder_path + 'b_test.npy', Y_test_bias)
print("'b_test.npy' saved successfully.")

'x_test.npy' saved successfully.
'y_test.npy' saved successfully.
'b_test.npy' saved successfully.


In [101]:
print("Loading arrays back from .npy files to verify...")

X_loaded = np.load(folder_path + 'x_test.npy')
Y_loaded = np.load(folder_path + 'y_test.npy')
Y_bias_loaded = np.load(folder_path + 'b_test.npy')

print("\nShapes of loaded arrays:")
print(f"Shape of loaded X: {X_loaded.shape}")
print(f"Shape of loaded Y: {Y_loaded.shape}")
print(f"Shape of loaded Y_bias: {Y_bias_loaded.shape}")

assert np.array_equal(X_test_features, X_loaded) and np.array_equal(Y_test, Y_loaded) and np.array_equal(Y_test_bias, Y_bias_loaded)
print("\nVerification successful: Original and loaded arrays are identical.")

Loading arrays back from .npy files to verify...

Shapes of loaded arrays:
Shape of loaded X: (10000, 2048)
Shape of loaded Y: (10000,)
Shape of loaded Y_bias: (10000,)

Verification successful: Original and loaded arrays are identical.
