In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('./data'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import ast

def str_to_list(s):
    return [float(x.strip()) for x in s.strip('[]').split(',')]

# Custom converters for array columns
array_converters = {'x': str_to_list, 'y': str_to_list, 'z': str_to_list}

# Read training data
training_data = pd.read_csv("./data/gesture_data.csv", 
                            converters=array_converters)

# Verify the changes
print("Training data 'x' column first row:")
print(training_data.loc[0, 'x'])
print("\nType of 'x' column first row:")
print(type(training_data.loc[0, 'x']))

In [None]:
# checking how many data points per axes
mydict = {'x': [9999,0],
          'y': [9999,0],
          'z': [9999,0]}
for i in range(len(training_data)):
    for axes, hidden in mydict.items():
        if len(training_data.loc[i, axes]) < hidden[0]:
            mydict[axes][0] = len(training_data.loc[i, axes])
        if len(training_data.loc[i, axes]) > hidden[1]:
            mydict[axes][1] = len(training_data.loc[i, axes])

print(mydict)

In [None]:
print(training_data.head())

In [None]:
# Function to pad or truncate the array
def pad_or_truncate(array, target_length=40):
    if len(array) > target_length:
        return array[:target_length]
    elif len(array) < target_length:
        return array + [0] * (target_length - len(array))
    else:
        return array

for col in ["x","y","z"]:
    training_data[col] = training_data[col].apply(pad_or_truncate)

from sklearn.model_selection import train_test_split

# Splitting the data into 50% Train, 25% Test, and 25% hiddenidation
train_data, temp_data = train_test_split(training_data, test_size=0.5, random_state=42)
test_data, hidden_data = train_test_split(temp_data, test_size=0.5, random_state=42)

In [None]:
import random
from scipy.interpolate import CubicSpline
# Function for data augmentation using CubicSpline
def time_warping(time_series, num_operations, warp_factor):
    """
    Applying time warping to a time series with balanced insertions and deletions.

    :param time_series: Time series, numpy array.
    :param num_operations: Number of operations (half will be insertions and half deletions).
    :param warp_factor: Warp factor that determines the impact of operations.
    :return: Distorted time series with the same length.
    """
    warped_series = time_series.copy()
    
    # Ensure the number of insertions equals the number of deletions
    num_insertions = num_operations // 2
    num_deletions = num_insertions
    
    for _ in range(num_insertions):
        index = random.randint(1, len(warped_series) - 2)
        insertion_hiddenue = (warped_series[index - 1] + warped_series[index]) * 0.5
        warp_amount = insertion_hiddenue * warp_factor * random.uniform(-1, 1)
        warped_series = np.insert(warped_series, index, insertion_hiddenue + warp_amount)
    
    for _ in range(num_deletions):
        if len(warped_series) > 2:  # Ensure there's enough data to delete from
            index = random.randint(1, len(warped_series) - 2)
            warped_series = np.delete(warped_series, index)
    
    return warped_series

def magnitude_warping(time_series, num_knots, warp_std_dev):
    """
    Applies magnitude warping to a time series using cubic splines.

    :param time_series: np.array, time series to distort
    :param num_knots: int, number of control points for splines
    :param warp_std_dev: float, standard deviation for distorting the hiddenues of control points
    :return: np.array, distorted time series
    """
    # Generating random spline knots within a time series
    knot_positions = np.linspace(0, len(time_series) - 1, num=num_knots)
    knot_hiddenues = 1 + np.random.normal(0, warp_std_dev, num_knots)

    # Creating a Cubic Spline Function Through Knots
    spline = CubicSpline(knot_positions, knot_hiddenues)

    # Generating time indexes for a time series
    time_indexes = np.arange(len(time_series))

    # Applying distortion to a time series
    warped_time_series = time_series * spline(time_indexes)

    return warped_time_series

def augment_data(x, y, z, num_augmented=1, time_warp_factor=0.05, mag_warp_std_dev=0.05):
    """
    Augments vibrational data by applying time warping and magnitude warping.

    :param x: np.array, x-axis vibrational data
    :param y: np.array, y-axis vibrational data
    :param z: np.array, z-axis vibrational data
    :param num_augmented: int, number of augmented samples to generate
    :param num_operations: int, number of operations for time warping
    :param time_warp_factor: float, factor determining the magnitude of time warping
    :param mag_warp_knots: int, number of control points for magnitude warping splines
    :param mag_warp_std_dev: float, standard deviation for magnitude warping
    :return: list of dictionaries containing augmented x, y, and z data
    """
    original_length = len(x)
    t = np.arange(original_length)
    
    
    # Define a midpoint range factor
    midpoint_factor = 0.5
    deviation_factor = 0.2

    # Calculate the midpoint and range for operations and knots
    midpoint = int(original_length * midpoint_factor)
    min_hidden = int(midpoint * (1 - deviation_factor))
    max_hidden = int(midpoint * (1 + deviation_factor))

    # Generate random hiddenues within this controlled range
    num_operations = random.randint(min_hidden, max_hidden)
    mag_warp_knots = random.randint(min_hidden, max_hidden)
    
    augmented_data = []
    for _ in range(num_augmented):
        # Apply time warping to each axis
        warped_x = time_warping(x, num_operations=num_operations, warp_factor=time_warp_factor)
        warped_y = time_warping(y, num_operations=num_operations, warp_factor=time_warp_factor)
        warped_z = time_warping(z, num_operations=num_operations, warp_factor=time_warp_factor)
        
        # Apply magnitude warping to each axis
        warped_x = magnitude_warping(warped_x, num_knots=mag_warp_knots, warp_std_dev=mag_warp_std_dev)
        warped_y = magnitude_warping(warped_y, num_knots=mag_warp_knots, warp_std_dev=mag_warp_std_dev)
        warped_z = magnitude_warping(warped_z, num_knots=mag_warp_knots, warp_std_dev=mag_warp_std_dev)
        
        augmented_data.append({'x': warped_x.tolist(), 'y': warped_y.tolist(), 'z': warped_z.tolist()})
    
    return augmented_data

# Augment training data
augmented_data = []
for _, row in train_data.iterrows():
    augmented = augment_data(row['x'], row['y'], row['z'], num_augmented=4)
    for aug in augmented:
        new_row = row.copy()
        new_row['x'] = aug['x']
        new_row['y'] = aug['y']
        new_row['z'] = aug['z']
        augmented_data.append(new_row)

# Combine original and augmented data
augmented_train_data = pd.concat([train_data, pd.DataFrame(augmented_data)], ignore_index=True)

In [None]:
# Normalization
# Find min and max hiddenues from the augmented training data
min_hiddenues = {}
max_hiddenues = {}

for column in ['x', 'y', 'z']:
    min_hiddenues[column] = np.min([np.min(row) for row in augmented_train_data[column] if isinstance(row, list)])
    max_hiddenues[column] = np.max([np.max(row) for row in augmented_train_data[column] if isinstance(row, list)])

# Normalization function
def normalize_array(arr, min_hidden, max_hidden):
    return [2 * ((x - min_hidden) / (max_hidden - min_hidden)) - 1 for x in arr]

for col in ['x', 'y', 'z']:
    augmented_train_data[col] = augmented_train_data[col].apply(normalize_array, args=(min_hiddenues[col], max_hiddenues[col]))
    test_data[col] = test_data[col].apply(normalize_array, args=(min_hiddenues[col], max_hiddenues[col]))
    hidden_data[col] = hidden_data[col].apply(normalize_array, args=(min_hiddenues[col], max_hiddenues[col]))

In [None]:
# Check for min and max lengths in the data
min_max_lengths = {'x': [9999, 0], 'y': [9999, 0], 'z': [9999, 0]}
for i in range(len(augmented_train_data)):
    for axes, hidden in min_max_lengths.items():
        if len(augmented_train_data.loc[i, axes]) < hidden[0]:
            min_max_lengths[axes][0] = len(augmented_train_data.loc[i, axes])
        if len(augmented_train_data.loc[i, axes]) > hidden[1]:
            min_max_lengths[axes][1] = len(augmented_train_data.loc[i, axes])

print(min_max_lengths)

# Find final min and max hiddenues
min_hiddenues = {}
max_hiddenues = {}

for column in ['x', 'y', 'z']:
    min_hiddenues[column] = np.min([np.min(row) for row in augmented_train_data[column] if isinstance(row, list)])
    max_hiddenues[column] = np.max([np.max(row) for row in augmented_train_data[column] if isinstance(row, list)])

for column in ['x', 'y', 'z']:
    print(f"{column} - Min: {min_hiddenues[column]}, Max: {max_hiddenues[column]}")


In [None]:
# print head for augmented data
print(augmented_train_data.head())
print(test_data.head())
print(hidden_data.head())

In [None]:
import numpy as np

# Convert NumPy arrays to lists of floats for all the relevant columns
augmented_train_data['x'] = augmented_train_data['x'].apply(lambda x: list(map(float, x)))
augmented_train_data['y'] = augmented_train_data['y'].apply(lambda y: list(map(float, y)))
augmented_train_data['z'] = augmented_train_data['z'].apply(lambda z: list(map(float, z)))

test_data['x'] = test_data['x'].apply(lambda x: list(map(float, x)))
test_data['y'] = test_data['y'].apply(lambda y: list(map(float, y)))
test_data['z'] = test_data['z'].apply(lambda z: list(map(float, z)))

hidden_data['x'] = hidden_data['x'].apply(lambda x: list(map(float, x)))
hidden_data['y'] = hidden_data['y'].apply(lambda y: list(map(float, y)))
hidden_data['z'] = hidden_data['z'].apply(lambda z: list(map(float, z)))

augmented_train_data.to_csv("./processed/processed_train_gesture_data.csv", index=False)
test_data.to_csv("./processed/processed_test_gesture_data.csv", index=False)
hidden_data.to_csv("./processed/processed_hidden_gesture_data.csv", index=False)
