In [1]:
%matplotlib inline
from utils import DeepSortMock, drawDetection, random_bbox
from matplotlib import pyplot as plt
import numpy as np
import pickle
import cv2
import os
import torch
from torch import nn
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

In [6]:
!pip install torch torchvision



In [2]:
data_files = ["grandma_A", "grandma_B"]
feeds = [cv2.VideoCapture(f"raw_data/videos/{file}.mp4") for file in data_files]
detectors = [DeepSortMock(f"raw_data/tracks/{file}.cleaned.pb") for file in data_files]
feed_identities = [dict() for _ in data_files] 

output_frames = []

x_data = []
y_data = []

frames_left = True
frame_num = 0 

try:
    while frames_left:
        frame_num += 1
        output_frames = [ ]
    
        for i, (feed, detector, identities) in enumerate(zip(feeds, detectors, feed_identities)):
            frames_left, frame = feed.read()
 
            if not frames_left: break
            
            if frame_num % 24 == 0:
                for ID in identities:
                    start, stop = identities[ID][0], identities[ID][-1]
                    identities[ID] = [stop]
                    
                    # Get Pixels per Frame Velocity
                    num_a, c_a = start
                    num_b, c_b = stop 
                    if num_a == num_b: continue

                    c_d = ((c_a[0] - c_b[0] / (num_a - num_b)), (c_a[1] - c_b[1] / (num_a - num_b)))
                    
                    a = np.zeros(len(data_files))
                    a[i] = 1
                    
                    feat_x = (num_b,) + tuple(a) + c_b + c_d
                    
                    x_data.append(feat_x)
                    y_data.append(ID)
                    
            updated_ids = set()        
            people = detector.update(frame)
            # Grab Positive Person images to train seperators  
            for person in people:
                p1, p2, cat, ID = person
                updated_ids.add(ID)
                
                # Center coordinates
                c = ((p1[0] + (abs(p2[0] - p1[0]) / 2)), (p1[1] + (abs(p2[1] - p1[1]) / 2)))
                
                # Normalized Centers
                c_n = (c[0] / frame.shape[0], c[1] / frame.shape[1])
                
                if ID not in identities: identities[ID] = []
                identities[ID].append((frame_num, c_n))

                # Draw a circle with blue line borders of thickness of 2 px
                frame = cv2.circle(frame, (int(c[0]), int(c[1])), 20, (255, 0, 0), 5)
                frame = drawDetection(frame, (p1, p2, cat, ID), info = frame_num)
                
            output_frames.append(frame) 
        try:
            cv2.imshow("Joined Product",
                np.concatenate(tuple(output_frame for output_frame in output_frames), axis = 1))
        except :
            pass
        finally:
            cv2.waitKey(1)
                
finally:
    [cap.release() for cap in feeds]

    cv2.destroyAllWindows()

In [9]:
# A categorical input for every video feed
# the X and Y vel across the latest N detections
# The X, Y center of the most recent detection

def generate_training_data(x, y, output_size = 10000):
    x_out = []
    y_out = [] 
  
    while len(y_out) < output_size:
        x_1, x_2 = np.random.randint(len(x), size=(2))
        if abs(x[x_1][0] - x[x_2][0]) > (24 * 4): continue
        
        x_out.append(x[x_1][1:] + (abs(x[x_1][0] - x[x_2][0]),) + x[x_2][1:])
        y_out.append([y[x_1] == y[x_2]])
    return np.array(x_out), np.array(y_out, dtype=np.int)

In [26]:
input_size = len(data_files) + 2 + 2

net = NeuralNetwork()
net.add(Dense(input_size))
net.add(Dense(32))
net.add(Dense(16))

err = NeuralNetwork()
err.add(Dense(net._layers[-1].outputSize))
err.add(Dense(16))
err.add(Dense(10))
err.add(Dense(1))

s_net = SiameseNetwork(loss_type="model")

s_net.set_main_model(net)
s_net.set_error_model(err)

net.set_training_set(X,Y)

# net.train(epochs = 1, batch_size = 200)
# net.predict()

In [None]:
from ml_me.architectures import NeuralNetwork
from ml_me.layers import Dense, Layer

output_size = 10
split = output_size // 2
X, Y = generate_training_data(x_data, y_data, output_size = output_size)

net = NeuralNetwork()
net.add(Dense(13))
net.add(Dense(32))
net.add(Dense(16))
net.add(Dense(1)) # Not Implemented, activation="softmax")

net.set_training_set(X[split:],Y[split:])
net.train(epochs = 1000, batch_size = 200)
print(f"Recall: {net.get_recall()} Accuracy: {round(net.get_acc(X[:split], Y[:split]), 3)}")
plt.plot(range(len(net.losses)),net.losses)
plt.title("Loss vs. Epochs")
plt.show()


In [None]:
data_files = ["grandma_A", "grandma_B"]
feeds = [cv2.VideoCapture(f"raw_data/videos/{file}.mp4") for file in data_files]
detectors = [DeepSortMock(f"raw_data/tracks/{file}.cleaned.pb") for file in data_files]
feed_identities = [dict() for _ in feeds] 

# DEBUG 
output_frames = []

samples = []
x_data = []
y_data = []

frames_left = True
frame_num = 0 

try:
    while frames_left:
        frame_num += 1
        output_frames = [ ]
    
        for i, (feed, detector, identities) in enumerate(zip(feeds, detectors, feed_identities)):
            updated_ids = set()
            frames_left, frame = feed.read()
 
            if not frames_left: break
            
            if frame_num % 24 == 0:
                for ID in identities:
                    start, stop = identities[ID][0], identities[ID][-1]
                    identities[ID] = [stop]
                    # Get Pixels per Frame Velocity
                    num_a, c_a = start
                    num_b, c_b = stop 
                    if num_a == num_b: continue

                    c_d = ((c_a[0] - c_b[0]/(num_a - num_b)), (c_a[1] - c_b[1]/(num_a - num_b)))
                    
                    a = np.zeros(len(data_files))
                    a[i] = 1
                    
                    feat_x = (num_b,) + tuple(a) + c_b + c_d
                    
                    x_data.append(feat_x)
                    y_data.append(ID)
                    
            people = detector.update(frame)
            # Grab Positive Person images to train seperators  
            for person in people:
                p1, p2, cat, ID = person
                updated_ids.add(ID)
                
                # Normalized Centers
                c = ((p1[0] + (abs(p2[0] - p1[0]) / 2)), (p1[1] + (abs(p2[1] - p1[1]) / 2)))
                c_n = (c[0] / frame.shape[0], c[1] / frame.shape[1])
                # Center coordinates
                center_coordinates = (int(c[0]), int(c[1]))
                if ID not in identities: identities[ID] = []
                identities[ID].append((frame_num, c_n))

                # Draw a circle with blue line borders of thickness of 2 px
                frame = cv2.circle(frame, center_coordinates, 20, (255, 0, 0), 5)
                frame = drawDetection(frame, (p1, p2, cat, ID), info = frame_num)
        
            output_frames.append(frame) 
            
        try:
            cv2.imshow("Joined Product",
                np.concatenate(tuple(output_frame for output_frame in output_frames), axis = 1))
        except :
            pass
        cv2.waitKey(1)
                
finally:
    [cap.release() for cap in feeds]

    cv2.destroyAllWindows()

In [15]:
import os
import torch

class CustomDataset(Dataset):
    def __init__(self, x, y):
        self.detections = x 
        self.labels = y

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.detections[idx], self.labels[idx]
X, Y = generate_training_data(x_data, y_data)

In [None]:
'''
Multilayer Perceptron.
'''

class MLP(nn.Module):
 
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
          # nn.Flatten(),
          nn.Linear(32 * 32 * 3, 64),
          nn.ReLU(),
          nn.Linear(64, 32),
          nn.ReLU(),
          nn.Linear(32, 10)
        )
    def forward(self, x):
        '''Forward pass'''
        return self.layers(x)


# Set fixed random number seed
torch.manual_seed(42)

# Prepare CIFAR-10 dataset
dataset = CustomDataset(X,Y)# CIFAR10(os.getcwd(), download=True, transform=transforms.ToTensor())

trainloader = torch.utils.data.DataLoader(dataset, batch_size=10, shuffle=True, num_workers=1)

# Initialize the MLP
mlp = MLP()

# Define the loss function and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(mlp.parameters(), lr=1e-4)

# Run the training loop
for epoch in range(0, 5): # 5 epochs at maximum

    # Print epoch
    print(f'Starting epoch {epoch+1}')

    # Set current loss value
    current_loss = 0.0

    # Iterate over the DataLoader for training data
    for i, data in enumerate(trainloader, 0):

        # Get inputs
        inputs, targets = data

        # Zero the gradients
        optimizer.zero_grad()

        # Perform forward pass
        outputs = mlp(inputs)

        # Compute loss
        loss = loss_function(outputs, targets)

        # Perform backward pass
        loss.backward()

        # Perform optimization
        optimizer.step()

        # Print statistics
        current_loss += loss.item()
        if i % 500 == 499:
            print('Loss after mini-batch %5d: %.3f' %
                  (i + 1, current_loss / 500))
            current_loss = 0.0

# Process is complete.
print('Training process has finished.')

In [None]:
from ml_me.architectures import NeuralNetwork
from ml_me.layers import Dense, Layer

class SiameseNetwork(NeuralNetwork):
    def __init__(self, loss_type = "dist"):
        self._model = None
        self._error_model = None
        self.margin = 3
        self.loss_type = loss_type 
        
    def set_main_model(self, model):
        self._model = model
        # TODO: Validate Model
        pass
    def set_error_model(self, model):
        assert(self.loss_type == "model")
         self._error_model = model
    def add():
        pass
        
    def forward(self):
        pass
    
    def backward(self):
        pass
    
    def train(self, epochs = 1000, batch_size = 32, resolution = 10):
        batch_idx = np.random.choice(len(self.training_data), batch_size)
        data = np.array([self.training_data[y] for y in batch_idx])
        labels = np.array([self.training_labels[t] for t in batch_idx])
        if self.loss_type == "model":
            pass
        else:
            pass
        # Get inputs
        self._model.forward()
        self._model.backward()
        
class TripletLossLayer(Layer):
    def __init__(self, activation = "relu", margin = 1, **kwargs):
        Layer.__init__(self, "Triplet", **kwargs)
        self.margin = 1
        self.outputSize = 1
        self.__frame_count = 0 
        self.__embedding_memory = []
        self.__label_memory = []
    
    def init(self, inputSize):
        pass
    
    def calc_euclidian(self):
        pass
    
    def forward(self):
        pass
    
    def backward(self, labels):
        # Generate Triplets 
        self.error = 1
        self.delta = self.error * self.activation_prime(self.outputs)
        