In [None]:
import cv2
import os
import numpy as np
from tqdm.notebook import tqdm
import torch

torch.manual_seed(1337)
np.random.seed(1337)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Images / Data

## Standardize

* Change the size of each image to the median values of the entire image set (460 x 310)

In [2]:
IMAGES_FILEPATH = 'data/food'
PROCESSED_IMAGES_FILEPATH = 'data/food_processed'
IMAGE_DIMS = (460, 310)

def listdir_jpg(path):
    """os.listdir but only for jpg"""
    for f in os.listdir(path):
        _, extension = os.path.splitext(f)
        if extension == ".jpg":
            yield f

number_of_images = len(list(listdir_jpg(IMAGES_FILEPATH)))

print("Total number of images: {}".format(number_of_images))

Total number of images: 10000


In [3]:
def load_images():
    """Load and return standard size images."""
    should_preprocess = not (os.path.exists(PROCESSED_IMAGES_FILEPATH) \
        and len(list(listdir_jpg(PROCESSED_IMAGES_FILEPATH))) == number_of_images)
    
    image_dir = IMAGES_FILEPATH if should_preprocess else PROCESSED_IMAGES_FILEPATH
    images_gen = sorted(listdir_jpg(image_dir))
    tqdm_desc = "Loading {} images".format("raw" if should_preprocess else "processed")
    images = [cv2.imread(os.path.join(image_dir, img)) for img in tqdm(images_gen, desc=tqdm_desc)]
    
    
    if should_preprocess:
        if not os.path.exists(PROCESSED_IMAGES_FILEPATH):
            os.makedirs(PROCESSED_IMAGES_FILEPATH)
        pbar = tqdm(total=len(images), desc="Processing image size", position=0)
        for i, image in enumerate(images):
            # Preprocess
            image = cv2.resize(image, dsize=IMAGE_DIMS, interpolation=cv2.INTER_LANCZOS4)
            images[i] = image
            # Save for later
            cv2.imwrite(os.path.join(PROCESSED_IMAGES_FILEPATH, "{:05d}.jpg".format(i)), image)
            pbar.update()
    return images

images = load_images()

Loading processed images: 100%|██████████| 10000/10000 [00:38<00:00, 262.06it/s]


## Normalize images to [0, 1]

In [4]:
images_normalized = [cv2.normalize(image, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) for image in tqdm(images, desc="Normalizing images")]

print(images_normalized[0].shape)

Normalizing images: 100%|██████████| 10000/10000 [00:51<00:00, 196.05it/s]

(310, 460, 3)





## Dataset

In [5]:
from torch.utils.data import Dataset, DataLoader

class TripletImageDataset(Dataset):
    
    def __init__(self, triplet_labels_fp, images):
        """Create triplets from triplet_labels_fp"""
        
        self.images = images
        
        triplets = []
        for line in tqdm(open(triplet_labels_fp, 'r')):
            anchor, positive, negative = tuple(map(int, line.split()))
            triplets.append((anchor, positive, negative))
        
        self.triplets = triplets
        
    # Required for Map-style dataset
    def __getitem__(self, index):
        i_1, i_2, i_3 = self.triplets[index]
        return images[i_1], images[i_2], images[i_3]

    def __len__(self):
        return len(self.triplets)
    
    def _img_to_tensor(self, img):
        return img.float().permute(0, 3, 1, 2).to(device)
    
    def generate_batches(self, batch_size, shuffle=True, drop_last=True):
        dataloader = DataLoader(dataset=self, batch_size=batch_size, 
                                shuffle=shuffle, drop_last=drop_last)

        for img1, img2, img3 in dataloader:
            yield self._img_to_tensor(img1), self._img_to_tensor(img2), self._img_to_tensor(img3)

In [6]:
train_set = TripletImageDataset('data/train_triplets.txt', images_normalized)
test_set = TripletImageDataset('data/test_triplets.txt', images_normalized)

59515it [00:00, 184162.92it/s]
59544it [00:00, 383931.68it/s]


# Model

In [7]:
import torch.nn as nn

class NeuralNet(nn.Module):
    
    EMBEDDING_DIM=50
    
    def __init__(self):
        super(NeuralNet, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, 7),
            nn.PReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),
            nn.Conv2d(32, 64, 5),
            nn.PReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),
            nn.Conv2d(64, 128, 3),
            nn.PReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),
            nn.Conv2d(128, 256, 1),
            nn.PReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),
            nn.Conv2d(256, 28, 1),
            nn.PReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),
        )
        
        self.fc = nn.Sequential(
            nn.Linear(52416, 512),
            nn.PReLU(),
            nn.Linear(512, NeuralNet.EMBEDDING_DIM)
        )
        
        self.apply(self.init_weights)
        
    def forward(self, x):
        x = self.conv(x)
        x = x.view(-1, 52416)
        x = self.fc(x)
        return x
    
    def init_weights(self, m):
        if isinstance(m, nn.Conv2d):
            torch.nn.init.kaiming_normal_(m.weight)

In [8]:
class TripletLoss(nn.Module):
    
    def __init__(self, margin=0.5):
        super(TripletLoss, self).__init__()
        self.margin = margin
        
    def euclidean_distance(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor, positive, negative):
        distance_pos = self.euclidean_distance(anchor, positive)
        distance_neg = self.euclidean_distance(anchor, negative)
        # Torch.relu(x) = max(0, x)
        losses = torch.relu(distance_pos - distance_neg + self.margin)
        return losses.mean()

## Training

In [15]:
class Training:
    
    def start(self, model, dataset, batch_size):
        epochs = 5
        nr_batches = len(dataset) // batch_size
        progress_bar = tqdm(desc='', total=epochs * nr_batches,
            leave=False, ncols=80
        )
        # Start the training
        model.train()
        for epoch in range(epochs):
            running_loss = []
            batches = dataset.generate_batches(batch_size)
            for batch_index, (anchor_img, positive_img, negative_img) in enumerate(batches):
                progress_bar.set_description_str("E {} | B {}".format(
                    epoch, batch_index
                ))
                
                optimizer.zero_grad()
                anchor_out = model(anchor_img)
                positive_out = model(positive_img)
                negative_out = model(negative_img)

                loss = criterion(anchor_out, positive_out, negative_out)
                loss.backward()
                optimizer.step()
                
                loss_t = loss.cpu().detach().numpy()
                running_loss.append(loss_t)
                
                progress_bar.set_postfix_str("Loss={0:.3f}".format(loss_t))
                progress_bar.update()
                
            # Save the model
            torch.save(model.state_dict(), 'trained_models/model')
            
            progress_bar.write(
                "[Epoch {}/{}]: Loss {}".format(
                    epoch+1, epochs,  np.mean(running_loss)
                )
            )
            progress_bar.refresh()

In [17]:
model = NeuralNet()
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = TripletLoss()

batch_size = 16

Training().start(model, train_set, batch_size)




  0%|                                                 | 0/18595 [00:00<?, ?it/s][A[A[A


E 0 | B 0:   0%|                                      | 0/18595 [00:00<?, ?it/s][A[A[A


E 0 | B 0:   0%|                    | 0/18595 [00:24<?, ?it/s, Loss=1154116.500][A[A[A


E 0 | B 0:   0%|        | 1/18595 [00:24<124:29:33, 24.10s/it, Loss=1154116.500][A[A[A


E 0 | B 1:   0%|        | 1/18595 [00:24<124:29:33, 24.10s/it, Loss=1154116.500][A[A[A

KeyboardInterrupt: 