In [1]:
!pip install pandas
!pip install ultralytics
!pip install opencv-python

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
import numpy as np
import os
import re
import json
import pandas as pd
from glob import glob
from ultralytics import YOLO
import cv2

In [5]:
def GetEyeCams(input_dir, about):
    def GetEye(raw, side, crop, rot, cor):
        outname = os.path.join(input_dir, f'{side}.mp4')
        if os.path.exists(outname):
            print(f"{side}.mp4 already exists!")
            return True
        cmd_str = 'ffmpeg -i {} -vf "crop={},rotate={}*(PI/180),lenscorrection={}" -vsync 2 {}'.format(
            raw, crop, rot, cor, outname 
        )
        print(f'Attempting to generate "{outname}..."')
        success_code = os.system(cmd_str)
        if success_code == 0:
            print("{}.mp4 successfully extracted".format(side))
            return True
        print(f"[ERROR] Could not produce {side} eye footage successfully...")
        return False
    
    if about['eyes']['both'] is None:
        # Uh oh, we don't have the raw footage. Can't do anything else
        print('[ERROR] Raw footage of the passthough eye cams not provided or is missing. Please add them!')
        return about
    
    raw_path = os.path.join(input_dir, about['eyes']['both'])
    if about['eyes']['left'] is None:
        if GetEye(raw_path, side='left', crop='632:672:16:0', rot='21', cor='cx=0.57:cy=0.51:k1=-0.48:k2=0.2'):
            about['eyes']['left'] = 'left.mp4'
        else:
            about['eyes']['left'] = None
    if about['eyes']['right'] is None:
        if GetEye(raw_path, side='right', crop='632:672:648:0', rot='-21', cor='cx=0.43:cy=0.51:k1=-0.48:k2=0.2'):
            about['eyes']['right'] = 'right.mp4'
        else:
            about['eyes']['right'] = None
    
    # return
    return about

def GenerateAbout(input_dir, force_overwrite=True):
    """
    When given a participant's directory, we should expect to see:
    - bci/ <-- Stores all the EEG data collected from the Muse 2 Headband
    - vr.csv <-- the events file generated by the Unity instance
    - YYYY-MM-DD_HH-MM-SS_UNIX.mp4 <-- the passthrough footage collected by SCRCPY
    We must parse this data accordingly.
    First step is to always determine what data we have and don't have
    1. Check if we have access to the eye cams. If we don't have `left.mp4` and `right.mp4` generated, we must do so
    2. Check if we have the eeg from bci/ accessible
    3. Check if we have `vr.csv` available
    We should put everything into an "about" json file. We should also check if that's there or not
    """
    
    # Step 1: check if there's an about.json present
    about_json = {}
    about_json_filename = os.path.join(input_dir, "about.json")
    about_json_potentials = glob(about_json_filename)
    if len(about_json_potentials) > 0 and not force_overwrite:
        with open(about_json_filename, 'r') as openfile:
            # Reading from json file
            about_json = json.load(openfile)
    
    # Either we have the `about_json` pre-loaded or not, we should begin an assessment check
    # Step 2: Check all mp4s in the directory. Do we have the necessary mp4 files?
    mp4_general_filename = os.path.join(input_dir, "*.mp4")
    mp4s = glob(mp4_general_filename)
    if len(mp4s) > 0:
        mp4bs = [os.path.splitext(os.path.basename(b))[0] for b in mp4s]
        mp4sorted = sorted(mp4bs, key=len, reverse=True) # The longest filename should be some long string of mp4s
        if mp4sorted[0] == "left" or mp4sorted[0] == "right":
            about_json['eyes'] = {'date':None, 'both':None}
        else:
            timestamp = mp4sorted[0].split("_")
            yyyymmdd = timestamp[0].split("-")
            hhmmss = timestamp[1].split("-")
            unix_ts = int(timestamp[2])
            about_json['eyes'] = {
                'date': {
                    'year':int(yyyymmdd[0]),
                    'month':int(yyyymmdd[1]),
                    'day':int(yyyymmdd[2]),
                    'hour':int(hhmmss[0]),
                    'minute':int(hhmmss[1]),
                    'second':int(hhmmss[2]),
                    "unix_start":unix_ts
                },
                "both":mp4sorted[0]+".mp4"
            }
        about_json['eyes']['left'] = None
        about_json['eyes']['right'] = None
        about_json = GetEyeCams(input_dir, about_json)
    else:
        about_json['eyes'] = {
            'date':None,
            'both':None,
            "left":None,
            "right":None
        }
    
    # Step 3: Check if 'bci/eeg.csv' is in the data
    eeg_filepath = os.path.join(input_dir, "bci","eeg.csv")
    about_json['eeg'] = 'bci/eeg.csv' if os.path.exists(eeg_filepath) else None
    
    # Step 4: Check if `vr.csv` is in the data
    vr_filepath = os.path.join(input_dir, "vr.csv")
    if 'events' not in about_json:
        about_json['events'] = {}
    about_json["events"]['raw'] = 'vr.csv' if os.path.exists(vr_filepath) else None
    if about_json['events']['raw'] is not None:
        events_df = pd.read_csv(os.path.join(input_dir,about_json['events']['raw']))
        about_json['events']['unix_start'] = int(events_df.iloc[0]['unix_ms'])
        about_json['events']['unix_end'] = int(events_df.iloc[-1]['unix_ms'])
        display(events_df)
        event_trials = events_df[events_df.title.str.match('Trial \d+ Start')]
        event_trials_list = [[i]+j for i,j in zip(event_trials.index.tolist(), event_trials.values.tolist())]
        about_json['events']['trials'] = []
        for i in range(len(event_trials_list)):
            trial = event_trials_list[i]
            trial_name = trial[3].split()[1]
            trial_start = int(trial[1])
            trial_end = int(event_trials_list[i+1][1]) if i < len(event_trials_list)-1 else about_json['events']['unix_end']
            print(f'Parsing Trial  {trial_name}')
            if about_json['eyes']['date'] is not None:
                trial_start_seconds = float(trial_start - about_json['eyes']['date']['unix_start'])/1000.0
                (start_hours, start_seconds) = divmod(trial_start_seconds, 3600)
                (start_minutes, start_seconds) = divmod(trial_start_seconds, 60)
                about_json['events']['trials']
                trial_start_hhmmss = f"{start_hours:02.0f}:{start_minutes:02.0f}:{start_seconds:02.0f}"
                trial_end_seconds = float(trial_end - about_json['eyes']['date']['unix_start'])/1000.0
                (end_hours, end_seconds) = divmod(trial_end_seconds, 3600)
                (end_minutes, end_seconds) = divmod(trial_end_seconds, 60)
                trial_end_hhmmss = f"{end_hours:02.0f}:{end_minutes:02.0f}:{end_seconds:02.0f}"
                if about_json['eyes']['left'] is not None:
                    orig_left = os.path.join(input_dir,'left.mp4')
                    trial_dir = os.path.join(input_dir,trial_name)
                    if not os.path.exists(trial_dir):
                        os.makedirs(trial_dir)
                    trial_left_filename = os.path.join(trial_dir,'left.mp4')
                    trial_generate_command = f'ffmpeg -i {orig_left} -ss {trial_start_hhmmss} -to {trial_end_hhmmss} -vsync 2 {trial_left_filename}'
                    trial_generate_code = os.system(trial_generate_command)
                    if trial_generate_code != 0:
                        print(f'[ERROR] Failed to generate left-eye clip: {trial_left_filename} with command "{trial_generate_command}"')
                        trial_left_filename = None
                else:
                    trial_left_filename = None
                if about_json['eyes']['right'] is not None:
                    orig_right = os.path.join(input_dir,'right.mp4')
                    trial_dir = os.path.join(input_dir,trial_name)
                    if not os.path.exists(trial_dir):
                        os.makedirs(trial_dir) 
                    trial_right_filename = os.path.join(trial_dir,'right.mp4')
                    trial_generate_command = f'ffmpeg -i {orig_right} -ss {trial_start_hhmmss} -to {trial_end_hhmmss} -vsync 2 {trial_right_filename}'
                    trial_generate_code = os.system(trial_generate_command)
                    if trial_generate_code != 0:
                        print(f'[ERROR] Failed to generate right-eye clip: {trial_right_filename} with command "{trial_generate_command}"')
                        trial_right_filename = None
                else:
                    trial_right_filename = None
            else:
                trial_start_hhmmss = None
                trial_end_hhmmss = None
                trial_left_filename = None
                trial_right_filename = None
            about_json['events']['trials'].append({
                'trial':trial_name,
                'unix_start':trial_start,
                'hhmmss_start':trial_start_hhmmss,
                'unix_end':trial_end,
                'hhmmss_end':trial_end_hhmmss,
                'unix_duration':trial_end - trial_start,
                'left':trial_left_filename,
                'right':trial_right_filename
            })
        
    
    # Step 5: Generate the about.json, and return our resuts
    json_object = json.dumps(about_json, indent=4)
    with open(about_json_filename, "w") as outfile:
        outfile.write(json_object)
    return about_json

class PassParser:
    def __init__(self, input_dir):
        self.dir = input_dir
        
    def GetEyeCams(self, raw_vid):
        ### Save the raw footage
        self.raw_footage = os.path.join(self.dir, raw_vid)
        # Get initial timestamp from video name
        self.raw_footage_start = os.path.splitext(raw_vid)[0].split('_')[-1]
        print(self.raw_footage_start)
        
        ### LEFT EYE
        left_output_filename = os.path.join(self.dir, 'left.mp4')
        if os.path.exists(left_output_filename):
            print("Left Eye Footage already exists at {}".format(left_output_filename))
            self.left = left_output_filename
        else:
            left_cmd_str = 'ffmpeg -i {} -vf "crop=632:672:16:0,rotate=21*(PI/180),lenscorrection=cx=0.57:cy=0.51:k1=-0.48:k2=0.2" -vsync 2 {}'.format(
                self.raw_footage, left_output_filename
            )
            left_code = os.system(left_cmd_str)
            if left_code == 0:
                print("Left Eye footage successfully extracted at {}".format(left_output_filename))
                self.left = left_output_filename
            else:
                print("[ERROR] Could not produce left eye footage successfully...")

        ### RIGHT EYE
        right_output_filename = os.path.join(self.dir, 'right.mp4')
        if os.path.exists(right_output_filename):
            print('Right Eye Footage already exists at {}'.format(right_output_filename))
            self.right = right_output_filename
        else:
            right_cmd_str = 'ffmpeg -i {} -vf "crop=632:672:648:0,rotate=-21*(PI/180),lenscorrection=cx=0.43:cy=0.51:k1=-0.48:k2=0.2" -vsync 2 {}'.format(
                self.raw_footage, right_output_filename
            )
            right_code = os.system(right_cmd_str)
            if right_code == 0:
                print("Right Eye footage successfully extracted at {}".format(right_output_filename))
                self.right = right_output_filename
            else:
                print("[ERROR] Could not produce right eye footage successfully...")
    
    def GetEvents(self, raw_events):
        self.raw_events = os.path.join(self.dir, raw_events)
        self.events_df = pd.read_csv(self.raw_events)
        self.events_start = self.events_df.iloc[0]['unix_ms']
        self.events_end = self.events_df.iloc[-1]['unix_ms']
        print(f"Events: Start @ {self.events_start}, End @ {self.events_end}")
        display(self.events_df.head())
        display(self.events_df.tail())
    
    def GetEEG(self, raw_eeg):
        self.raw_eeg = os.path.join(self.dir, raw_eeg)
        self.eeg_df = pd.read_csv(self.raw_eeg)
        display(self.eeg_df.head())
        display(self.eeg_df.tail())
        
    def GetTrialTimes(self):
        if self.events_df is None:
            print("[ERROR] Events from the VR application were not provided to this parser.")
            return
        print(self.events_df['title'])
        print(self.events_df[self.events_df.title.str.match('Trial \d+ Start')])
        
        

In [7]:
GenerateAbout('../data/Debug4')

left.mp4 already exists!
right.mp4 already exists!


Unnamed: 0,unix_ms,event_type,title,description,x,y,z
0,1706113293458,Simulation,Simulation Start,,0.000000,0.000000,0.000000
1,1706113293492,Simulation,Trial 1 Start,,0.000000,0.000000,0.000000
2,1706113293492,Player,position,,0.000000,0.000000,0.000000
3,1706113293492,Player,orientation,,0.000000,0.000000,0.000000
4,1706113293492,Eye Tracking,Left,Direction,0.000000,0.000000,0.000000
...,...,...,...,...,...,...,...
146836,1706113704505,Player,position,,3.461206,1.627611,18.525040
146837,1706113704505,Player,orientation,,11.987300,197.997500,359.918500
146838,1706113704505,Eye Tracking,Left,Direction,0.307077,-0.304372,0.901699
146839,1706113704505,Eye Tracking,Right,Direction,0.256966,-0.304887,0.917067


Parsing Trial  1
Parsing Trial  2
Parsing Trial  3
Parsing Trial  4
Parsing Trial  5


{'eyes': {'date': {'year': 2024,
   'month': 1,
   'day': 24,
   'hour': 11,
   'minute': 18,
   'second': 3,
   'unix_start': 1706113083427},
  'both': '2024-01-24_11-18-03_1706113083427.mp4',
  'left': 'left.mp4',
  'right': 'right.mp4'},
 'eeg': 'bci/eeg.csv',
 'events': {'raw': 'vr.csv',
  'unix_start': 1706113293458,
  'unix_end': 1706113704505,
  'trials': [{'trial': '1',
    'unix_start': 1706113293492,
    'hhmmss_start': '00:03:30',
    'unix_end': 1706113323586,
    'hhmmss_end': '00:04:00',
    'unix_duration': 30094,
    'left': '../data/Debug4\\1\\left.mp4',
    'right': '../data/Debug4\\1\\right.mp4'},
   {'trial': '2',
    'unix_start': 1706113323586,
    'hhmmss_start': '00:04:00',
    'unix_end': 1706113415571,
    'hhmmss_end': '00:05:32',
    'unix_duration': 91985,
    'left': '../data/Debug4\\2\\left.mp4',
    'right': '../data/Debug4\\2\\right.mp4'},
   {'trial': '3',
    'unix_start': 1706113415571,
    'hhmmss_start': '00:05:32',
    'unix_end': 1706113505036,
 

In [None]:
testParser = PassParser('../data/Debug4')
testParser.GetEyeCams('2024-01-24_11-18-03_1706113083427.mp4')
testParser.GetEvents('11-21-33.csv')
testParser.GetEEG('2024-01-24T11-10-37-575/eeg.csv')
testParser.GetTrialTimes()

# Before Doing Anything...

Make sure to split the footage for left and right eye cams for any VR footage.

In [None]:
!ffmpeg