In [223]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [224]:
# Import libraries
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [225]:
# Load data into pandas
expert = pd.read_csv("/content/drive/MyDrive/datasets/manual_trajectories/expert.csv")
pgy4 = pd.read_csv("/content/drive/MyDrive/datasets/manual_trajectories/pgy4.csv")
pgy2 = pd.read_csv("/content/drive/MyDrive/datasets/manual_trajectories/pgy2.csv")
trajectories = [expert, pgy4, pgy2]

# Exploratory Visualization

## Full Trajectories

In [None]:
# Plot PGY2 trajectory
for i in range(max(pgy2['pull']) + 1):
  plt.scatter(pgy2[pgy2['pull'] == i]['x'], pgy2[pgy2['pull'] == i]['y'])

In [None]:
# Plot PGY4 trajectory
for i in range(max(pgy4['pull']) + 1):
  plt.scatter(pgy4[pgy4['pull'] == i]['x'], pgy4[pgy4['pull'] == i]['y'])

In [None]:
# Plot expert trajectory
for i in range(max(expert['pull']) + 1):
  plt.scatter(expert[expert['pull'] == i]['x'], expert[expert['pull'] == i]['y'])

## Individual Pulls

In [None]:
# Plot all 4 pulls in expert trajectory
for i in range(max(expert['pull']) + 1):
  plt.figure()
  plt.scatter(expert[expert['pull'] == i]['x'], expert[expert['pull'] == i]['y'])
plt.show()

In [None]:
# Plot all PGY4 trajectories
for i in range(max(pgy4['pull']) + 1):
  plt.figure()
  plt.scatter(pgy4[pgy4['pull'] == i]['x'], pgy4[pgy4['pull'] == i]['y'])
plt.show()

In [None]:
# Plot all PGY2 trajectories
for i in range(max(pgy2['pull']) + 1):
  plt.figure()
  plt.scatter(pgy2[pgy2['pull'] == i]['x'], pgy2[pgy2['pull'] == i]['y'])
plt.show()

# Feature Extraction Functions

In [232]:
def pull_to_length(pull):
    """
    Args:
      pull: The dataframe for the pull to conver to velocities
  
    Return:
      velocities: velocities calculated as distance traveled per frame
    """
    return len(pull.x)

In [233]:
def pull_to_velocities_and_accelerations(pull):
  """
  Args:
    pull: The dataframe for the pull to conver to velocities
  
  Return:
    mean velocity: velocity calculated as distance traveled per frame
    mean acceleration: acceleration calculated as change in velocity between frames
  """
  x, y = pull.x, pull.y
  velocities = []
  accelerations = []

  for i in range(len(x)-1):
    x1, x2 = x[i:i+2]
    y1, y2 = y[i:i+2]
    velocities.append(np.sqrt(np.square(x2 - x1) + np.square(y2 - y1)))

  for i in range(len(velocities) - 1):
    accelerations.append(np.abs(velocities[i+1] - velocities[i]))

  return (np.mean(velocities), np.mean(accelerations))

In [234]:
def pull_to_angles(pull):
  """
  Args:
    pull: The dataframe for the pull to convert to angles

  Return:
    angles: The angles for each triple of datapoints
  """
  x, y = pull.x, pull.y
  angles = []
  for i in range(len(x)-2):
    x1, x2, x3 = x[i:i+3]
    y1, y2, y3 = y[i:i+3]
    v1 = np.array([x1-x2, y1-y2])
    v2 = np.array([x3-x2, y3-y2])
    angle_nocos = np.dot(v1, v2) / (np.linalg.norm(v1, 2) * np.linalg.norm(v2, 2))
    angle_floor = np.where(angle_nocos < -1, -1.0, angle_nocos)
    angle_ceil = np.where(angle_floor > 1, 1.0, angle_floor) 
    angle = np.arccos(angle_ceil) * 180 / np.pi
    angles.append(angle)
  return angles

In [252]:
def angles_to_bins(angles):
  """
  Args:
    angles: the list of angles to bin

  Returns:
    histogram: histogram values
    bins: histogram bins
  """
  bins = [i for i in range(0, 181, 180//num_bins)] 
  return np.histogram(angles, bins)


In [250]:
def featurize_pull(pull):
  """
  Performs all featurizations for an individual pull

  Args:
    pull: The pull dataframe to convert to a feature row

  Returns:
    row: A feature row for the pull
  """
  angles = pull_to_angles(pull)
  hist, bins = angles_to_bins(angles)

  length = pull_to_length(pull)
  mean_velocity, mean_accel = pull_to_velocities_and_accelerations(pull)

  features = np.append(hist, np.array([length, mean_velocity, mean_accel]))

  return features

In [247]:
def featurize_trajectory(trajectory, label):
  """
  Splits a trajectory by pulls and featurizes each pull. Each pull is assigned
  the input label.

  Args:
    trajectory: The trajectory dataframe to split into featurized pulls
    label: The label to assign each pull for the trajectory
  
  Return:
    X: The feature rows
    y: The labels for each row
  """
  X = [featurize_pull(df) for val, df in trajectory.groupby('pull')]
  y = np.repeat(label, len(X))
  return np.array(X), y

In [248]:
def featurize_trajectories(trajectories):
  """
  Converts a list of trajectories into a feature matrix and label vector

  Args:
    trajectory: The trajctory dataframe to split into featurized pulls
    label: The label to assign each pull for the trajectory
  
  Return:
    X: The feature matrix
    y: The label vector
  """
  featurized_trajectory_list = [featurize_trajectory(t, i) for i, t in enumerate(trajectories)]
  X = np.row_stack([x for x, y in featurized_trajectory_list])
  y = np.concatenate([y for x, y in featurized_trajectory_list])
  return X, y

# Model Fitting/Prediction

In [None]:
X, y = featurize_trajectories(trajectories)
y

In [None]:
from sklearn.model_selection import StratifiedShuffleSplit

# Keep deterministic
np.random.seed(42)
sss = StratifiedShuffleSplit(1, test_size=.2)
train_ind, test_ind =  next(sss.split(X, y))
X_train, X_test = X[train_ind], X[test_ind]
y_train, y_test = y[train_ind], y[test_ind]
# Print to check class balance
y_train, y_test

In [None]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(random_state=0, multi_class="multinomial", max_iter=1000).fit(X_train, y_train)
clf.score(X_test, y_test)