In [9]:
import scipy.io as sio
import numpy as np
import os
from fastdtw import fastdtw
from sklearn.neighbors import BallTree
from feature_vector_generator import FeatureVectorEmbedder
from scipy.spatial.distance import euclidean
from sklearn.metrics import DistanceMetric
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from tensorflow.keras.models import load_model, Model
from multiprocessing import Pool
import time
import sys
os.path.abspath('')
currentdir = globals()['_dh'][0]
sys.path.append(os.path.join(currentdir, "../utils/"))
from custom_classes import PoseSample, load_embedding_samples


def keogh_lower_bound(X, Y, r=1):
    """
    Compute the Keogh lower bound between two sequences.
    """
    lower_bound = 0
    for i in range(len(X)):
        lower = min(Y[max(0, i - r) : min(len(Y), i + r)])
        upper = max(Y[max(0, i - r) : min(len(Y), i + r)])
        if X[i] > upper:
            lower_bound += (X[i] - upper) ** 2
        elif X[i] < lower:
            lower_bound += (lower - X[i]) ** 2
    return np.sqrt(lower_bound)


In [10]:

class ActionClassification(object):
    def __init__(
        self,
        embedding_dir,
        leaf_size=40,
        file_extension="mat",
        n_embeddings=341,
        n_dimensions=3,
        sliding_window_size=30,
        n_neighbors=10,
    ):
        self.n_embeddings = n_embeddings
        self.n_dimensions = n_dimensions
        self.sliding_window_size = sliding_window_size
        self.n_neighbors = n_neighbors
        self._embedding_samples, self._class_names = load_embedding_samples(
            embedding_dir=embedding_dir, file_extension=file_extension
        )

        self.max_shape = max(
            sample.embedding.shape for sample in self._embedding_samples
        )
        print("Hello")
        self._generateBallTree()

    def dtw_distances(self, args):
        """
        Calculates DTW distances between two samples. The samples are of (t,1,128) shape. The samples are reshaped to (t,128) and then the fastdtw function is used to calculate the distance.
        """
        X, Y, class_name = args
        X = np.squeeze(X, axis=1)
        Y = np.squeeze(Y, axis=1)
        # X = np.swapaxes(X.reshape(self.n_embeddings * 3, X.shape[2]), 0, 1)
        # Y = np.swapaxes(Y.reshape(self.n_embeddings * 3, Y.shape[2]), 0, 1)
        distance, path = fastdtw(X, Y)
        return distance, class_name

    def keogh_lower_bound(X, Y, r=1):
        """
        Compute the Keogh lower bound between two sequences.
        """
        lower_bound = 0
        for i in range(len(X)):
            lower = min(Y[max(0, i - r) : min(len(Y), i + r)])
            upper = max(Y[max(0, i - r) : min(len(Y), i + r)])
            if X[i] > upper:
                lower_bound += (X[i] - upper) ** 2
            elif X[i] < lower:
                lower_bound += (lower - X[i]) ** 2
        return np.sqrt(lower_bound)

    def _generateBallTree(self, leaf_size=40):
        """
        This method generates a ball tree for the embedding samples. The ball tree is used to perform knn classification. It uses the custom dtw function as a distance metric.
        """
        keogh_distance_metric = DistanceMetric.get_metric(
            "pyfunc", func=keogh_lower_bound
        )

        # Find the maximum shape of the embeddings
        max_shape = max(sample.embedding.shape for sample in self._embedding_samples)

        # Pad all embeddings to the maximum shape and flatten them
        embedding_samples = [
            np.ravel(
                np.pad(
                    np.squeeze(sample.embedding, axis=1),
                    ((0, max_shape[0] - sample.embedding.shape[0]), (0, 0)),
                )
            )
            for sample in self._embedding_samples
        ]

        self.ball_tree = BallTree(
            embedding_samples,
            metric=keogh_distance_metric,
            leaf_size=leaf_size,
        )

    def classify(self, test_sample):
        # Flatten the test_sample.embedding into a 1D array and reshape it to a 2D array
        test_sample_embedding = np.ravel(test_sample.embedding).reshape(1, -1)

        # Use the BallTree to find the training samples with the smallest Keogh lower bounds
        indices = self.ball_tree.query(test_sample_embedding, k=self.n_neighbors)

        # Compute the full DTW distance between the test sample and the remaining training samples
        with Pool(processes=4) as pool:  # Use 4 processes
            distances = pool.map(
                self.dtw_distances,
                [
                    (test_sample_embedding, self._embedding_samples[i].embedding)
                    for i in indices
                ],
            )

        # Classify the test sample based on the training sample with the smallest DTW distance
        return self._class_names[np.argmin(distances)]



In [11]:

if __name__ == "__main__":
    # Get the path to the embeddings
    
    currentdir = globals()['_dh'][0]
    embedding_path = os.path.join(currentdir, "../encoded_embeddings/")
    print("Initializing Action Classifier")
    action_classifier = ActionClassification(embedding_path)
    print("Action Classifier Initialized")
    print(action_classifier._embedding_samples[0].embedding.shape)
    X_train, X_test, y_train, y_test = train_test_split(
        [sample for sample in action_classifier._embedding_samples],
        [sample.class_name for sample in action_classifier._embedding_samples],
        test_size=0.01,
        random_state=42,
    )
    



 


Initializing Action Classifier
Embedding samples loaded using Utilities
Hello
Action Classifier Initialized
(70, 1, 128)


In [12]:
    print("Training")
    print(len(X_test))
    y_pred = []
    start_time = time.process_time()
    count = 0
    for test_sample in X_test:
        y_pred.append(action_classifier.classify(test_sample))

    end_time = time.process_time()
    print(f"Time taken: {end_time - start_time}")
    print(accuracy_score(y_test, y_pred))
    print(confusion_matrix(y_test, y_pred))
    print(classification_report(y_test, y_pred))

Training
9


ValueError: query data dimension must match training data dimension