In [None]:
import os
import numpy as np
import cv2
import random as rng

### Define dataset structure

The initial idea is to save the images whole, and let a get_batch function do the cutting and pairing when creating the batch. The strcuture of the dataset is going to be as follows:

* images
    * train
        * s000 (infered by the names of the images)
            * s000img1 (actually original name of the image)
            * s000img2
            * ...
        * S001 
            * etc.
    * valid
        * s700
            * etc.
    * test
        * s999
            * etc.

For now, i will skip this structure creation and just assume it, for the experimenting enviroment, I will upload a few pictures and manually define this structure, and just work with train.

In [None]:
!pwd

In [None]:
STRIP_SIZE = 256

Right now im storing the images whole. With the strips dictionary, given a strip we can get the image location and the pixels of the strip.
However, in practice, this means that an image may be read multiple times when generating batches, also, it means that we read a whole big-ass image, while only needing a small part of it. A better approach would be to store the strips already as separate images beforehand, which unfortunately may increase the dataset size.

In [None]:
def get_dataset(base_path = "images", s = "train", strips_device = 4, strip_size = 256):
    devices = {}
    strips = {}
    s_path = os.path.join(base_path, s)
    strip_num = 0
    for device in os.scandir(s_path):
        if len(device.name) != 3:
            continue
        d_num = int(device.name)
        devices[d_num] = device.path
#        imgs = os.listdir(device)
        for strip in range(strips_device): #I'm only taking 4 strips for image. (top-left 4) and assuming I always can 
            x = strip * strip_size         #Better logic should be impl. to get the max. amount of strips per img size
            y = 0                        
            strips[strip_num] = (d_num, x, y)
            strip_num += 1
    return devices, strips

In [None]:
d, s = get_dataset()

In [None]:
print(s)
print(d)

In [None]:
def get_batch(batch_size, devices, strips, imgids = False):
    #choose at random some categories (category = strip location of a given device)
    categories = rng.sample(range(0, len(strips)), k=batch_size)
    
    pairs = [np.zeros((batch_size, STRIP_SIZE, STRIP_SIZE, 3)) for i in range(2)]
    labels = np.zeros((batch_size, ))
    ids = []
    
    #the first half of the batch will be positive examples and the second negative
    labels[batch_size//2:] = 1
    for i in range(batch_size):
        cat = categories[i]
        x = strips[cat][1] # x position of the strip
        y = strips[cat][2] # y position of the strip
        imgs_paths = [os.path.join(devices[strips[cat][0]], l) for l in os.listdir(devices[strips[cat][0]])]
        if i >= batch_size // 2:
            idxs = rng.sample(range(0, len(imgs_paths)-1), k=2)
            img1 = cv2.imread(imgs_paths[idxs[0]])
            img2 = cv2.imread(imgs_paths[idxs[1]])
            pairs[0][i,:,:,:] = img1[x:x+STRIP_SIZE, y:y+STRIP_SIZE, :]
            pairs[1][i,:,:,:] = img2[x:x+STRIP_SIZE, y:y+STRIP_SIZE, :]
            ids.append((imgs_paths[idxs[0]], cat, imgs_paths[idxs[1]], cat))
        else :
            idx1 = rng.randint(0, len(imgs_paths)-1)
            cat2 = (cat + rng.randint(1, len(categories))) % len(categories) # we ensure that is a different category
            x2 = strips[cat2][1]
            y2 = strips[cat2][2]            
            imgs_paths2 = [os.path.join(devices[strips[cat2][0]], l) for l in os.listdir(devices[strips[cat2][0]])]
            idx2 = rng.randint(0, len(imgs_paths2)-1)
            img1 = cv2.imread(imgs_paths[idx1])
            img2 = cv2.imread(imgs_paths2[idx2])
            pairs[0][i,:,:,:] = img1[x:x+STRIP_SIZE, y:y+STRIP_SIZE, :]
            pairs[1][i,:,:,:] = img2[x2:x2+STRIP_SIZE, y2:y2+STRIP_SIZE, :]
            ids.append((imgs_paths[idx1], cat, imgs_paths[idx2], cat2))
            
    if imgids:
        return pairs, labels, ids
    return pairs, labels

### Dataset visualization

We will now show some random examples to ensure that we are reading the dataset correctly. Each pair of images corresponds to a pair example, Pair0.0 are negative examples (belong to different strips of different images) and Pair1.0 are positive examples (belong to the same strip coordinates and the same devices (but different pictures))

In [None]:
import matplotlib.pyplot as plt

In [None]:
pairs, labels, ids = get_batch(4,d ,s, True)
for ix, i  in enumerate(ids):
    f, ax = plt.subplots(1,2)
    f.suptitle("Pair" + str(labels[ix]) )
    ax[0].imshow(pairs[0][ix,:,:,::-1]/255)
    title = str(s[i[1]]) + "\n" +  i[0].split('/')[-1] 
    ax[0].set_title(title)
    ax[1].imshow(pairs[1][ix,:,:,::-1]/255)
    title = str(s[i[3]]) + "\n" +  i[2].split('/')[-1] 
    ax[1].set_title(title)

## Model definition 

(ref: Siamese Neural Networks for One-shot Image Recognition, Koch et al.)

In [None]:
def generate(batch_size, devices, strips, s="train"):
    while True:
        pairs, labels = get_batch(batch_size, devices, strips, False)
        yield(pairs, labels)

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K

In [4]:
def sequential_block(input_shape = (256,256,3), base_filters=64):
    model = keras.Sequential()
    model.add(layers.Conv2D(base_filters, (10,10), activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*2, (7,7), activation='relu'))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*2, (4,4), activation='relu'))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*4, (4,4), activation='relu'))
#     model.add(layers.Flatten())
    model.add(layers.Dense(base_filters*32, activation='sigmoid'))
    return model

In [6]:
m = sequential_block(base_filters=64)
m.summary()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_8 (Conv2D)            (None, 247, 247, 64)      19264     
_________________________________________________________________
max_pooling2d_6 (MaxPooling2 (None, 123, 123, 64)      0         
_________________________________________________________________
conv2d_9 (Conv2D)            (None, 117, 117, 128)     401536    
_________________________________________________________________
max_pooling2d_7 (MaxPooling2 (None, 58, 58, 128)       0         
_________________________________________________________________
conv2d_10 (Conv2D)           (None, 55, 55, 128)       262272    
_________________________________________________________________
max_pooling2d_8 (MaxPooling2 (None, 27, 27, 128)       0         
_________________________________________________________________
conv2d_11 (Conv2D)           (None, 24, 24, 256)      

In [10]:
def get_siamese_model(input_shape = (256,256,3), base_filters=64):


    left_input = layers.Input(input_shape)
    right_input = layers.Input(input_shape)
    
    # Convolutional Neural Network
    model = keras.Sequential()
    model.add(layers.Conv2D(base_filters, (10,10), activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*2, (7,7), activation='relu'))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*2, (4,4), activation='relu'))
    model.add(layers.MaxPooling2D())
    model.add(layers.Conv2D(base_filters*4, (4,4), activation='relu'))
#     model.add(layers.Flatten())
    model.add(layers.Dense(base_filters*64, activation='sigmoid'))
    
    # Generate the encodings (feature vectors) for the two images
    encoded_l = model(left_input)
    encoded_r = model(right_input)
    
    # Add a customized layer to compute the absolute difference between the encodings
    L1_layer = layers.Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_l, encoded_r])
    
    # Add a dense layer with a sigmoid unit to generate the similarity score
    prediction = layers.Dense(1,activation='sigmoid')(L1_distance)
    
    # Connect the inputs with the outputs
    siamese_net = keras.Model(inputs=[left_input,right_input],outputs=prediction)
    
    # return the model
    return siamese_net

In [12]:
model = get_siamese_model(base_filters=64)
model.summary()

Model: "functional_7"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
input_8 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
sequential_6 (Sequential)       (None, 24, 24, 4096) 2260288     input_7[0][0]                    
                                                                 input_8[0][0]                    
__________________________________________________________________________________________________
lambda_3 (Lambda)               (None, 24, 24, 4096) 0           sequential_6[0][0]    