<a href="https://colab.research.google.com/github/cathymonkey/AlienPass/blob/main/Preprocessing_Movement_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir("/content/drive/MyDrive/Multimodal_Alignment")
import pandas as pd
import numpy as np
from utils import *

Mounted at /content/drive


# Overview of the movement data

- Clean data (dropping FF)
- Move data to subfolders -- Aff, Arg, Coop
- Summary stats of data

  - Arg 116

     - ZT left: 27

     - ZT right: 27

     - ZR left: 31
      
     - ZR right: 31

  -  Aff 80

    - ZT left: 21

     - ZT right: 21

     - ZR left: 19

     - ZR right: 19

  - Coop 80

     - ZT left: 21

     - ZT right: 21

     - ZR left: 19

     - ZR right: 19

In [2]:
folder_path = "/content/drive/MyDrive/Multimodal_Alignment/movement_data"
subfolders = {
    'Arg': os.path.join(folder_path, 'Arg'),
    'Aff': os.path.join(folder_path, 'Aff'),
    'Coop': os.path.join(folder_path, 'Coop')
}

In [3]:
count_files(subfolders['Arg']) + count_files(subfolders['Aff']) + count_files(subfolders['Coop'])

276

In [17]:
for folder in subfolders.values():
  zt_l = 0
  zt_r = 0
  zr_l = 0
  zr_r = 0
  total = 0

  for file in os.listdir(folder):
    total += 1
    if "FF" in file:
      os.remove(os.path.join(folder, file))

    elif "ZT" in file:
      if "left" in file:
        zt_l += 1
      elif "right" in file:
        zt_r += 1

    elif "ZR" in file:
      if "left" in file:
        zr_l += 1
      elif "right" in file:
        zr_r += 1
  print("="*12)
  print(folder.split("/")[-1], total)
  print("ZT left:", zt_l)
  print("ZT right:", zt_r)
  print("ZR left:", zr_l)
  print("ZR right:", zr_r)


Arg 116
ZT left: 27
ZT right: 27
ZR left: 31
ZR right: 31
Aff 80
ZT left: 21
ZT right: 21
ZR left: 19
ZR right: 19
Coop 80
ZT left: 21
ZT right: 21
ZR left: 19
ZR right: 19


# Preprocessing data
- Compute the Euclidean distance *d*  between the nose and the neck
- To calculate the coordinate of the neck, we take the mean of the left and right shoulders
- Save the processed data as csv files as follows:

| Nose_x | Nose_y | Neck_x | Neck_y | Distance
|----------|----------|----------|----------||----------|
| 0.6079339981 | 0.338481307|  0.696541369| 0.4657012224|0.1245230632

- We have normalized the frame scaled from 0 to 1?




In [3]:
folder_path = "/content/drive/MyDrive/Multimodal_Alignment/movement_data"
subfolders = {
    'Arg': os.path.join(folder_path, 'Arg'),
    'Aff': os.path.join(folder_path, 'Aff'),
    'Coop': os.path.join(folder_path, 'Coop')
}

### Helper functions

In [5]:
def get_extracted_data(left_path, right_path):
  # load data from csv files
  left_data = pd.read_csv(left_path)
  right_data = pd.read_csv(right_path)

  # column names for the extracted data
  extracted_cols = ['Nose_x', 'Nose_y', 'Left_shoulder_x', 'Left_shoulder_y', 'Right_shoulder_x', 'Right_shoulder_y']

  # extract the time-series data
  left_data = left_data[extracted_cols].values
  right_data = right_data[extracted_cols].values

  return left_data, right_data

# calculate the Euclidean distance between the neck and the nose
def calculate_distance(extracted_data):
  neck_x = (extracted_data[:, 2] + extracted_data[:, 4]) / 2
  neck_y = (extracted_data[:, 3] + extracted_data[:, 5]) / 2

  nose_x = extracted_data[:, 0]
  nose_y = extracted_data[:, 1]

  distance = np.sqrt((nose_x - neck_x) ** 2 + (nose_y - neck_y) ** 2)

  return distance

# store data to a csv file
def save_data(data, distance, dest_path):
  results_df = pd.DataFrame({
      'Nose_x': data[:,0],
      'Nose_y': data[:,1],
      'Neck_x': data[:,2],
      'Neck_y': data[:,3],
      'Distance': distance,
  })

  # Save the results to a CSV file
  results_df.to_csv(dest_path, index=False)

# drop Nans and Infs
def clean_data(left, right):
    # Identify the rows where either time-series has NaNs or Infs
    invalid_indices = np.isnan(left).any(axis=1) | np.isnan(right).any(axis=1) | \
                      np.isinf(left).any(axis=1) | np.isinf(right).any(axis=1)

    # Remove these rows from both time-series
    left_timeseries = left[~invalid_indices]
    right_timeseries = right[~invalid_indices]

    return left_timeseries, right_timeseries

### Preprocessing all files by pairs (left and right)

In [15]:
# iterate through all the files
for folder_path in subfolders.values():
  # i.e. folder_path: /content/drive/MyDrive/Multimodal_Alignment/movement_data/Arg
  files = sorted(os.listdir(folder_path))

  for i in range(0, len(files)-1, 2):
    # i.e left: '1038_ZT_2_Arg_Video_left.csv'
    # i.e. right: '1038_ZT_2_Arg_Video_right.csv'
    left = os.path.join(folder_path, files[i])
    right = os.path.join(folder_path, files[i+1])

    # extract the data
    left_data, right_data = get_extracted_data(left, right)

    # clean the data, drop any NaNs and infs
    left_timeseries, right_timeseries = clean_data(left_data, right_data)

    # calculate distance for both left and right
    left_distance = calculate_distance(left_timeseries)
    right_distance = calculate_distance(right_timeseries)


    # create dest_path for left and right

    # destination folder path for processed data
    dest_path = "/content/drive/MyDrive/Multimodal_Alignment/movement_data_processed"

    # create the destination folder if it doesn't exist
    os.makedirs(dest_path, exist_ok=True)

    # create the dest path for left and right
    dest_path = os.path.join(dest_path, folder_path.split('/')[-1])
    dest_path_left = os.path.join(dest_path, left.split('/')[-1])
    dest_path_right = os.path.join(dest_path, right.split('/')[-1])

    # save as csv files if not existed before
    if not os.path.exists(dest_path_left):
      save_data(left_timeseries, left_distance, dest_path_left)

    if not os.path.exists(dest_path_right):
      save_data(right_timeseries, right_distance, dest_path_right)

print("Process done!")

Process done!


In [16]:
# Verify if all the files have been processed
processed_folder = "/content/drive/MyDrive/Multimodal_Alignment/movement_data_processed"
for subfolder in os.listdir(processed_folder):
  subfolder_path = os.path.join(processed_folder, subfolder)
  print(f"{subfolder}: {count_files(subfolder_path)}")

Arg: 116
Aff: 80
Coop: 80
