In [4]:
# A simpler approach would be not to do this preprocessing and do replay including releases
# Avoiding hoykey, double clicks and drags detections 
class Preprocessing():
    def __init__(self, length_th, minpixels_th, dt_th, maxpixels_th): 
        # The thresholds for drag and doubleclicks detection determine the correct replay of task, how important are to calibrate them? 
        # Many of these functions make copies of dataframe, so it is not efficient. 
        # Drags
        self.length_th = length_th 
        self.minpixels_th = minpixels_th
        # Double clicks
        self.dt_th = dt_th 
        self.maxpixels_th = maxpixels_th

    def run(self, sample): 
        sample = self.replace_hotkeys(sample)                                                           # Detect hotkeys
        sample['trajectory'] = sample['trajectory'].map(lambda x: self.string2list(x))                  # Convert a list represented as a string into a actual list
        sample = self.replace_drags(sample)                                                             # Detect drag events
        actions = sample[sample['event'].map(lambda x: ('pressed' in x) or ('Scroll.' in x))].copy()    # Select pressed and scroll events
        actions = self.replace_doubleclicks(actions)                                                    # Detect double clicks
        actions['event'] = actions['event'].map(lambda x: x.replace('pressed ',''))       # Remove pressed string
        delays = actions['timestamp'][1:].values - actions['timestamp'][0:-1].values      # Calculating delays
        delays = np.append(delays,0.0)                                                
        actions['delay'] = delays
        actions = actions.reset_index(drop=True)                                          # Reset dataframe index
        return actions
    
    def replace_hotkeys(self, sample):
        """ Replace rows in the sample by hotkey events """
        # The key insight: consecutive pressed events form a hotkey. 
        # And the number of releases (di) between groups of consective pressed events determine the keys that are keeping press (basepressed). 
        samplecopy = sample.copy()
        for ix, ixN in self.find_hotkeys(samplecopy):
            ixs_pressed = sample.loc[ix:ixN].index[sample.loc[ix:ixN,'event'].map(lambda x: 'pressed' in x)].tolist()
            pgroups = self.group_consecutive(ixs_pressed)  # Groups of consecutive position indexes of pressed events
            keep_ixs = []
            basepressed = []
            for i in range(len(pgroups)): 
                pgroup = basepressed + pgroups[i]
                newevent = 'pressed ' + samplecopy.loc[pgroup[0], 'event'].replace('pressed ', '')
                for ki in pgroup[1:]: 
                    newevent += '+' + samplecopy.loc[ki, 'event'].replace('pressed ', '')
                samplecopy.loc[pgroup[-1], 'event'] = newevent
                keep_ixs.append(pgroup[-1])
                if i+1 < len(pgroups):
                    di = pgroups[i+1][0] - pgroup[-1] - 1
                    basepressed = pgroup[:-di]
            # Remove rows
            remove_ixs = [i for i in range(ix,ixN+1)]
            [remove_ixs.remove(i) for i in keep_ixs]
            samplecopy = samplecopy.drop(remove_ixs)
        return samplecopy 
    
    def group_consecutive(self, numbers):
        """ Making groups of consecutive numbers """ # numbers list cannot be empty by design.
        groups = []
        current_group = [numbers[0]]
        for i in range(1, len(numbers)):
            if numbers[i] == numbers[i-1] + 1:
                current_group.append(numbers[i])
            else:
                groups.append(current_group)
                current_group = [numbers[i]]
        groups.append(current_group)
        return groups
    
    def find_hotkeys(self, sample):
        ixs_pressed = sample.index[sample['event'].map(lambda x: 'pressed' in x)].tolist()
        hotkeys_indexes = []
        while len(ixs_pressed) >= 2:  # To not include the end Key.esc
            ix = ixs_pressed[0]
            released1 = sample['event'][ix].replace('pressed', 'released')
            ixN = sample.loc[ix:].index[sample.loc[ix:,'event'] == released1][0]
            if ixN - ix >= 3:  # 3 para evitar que al escribir aparezcan hotkeys. Cuidado!! es una condicion debil
                hotkeys_indexes.append((ix, ixN))
            for i in range(ix,ixN):
                try: 
                    ixs_pressed.remove(i)
                except:
                    pass
        return hotkeys_indexes

    def string2list(self, string): 
        """ Convert the trajectory string into an actual python list"""
        if isinstance(string, str):
            aux1 = string.strip('][').split('), (')
            if aux1[0] != '':
                aux2 = [x.strip(')(').split(', ') for x in aux1]
                trajectory = [(float(time), int(px), int(py)) for time,px,py in aux2]
            else:
                trajectory = []
        else:
            trajectory = []
        return trajectory

    def replace_drags(self, sample):
        """ Transform the dataframe to encode drag events"""
        samplecopy = sample.copy()
        samplecopy['drag2px'] = len(samplecopy)*[None]
        samplecopy['drag2py'] = len(samplecopy)*[None]
        for ix in self.find_drags(sample):
            samplecopy.loc[ix, 'event'] = 'pressed Button.left.drag'
            samplecopy.loc[ix, 'drag2px'] = sample.loc[ix+1, 'px']
            samplecopy.loc[ix, 'drag2py'] = sample.loc[ix+1, 'py']
            samplecopy.at[ix, 'trajectory'] = sample.loc[ix+1, 'trajectory']
        return samplecopy
    
    def find_drags(self, sample):
        rBleft_indexes = sample.index[sample['event'].map(lambda x: 'released Button.left' in x)]
        drags_indexes = []
        for ix in rBleft_indexes:
            length = len(sample.loc[ix,'trajectory'])
            p1 = (sample.loc[ix-1,'px'],sample.loc[ix-1,'py'])
            p2 = (sample.loc[ix,'px'], sample.loc[ix,'py'])
            if (length >= self.length_th) and ((abs(p1[0]-p2[0]) >= self.minpixels_th) or (abs(p1[1] - p2[1]) >= self.minpixels_th)): 
                drags_indexes.append(ix-1)
        return drags_indexes
    
    def replace_doubleclicks(self, sample):
        """ Transform the dataframe to encode double events """
        samplecopy = sample.copy()
        for ix0,ix1 in self.find_doubleclicks(sample):
            samplecopy.loc[ix0, 'event'] = 'pressed Button.left.double'
            samplecopy = samplecopy.drop([ix1])
        return samplecopy

    def find_doubleclicks(self, sample):
        pBleft_indexes = sample.index[sample['event'].map(lambda x: 'pressed Button.left' in x)].tolist()
        dclicks_indexes = []
        while len(pBleft_indexes) > 1:
            ix0, ix1 = pBleft_indexes[0], pBleft_indexes[1]
            dt = sample.loc[ix1, 'timestamp'] - sample.loc[ix0, 'timestamp']
            p1 = (sample.loc[ix0, 'px'], sample.loc[ix0, 'py'])
            p2 = (sample.loc[ix1, 'px'], sample.loc[ix1, 'py'])
            if (dt <= self.dt_th) and (abs(p1[0]-p2[0]) <= self.maxpixels_th) and (abs(p1[1] - p2[1]) <= self.maxpixels_th): 
                dclicks_indexes.append((ix0, ix1))
                pBleft_indexes.pop(1)   
            pBleft_indexes.pop(0)
        return dclicks_indexes

from glob import glob 
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

ix_sample = 0
sample_paths = glob('../data/*/*.csv')
[print(x) for x in sample_paths]
sample_path = sample_paths[ix_sample]
print('usamos este:',sample_path)
sample = pd.read_csv(sample_path)

processing = Preprocessing(length_th = 5, minpixels_th = 1, dt_th = 0.22, maxpixels_th = 0)
print(processing.find_hotkeys(sample))

processing.replace_hotkeys(sample)
processing.run(sample)



../data/sample1/raw_pcdata_2024-03-29_11-49-45.csv
usamos este: ../data/sample1/raw_pcdata_2024-03-29_11-49-45.csv
[(0, 3), (4, 7), (8, 17), (18, 23), (24, 29), (30, 33)]


Unnamed: 0,timestamp,img_path,px,py,event,trajectory,drag2px,drag2py,delay
0,1.524696,data/sample1/screen0000000001_1.48639631399419...,1427,249,Key.ctrl_r+Button.left,[],,,3.77216
1,5.296856,data/sample1/screen0000000005_5.26867441495414...,1197,285,Key.ctrl_r+Button.left,[],,,6.712439
2,12.009295,data/sample1/screen0000000009_11.9824571780045...,680,124,Key.ctrl_r+Button.left,[],,,0.171544
3,12.180838,data/sample1/screen0000000011_12.1505428369855...,680,124,Key.ctrl_r+Button.left,[],,,0.941226
4,13.122064,data/sample1/screen0000000013_13.0865030529676...,740,58,Key.ctrl_r+Button.left,[],,,0.189987
5,13.312051,data/sample1/screen0000000015_13.2765589460032...,740,58,Key.ctrl_r+Button.left,[],,,3.705256
6,17.017306,data/sample1/screen0000000019_16.9881464299978...,562,117,Key.ctrl+Button.left,[],,,0.199459
7,17.216765,data/sample1/screen0000000021_17.1769822799833...,562,117,Key.ctrl+Button.left,[],,,3.325152
8,20.541917,data/sample1/screen0000000025_20.5186704429797...,584,52,Key.ctrl+Button.left,[],,,0.179843
9,20.72176,data/sample1/screen0000000027_20.6907563429558...,584,52,Key.ctrl+Button.left,[],,,2.579849


In [None]:
### Version 2 
    def replace_hotkeys(self, sample):
        samplecopy = sample.copy()
        for ix, ixN in self.find_hotkeys(samplecopy):
            ixs_pressed = sample[ix:ixN].index[sample['event'][ix:ixN].map(lambda x: 'pressed' in x)]
            newevent = 'pressed ' + samplecopy.loc[ixs_pressed[0], 'event'].replace('pressed ', '')
            # event1 + ... + eventN
            if self.are_pressedkeys_consecutive(ixs_pressed.tolist()): 
                for ix_pressed in ixs_pressed[1:]: 
                    newevent += '+' + samplecopy.loc[ix_pressed, 'event'].replace('pressed ', '')
                samplecopy.loc[ixs_pressed[-1], 'event'] = newevent
                remove_ixs = ixs_pressed[:-1].tolist() + sample[ix:ixN+1].index[sample['event'][ix:ixN+1].map(lambda x: 'released' in x)].tolist()
                samplecopy = samplecopy.drop(remove_ixs)
            # event1 + {event2 + ... + eventN}
            elif self.are_sequentialhotkeys(ixs_pressed[1:].tolist()):
                for ix_pressed in ixs_pressed[1:]:
                    samplecopy.loc[ix_pressed, 'event'] = newevent + '+' + samplecopy.loc[ix_pressed, 'event'].replace('pressed ', '')
                    samplecopy = samplecopy.drop([ix_pressed+1])
                samplecopy = samplecopy.drop([ixs_pressed[0], ixN])
            # Other cases are not include: like event1 + event2 + {event3 + ... + eventN}
            else:
                # what to do? Nothing. 
                pass
        return samplecopy 
    
    def are_sequentialhotkeys(self, numbers_sorted):   
        for i in range(len(numbers_sorted) - 1):
            if numbers_sorted[i] + 2 != numbers_sorted[i + 1]:
                return False
        return True
    
    def are_pressedkeys_consecutive(self, numbers_sorted):   
        for i in range(len(numbers_sorted) - 1):
            if numbers_sorted[i] + 1 != numbers_sorted[i + 1]:
                return False
        return True
    
    def find_hotkeys(self, sample):
        ixs_pressed = sample.index[sample['event'].map(lambda x: 'pressed' in x)].tolist()
        hotkeys_indexes = []
        while len(ixs_pressed) >= 2:  # To not include the end Key.esc
            ix = ixs_pressed[0]
            released1 = sample['event'][ix].replace('pressed', 'released')
            ixN = sample[ix:].index[sample['event'][ix:] == released1][0]
            if ixN - ix >= 3:  # 3 para evitar que al escribir aparezcan hotkeys. Cuidado!! es una condicion debil
                hotkeys_indexes.append((ix, ixN))
            for i in range(ix,ixN):
                try: 
                    ixs_pressed.remove(i)
                except:
                    pass
        return hotkeys_indexes

In [None]:
### Version 1
