In [467]:
# open annotation data for start and end unix times of task videos
# read motion data from video_ID
# parse motion data txt file such that each measurement is in separate column and each sensor is in separate row
# (one sensor at a time)
# create timestamp for each frame in video 
# merge frame and motion timestamps, sort time and make index
# then interpolate sensor measurements
# merge sensors for each video


from pathlib import Path
import pandas as pd
import numpy as np
import argparse
import os

from IPython.display import display


pd.set_option("display.precision", 13)

In [468]:
def parse(filepath, sensor_id, max_lines=None, header_startswith='Format for each'):
    '''Parse a kinematics file'''

    with open(filepath,'r') as f:

        # read header info
        header = f.readline()
        if header_startswith:
            # check for header where the columns names are for the sensors
            if not header.startswith(header_startswith): 
                return None
            
        # loop over the lines of sensor data
        for line_number, line in enumerate(f.readlines()):
            if max_lines is None or (max_lines > 0 and line_number < max_lines):
                sensor = line.split('Sensor')
                # pick the correct sensor
                s = sensor[sensor_id]
                # split the data and exclude first item with the sensor id
                t = s.split()[1:]
                # this generator yields every item one by one
                yield t

In [469]:
# location of data 
root = Path('D:\\')
dir = root / 'Kinematic_data' 

csv = 'C:/Users/petra/REU/unix_videos.csv'
dfv = pd.read_csv(csv)

# find all kinematics files in the folder
file_list = list(dir.glob('*.txt'))
file_list

[WindowsPath('D:/Kinematic_data/891044.txt'),
 WindowsPath('D:/Kinematic_data/891043.txt'),
 WindowsPath('D:/Kinematic_data/891039.txt'),
 WindowsPath('D:/Kinematic_data/891038.txt')]

In [470]:

# loop over videos
for index, row in dfv.iterrows():
    file = int(row["video_id"])
    fname = f'{file}.txt'
    print (fname)

    filename = dir / fname
    print(filename)
    filepath = Path(filename)
    
    if not filepath.exists():
        
        continue

    for filename in os.listdir(dir):
        for i in range(1,8):

            colnames = ['status', 'x', 'y', 'z', 'azimuth', 'elevation', 'roll', 'button', 'quality', 'time']

            
            sensor_id = i

            max_lines = 100000  # for testing

            df = pd.DataFrame(columns=colnames)

            # load dataframe row by row
            for row in parse(filepath, sensor_id, max_lines):
                df.loc[len(df)] = row

            # convert columns one by one
            coltypes = dict(
                x=np.float32, y=np.float32, z=np.float32, 
                azimuth=np.float32, elevation=np.float32, roll=np.float32, 
                button=np.int8, quality=np.int8
                )

            for c,t in coltypes.items():
                df[c] = df[c].astype(t)

            df['t'] = df.time.astype(np.float64)
            df['Sensor'] = sensor_id * np.ones_like(df.index)

            #df['video_time'] = df.t - df.t[0]

            #print (df.time[0], df.t[0])
            print(df.dtypes)
            df
            #display(df)

891038.txt
D:\Kinematic_data\891038.txt


In [None]:
def interpolation(args = None):
    #parser = argparse.ArgumentParser(
        #description='Interolation of motion data on video frames')

    #parser.add_argument('csv_unix', help='File of Start and End UNIX timestamps for processed videos')       
    #parser.add_argument('-f', '--fps', default=30, help="Frame rate")

    #args = parser.parse_args()
    #csv = args.csv_unix

    csv = 'C:/Users/petra/REU/unix_videos.csv'
    dfv = pd.read_csv(csv)

    # convenience class like a dict
    if args is None:
        args = Bunch()
        args.fps = 30
        args.frames = 50

    # loop over videos - iterate through the unix start and end times for tasks
    for index, row in dfv.iterrows():
        # useful time interval
        start_timestamp = row['unix_start_time']
        end_timestamp = row['unix_end_time']
        
        # total number of frames
      
        frames = int(np.floor((end_timestamp - start_timestamp) * args.fps))
        #frames = 3 
        display(frames)
        # new dataframe for the time of each frame
        f = np.arange(frames) # array of frame numbers 0,1,2...,frames-1
        t = start_timestamp + f / args.fps
        times = pd.DataFrame({'t': t, 'frame': f})

        if False:
            display(times)
            display(df[['t', 'x']])

        # test time intervals
        assert times.t.values[0] > df.t.values[0], 'starting violation'
        assert times.t.values[-1] < df.t.values[-1], 'ending violation'

        #join the motion data and the frame times data frames 
        #merged_df = pd.merge(times, df[['t', 'x']], left_on='time', right_on='t', how='outer')

        merged_df = times.merge(df, how='outer', on='t')
        display(merged_df[['t','x','y']])

        # sort the unix times column in numerical order 
        merged_df = merged_df.sort_values('t')
        merged_df.set_index('t', inplace=True)

        # sensor columns to interpolate are all but a few
        sensor_cols = list(df.columns) # everything kinematic file
        for c in ['t', 'status', 'time', 'Sensor']: # excluding these columns
            sensor_cols.remove(c)
        print (sensor_cols)

        # interpolate selected columns in place 
        for c in sensor_cols:
            merged_df[c].interpolate(inplace=True, method='linear')
        
        display(merged_df)

        # exclude rows without frame numbers, i.e., NaN 
        merged_df = merged_df[merged_df.frame.apply(lambda f: f >= 0)]

        display(merged_df)

        # Save the DataFrame to a CSV file
        video = int(row['video_id'])
        output = Path(f'''{video}_Sensor{sensor_id}.csv''')
        # delete output csv if already exists
        if output.exists():
            output.unlink() 
        merged_df.to_csv(output, index=False)

        
        

interpolation()


3

AssertionError: ending violation