In [2]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

In [3]:
data_dir = "../../data/GestureData/"
file_name = "Circle_V01_Pascal_Raw_labels.csv"

In [4]:
df = pd.read_csv(data_dir + file_name)

In [5]:
df

Unnamed: 0,from,to,label,real_start,real_end,diff
0,,,1,18.7,20.45,-0.25
1,,,1,20.4,22.4,0.0
2,,,1,22.35,24.5,0.15
3,,,1,26.65,28.55,-0.1
4,,,1,30.7,32.6,-0.1
5,,,1,34.65,36.35,-0.3
6,,,1,38.65,40.55,-0.1
7,,,1,42.6,44.5,-0.1
8,,,1,46.5,48.4,-0.1
9,,,1,50.7,52.5,-0.2


In [38]:
class LabelGenerator():
    
    def __init__(self, data, raw_labels, ms_per_frame):
        
        # stores the original data and the used framerate.
        self.data = data
        self.raw_labels = raw_labels 
        self.ms_per_frame = ms_per_frame
        
        # creates label-Dataframe whose "from"/"to" columns will be used for labelling.
        self.__label_df = pd.DataFrame(
            columns = ["label","real_start","real_end"]
        )
        self.__label_df[["label","real_start","real_end"]] =\
            self.raw_labels[["label","real_start","real_end"]]
        
        self.__label_df["real_start"] = self.__label_df["real_start"].apply(lambda x: x*1000)
        self.__label_df["real_end"] = self.__label_df["real_end"].apply(lambda x: x*1000)
        
        self.__label_df["real_start"] = np.round(self.__label_df["real_start"],0).astype("int32")
        self.__label_df["real_end"] = np.round(self.__label_df["real_end"],0).astype("int32")
        
        
        # default variables
        self.__is_fitted = False
        self.__is_labeled = False
        self.__is_finalized = False
        
        
        
        
    # PUBLIC method that creates two Dataframe, __label_df and __label_info
    # __label_df --> includes the acceptable range with the columns "from" and "to".
    #              any value between "from" and "to" is an acceptable movement endpoint and can be labeled accordingly.
    #              USAGE: this data frame will be used to create the final training data set
    # __label_info --> provides additional information about how the slacks were calculated
    #              USAGE: this data frame is for debugging mainly
    def fit_range(self, tolerance_range, max_error):
        
        self.__is_fitted = False
        self.__is_labeled = False
        self.__is_finalized = False
        
        tolerance_range = self.__check_variable("tolerance_range", tolerance_range)
        max_error = self.__check_variable("max_error", max_error)
        
        diff = self.__label_df["real_end"] - self.__label_df["real_start"] - 2000
        
        lower_slack, upper_slack, tolerance_range_ind = self.__calc_range(diff)
    
        self.__label_df["from"] = (self.__label_df["real_end"] + lower_slack).astype("int32")
        self.__label_df["to"] = (self.__label_df["real_end"] + upper_slack).astype("int32")
        self.__label_df["ignore"] = (abs(diff) >= self.max_error)

    
        # creates a Dataframe to store the used slacks for each labeled sample
        self.__label_info = pd.DataFrame(
            columns=["diff","l_slack","u_slack","tol_range_indicator"]
        )
        
        self.__label_info["diff"] = diff
        self.__label_info["l_slack"] = lower_slack 
        self.__label_info["u_slack"] = upper_slack
        self.__label_info["tol_range_indicator"] = tolerance_range_ind
        
        
        self.__set_error_df()

        self.__is_fitted = True

 

    # PRIVATE METHOD
    # calculates the acceptance interval for each sample
    def __calc_range(self, diff):
        indicator = (diff >= 0)
       
        lower_slack = - indicator * diff
        upper_slack = - (~indicator) * diff

        current_range = upper_slack - lower_slack
        range_delta = self.tolerance_range - current_range
        tolerance_range_ind = (range_delta > 0)

        lower_slack = lower_slack - range_delta//2 * tolerance_range_ind
        upper_slack = upper_slack + range_delta//2 * tolerance_range_ind
        
        return lower_slack.astype("int32"), upper_slack.astype("int32"), tolerance_range_ind
    
  
    # PRIVATE method that returns default variable values if no value is provided 
    #   and sets instance variables otherwise:
    #   symmetric_slack, tolerance_range, max_error
    def __check_variable(self, identifier, value):
        
        if identifier == "tolerance_range":
            if not value:
                value = self.tolerance_range
            else:
                self.tolerance_range = value
        
        elif identifier == "max_error":
            if not value:
                value = self.max_error
            else:
                self.max_error = value
            
        return value
    
    
    # creates the cutoff Dataframe with additional information about all movements that exceeded the max_error
    #   ... specified on initialization
    # USAGE: any movement in the error_df will not yield any labeled data. In the future it might even be removed 
    #        ... completely from the data (not implemented yet)
    def __set_error_df(self):
          
        self.__error_df = self.__label_df[abs(self.__label_info["diff"])>= self.max_error]\
            [["real_start","real_end"]]
        self.__error_df["start_idx"] =\
            (self.__error_df["real_start"]//self.ms_per_frame).apply(int)
        self.__error_df["start_calc"] =\
            self.__error_df["start_idx"] * self.ms_per_frame
        self.__error_df["end_idx"] =\
            np.ceil(self.__error_df["real_end"]/self.ms_per_frame).apply(int)
        self.__error_df["end_calc"] =\
            self.__error_df["end_idx"] * self.ms_per_frame

        
    
    # calls the Error-Dataframe with additional information about all movements that exceeded the max_error
    # this method can only be called after the Error-Dataframe has been created by calling the get_error_df method
    def get_error_df(self):
        
        if not self.__is_fitted:
            raise ValueError("You have to fit the range with the fit_range method")
            
        else:
            return self.__error_df
        
 
    # PUBLIC method that creates the PRIVATE labeled-Data Dataframe. 
    # This dataframe can be called by the get_labeled_data method
    # this is the the data frame that provides a label for each wire frame from posenet
    def set_labels(self):
        
        if not self.__is_fitted:
            raise ValueError("You have to fit the range with the fit_range method")
            
        self.__is_labeled = False
        self.__is_finalized = False
            
        _T = pd.DataFrame(columns=["time"])
        _T["time"] = (self.data.index.values+1) * self.ms_per_frame
        _T["_key_"] = 0
        _l = self.__label_df[["from","to","label","ignore"]]
        _l["_key_"] = 0
        _m = _T.reset_index().merge(_l, how="left").set_index("index")
        _l = _m[(_m["time"] >= _m["from"]) & (_m["time"] <= _m["to"])].loc[:,["time","label","ignore"]]
        
        self.__labeled_data = self.data.copy()
        self.__labeled_data["label"] = _l["label"][~_l["ignore"]]
        self.__labeled_data.fillna(value={'label': 0}, inplace = True)
        self.__labeled_data["label"] = self.__labeled_data["label"].astype("int32")
        self.__labeled_data["time"] = np.round(_T["time"],0).astype("int32")
 
        self.__is_labeled = True
    
    
    # PUBLIC get-Method for the private dataset that stores the labeled data
    def get_labeled_data(self):
        
        if not self.__is_labeled:
            raise ValueError("You have to set the labels with the set_labels-method")
        else:
            return self.__labeled_data
        
    
    # provides 3D labeled data and labels for training. The instance can call X, y, feature_names and final_time
    # X --> Array with dimensions [sample size] x [timesteps per sample] x [number of features]
    # y --> vector of labels with length [sample size]
    # feature_names --> list of the names of the assiciated columns in X
    # final_time --> vector with the number of milliseconds associated with the first dimension of X ([sample size])
    def extract_training_data(self):
        
        if not self.__is_labeled:
            raise ValueError("You have to set the labels with the set_labels-method")
            
        self.__is_finalized = False
        
        steps = int(2000//self.ms_per_frame) + 1
        self.__feature_names = self.__labeled_data.columns.drop(['label','time'])
        
        _fn = self.__labeled_data.shape[0] - steps + 1
        _ln = self.__labeled_data.shape[0]
        self.__seq_end_time = self.__labeled_data.loc[(_ln-_fn):_ln,"time"].values
        
        self.__X = np.zeros((
            _fn,
            steps,
            len(self.__feature_names)
        ))
        self.__y = np.zeros(self.__labeled_data.shape[0] - steps + 1)
        _F = self.__labeled_data.loc[:,self.__feature_names].values

        for i in range(steps,_F.shape[0]+1):
            self.__X[i-steps] = _F[i-steps:i,:]
            self.__y[i-steps] = self.__labeled_data['label'][i-1] 
    
       
        self.__is_finalized = True
        
      
    
    def get_label_df(self):
        
        if not self.__is_fitted:
            raise ValueError("You have to set label_df by calling the fit_range method")
    
        else:
            return self.__label_df
        
    
    def get_label_info(self):

        if not self.__is_fitted:
            raise ValueError("You have to set label_info by calling the fit_range method")
    
        else:
            return self.__label_info
        
     
    # PUBLIC get-Methods for the private finalized data
    def get_X(self):
        
        if not self.__is_finalized:
            raise ValueError("You have to set X by calling the extract_training_data method")
            
        else:
            return self.__X
        
    
    def get_y(self):
        
        if not self.__is_finalized:
            raise ValueError("You have to set y by calling the extract_training_data method")
            
        else:
            return self.__y
        
        
    def get_feature_names(self):
        
        if not self.__is_finalized:
            raise ValueError("You have to set feature_names by calling the extract_training_data method")
            
        else:
            return self.__feature_names
        
        
    def get_sequence_end_time(self):
        
        if not self.__is_finalized:
            raise ValueError("You have to set sequence_end_time by calling the extract_training_data method")
            
        else:
            return self.__seq_end_time
        

In [39]:
lgen = LabelGenerator(
    # using dummy data here. This is supposed your wireframe data from posenet
    data = pd.DataFrame(np.zeros((8,540))).transpose(),
    
    # manually labeled "raw" gesture data with real beginning and real end of movement
    raw_labels = df[["real_start","real_end","label"]],
    
    # associated frame rate of the LabelGenerator. This instance only works with the framerate specified on instantiation
    ms_per_frame = 130    
)

lgen.fit_range(
    # the length of the tolerance range will be 400 ms
    tolerance_range = 400,
    
    # maximum acceptable difference/error in movement length compared to the theoretical movement length (2000 ms)
    # if movement length is smaller than 2000 - max_error or greater than 2000 + max_error, there will be 0-label
    max_error = 400
)

# creates the labeled data set. it can be called with the get_labeled_data method
lgen.set_labels()

# provides 3D labeled data and labels for training. The instance can call X, y, feature_names and final_time
# X --> Array with dimensions [sample size] x [timesteps per sample] x [number of features]
# y --> vector of labels with length [sample size]
# feature_names --> list of the names of the assiciated columns in X
# sequence_end_time --> vector with the number of milliseconds associated with the first dimension of X ([sample size])
lgen.extract_training_data()

In [42]:
lgen.get_label_df()

Unnamed: 0,label,real_start,real_end,from,to,ignore
0,1,18700,20450,20125,20525,False
1,1,20400,22400,22200,22600,False
2,1,22350,24500,24375,24775,False
3,1,26650,28550,28300,28700,False
4,1,30700,32600,32350,32750,False
5,1,34650,36350,36000,36400,False
6,1,38650,40550,40300,40700,False
7,1,42600,44500,44250,44650,False
8,1,46500,48400,48150,48550,False
9,1,50700,52500,52200,52600,False


In [43]:
lgen.get_label_info()

Unnamed: 0,diff,l_slack,u_slack,tol_range_indicator
0,-250,-325,75,True
1,0,-200,200,True
2,150,-125,275,True
3,-100,-250,150,True
4,-100,-250,150,True
5,-300,-350,50,True
6,-100,-250,150,True
7,-100,-250,150,True
8,-100,-250,150,True
9,-200,-300,100,True


In [44]:
lgen.get_labeled_data().head()

Unnamed: 0,0,1,2,3,4,5,6,7,label,time
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,130
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,260
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,390
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,520
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,650


In [45]:
print(len(lgen.get_y()))
print(lgen.get_X().shape)
print("")

i = 157
print(lgen.get_y()[i], '\n', lgen.get_X()[i,:,:])

525
(525, 16, 8)

1.0 
 [[0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]]


In [47]:
lgen.get_sequence_end_time()[0:10]

array([2080, 2210, 2340, 2470, 2600, 2730, 2860, 2990, 3120, 3250])

In [48]:
lgen.get_labeled_data().tail(2)

Unnamed: 0,0,1,2,3,4,5,6,7,label,time
538,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,70070
539,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,70200


In [49]:
lgen.get_feature_names()

Index([0, 1, 2, 3, 4, 5, 6, 7], dtype='object')