## Face classification

In [1]:
import torch
from matplotlib import pyplot as plt
from torchvision import datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms.functional as TF
import torchvision.transforms as T
import os
import argparse
import numpy as np
import shutil
from glob import glob
from random import shuffle
from PIL import Image


In [2]:
# Class map
class_names = {0: 'Black', 1: 'White'}

# Split the dataset into train, test and val folders
args = {"source": "../datasets/black_white", "dest": "../datasets/black_white_splitted", "val_size": 0.1, "train_size": 0.7, "test_size": 0.2}

# Crea le cartelle di destinazione (se non esistono)
if not os.path.exists(args["dest"]):
    os.makedirs(args["dest"])

dest_train = os.path.join(args["dest"], "train")
if not os.path.exists(dest_train):
    os.makedirs(dest_train)

dest_test = os.path.join(args["dest"], "test")
if not os.path.exists(dest_test):
    os.makedirs(dest_test)

dest_val = os.path.join(args["dest"], "val")
if not os.path.exists(dest_val):
    os.makedirs(dest_val)

class_dirs = os.listdir(args["source"])
for class_dir in class_dirs:
    class_dir_path = os.path.join(args["source"], class_dir)
    num_elem = len(os.listdir(class_dir_path))
    total_indices = np.arange(num_elem)
    val_indices = np.random.choice(total_indices,
                                   size=int(num_elem*args["val_size"]),
                                   replace=False)
    total_indices = np.delete(total_indices, val_indices)
    test_indices = np.random.choice(total_indices,
                                   size=int(num_elem*args["test_size"]),
                                   replace=False)

    if not os.path.exists(os.path.join(dest_train, class_dir)):
        os.mkdir(os.path.join(dest_train, class_dir))
    if not os.path.exists(os.path.join(dest_test, class_dir)):
        os.mkdir(os.path.join(dest_test, class_dir))
    if not os.path.exists(os.path.join(dest_val, class_dir)):
        os.mkdir(os.path.join(dest_val, class_dir))
        
    for index, filename in enumerate(os.listdir(class_dir_path)):
            if index in val_indices:
                shutil.copy(os.path.join(class_dir_path, filename), os.path.join(dest_val, class_dir))
            elif index in test_indices:
                shutil.copy(os.path.join(class_dir_path, filename), os.path.join(dest_test, class_dir))
            else:
                shutil.copy(os.path.join(class_dir_path, filename), os.path.join(dest_train, class_dir))

### Image dataset transforms
The `torchvision.transforms` module includes additional classes specific for image pre-processing. Some of them are:

- `Resize`: resizes an image;
- `RandomCrop`: randomly crops an image (data augmentation during training);
- `RandomHorizontalFlip`: randomly flips an image (data augmentation during training);
- `CenterCrop`: crops the central area of an image (used in testing, as counterpart to `RandomCrop`);
- `Normalize`: performs standardization, given per-channel means and standard deviations.

Usually, to do data augmentation, you crop an image to an area which is slightly smaller than the full size.

In [3]:
# Params
resize = T.Resize(32)  
random_crop = T.RandomCrop(28)             # train
random_hor_flip = T.RandomHorizontalFlip()  # train
center_crop = T.CenterCrop(28)             # test and val
to_tensor = T.ToTensor()
normalize = T.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))

# Compose transforms
train_transform = T.Compose([resize, random_crop, random_hor_flip, to_tensor, normalize])
test_transform = T.Compose([resize, center_crop, to_tensor, normalize])
val_transform = T.Compose([resize, center_crop, to_tensor, normalize])

In [4]:
# Instantiate datasets
root_dir = "../datasets/"
train_dataset = ImageFolder(os.path.join(root_dir, "black_white_splitted", "train"), transform=train_transform) 
val_dataset = ImageFolder(os.path.join(root_dir, "black_white_splitted", "val"), transform=val_transform)
test_dataset = ImageFolder(os.path.join(root_dir, "black_white_splitted", "test"), transform=test_transform) 

# Get number of classes (we'll need it in the model)
num_classes = len(train_dataset.classes)
batch_size = 64

# Print dataset statistics
print(f"Num. classes: {num_classes}")
print(f"Num. train samples: {len(train_dataset)}")
print(f"Num. valid. samples: {len(val_dataset)}")
print(f"Num. test samples: {len(test_dataset)}")

# def loader(path):
#    print(path)
#    return PIL.Image.open(path).convert("RGB")

# Instantiate data loaders
loaders = {"train": DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True,  num_workers=4, pin_memory=True),
           "val":   DataLoader(dataset=val_dataset,   batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True),
           "test":  DataLoader(dataset=test_dataset,  batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
          }

Num. classes: 2
Num. train samples: 100000
Num. valid. samples: 68611
Num. test samples: 91441


In [5]:
# Get image size ("size" is a property of PIL.Image)
train_dataset[0][0].size()

torch.Size([3, 28, 28])

In [6]:
# Show an image of a given class
train_dataset[train_dataset.targets.index(1)][0]

tensor([[[-1.1004, -1.0369, -0.8305,  ..., -0.7511,  0.6461,  1.3923],
         [-1.3385, -1.1004, -0.7193,  ..., -1.1321,  0.3285,  1.1065],
         [-1.1639, -0.8463, -0.4971,  ..., -1.2274, -0.6082,  0.6620],
         ...,
         [-0.6241, -0.3383, -0.1636,  ...,  1.5034,  1.4717,  1.4399],
         [-0.7670, -0.5447, -0.4177,  ...,  1.6304,  1.5669,  1.5034],
         [-0.7511, -0.5764, -0.5288,  ...,  1.6622,  1.6622,  1.6146]],

        [[-1.0806, -1.0161, -0.8063,  ..., -0.7256,  0.6946,  1.4531],
         [-1.3227, -1.0806, -0.6933,  ..., -1.1129,  0.3718,  1.1626],
         [-1.1452, -0.8224, -0.4674,  ..., -1.2097, -0.5803,  0.7107],
         ...,
         [-0.5965, -0.3060, -0.1285,  ...,  1.5660,  1.5338,  1.5015],
         [-0.7417, -0.5158, -0.3867,  ...,  1.6951,  1.6306,  1.5660],
         [-0.7256, -0.5481, -0.4997,  ...,  1.7274,  1.7274,  1.6790]],

        [[-0.8693, -0.8092, -0.6139,  ..., -0.5388,  0.7834,  1.4896],
         [-1.0947, -0.8693, -0.5087,  ..., -0

### CNN model

In [7]:
# Import
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [8]:
# Define class
class CNN(nn.Module):
    
    # Constructor
    def __init__(self):
        # Call parent constructor
        super().__init__();
        # Create convolutional layers
        self.conv_layers = nn.Sequential(
            # Layer 1
            nn.Conv2d(3, 64, kernel_size=3, padding=0, stride=1),
            nn.ReLU(),
            # Layer 2
            nn.Conv2d(64, 128, kernel_size=3, padding=0, stride=1),
            nn.ReLU(),
            # Layer 3
            nn.Conv2d(128, 128, kernel_size=3, padding=0, stride=1),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            # Layer 4
            nn.Conv2d(128, 256, kernel_size=3, padding=0, stride=1),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2)
        )
        # Create fully-connected layers
        self.fc_layers = nn.Sequential(
            # FC layer
            nn.Linear(4096, 1024),
            nn.ReLU(),
            # Classification layer
            nn.Linear(1024, 2)
        )

    # Forward
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

### Model training

In [9]:
# Select device
print(f"CUDA is available? {torch.cuda.is_available()}")
dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(dev)

CUDA is available? True
cuda


In [10]:
def train(epochs, dev, lr=0.001):
    try:
        # Create model
        model = CNN()
        model = model.to(dev)
        print(model)
        # Optimizer
        optimizer = optim.SGD(model.parameters(), lr=lr)
        # Initialize history
        history_loss = {"train": [], "val": [], "test": []}
        history_accuracy = {"train": [], "val": [], "test": []}
        # Process each epoch
        for epoch in range(epochs):
            # Initialize epoch variables
            sum_loss = {"train": 0, "val": 0, "test": 0}
            sum_accuracy = {"train": 0, "val": 0, "test": 0}
            # Process each split
            for split in ["train", "val", "test"]:
                # Process each batch
                for i,(input, labels) in enumerate(loaders[split]):
                    # Move to CUDA
                    input = input.to(dev)
                    labels = labels.to(dev)
                    # Reset gradients
                    optimizer.zero_grad()
                    # Compute output
                    pred = model(input)
                    loss = F.cross_entropy(pred, labels)
                    # Update loss
                    sum_loss[split] += loss.item()
                    # Check parameter update
                    if split == "train":
                        # Compute gradients
                        loss.backward()
                        # Optimize
                        optimizer.step()
                    # Compute accuracy
                    _,pred_labels = pred.max(1)
                    batch_accuracy = (pred_labels == labels).sum().item()/input.size(0)
                    # Update accuracy
                    sum_accuracy[split] += batch_accuracy
                    
                    # print batch info
                    #if(i%10 == 0):
                    #    print(f"Batch number: {i} | Loss: {loss}")
                    
            # Compute epoch loss/accuracy
            epoch_loss = {split: sum_loss[split]/len(loaders[split]) for split in ["train", "val", "test"]}
            epoch_accuracy = {split: sum_accuracy[split]/len(loaders[split]) for split in ["train", "val", "test"]}
            # Update history
            for split in ["train", "val", "test"]:
                history_loss[split].append(epoch_loss[split])
                history_accuracy[split].append(epoch_accuracy[split])
            # Print info
            print(f"Epoch {epoch+1}:",
                  f"TrL={epoch_loss['train']:.4f},",
                  f"TrA={epoch_accuracy['train']:.4f},",
                  f"VL={epoch_loss['val']:.4f},",
                  f"VA={epoch_accuracy['val']:.4f},",
                  f"TeL={epoch_loss['test']:.4f},",
                  f"TeA={epoch_accuracy['test']:.4f},")
            
            # Save the model
            torch.save(model, "./models/face_detection_model" + str(epoch))
    except KeyboardInterrupt:
        print("Interrupted")
    finally:
        # Plot loss
        plt.title("Loss")
        for split in ["train", "val", "test"]:
            plt.plot(history_loss[split], label=split)
        plt.legend()
        plt.show()
        # Plot accuracy
        plt.title("Accuracy")
        for split in ["train", "val", "test"]:
            plt.plot(history_accuracy[split], label=split)
        plt.legend()
        plt.show()

In [None]:
train(100, dev, 0.01)

CNN(
  (conv_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU()
    (4): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
    (6): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (7): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (8): ReLU()
    (9): AvgPool2d(kernel_size=2, stride=2, padding=0)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=4096, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=2, bias=True)
  )
)
Epoch 1: TrL=0.6085, TrA=0.6717, VL=0.5876, VA=0.6915, TeL=0.5878, TeA=0.6923,


  "type " + obj.__name__ + ". It won't be checked "


Epoch 2: TrL=0.5770, TrA=0.7044, VL=0.5679, VA=0.7131, TeL=0.5678, TeA=0.7129,
Epoch 3: TrL=0.5577, TrA=0.7180, VL=0.5495, VA=0.7273, TeL=0.5502, TeA=0.7268,
Epoch 4: TrL=0.5363, TrA=0.7314, VL=0.5127, VA=0.7458, TeL=0.5133, TeA=0.7460,
Epoch 5: TrL=0.5174, TrA=0.7425, VL=0.4914, VA=0.7627, TeL=0.4920, TeA=0.7631,
Epoch 6: TrL=0.4957, TrA=0.7569, VL=0.6936, VA=0.6510, TeL=0.6947, TeA=0.6502,
Epoch 7: TrL=0.4796, TrA=0.7658, VL=0.4793, VA=0.7672, TeL=0.4791, TeA=0.7676,
Epoch 8: TrL=0.4643, TrA=0.7765, VL=0.4282, VA=0.7989, TeL=0.4286, TeA=0.7985,
Epoch 9: TrL=0.4492, TrA=0.7850, VL=0.4400, VA=0.7875, TeL=0.4402, TeA=0.7879,
Epoch 10: TrL=0.4373, TrA=0.7926, VL=0.4198, VA=0.8008, TeL=0.4202, TeA=0.8003,
Epoch 11: TrL=0.4257, TrA=0.7992, VL=0.4153, VA=0.8039, TeL=0.4151, TeA=0.8045,
Epoch 12: TrL=0.4167, TrA=0.8046, VL=0.4013, VA=0.8125, TeL=0.4014, TeA=0.8115,
Epoch 13: TrL=0.4068, TrA=0.8080, VL=0.3935, VA=0.8170, TeL=0.3937, TeA=0.8170,
Epoch 14: TrL=0.4010, TrA=0.8139, VL=0.3813, VA=

In [36]:
model = torch.load("./models/face_detection_model89")
model.eval()

CNN(
  (conv_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (3): ReLU()
    (4): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
    (6): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (7): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (8): ReLU()
    (9): AvgPool2d(kernel_size=2, stride=2, padding=0)
  )
  (fc_layers): Sequential(
    (0): Linear(in_features=4096, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=2, bias=True)
  )
)

In [37]:
# Get random image
base_path = "../datasets/UCF-101-to-image/Utrain"
classification_path = "../datasets/UCF-101-to-image-classified/Utrain"

image_paths = glob(os.path.join(base_path, "*"))
for path in image_paths:
    # Load image
    img = Image.open(path).convert("RGB")
    # Show image
    plt.imshow(img)
    plt.axis('off')

    # Transform image
    img = test_transform(img)

    # View as a batch
    img.unsqueeze_(0)
    # Copy to device
    img = img.to(dev)
    with torch.no_grad():
        # Model forward
        pred = model(img)
    # Get prediction
    _,pred = pred.max(1)
    pred = pred.item()
    print(f"Predicted: {class_names[pred]}")
    if(pred == 0):
        #copia in black
        shutil.copy(path, classification_path + "/black")
    else:
        #copia in white
        shutil.copy(path, classification_path + "/white")