<a href="https://colab.research.google.com/github/Jayden-Nyamiaka/Machine-Learning-and-Data-Mining/blob/main/Motility_Features_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import os.path as op
from pathlib   import Path
from glob      import glob
from tqdm      import tqdm
from datetime  import datetime
import pandas as pd
import io

from sklearn.preprocessing import MinMaxScaler

import math
import csv
import json
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

In [None]:
###################################################
# !!! Remember to look at data_dictionary.txt !!! #
###################################################

# Set your target file here #######################
with open('/content/test.json', 'r') as f:
    track_data = json.load(f)
###################################################

# How many tracks are there?
print(f"n_tracks = {len(track_data.keys())}")

# What do the track Unique IDs (UIDs) look like?
track_uids = list(track_data.keys())
print(f"5 Example Track IDs = {track_uids[:5]}")

# What fields are avaiable for each track?
example_uid = track_uids[0]
print(f"Per-track keys = {track_data[example_uid].keys()}")

# What do the (t, x, y) track coordinates look like?
example_coords = track_data[track_uids[0]]['txy']
example_coords = np.array(example_coords)
np.set_printoptions(threshold=10)
print(f"Coordinate array = \n{example_coords}")

# What does the label look like?
example_label = track_data[track_uids[0]]['label']
print(f"Label = {example_label}")

n_tracks = 477
5 Example Track IDs = ['lab_19_0', 'lab_19_1', 'lab_19_2', 'lab_19_3', 'lab_19_4']
Per-track keys = dict_keys(['txy', 'label'])
Coordinate array = 
[[  1. 802. 926.]
 [  2. 802. 926.]
 [  3. 802. 926.]
 ...
 [284. 805. 917.]
 [285. 805. 917.]
 [286. 805. 917.]]
Label = None


In [None]:
# Import data
with open('/content/train.json', 'r') as f:
    train_data = json.load(f)

with open('/content/test.json', 'r') as f:
    test_data = json.load(f)

In [None]:
# Make dataframe from json files
def make_df(data):
  cols = ['UID', 't', 'x', 'y', 'motile']
  dfs = []

  for i, key in enumerate(data):
    df = pd.DataFrame(columns = cols)
    df['t'] = [lst[0] for lst in data[key]['txy']]
    df['x'] = [lst[1] for lst in data[key]['txy']]
    df['y'] = [lst[2] for lst in data[key]['txy']]
    df.loc[:,'UID'] = key
    df['motile'] = data[key]['label']
    dfs.append(df)
  return pd.concat(dfs)

In [None]:
test_df = make_df(test_data)
train_df = make_df(train_data)
print(test_df)
#print(train_df)

          UID    t        x        y motile
0    lab_19_0    1  802.000  926.000   None
1    lab_19_0    2  802.000  926.000   None
2    lab_19_0    3  802.000  926.000   None
3    lab_19_0    4  802.000  926.000   None
4    lab_19_0    5  802.000  926.000   None
..        ...  ...      ...      ...    ...
67  lab_42_13  296   24.973  274.930   None
68  lab_42_13  297   19.973  277.764   None
69  lab_42_13  298   14.973  280.597   None
70  lab_42_13  299   11.723  281.722   None
71  lab_42_13  300    8.473  282.847   None

[78135 rows x 5 columns]


In [None]:
def template_feature(coords):
    """Name of the Feature
    
    A short description of the feature goes here. Equations can be useful.
    
    Parameters
    ----------
    coords: array
        A numpy array containing the (t, x, y) coordinates of the track.
    
    Returns
    -------
    float
        The feature value for the entire array.
    
    """
    
    return 0

def mean_step_speed(coords):
    """Mean step speed of the entire track.
    The average per-step speed. Basically the average of distances between points adjacent in time.
    Returns
    -------
    float
        The average step speed.
    """

    speeds = []

    for i in range(1, coords.shape[0]):
        # Previous coordinate location
        prev = coords[i-1, 1:]
        # Current coordinate location
        curr = coords[i, 1:]
        
        # Speed in pixels per frame
        curr_speed = np.linalg.norm(curr - prev)
        
        # Accumulate per-step speeds into a list
        speeds.append(curr_speed)
    
    # Return the average of the speeds
    return np.mean(speeds)

def stddev_step_speed(coords):
    """Standard deviation of the step speed of the entire track.
    The standard deviation of the per-step speed.
    Returns
    -------
    float
        The stddev of the step speed.
    """

    speeds = []

    for i in range(1, coords.shape[0]):
        # Previous coordinate location
        prev = coords[i-1, 1:]
        # Current coordinate location
        curr = coords[i, 1:]
        
        # Speed in pixels per frame
        curr_speed = np.linalg.norm(curr - prev)
        
        # Accumulate per-step speeds into a list
        speeds.append(curr_speed)
    
    # Return the standard deviation of the speeds
    return np.std(speeds)


def track_length(coords):
    """ Length of the entire track.
    Returns
    -------
    float
        The length of the entire track.
    """

    lengths = []

    for i in range(1, coords.shape[0]):
        # Previous coordinate location
        prev = coords[i-1,1:]
        # Current coordinate location
        curr = coords[i,1:]
        
        # Speed in pixels per frame
        step_length = np.linalg.norm(curr - prev)
        
        # Accumulate per-step speeds into a list
        lengths.append(step_length)
    
    # Return the sum of the lengths
    return np.sum(lengths)

def e2e_distance(coords):
    """End-to-end distance of the track.
    The distance from the start and the end of the given track.
    Returns
    -------
    float
        The end-to-end distance of the entire track.
    """
    
    # Start and end of the track
    start = coords[0, 1:]
    end = coords[-1, 1:]
    
    # Return the distance
    return np.linalg.norm(end-start)

def duration(coords):
    """Duration of the track.
    The time duration of the track.
    Returns
    -------
    int
        The end-to-end duration of the entire track.
    """
    
    # Start and end times of the track
    start_t = coords[0, 0]
    end_t = coords[-1, 0]
    
    # Return the difference
    return end_t - start_t

def angle_between(coords):
    """Angle between two points.
    Returns
    -------
    int
        Returns the angle between the two points.
    """

    return math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0])

def angle_stddev(coords):
    """Standard deviation of the angles between points of the entire track.
    The standard deviation of the per-step angle.
    Returns
    -------
    int
        Returns the stddev of the angle.
    """
    angles = []

    for i in range(1, coords.shape[0]):
        # Previous coordinate location
        prev = coords[i-1,1:]
        # Current coordinate location
        curr = coords[i,1:]
        
        # Speed in pixels per frame
        angle = angle_between([prev, curr])
        
        # Accumulate per-step speeds into a list
        angles.append(angle)
    return np.std(angles)
  

def track_length_sliding_window(coords, window_size):
    """ Length of the entire track with the window size of window_size.
    Calculates the track_length by adding up the distance between i-th and
    (i + window_size)-th coordinates.
    Parameters
    ----------
    coords: array
        A numpy array containing the (t, x, y) coordinates of the track.
    window_size: int
        An integer defining the window_size of the track_length calculation

    Returns
    -------
    float
        The length of the entire track with the given window size.
    """

    lengths = []

    for i in range(0, coords.shape[0], window_size):
      if i + window_size < coords.shape[0]:
        # Previous coordinate location
        prev = coords[i,1:]
        # Current coordinate location
        curr = coords[i+window_size,1:]
        
        # Speed in pixels per frame
        step_length = np.linalg.norm(curr - prev)
        
        # Accumulate per-step speeds into a list
        lengths.append(step_length)
    
    # Return the sum of the lengths
    return np.sum(lengths)

def track_length_sliding_window_5(coords):
  """ Length of the entire track with the window size of 5.
    Returns
    -------
    float
        The length of the entire track with the window size 5.
    """
  return track_length_sliding_window(coords, 5)

def progressivity(coords):
  """ Progressivity of the particle movement
    Returns
    -------
    float
        The progressivity of particle represented by the length of the track
        divided by the end-to-end distance.
    """
  return track_length(coords) / e2e_distance(coords)

def mean_signed_turning_angle(coords):
  """ Mean of the turning angles between pairs of points (signed)
    Returns
    -------
    float
        Mean of the turning angles (calculated between neighboring triplets of 
        points) using the following formula:
        arctan(sum(sin(a2 - a1)) / sum(cos(a2 - a1))) where a1 is the angle
        between point1 and point2, and a2 is the angle between point2 and
        point3.
    """

  sines = []
  cosines = []

  for i in range(1, coords.shape[0]-1):
    prev = coords[i-1,1:]
    curr = coords[i,1:]
    next = coords[i+1,1:]
    angle = angle_between([curr, next]) - angle_between([prev, curr])
    sines.append(np.sin(angle))
    cosines.append(np.cos(angle))
  
  return np.arctan(np.sum(sines)/np.sum(cosines))

In [None]:
######################################################
# !!! Set your list of implemented features here !!! #
######################################################

FEATURE_LIST = [mean_step_speed, stddev_step_speed, track_length, e2e_distance, duration, angle_stddev, track_length_sliding_window_5]
# , progressivity, mean_signed_turning_angle]
TYPE = "test"
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
#features_csv = f"/kaggle/working/{TYPE}_features_{TIMESTAMP}.csv"
features_csv = f"/content/{TYPE}_features_{TIMESTAMP}.csv"

######################################################
# You shouldn't have to modify the rest of this part #
######################################################

# Generate the feature csv
header = ['uid', 'label']
for featfunc in FEATURE_LIST:
    header.append(featfunc.__name__)

features = []

track_uids = track_data.keys()
for uid in track_uids:
    curr_row = {
        'uid': uid,
        'label': track_data[uid]['label']
    }
    
    for featfunc in FEATURE_LIST:
        curr_row[featfunc.__name__] = featfunc(np.array(track_data[uid]['txy']))
    

    features.append(curr_row)

with open(features_csv, 'w') as f:
    writer = csv.DictWriter(f, fieldnames = header)
    writer.writeheader()
    for r in features:
        writer.writerow(r)

print("Written to:", features_csv)

  return np.arctan(np.sum(sines)/np.sum(cosines))


Written to: /content/test_features_20230215_004144.csv
