In [None]:
#imports

import numpy as np
import pandas as pd
import os
import pycocotools
import torchvision
import torch
from PIL import Image
import matplotlib.pyplot as plt
import pickle
import math

from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms

In [None]:
# Strategy: train NN to predict whether a prediction keypoint set
# represents the same sign as a GT keypoint set

# Input format: Lists of keypoints in image, scaled/padded to 500x500

# Note: model architecture adapted from PyTorch tutorial docs: "Build the Neural Network"

In [None]:
#data upload: SL_combined.csv, predictions.txt in inputs/; Sign_Images.zip in root

In [None]:
if not os.path.isdir('Sign_Images'):
  !unzip -o Sign_Images.zip

Archive:  Sign_Images.zip
   creating: Sign_Images/
  inflating: Sign_Images/26.0.png    
  inflating: Sign_Images/47.1.png    
  inflating: Sign_Images/5.0.png     
  inflating: Sign_Images/47.0.png    
  inflating: Sign_Images/45.0.png    
  inflating: Sign_Images/47.2.png    
  inflating: Sign_Images/7.0.png     
  inflating: Sign_Images/19.0.png    
  inflating: Sign_Images/47.3.png    
  inflating: Sign_Images/24.0.png    
  inflating: Sign_Images/58.0.png    
  inflating: Sign_Images/3.0.png     
  inflating: Sign_Images/47.7.png    
  inflating: Sign_Images/41.1.png    
  inflating: Sign_Images/43.3.png    
  inflating: Sign_Images/20.0.png    
  inflating: Sign_Images/20.1.png    
  inflating: Sign_Images/43.2.png    
  inflating: Sign_Images/41.0.png    
  inflating: Sign_Images/47.6.png    
  inflating: Sign_Images/39.0.png    
  inflating: Sign_Images/47.4.png    
  inflating: Sign_Images/43.0.png    
  inflating: Sign_Images/43.1.png    
  inflating: Sign_Images/22.0.png   

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

Using cpu device


In [None]:
class Matcher(nn.Module):
    """
    NN to determine whether the two images match
    """

    def __init__(self):
        super(Matcher, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(100*6 + 50*6, 512), # X1, Y1, X2, Y2, Score, Label
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        x = self.flatten(x)
        out = self.linear_relu_stack(x)
        return out

In [None]:
random_data = torch.rand(1, 150, 6, device=device)
model = Matcher()
model.to(device)
random_result = nn.Softmax(dim = 1)(model(random_data.to(device)))
print(random_result)

tensor([[0.4950, 0.5050]], grad_fn=<SoftmaxBackward>)


In [None]:
def strokenum(s):
  '''
  Converts string labels to integer:
  1 = Wedge, 2 = Winkelhaken, 3 = Line

  input: s (string, sign label)
  '''
  if s == "Wedge":
    return 1
  elif s == "Winkel":
    return 2
  elif s == "Line":
    return 3
  else: raise Exception("Error: Invalid Stroke")

In [None]:
gt_data = pd.read_csv("inputs/SL_combined.csv")

gt_dims = gt_data[["hfile","wfile"]].to_numpy()
gt_filenames = gt_data['filename'].to_numpy()
gt_labels = np.array([strokenum(s) for s in gt_data['stroke'].to_numpy()])

gt_datalen = len(gt_filenames)
gt_scores = np.ones([gt_datalen])

_sc = 500/np.max(gt_dims, axis=1)
gt_scale = np.array([_sc, _sc, _sc, _sc]).T
#print(gt_scale)

gt_keypoints = np.round(np.multiply(gt_scale, gt_data[['x1','y1','x2','y2']].to_numpy()))
#print(np.unique(gt_filenames))

In [None]:
#gt_keypoints[gt_filenames == '19.0.png']

In [None]:
class SGDataset(object):
  def __init__(self, filenames, keypoints, labels, scores):
    self.filenames = filenames
    self.files_uq = np.unique(filenames)
    self.keypoints = keypoints
    self.labels = labels
    self.scores = scores

  def __len__(self):
    return len(self.files_uq)

  def __getitem__(self, idx):
    
    filename = self.files_uq[idx]
    filter = self.filenames == filename
    

    item_keypoints = (self.keypoints[filter]).T
    item_labels = self.labels[filter]
    item_scores = self.scores[filter]

    datalen = len(item_labels)
    item_labels = np.reshape(item_labels, [1,datalen])
    item_scores = np.reshape(item_scores, [1,datalen])

    item = {}
    item['filename'] = filename
    item['x1'] = item_keypoints[0]
    item['y1'] = item_keypoints[1]
    item['x2'] = item_keypoints[2]
    item['y2'] = item_keypoints[3]
    item['labels'] = item_labels
    item['scores'] = item_scores

    #print(np.shape(item_keypoints), np.shape(item_scores), np.shape(item_labels))
    consolidated = np.concatenate([item_keypoints, item_scores, item_labels], axis = 0)
    #print(np.shape(consolidated))

    return item, consolidated.T

In [None]:
gt_dataset = SGDataset(filenames = gt_filenames,
                       keypoints = gt_keypoints,
                       labels = gt_labels,
                       scores = gt_scores)

print(gt_dataset.__len__())

print(np.shape(gt_dataset.__getitem__(2)[1]))

98
(4, 6)


In [None]:
with open("inputs/stroke_predictions_20210505_1819.txt", "rb") as fp:   # Unpickling
   training_stroke_predictions = pickle.load(fp)

training_stroke_predictions[0].keys()

dict_keys(['name', 'dim', 'scores', 'keypoints', 'labels', 'gtkeypoints', 'gtlabels'])

In [None]:
print(np.shape(training_stroke_predictions[0]['keypoints']))
print(training_stroke_predictions[0]['keypoints'][0])

(11, 17, 3)
[[ 82.2054   257.03955    1.      ]
 [411.76453  254.54909    1.      ]
 [181.44763  258.28482    1.      ]
 [134.63525  259.53006    1.      ]
 [407.3954   254.54909    1.      ]
 [ 82.82956  257.03955    1.      ]
 [173.95767  255.79434    1.      ]
 [407.3954   250.81335    1.      ]
 [161.47437  253.30383    1.      ]
 [160.85019  259.53006    1.      ]
 [148.99106  253.30383    1.      ]
 [137.75609  262.02054    1.      ]
 [408.64374  253.30383    1.      ]
 [109.668655 254.54909    1.      ]
 [174.58183  254.54909    1.      ]
 [109.668655 262.02054    1.      ]
 [127.769455 258.28482    1.      ]]


In [None]:
pred_filenames = []
pred_keypoints = []
pred_labels = []
pred_scores = []

for pred in training_stroke_predictions:
  pred_num_strokes = len(pred['labels'])
  pred_filenames.append(np.repeat(pred['name'], pred_num_strokes))
  pred_keypoints.append(pred['keypoints'][:,:2,:2])
  pred_labels.append(pred['labels'])
  pred_scores.append(pred['scores'])

# print((pred_keypoints[0]))

pred_filenames = np.concatenate(pred_filenames)
pred_keypoints = np.concatenate(pred_keypoints)
pred_labels = np.concatenate(pred_labels)
pred_scores = np.concatenate(pred_scores)

# print(np.shape(pred_keypoints))
pred_keypoints = np.reshape(pred_keypoints, [len(pred_keypoints),4])
# print(np.shape(pred_keypoints))

assert (len(pred_filenames) == len(pred_keypoints))
assert (len(pred_filenames) == len(pred_labels))
assert (len(pred_filenames) == len(pred_scores))

print(np.shape(pred_scores))

print(f'Filenames ({len(pred_filenames)}): {pred_filenames}')
print(f'UQFilenames ({len(np.unique(pred_filenames))}): {np.unique(pred_filenames)}')



(7723,)
Filenames (7723): ['1.0' '1.0' '1.0' ... '9.0' '9.0' '9.0']
UQFilenames (98): ['1.0' '10.0' '11.0' '12.0' '13.0' '14.0' '15.0' '15.1' '16.0' '16.1'
 '17.0' '18.0' '19.0' '2.0' '20.0' '20.1' '21.0' '22.0' '23.0' '24.0'
 '25.0' '25.1' '25.2' '25.3' '26.0' '27.0' '28.0' '28.1' '28.2' '28.3'
 '28.4' '29.0' '3.0' '30.0' '30.1' '31.0' '32.0' '32.1' '33.0' '34.0'
 '34.1' '35.0' '35.1' '36.0' '37.0' '38.0' '38.1' '39.0' '4.0' '40.0'
 '40.1' '40.2' '40.3' '41.0' '41.1' '42.0' '42.1' '43.0' '43.1' '43.2'
 '43.3' '44.0' '45.0' '46.0' '46.1' '46.2' '46.3' '47.0' '47.1' '47.2'
 '47.3' '47.4' '47.5' '47.6' '47.7' '48.0' '48.1' '48.2' '48.3' '48.4'
 '49.0' '49.1' '5.0' '50.0' '51.0' '52.0' '53.0' '54.0' '54.1' '55.0'
 '56.0' '57.0' '58.0' '6.0' '7.0' '8.0' '8.1' '9.0']


In [None]:
# with open("inputs/eval_predictions", "rb") as fp:   # Unpickling
#    eval_predictions = pickle.load(fp)

# eval_predictions[0].keys()

dict_keys(['name', 'dim', 'scores', 'keypoints', 'labels'])

In [None]:
# eval_filenames = []
# eval_keypoints = []
# eval_labels = []
# eval_scores = []

# for pred in eval_predictions:
#   eval_num_strokes = len(pred['labels'])
#   eval_filenames.append(np.repeat(pred['name'], eval_num_strokes))
#   eval_keypoints.append(pred['keypoints'][:,:2,:2])
#   eval_labels.append(pred['labels'])
#   eval_scores.append(pred['scores'])

# # print((pred_keypoints[0]))

# eval_filenames = np.concatenate(eval_filenames)
# eval_keypoints = np.concatenate(eval_keypoints)
# eval_labels = np.concatenate(eval_labels)
# eval_scores = np.concatenate(eval_scores)

# # print(np.shape(pred_keypoints))
# eval_keypoints = np.reshape(eval_keypoints, [len(eval_keypoints),4])
# # print(np.shape(pred_keypoints))


# assert (len(eval_filenames) == len(eval_keypoints))
# assert (len(eval_filenames) == len(eval_labels))
# assert (len(eval_filenames) == len(eval_scores))

# print(f'Filenames: ({len(eval_filenames)})')
# print(f'UQFilenames ({len(np.unique(eval_filenames))}): {np.unique(eval_filenames)}')



Filenames: (448)
UQFilenames (5): ['Eval1.png' 'Eval2.png' 'Eval3.png' 'Eval4.png' 'Eval5.png']


In [None]:
pred_dataset = SGDataset(filenames = pred_filenames,
                         keypoints = pred_keypoints,
                         labels = pred_labels,
                         scores = pred_scores)

print(pred_dataset.__len__())
print(np.shape(pred_dataset.__getitem__(6)[1]))

98
(51, 6)


In [None]:
# eval_dataset = SGDataset(filenames = eval_filenames,
#                          keypoints = eval_keypoints,
#                          labels = eval_labels,
#                          scores = eval_scores)

# print(eval_dataset.__len__())
# print(np.shape(eval_dataset.__getitem__(0)[1]))

5
(75, 6)


In [None]:
#tool for generating training set
def indexbool(a):
  out = np.empty(len(a), dtype=bool)
  for i in range(len(a)):
    if i == 0:
      out[i] = True
    else:
      out[i] = math.floor(float(a[i])) != math.floor(float(a[i-1]))
  return out

uqfilenames = np.unique(pred_filenames)
unique_sign_indices = indexbool(a = uqfilenames)

# print(len(unique_sign_indices))
# print(len(uqfilenames))
# print(len(uqfilenames[unique_sign_indices]))
# print(unique_sign_indices)

def uqdelete(j, a):
  b = np.copy(a)
  assert j >= 0
  if b[j]:
    b[j] = False
  else:
    b = uqdelete(j-1, a)
  assert(len(np.shape(b)) == 1)
  return b

In [None]:
def inputgen(n = 1000, p = 0.5, preds = pred_dataset, gts = gt_dataset):
  idxs = np.arange(preds.__len__())
  
  inputs = []
  matches = []

  for i in range(n):
    match = (np.random.uniform() < p)
    j = np.random.choice(idxs)
    uqi = np.copy(unique_sign_indices)
    
    uqi = uqdelete(j, uqi)
    #print(np.shape(idxs[uqi]))
    k = np.random.choice(idxs[uqi])

    #assert j != k and math.floor(float(uqfilenames[j])) != math.floor(float(uqfilenames[k])), f"J=K: {j}, {k}"

    gt_part = gts.__getitem__(j)[1][:50]
    
    gt_padlen = 50 - len(gt_part)
    if gt_padlen > 0:
      gt_padding = np.zeros([gt_padlen, np.shape(gt_part)[1]])
      gt_part = np.concatenate((gt_part, gt_padding))
    #assert len(gt_part) == 50

    if match:
      matches.append(1)
      pred_part = preds.__getitem__(j)[1][:100]
    else:
      matches.append(0)
      pred_part = preds.__getitem__(k)[1][:100]

    pred_padlen = 100 - len(pred_part)
    if pred_padlen > 0:
      pred_padding = np.zeros([pred_padlen, np.shape(pred_part)[1]])
      pred_part = np.concatenate((pred_part, pred_padding))
    #assert len(pred_part) == 100

    image = np.reshape(np.concatenate((pred_part, gt_part)), -1)
    inputs.append(image)

  return inputs, matches

X, Y = inputgen(p = 0.5, n = 10000)
assert len(X) == len(Y)
print(f'Inputs: {np.shape(X)}')
print(f'Average Match: {np.average(Y)}')

Inputs: (10000, 900)
Average Match: 0.5083


In [None]:
#print(X[0])

In [None]:
class SGSamples(Dataset):
  def __init__(self, X, Y):
    self.X = X
    self.Y = Y

  def __len__(self):
    return len(Y)

  def __getitem__(self, idx):
    xi = X[idx]
    yi = Y[idx]

    xi = torch.as_tensor(xi, dtype = torch.float )
    yi = torch.as_tensor(yi, dtype = torch.long )

    sample = {'image': xi, 'label': yi}

    return xi, yi

In [None]:
batch_size = 100
train_frac = 0.9

ttpp = int(train_frac * len(X)) #train-test partition point

X_train, X_test = X[:ttpp], X[ttpp:]
Y_train, Y_test = Y[:ttpp], Y[ttpp:]

train_samples = SGSamples(X=X_train, Y=Y_train)
test_samples = SGSamples(X=X_test, Y=Y_test)

train_dataloader = DataLoader(train_samples, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_samples, batch_size=batch_size, shuffle=False)

In [None]:
train_features, train_labels = next(iter(train_dataloader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")
print(train_labels.dtype)

Feature batch shape: torch.Size([100, 900])
Labels batch shape: torch.Size([100])
torch.int64


In [None]:
#Training Hyperparams

learning_rate = 1e-3
epochs = 20

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
rp = model(train_features.to(device))
rt = train_labels.to(device)

loss_fn(rp, rt)

tensor(1.0180, grad_fn=<NllLossBackward>)

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X.to(device))
        loss = loss_fn(pred, y.to(device))

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X.to(device))
            test_loss += loss_fn(pred, y.to(device)).item()
            correct += (pred.argmax(1) == y.to(device)).type(torch.float).sum().item()

    test_loss /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")



In [None]:
print(f"Init: \n-------------------------------")

test_loop(test_dataloader, model.to(device), loss_fn)
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model.to(device), loss_fn, optimizer)
    test_loop(test_dataloader, model.to(device), loss_fn)
print("Done!")

Init: 
-------------------------------
Test Error: 
 Accuracy: 52.0%, Avg loss: 0.009266 

Epoch 1
-------------------------------
loss: 0.881502  [    0/10000]
loss: 0.604817  [ 2000/10000]
loss: 0.504401  [ 4000/10000]
loss: 0.430798  [ 6000/10000]
loss: 0.378260  [ 8000/10000]
Test Error: 
 Accuracy: 87.1%, Avg loss: 0.003891 

Epoch 2
-------------------------------
loss: 0.393822  [    0/10000]
loss: 0.349086  [ 2000/10000]
loss: 0.381006  [ 4000/10000]
loss: 0.332652  [ 6000/10000]
loss: 0.324324  [ 8000/10000]
Test Error: 
 Accuracy: 86.0%, Avg loss: 0.003202 

Epoch 3
-------------------------------
loss: 0.332442  [    0/10000]
loss: 0.283394  [ 2000/10000]
loss: 0.240634  [ 4000/10000]
loss: 0.231948  [ 6000/10000]
loss: 0.189712  [ 8000/10000]
Test Error: 
 Accuracy: 90.9%, Avg loss: 0.002463 

Epoch 4
-------------------------------
loss: 0.246304  [    0/10000]
loss: 0.465294  [ 2000/10000]
loss: 0.179143  [ 4000/10000]
loss: 0.212057  [ 6000/10000]
loss: 0.157496  [ 8000/

In [None]:
def nonrandom_inputgen(j, k, preds = pred_dataset, gts = gt_dataset):

  gt_part = gts.__getitem__(k)[1][:50]
  
  gt_padlen = 50 - len(gt_part)
  if gt_padlen > 0:
    gt_padding = np.zeros([gt_padlen, np.shape(gt_part)[1]])
    gt_part = np.concatenate((gt_part, gt_padding))
  #assert len(gt_part) == 50

  pred_part = preds.__getitem__(j)[1][:100]

  pred_padlen = 100 - len(pred_part)
  if pred_padlen > 0:
    pred_padding = np.zeros([pred_padlen, np.shape(pred_part)[1]])
    pred_part = np.concatenate((pred_part, pred_padding))
  #assert len(pred_part) == 100

  image = np.reshape(np.concatenate((pred_part, gt_part)), -1)

  match = int(j==k)

  return image, match



In [None]:
model.eval()
j = 13
k = 0
i, m = nonrandom_inputgen(j,k)
i = torch.as_tensor([i], dtype=torch.float32)

result = nn.Softmax(dim=1)(model(i.to(device))).detach().cpu()
print(f'J={j}, K={k}: Guess = {torch.argmax(result)}, Correct = {m} ({result.tolist()[0],})')

J=13, K=0: Guess = 0, Correct = 0 (([0.9989519119262695, 0.0010481072822585702],))


In [None]:
def notsame(j,k):
  val = (j != k and math.floor(float(uqfilenames[j])) != math.floor(float(uqfilenames[k])))
  return val

In [None]:
#print(np.stack((uqfilenames, range(len(uqfilenames))),axis=1))

In [None]:
#Useful indices: 2 = an/DINGIR; 8 = ÌR/ARAD; 9 = ÌR, v2; 4,5 = similar

In [None]:
# def sgplot(j,k):
  
#   fig,ax = plt.subplots(1,2, figsize = [20,10])
#   # ax[0].axis('off')
#   # ax[1].axis('off')
#   plt.show()

# sgplot(4,5)