In [None]:
import tensorflow as tf
from pyramda import compose, curry
import os
import numpy as np
from tqdm import tqdm
from PIL import Image
import sys
from sklearn.model_selection import train_test_split
from visuazlizers.nb_graph_visualizer import show_graph, rename_nodes
from utils.curried_functions import tf_add, tf_cast, tf_multiply, filter_list

### Batch

In [None]:
classes_to_labels = compose(
    list,
    range,
    len,
)

In [None]:
def load_batch(path, dirs, labels, num_per_class, image_shape):
    """
    Loads a random batch of images
    
    Parameters:
    -----------
    - path: string
        Path to the image source directory on disk. 
        Source directory should be divided into directories, one directory per class.
    - dirs: [string]
        List of directories contained in the path (classes).
    - labels: [int]
        Class labels. Should correspond to classes.
    - num_per_class: int
        Number of images randomly chosen from each class
    - image_shape: tuple (H,W,C)
        H - image height
        W - image width
        C - number of channels
      
    Returns:
    --------
    - samples: ndarray (N, H, W, C)
        Numpy array of randomly chosen images resized according to model's input shape.
        N - number of samples
        H - height
        W - width
        C - number of channels
    - batch_labels: [int]
        Sample labels.
    """
    
    samples = np.zeros((num_per_class * len(dirs), *image_shape))
    batch_labels = np.ones(num_per_class * len(dirs)).astype(int)
    
    for i, dir_name in enumerate(tqdm(dirs)):
        dir_path = os.path.join(path, dir_name)
        filenames = os.listdir(dir_path)
        filenames = np.random.choice(filenames, num_per_class)
        
        batch = np.zeros((num_per_class, *image_shape))

        for j, filename in enumerate(filenames):
            img = Image.open(os.path.join(dir_path, filename))
            img = img.resize((image_shape[1], image_shape[0]))
            img = np.array(img)
            batch[j,:,:,:] = img
        
        samples[i*num_per_class: i*num_per_class + num_per_class, :, :, :] = batch
        batch_labels[i*num_per_class: i*num_per_class + num_per_class] = batch_labels[i*num_per_class: i*num_per_class + num_per_class] * labels[i]
    
    return samples, batch_labels

### Model

In [None]:
from models.deep_sort_cnn.freeze_model import _preprocess, _network_factory

def create_deep_sort_conv_graph():
    input_var = tf.placeholder(tf.uint8, (None, 128, 64, 3), name="images")
    image_var = tf.map_fn(
        lambda x: _preprocess(x), tf.cast(input_var, tf.float32),
        back_prop=False
    )

    factory_fn = _network_factory()
    features, _ = factory_fn(image_var, reuse=None)
    features = tf.identity(features, name="features")
    
    return input_var, features

In [None]:
def load_model(checkpoint_filename, input_name, output_name, **kwargs):
    """
    Load a model from
    
    Parameters:
    -----------
    - checkpoint_filename: string
        Path to the checkpoint on disk.
    - input_name: string
        Name of the input variable in the graph.
    - output_name: string
        Name of the output variable in the graph.
        
    Returns:
    --------
    - inputs: Tensor (N, H, W, C)
        Images
        N - number of samples (None)
        H - image height
        W - image width
        C - number of channels
    - outputs: Tensor (N, E)
        Image embeddings
        N - number of samples (None)
        E - embedding size
    """
    graph_creator = kwargs.pop('graph_creator', None)
    
    inputs, outputs = None, None
    if graph_creator != None:
        inputs, outputs = graph_creator()
    
    with tf.gfile.GFile(checkpoint_filename, "rb") as file_handle:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(file_handle.read())
    
    tf.import_graph_def(graph_def, name="net")
    
    if graph_creator == None:
        inputs = tf.get_default_graph().get_tensor_by_name("net/%s:0" % input_name)
        outputs = tf.get_default_graph().get_tensor_by_name("net/%s:0" % output_name)
    
    return inputs, outputs, graph_def

In [None]:
inputs, outputs, graph_def = load_model('./models/deep_sort_cnn/mars-small128.pb', input_name="images", output_name="features")

In [None]:
tmp_def = rename_nodes(graph_def, lambda s:"/".join(s.split('_',1)))
show_graph(tmp_def)

## Loss

In [None]:
def get_positive_mask(labels):
    """
    Parameters:
    -----------
    - labels: [int]
        List of labels of size N (number of samples).
    Returns:
    ----------
    - positive_mask: Tensor (N, N)
        A square martix with True for all positive samples and False for all negative samples.
    """
    return tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))

get_not_anchor_mask = compose(
    tf.logical_not,
    tf_cast(dtype = tf.bool),
    tf.eye,
    lambda shape: shape[0],
    tf.shape,
)

get_not_anchor_mask.__doc__ = """
    Parameters:
    -----------
    - labels: [int]
        List of labels of size N (number of samples).
    Returns:
    ----------
    - not_anchor_mask: Tensor (N, N)
        A square martix with False for all anchors and True for other samples, like
        [[0 1 1]
         [1 0 1]
         [1 1 0]]
"""

def get_anchor_positive_mask(labels):
    """
    Parameters:
    -----------
    - labels: [int]
        List of labels of size N (number of samples).
    Returns:
    ----------
    - anchor_positive_mask: Tensor (N, N)
        A square martix with ones for all positive samples, except anchors on main diagonal, 
        and zeros for all other samples.
    """
    return tf.to_float(
        tf.logical_and(
            get_not_anchor_mask(labels),
            get_positive_mask(labels),
        )
    )

get_negative_mask = compose(
    tf_add(1.),
    tf_multiply(np.finfo(np.float32).max),
    tf.to_float,
    get_positive_mask,
)

get_negative_mask.__doc__ = """
    Parameters:
    -----------
    - labels: [int]
        List of labels of size N (number of samples).
    Returns:
    ----------
    - positive_mask: Tensor (N, N)
        A square martix with ones for all negative samples and infinity for all positive samples.
"""

In [None]:
def cosine_distance(embeddings):
    """
    Compute cosine distance matrix
    
    Parameters:
    -----------
    - embeddings: Tensor(N, E)
        Image embeddings, outputs of the convolutional network.
        N - number of samples (None)
        E - embedding size
    """
    
    normalized_embeddings = tf.divide(
        embeddings,
        tf.norm(embeddings),
    )
    
    return tf.subtract(
        1.,
        tf.matmul(normalized_embeddings, tf.transpose(normalized_embeddings))
    )

In [None]:
def compute_loss(model, metric, masks, margin):
    """
    Compute triplet loss
    
    Parameters:
    -----------
    - model: tuple
        Model input tensor and model output tensor
    - metric: function
        Should take output tesor as a parameter and compute distance matrix
    - masks: tuple
        Contains two matrices: 
            a square martix with ones for all positive samples, except anchors on main diagonal, 
            and zeros for all other samples;
            a square martix with ones for all negative samples and infinity 
            for all positive samples.
    - margin: float
        Minimum margin between positive and negative distance.
    """
    inputs, outputs = model
    anchor_positive_mask, negative_mask = masks

    distances = metric(outputs)
    positive_distances = tf.multiply(anchor_positive_mask, distances)
    negative_distances = tf.multiply(negative_mask, distances)

    loss = tf.expand_dims(positive_distances, 2) - tf.expand_dims(negative_distances, 1) + margin
    loss = tf.maximum(loss, 0.)
    
    num_triplets = compose(
        tf.reduce_sum,
        tf.to_float,
    )(tf.greater(loss, 0.))
    
    loss = tf.reduce_sum(loss) / (num_triplets + 1e-16)

    return loss

### Training

In [None]:
def train(
    session,
    model, 
    source_path, 
    dirs, 
    train_labels, 
    metric, 
    optimizer,
    margin=0.2, 
    num_per_class=5, 
    num_iter=1000, 
):
    train_dirs, val_dirs = dirs
    
    inputs, outputs = model
    labels = tf.placeholder(name='labels', shape=(len(train_labels) * num_per_class), dtype=tf.int8)
    anchor_positive_mask = get_anchor_positive_mask(labels)
    negetive_mask = get_negative_mask(labels)
    
    loss = compute_loss(
        model=(inputs, outputs), 
        metric=metric, 
        masks=(anchor_positive_mask, negetive_mask), 
        margin=margin,
    )
    
    train_step = optimizer.minimize(loss)
    session.run(tf.global_variables_initializer())
    
    for i in range(num_iter):
        samples, batch_lables = load_batch(source_path, train_dirs, train_labels, num_per_class, image_shape=(128, 64, 3))
        batch_outputs, batch_loss, _ = session.run([outputs, loss, train_step], {
            inputs: samples,
            labels: batch_lables,
        })
        print(batch_loss)

In [None]:
tf.reset_default_graph()

source_path = '../input/mars/bbox_train/'
dirs = compose(
    filter_list(['.DS_Store'], False),
    os.listdir,
)(source_path)

train_dirs, val_dirs = train_test_split(dirs, test_size=0.2)
train_labels = classes_to_labels(train_dirs)

inputs, outputs, _ = load_model(
    './models/deep_sort_cnn/mars-small128.pb', 
    input_name="images", 
    output_name="features", 
    graph_creator=create_deep_sort_conv_graph,
)

session = tf.Session()

train(
    session=session,
    model=[inputs, outputs],
    source_path=source_path,
    dirs=(train_dirs[0:5], val_dirs),
    train_labels=train_labels[0:5],
    metric=cosine_distance,
    optimizer=tf.train.AdamOptimizer(learning_rate=0.00001),
    num_iter=2,
)