In [74]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
from scipy.interpolate import splprep, splev
import time
from IPython.display import clear_output
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

In [75]:
# function to create a dictionary of file list
file_names = {}
for stroke_num in range(10):
    # Generate file names for each stroke
    file_names[stroke_num] = [f"training_data_csv/{stroke_num}/stroke_{stroke_num}_{str(i).zfill(4)}.csv" for i in range(1, 101)]


In [77]:
# for vertically stacking the data
training_data = pd.DataFrame()
for digit in range(10):
    stroke_0_files = file_names[digit]
    data_0 = stroke_0_files[0]
    data_0 = pd.read_csv(data_0, header = None) 
    data_0.columns = ['x', 'y', 'z']
    data_0['obs'] = 1
    data_0['label'] = digit
    for i in range(1, len(stroke_0_files)):
        data_c = pd.read_csv(stroke_0_files[i], header = None)
        data_c.columns = ['x', 'y', 'z']
        data_c['obs'] = i + 1
        data_c['label'] = digit
        data_0 = pd.concat([data_0, data_c], axis= 0)
    training_data = pd.concat([training_data, data_0], axis=0)

In [78]:
training_data.head()

Unnamed: 0,x,y,z,obs,label
0,-8.9283,304.45,-18.469,1,0
1,-9.1475,304.36,-18.585,1,0
2,-9.8688,304.32,-18.896,1,0
3,-11.69,303.81,-19.305,1,0
4,-14.821,302.69,-19.284,1,0


In [15]:
row_counts = training_data.groupby('label').size()
print(row_counts/100)

label
0    53.16
1    39.01
2    55.17
3    61.38
4    66.50
5    62.45
6    52.33
7    45.82
8    62.11
9    54.57
dtype: float64


In [79]:
def interpolate_trajectory_by_distance(trajectory, num_points=50):
    # Calculate cumulative distance
    distances = np.sqrt(np.sum(np.diff(trajectory, axis=0) ** 2, axis=1))
    cumulative_distances = np.insert(np.cumsum(distances), 0, 0)
    # Resample based on cumulative distance
    total_distance = cumulative_distances[-1]
    new_distances = np.linspace(0, total_distance, num_points)
    interpolated = interp1d(cumulative_distances, trajectory, axis=0, kind='linear')(new_distances)
    interpolated = pd.DataFrame(interpolated)
    interpolated.columns = ['x', 'y', 'z']
    return interpolated

In [80]:
def spline_interpolation(trajectory, num_points=50):
    # Fit a spline to the trajectory
    tck, _ = splprep(trajectory.T, s=0)
    new_t = np.linspace(0, 1, num_points)
    interpolated = np.array(splev(new_t, tck)).T
    interpolated = pd.DataFrame(interpolated)
    interpolated.columns = ['x', 'y', 'z']
    return interpolated


In [81]:
def interpolate_trajectory(trajectory, num_points=50):
    t = np.linspace(0, 1, len(trajectory))  # Original time scale
    new_t = np.linspace(0, 1, num_points)   # New time scale
    interpolated = interp1d(t, trajectory, axis=0, kind='linear')(new_t)  # Interpolation
    interpolated = pd.DataFrame(interpolated)
    interpolated.columns = ['x', 'y', 'z']
    return interpolated

In [82]:
def normalisation (data):
    data = data.dropna()
    return 2 * (data - data.min()) / (data.max() - data.min()) - 1

In [83]:
def normalisation_1 (data):
    data = data.dropna()
    data1 = data - data.mean()
    data1 = data1.to_numpy()
    max_distance = np.max(np.sqrt(np.sum(data1**2, axis=-1)))
    data1 = data1/(max_distance + 1e-8)
    return pd.DataFrame(data1, columns = ['x', 'y', 'z'])

In [84]:
def smoothing (data, type='savgol_filter', window_length = 4):
    if type == 'savgol_filter':
        data['x'] = savgol_filter(data['x'], window_length, polyorder=2)
        data['y'] = savgol_filter(data['y'], window_length, polyorder=2)
        data['z'] = savgol_filter(data['z'], window_length, polyorder=2)
    elif type == 'rolling':
        data['x'] = data['x'].rolling(window=window_length, center=True).mean()
        data['y'] = data['y'].rolling(window=window_length, center=True).mean()
        data['z'] = data['z'].rolling(window=window_length, center=True).mean()
    return data

In [148]:
def transform (data):
    data = interpolate_trajectory_by_distance(data, 30)
    data = smoothing(data, 'savgol_filter', 5) 
    data = smoothing(data, 'rolling', 3)
    data = normalisation_1(data)
    data = smoothing(data, 'savgol_filter', 5) 
    data = interpolate_trajectory_by_distance(data, 300)
    return data 

In [86]:
def process_and_stack(data):
    """
    Processes the data for each (label, obs) group, applies the processing 
    function, and stacks the results into a new DataFrame.
    """
    # List to store processed results
    results = []

    # Group by label and obs
    grouped = data.groupby(['label', 'obs'])

    for (label, obs), group in grouped:
        # Extract x, y, z coordinates
        xyz = group[['x', 'y', 'z']]

        # Apply the processing function to xyz coordinates
        processed_xyz = transform(xyz)

        # Create a new DataFrame for processed points
        processed_df = pd.DataFrame(processed_xyz, columns=['x', 'y', 'z'])
        processed_df['obs'] = obs  # Add observation column
        processed_df['label'] = label  # Add label column

        # Append to results
        results.append(processed_df)
    final_df = pd.concat(results, ignore_index=True)
    return final_df

In [159]:
data =training_data[(training_data['label'] == 0) & (training_data['obs'] == 1)]
data = data[['x', 'y', 'z']]
data =interpolate_trajectory_by_distance(data, 30)
data =smoothing(data=data, type='savgol_filter', window_length=5)
data =smoothing(data=data, type='rolling', window_length=3)
data =normalisation_1(data=data)
data =smoothing(data=data, type='savgol_filter', window_length=5)
data =interpolate_trajectory_by_distance(data, 300)
data

Unnamed: 0,x,y,z
0,-0.169659,0.954791,0.012435
1,-0.182881,0.945407,0.012714
2,-0.196103,0.936023,0.012992
3,-0.209325,0.926638,0.013270
4,-0.222547,0.917254,0.013549
...,...,...,...
295,0.000131,0.963719,0.010162
296,-0.013160,0.972986,0.010829
297,-0.026451,0.982253,0.011496
298,-0.039742,0.991519,0.012163


In [118]:
process_and_stack(training_data).to_csv('training_dataset_1.csv', index= False)

In [87]:
processed_data = process_and_stack(training_data)

In [None]:
import json

def compare_json_files(file_a, file_b, output_file):
    # Read the content of both JSON files
    with open(file_a, 'r') as f_a, open(file_b, 'r') as f_b:
        data_a = json.load(f_a)
        data_b = json.load(f_b)
    
    # Initialize a dictionary to store the comparison results
    comparison_result = {}
    
    # Compare values for each key (class pair) in both dictionaries
    for key in data_a:
        if key in data_b:
            # Compare corresponding lists
            comparison_result[key] = [
                1 if a > b else 0 for a, b in zip(data_a[key], data_b[key])
            ]
    
    # Write the comparison results to a new JSON file
    with open(output_file, 'w') as f_out:
        json.dump(comparison_result, f_out, indent=4)

# Specify the input and output file paths
file_a = 'fisher_ratios_format_1.json'
file_b = 'fisher_ratios_format_2.json'
output_file = 'comparison_result.json'

# Call the function to compare the files and write the result
compare_json_files(file_a, file_b, output_file)


In [145]:
import numpy as np
from pprint import pprint

classes = np.unique(processed_data['label'].values)

fisher_ratios = {}

for i in classes:
    for j in classes:
        if i < j:
            # Assuming `data` is a DataFrame with columns: x, y, z, observation, class_label
            # Filter the data for the class you're interested in (e.g., class '0')
            class_i_data = processed_data[processed_data['label'] == i][['x', 'y', 'z']].values
            class_j_data = processed_data[processed_data['label'] == j][['x', 'y', 'z']].values

            # Reshape the data for each class
            # Each observation has 300 x, y, z points
            observations_class_i = class_i_data.reshape(100, 300, 3)  # 100 observations, 300 points, 3 coordinates
            observations_class_j = class_j_data.reshape(100, 300, 3)

            # Step 1: Calculate the mean of each class
            mean_class_i = np.mean(observations_class_i, axis=(0, 1))  # Mean over all points and observations of class '0'
            mean_class_j = np.mean(observations_class_j, axis=(0, 1))

            # Step 2: Calculate the within-class variance (average variance of points within each class)
            within_class_var_i = np.mean(np.var(observations_class_i, axis=1), axis=0)  # Variance within class '0'
            within_class_var_j = np.mean(np.var(observations_class_j, axis=1), axis=0)

            # Step 3: Calculate the between-class variance (distance between the class means)
            overall_mean = np.mean(np.vstack([observations_class_i.reshape(-1, 3), observations_class_j.reshape(-1, 3)]), axis=0)
            between_class_var = np.linalg.norm(mean_class_i - mean_class_j) ** 2

            # Step 4: Compute Fisher's Ratio
            fisher_ratio = between_class_var / (within_class_var_i + within_class_var_j)
            fisher_ratios[(i, j)] = fisher_ratio

import json

# Specify the file path
file_path = 'fisher_ratios_format_2.json'

fisher_ratios_serializable = {str(k): v.tolist() for k, v in fisher_ratios.items()}


# Open the file in write mode
with open(file_path, 'w') as f:
    json.dump(fisher_ratios_serializable, f, indent=4)


In [130]:
import numpy as np
from pprint import pprint

classes = np.unique(training_data['label'].values)

fisher_ratios_og = {}

for i in classes:
    for j in classes:
        if i < j:
            # Assuming `data` is a DataFrame with columns: x, y, z, observation, class_label
            # Filter the data for the class you're interested in (e.g., class '0')
            class_i_data = training_data[training_data['label'] == i][['x', 'y', 'z']].values
            class_j_data = training_data[training_data['label'] == j][['x', 'y', 'z']].values

            # Reshape the data for each class
            # Each observation has 300 x, y, z points
            observations_class_i = class_i_data.reshape(100, 300, 3)  # 100 observations, 300 points, 3 coordinates
            observations_class_j = class_j_data.reshape(100, 300, 3)

            # Step 1: Calculate the mean of each class
            mean_class_i = np.mean(observations_class_i, axis=(0, 1))  # Mean over all points and observations of class '0'
            mean_class_j = np.mean(observations_class_j, axis=(0, 1))

            # Step 2: Calculate the within-class variance (average variance of points within each class)
            within_class_var_i = np.mean(np.var(observations_class_i, axis=1), axis=0)  # Variance within class '0'
            within_class_var_j = np.mean(np.var(observations_class_j, axis=1), axis=0)

            # Step 3: Calculate the between-class variance (distance between the class means)
            overall_mean = np.mean(np.vstack([observations_class_i.reshape(-1, 3), observations_class_j.reshape(-1, 3)]), axis=0)
            between_class_var = np.linalg.norm(mean_class_i - mean_class_j) ** 2

            # Step 4: Compute Fisher's Ratio
            fisher_ratio = between_class_var / (within_class_var_i + within_class_var_j)
            fisher_ratios_og[(i, j)] = fisher_ratio
pprint(fisher_ratios_og, indent=4)


ValueError: cannot reshape array of size 15948 into shape (100,300,3)

In [88]:
def display_fun(data, fig, row, col):
    centroid = data.mean()
    colors = data.index
    x, y, z = data.iloc[:, 0], data.iloc[:, 1], data.iloc[:, 2]

    fig.add_trace(go.Scatter3d(
        x=x,
        y=y,
        z=z,
        mode='markers',
        marker=dict(
            size=3,
            color=colors,                # set color to an array/list of desired values
            colorscale='Plasma',   # choose a colorscale
            opacity=0.8
        ),
        showlegend = False
    ), row=row, col=col)

    fig.add_trace(go.Scatter3d(
        x=np.array(centroid[0]),
        y=np.array(centroid[1]),
        z=np.array(centroid[2]),
        mode='markers',
        marker=dict(
            size=5,
            color='red',                # set color to an array/list of desired values
            #colorscale='Plasma',   # choose a colorscale
            opacity=0.8
        ),
        showlegend = False
    ), row=row, col=col)
    #purple for starting
    #yellow for ending

    # Add the connecting line
    fig.add_trace(go.Scatter3d(
        x=x,
        y=y,
        z=z,
        mode='lines',               # Connect the points with lines
        line=dict(
            color='gray',           # Set line color
            width=1                # Thin line
        ),
        showlegend = False
    ), row=row, col=col)

    # tight layout


In [113]:
def plot_side_by_side(data):
    # Create a subplot with 1 row and 2 columns
    fig = make_subplots(rows=1, cols=2,  # 1 row, 2 columns
                        subplot_titles=('Raw Data', 'Transformed Data'),
                        specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]])

    # Display raw data in the first subplot
    display_fun(data, fig, row=1, col=1)

    # Apply transformation (you need to define `transform` function)
    transformed_data = transform(data)  # Assuming you have a `transform` function

    # Display transformed data in the second subplot
    display_fun(transformed_data, fig, row=1, col=2)

    # Update layout
    fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
    fig.show()

In [147]:
data = training_data[(training_data['label'] == 0) & (training_data['obs'] == 1)]
data = data[['x', 'y', 'z']]
plot_side_by_side(data)

In [90]:

for i in range(1, 101):
    data = training_data[(training_data['label'] == 0) & (training_data['obs'] == i)]
    data = data[['x', 'y', 'z']]
    plot_side_by_side(data)
    time.sleep(3)
    clear_output(wait=True) 
#data = transform(data)

# to check the distances between points
# print(np.sqrt(np.diff(x)**2 + np.diff(y)**2 + np.diff(z)**2))

KeyboardInterrupt: 