# Needed Modules

In [85]:
import numpy as np
import re
from typing import List, Dict, Tuple
import jsbeautifier
import os
import random

# Configuration

In [86]:
all_recorders = ['salman', 'tarab', 'negin', 'fada', 'rajab']

data_dir = '/Users/bobby/Desktop/Projects/Awesome-Fun-And-Glorious-Gloves-Of-Sign-Language/data/data with new glove'

features_index = {
  # "quat_w": 0,
  # "quat_x": 1,
  # "quat_y": 2,
  # "quat_z": 3,
  # "euler_x": 4,
  # "euler_y": 5,
  # "euler_z": 6,
  "acc_x": 7,
  "acc_y": 8,
  "acc_z": 9,
  "line_acc_x": 10,
  "line_acc_y": 11,
  "line_acc_z": 12,
  "gyro_x": 13,
  "gyro_y": 14,
  "gyro_z": 15,
  "gravity_x": 16,
  "gravity_y": 17,
  "gravity_z": 18,
  "flex_0": 19,
  "flex_1": 20,
  "flex_2": 21,
  "flex_3": 22,
  "flex_4": 23
}

n_features = len(features_index.keys())

index_feature = {value:key for key,value in features_index.items()}

features = np.array([features_index[key] for key in features_index])

labels_map = {
  "Abi": 0,
  "Sabz": 1,
  "Saal": 2,
  "Ruz": 3,
  "Faramush": 4,
  "Ast": 5,
  "Kheili": 6,
  "Tabestun": 7,
  "Bakht": 8,
  "Diruz": 9,
  "Omidvar": 10,
  "Maman": 11,
  "Baba": 12,
  "Khosh": 13,
  "Like": 14,
  "Dislike": 15
}

n_clusters = 16

index_label = {value:key for key,value in labels_map.items()}

# printing
options = jsbeautifier.default_options()
options.indent_size = 2

#pie chart colors
random.seed(42)
colors = {key: (random.random(), random.random(), random.random()) for key in labels_map}

# Loading The Dataset

First we implement a function to exract the label given a file's name:

In [87]:
def extract_file_label(filename: str) -> List[int]:
  filename = filename.split('/')[-1].split('.')[0]
  labels = re.findall('[A-Z][^A-Z]*', filename)
  return [labels_map[label] for label in labels]

extract_file_label('KheiliKheiliAbi.txt')

[6, 6, 0]

Then, we need  a function to read files and extract time series data:

In [88]:
def line2float(line: str, features: np.array = None):
  if features is None:
      return np.array([float(i) for i in line[:-2].split(" ")])
  return np.array([float(i) for i in line[:-2].split(" ")])[features]

def read_file(file:str, features: np.array = None):
    with open(file, "r") as f:
        lines = f.readlines()
    x = [[]]
    for i in range(len(lines)):
        if ";" in lines[i]:
            x.append([])
        else:
            x[-1].append(line2float(lines[i], features=features))
    if x[-1] == []:
        return x[:-1]
    return x

Now we can load the entire dataset:

In [89]:
def load_dataset(data_dir: str, recorders: List[str], features: np.array = None):
  dataset = {}
  for recorder in recorders:
    files = os.listdir(f'{data_dir}/{recorder}')
    if not recorder in dataset:
      dataset[recorder] = []
    for f in files:
      data = read_file(f'{data_dir}/{recorder}/{f}', features=features)
      label = extract_file_label(f)
      for sample in data:
        dataset[recorder].append((np.array(sample), label))
  return dataset

dataset = load_dataset(data_dir, all_recorders, features=features)
for recorder in all_recorders:
  print(recorder, len(dataset[recorder]))

salman 616
tarab 807
negin 355
fada 382
rajab 404


# Removing outliers

We observe that there are some outlier data that may cause problem in our normalization process:
- quat_y > 1.5
- euler_x > 500 or < -500
- acc_x > 100 or < -100
- acc_z > 100 or < -100
- gyro_z > 20
- gravity_y > 11

Lets see to what sample does these outliers belong:

In [90]:
# outlier_bt = {'quat_y': 1.5, 'euler_x': 500, 'acc_x': 100, 'acc_z': 100, 'gyro_z': 20, 'gravity_y': 11}
# outlier_bt = {'euler_x': 500, 'acc_x': 100, 'acc_z': 100, 'gyro_z': 20, 'gravity_y': 11}
# outlier_lt = {'euler_x': -500, 'acc_x': -100, 'acc_z': -100}

# outlier_bt = {'gyro_z': 20, 'gravity_y': 11}

outlier_bt = {'acc_x': 100, 'acc_z': 100, 'gyro_z': 20, 'gravity_y': 11}
outlier_lt = {'acc_x': -100, 'acc_z': -100}

index_relative_feature = {key : i for i, key in enumerate(features_index.keys())}

outlier_ids = dict()

for recorder in all_recorders:
  for id, sample_label_pair in enumerate(dataset[recorder]):
      sample, label = sample_label_pair
      for feature in outlier_bt:
         index = index_relative_feature[feature]
         for i in range(sample.shape[0]):
            if sample[i, index] > outlier_bt[feature]:
               print('Bigger:', recorder, id, feature, sample[i, index], label)
               if recorder not in outlier_ids:
                  outlier_ids[recorder] = []
               outlier_ids[recorder].append(id)
               break
      
      for feature in outlier_lt:
         index = index_relative_feature[feature]
         for i in range(sample.shape[0]):
            if sample[i, index] < outlier_lt[feature]:
               print('Smaller:', recorder, id, feature, sample[i, index], label)
               if recorder not in outlier_ids:
                  outlier_ids[recorder] = []
               outlier_ids[recorder].append(id)
               break

print(outlier_ids)

Smaller: salman 267 acc_z -320.45 [9, 1]
Bigger: salman 280 gyro_z 36.01 [8, 4]
Bigger: salman 366 gravity_y 38.9 [5, 1]
Bigger: salman 475 acc_x 163.71 [3, 0, 2]
Bigger: tarab 577 acc_z 322.91 [6, 8, 1]
Smaller: tarab 654 acc_x -322.89 [10, 4]
{'salman': [267, 280, 366, 475], 'tarab': [577, 654]}


Now lets remove those outliers:

In [91]:
def delete_items_from_list(the_list: list, indexes: List[int]):
  for index in sorted(indexes, reverse=True):
        del the_list[index]
      
for recorder, ids in outlier_ids.items():
      delete_items_from_list(dataset[recorder], ids)
          
for recorder in all_recorders:
  print(recorder, len(dataset[recorder]))

salman 612
tarab 805
negin 355
fada 382
rajab 404


Lets find the minimum and maximum range of each feature to use it our normalization process:

In [92]:
mins = np.ones((1, n_features), dtype=float) * (10 ** 6)
maxs = np.ones((1, n_features), dtype=float) * (-10 ** 5)

for _ , samples in dataset.items():
  for sample_label_pair in samples:
    sample, _ = sample_label_pair
    data_mins = np.min(sample, axis = 0, keepdims = True)
    data_maxs = np.max(sample, axis = 0, keepdims = True)
    mins = np.minimum(mins, data_mins)
    maxs = np.maximum(maxs, data_maxs)

print(mins)
print(maxs)

[[-39.2    -26.23   -38.68   -39.4    -17.21   -34.64   -13.3511 -28.8367
  -11.6756  -9.35    -9.8     -9.8      0.       5.     123.       5.
   97.    ]]
[[ 39.41    39.97    39.87    30.03    30.95    43.13    16.2378  29.8933
   19.7389   9.8      9.18     9.8    405.     613.     493.     568.
  380.    ]]


# Finding Nearest Samples

First, we implement a function to filter the dataset:

In [93]:
def filter_dataset(dataset: Dict[str, List[Tuple[np.array, np.array]]], recorders: List[str],
                    max_word_length: int, min_word_length: int  = 1):
  labels = []
  series = []
  series_recorders = []
  for recorder in recorders:
    for sample_label_pair in dataset[recorder]:
        sample, label = sample_label_pair
        if min_word_length <= len(label) <= max_word_length:
          sample_np = (np.array(sample) - mins) / (maxs - mins)
          series.append(sample_np.tolist())
          labels.append(label)
          series_recorders.append(recorder)

  return series, labels, series_recorders

Now we implement our distance function:

In [94]:
def my_dtw(series1, series2, r1=3, r2=3):

    def euclidean_dist(sample1, sample2):
        return np.sum(np.power(np.array(sample1) - np.array(sample2), 2))
    
    # distances = np.zeros((len(series1) + 1, len(series2) + 1), dtype=np.float64)
    distances = np.ones((len(series1) + 1, len(series2) + 1), dtype=np.float64) * np.inf
    directions = np.zeros((len(series1) + 1, len(series2) + 1), dtype=np.int8)

    for i in range(r1 + 1):
        distances[i, 0] = 0
        directions[i, 0] = 2
    
    for j in range(r2 + 1):
        distances[0, j] = 0
        directions[0, j] = 3

    for i in range(1, distances.shape[0]):
        for j in range(1, distances.shape[1]):
            min_value = distances[i - 1, j - 1]
            directions[i, j] = 1
            if distances[i - 1, j] < min_value:
                min_value = distances[i - 1, j]
                directions[i, j] = 2
            if distances[i, j - 1] < min_value:
                min_value = distances[i, j - 1]
                directions[i, j] = 3

            distances[i, j] = min_value + euclidean_dist(series1[i - 1], series2[j - 1])

    #p = (distances.shape[0] - 1, distances.shape[1] - 1)
    min_row_value = float('inf')
    min_col_value = float('inf')
    min_row_point = None
    min_col_point = None
    n, m = distances.shape[0] - 1, distances.shape[1] - 1
    for i in range(r1 + 1):
        if distances[n - i, m] < min_col_value:
            min_col_point = (n - i, m)
            min_col_value = distances[n - i, m]

    for j in range(r2 + 1):
        if distances[n, m - j] < min_row_value:
            min_row_point = (n, m - j)
            min_row_value = distances[n, m - j]
    
    p = min_col_point if min_col_value < min_row_value else min_row_point
    final_point = p
    points = []
    while p[0] > r1 or p[1] > r2:   
        points.append((p[0] - 1, p[1] - 1))
        if directions[p[0], p[1]] == 1:
            p = (p[0] - 1, p[1] - 1)
        elif directions[p[0], p[1]] == 2:
            p = (p[0] - 1, p[1])
        elif directions[p[0], p[1]] == 3:
            p = (p[0], p[1] - 1)
    points.append((p[0] - 1, p[1] - 1))
    
    points.reverse()
    return distances[final_point[0], final_point[1]], distances[1:, 1:], points

Finally, here are our functions to calculate the nearest(s) samples:

In [95]:
def find_nearests(series, labels, sample, top=1):
    '''Finds the top nearests samples to the given sample (including itself if exists in series)'''
    nearests = []
    for i, s in enumerate(series):
        dist = my_dtw(series[i], sample)[0]
        # dist = dtw_path(series[i], sample)[1]
        nearests.append((i, dist, labels[i]))

    nearests = sorted(nearests, reverse=False, key=lambda x: x[1])
    return nearests[:top]

In [96]:
def find_nearest(series, labels, sample, ignore_index=[]):
    '''Finds the nearest sample to the given sample with index.'''
    min_dist = float('inf')
    min_index = index
    min_label = labels[index]
    for i, s in enumerate(series):
        if i not in ignore_index:
            dist = my_dtw(series[i], sample)[0]
            # dist = dtw_path(series[i], sample)[1]
            if dist < min_dist:
                min_dist = dist
                min_index = i
                min_label = labels[i]
    return min_index, min_dist, min_label

### 1.Based on the data belonging to a specific recorder

Lets ge the data belonging to one user: 

In [97]:
target_recorders = ['salman']
series, labels, series_recorders = filter_dataset(dataset, target_recorders, max_word_length=1)

Now we can check the accuracy:

In [98]:
correct = 0
for i, s in enumerate(series):
    top1 = find_nearest(series, labels, series[i], ignore_index=[i])
    if top1[2] == labels[i]:
        correct += 1

print(correct / len(series))

0.9901960784313726


Finally, we can see the the data label-wise:

### 2.Based on the data belonging to all recorders

Now we choose a test recorder and try to classify his/her samples using the data belonging to other recorders:

In [99]:
test_recorder = 'salman'
other_recorders = [recorder for recorder in all_recorders if recorder != test_recorder]

test_series, test_labels, _ = filter_dataset(dataset, [test_recorder], max_word_length=1)
other_series, other_labels, _ = filter_dataset(dataset, other_recorders, max_word_length=1)

Now we can check the top-5 accuracy:

In [100]:
correct = 0
for i, s in enumerate(test_series):
    top5 = find_nearests(other_series, other_labels, s, top=5)
    for result in top5:
        if result[2] == test_labels[i]:
            correct += 1
            break

print(correct / len(test_series))

0.9607843137254902


Now we can check the top-1 accuracy:

In [101]:
correct = 0
for i, s in enumerate(test_series):
    top1 = find_nearest(other_series, other_labels, s)
    if top1[2] == test_labels[i]:
        correct += 1

print(correct / len(test_series))

0.803921568627451
