In [4]:
class Signal:
    '''
        Objet Signal correspondant à une courbe émise lorsqu'un camion est passé sur les capteurs
    '''
    
    def __init__(self, name, data, temp=False): #lookahead=20, delta=0.2
        '''
            Initialiation à la création d'une instance 'Signal'.
            Si temp=True, self.peaks et self.deletion_limite n'existe pas.
            Sinon, cela permet le calcul automatique des pics du signal, ainsi que la suppresion de ceux
            inutils pour notre analyse.
            __________
            param:
                - (name : str) Nom du signal
                - (data : np.array) Tableau de donnée du signal
                - (temp : bool) Définit la temporalité du signal
            __________
            return: (Objet Signal)
        '''
        
        self.name = name
        if not temp:
            self.data = data[0]
            self.df_calculate = data[1]
            self.peaks = self.calculate__peaks(int(len(self.data)/150), len(self.data)/15000) #int(len(self.data)/100), 0.2 # list des pics avec param automatisé selon signal
            self.deletion_limit = self.deletion_band(deletion=False) # float de la limit positive
        else:
            self.data = data
        
    
    def deletion_band(self, pourcentage=0.2, nbr_std=3, deletion=False):
        '''
            Calcul de la zone de pics du signal à supprimer.
            Si deletion = True, les pics inclus dans la zone seront supprimés. Pas dans le cas contraire.
            __________
            param:
                - (pourcentage : float) Pourcentage du signal à analyse pour calculer la zone
                - (nbr_std : int) Multiplicateur d'écart-type pour égalir la zone
                - (deletion : bool) True pour supprimer les points False pour simplement calculer la zone.
            __________
            return: (float) Limite positive de la zone
        '''
        
        deletion_limit = self.data.iloc[:int(pourcentage*len(self.data)),1].std()*nbr_std
        
        if deletion:
            peaks = list(self.peaks)
            for peak in peaks:
                if (peak.amplitude < deletion_limit) & (peak.amplitude > - deletion_limit): 
                    self.peaks.remove(peak)
    
        return deletion_limit
    
    
    def find_close_point(self, close_point, the_best=False, min_max=None, signal_gauche=False, last_point=None, nbr_best=10):
        '''
            Recherche du point le plus proche de 'close_point' sur le signal.
            __________
            param:
            
                - (close_point : DataFrame) Point dont on cherche la plus proche correspondance sur le signal
                - (the_best : bool) Spécifier si l'on souhaite récupérer le meilleur point sur la courbe à True
                - (nbr_best : int) Selection du nombre de point de différence à prendre entre le point de calcul
                    et celui sur la courbe
                
                -- Dans le cas du calcul fwhm --
                - (min_max: str) Definit sur la recherche du point se fait sur un min ou un max ("min ou "max)
                - (signal_gauche : bool) Définit à True si l'étude est sur la partie gauche du signal par raport au point
                    de calcul
                - (last_point : float) Correspond à la position du dernier point du signal total
            __________
            return: (DataFrame) contenant le numéro d'échantillon, l'amplitude et la position du point
        '''
        
        columns=["sample_number", "time", "amplitude"]
        points = pd.DataFrame(columns=columns)
        
        for index, row in close_point.iterrows():
            if min_max == "max":
                self.data = self.data[self.data["amplitude"] < row.amplitude]
            elif min_max == "min":
                self.data = self.data[self.data["amplitude"] > row.amplitude]
                
            if not self.data.empty:
                time_diff = abs(row.time - self.data.iloc[:,0])
                amplitude_diff = abs(row.amplitude - self.data.iloc[:,1])

                all_sort = pd.concat([time_diff, amplitude_diff], axis=1).sort_values(by=["time", "amplitude"])
                sum_diff = (all_sort.iloc[:nbr_best,0] + all_sort.iloc[:nbr_best,1]).sort_values()
                id_min = sum_diff.index[0]
                
                new_df = self.data.loc[[id_min],:].reset_index()
            
            elif not the_best & self.data.empty:
                if signal_gauche:
                    new_df = pd.DataFrame([[0, 0, row.amplitude]], columns=columns)
                else:
                    new_df = pd.DataFrame([[0, last_point, row.amplitude]], columns=columns)
                
            new_df.columns = columns
            
            points = points.append(new_df, ignore_index=True)
        return points
    
    
    def peaks_list(self, kind):
        '''
            Récupère tous les pics du signal du genre 'kind' (max, min ou all).
            __________
            param:
                - (kind : str) String ('max', 'min' ou 'all') dépendant des pics que l'on souhaite récupérer.
            __________
            return: (DataFrame) contenant l'ensemble des pics du signal du genre 'kind' selectionné.
                Affiché via Pic.to_df()
        '''
        peaks_df = pd.DataFrame()
        for peak in self.peaks:
            if peak.kind == kind:
                peaks_df = peaks_df.append(peak.to_df())
            elif kind == "all":
                peaks_df = peaks_df.append(peak.to_df())
        
        return peaks_df.reset_index(drop=True)
        
    
    def calcultate__half_max(self):
        '''
            Calcul des mi-hauteurs pour l'ensemble des pics contenus dans le signal
            __________
            return: (DataFrame) contenant les positions des points de mi-hauteurs calculés (time et amplitude)
        '''
        peaks_df = pd.DataFrame()
        
        for peak in self.peaks:
            peaks_df = pd.concat([peaks_df, peak.calcultate__half_max])

        return peaks_df
    
    
    def calculate__peaks(self, lookahead=10, delta=0.2):
        '''
            Detection de tous les pics du signal.
            __________
            param:
                - (lookahead : int) Distance entre le pic actuel déterminé et le candidat suivant
                - (delta : float) Différence minimum entre un pics et le candidat suivant
            __________
            return: (list) contenant un objet Pic par pic
        '''
        peaks_list = []
        
        peaks = peakdetect(self.data.iloc[:, 1],self.data.iloc[:, 0], lookahead=lookahead, delta=delta)
        for kind in peaks:
            if kind == peaks[0]:
                kind_name = 'max'
            else:
                kind_name = 'min'
                
            for sig in kind:
                peaks_list.append(Pic(self, sig[0], sig[1], kind_name))
        
        return peaks_list
    
    
    def show(self, courbe=True, deletion_zone=True, peaks=True, fwhm=True):
        '''
            Affichage de la courbe du signal, les limits + et - de la zone à supprimer,
            les pics et les points à mi-hauteur et leur symétrique.
            __________
            return:  plot
        '''
        plt.figure(figsize=(15, 10))
        
        ## Courbe de capteur
        if courbe:
            plt.plot(self.data.loc[:,'time'], self.data.loc[:, 'amplitude'])
        
        ## Zone de suppression des pics
        if deletion_zone:
            plt.plot([self.data.iloc[0,0], self.data.iloc[-1,0]], [self.deletion_limit, self.deletion_limit])
            plt.plot([self.data.iloc[0,0], self.data.iloc[-1,0]], [-self.deletion_limit, -self.deletion_limit])

        ## Detections des maximas et minimas
        if peaks:
            # Pics
            plt.scatter(self.peaks_list('all')['time'], self.peaks_list('all')['amplitude'])
        if fwhm:
            # Largeur
            plt.scatter(self.peaks_list('all')['time p1'], self.peaks_list('all')['amplitude p1'])
            plt.scatter(self.peaks_list('all')['time p2'], self.peaks_list('all')['amplitude p2'])

        plt.legend(['Signal', 'Limit +', 'Limit -', 'Pics', '1er point largeur', '2nd point largeur'])
        plt.xlabel("Temps")
        plt.ylabel("Amplitude")

        plt.show()
        
        
    def to_df(self):
        '''
            Affichage des informations utiles pour le modèle de prédiction pour chacun des pics:
                - Time
                - Amplitude
                - Largeur
                - Kind
            L'ordre des pics correspond au max puis aux mins dans l'ordre d'apparition de ces derniers.
            __________
            return:  (DataFrame) contenant à toutes les informations utiles
        '''
        all_df = pd.DataFrame()
        all_df['name'] = pd.Series(self.name)
        for index, peak in enumerate(self.peaks):
            keep_col = ["time", "amplitude", "fwhm", "kind"]
            df = peak.to_df()[keep_col]
            
            new_columns = []
            
            for col in df.columns:
                new_columns.append(str(col) + '_' + str(index))
                
            df.columns = new_columns
            all_df = pd.concat([all_df, df], axis=1)
            
        for elt in list_multiple_var("load_", var_to=8):
            all_df[elt] = pd.Series(self.df_calculate[elt])
            
        return all_df
        

In [None]:
class Pic:
    '''
        Objet Pic correspondant à un maximum ou un minimum d'un objet Signal
    '''
    
    def __init__(self, signal, time, amplitude, kind):
        '''
            Initialiation à la création d'une instance 'Pic'.
            Permet le calcul automatique du FWHM (Full Weight at Half Maximum) et la détermination
            du numéro d'échantillon sur la courbe.
            __________
            param:
                - (signal : Signal) Instance 'Signal' à laquelle le pic est rataché
                - (time : float) Position dans le temps à laquelle le pic est placé sur le signal
                - (amplitude : float) Position en amplitude à laquelle le pic est placé sur le signal
                - (kind : str) Genre du pic ('max' ou 'min')
            __________
            return: (Objet Pic)
        '''
        
        self.signal = signal
        self.time = time
        self.amplitude = amplitude
        self.kind = kind
        self.sample_number = self.find_sample_number()
        self.fwhm = self.calculate__fwhm() # float de la lagueur à mi-hauteur du pic
        
    
    def to_df(self):
        '''
            Conversion des informations de l'instance 'Pic' en DataFrame
            ---
            Informations données :
                - time
                - amplitude
                - sample_number
                - time p1
                - amplitude p1
                - time p2
                - amplitude p2
                - fwhm
                - kind
                - signal_name
            __________
            return: (DataFrame) contenant l'ensemble des informations du pic.
        '''
        
        if not self.fwhm.empty:
            time_p1 = self.fwhm.loc[:,'time p1'][0]
            amplitude_p1 = self.fwhm.loc[:,'amplitude p1'][0]
            time_p2 = self.fwhm.loc[:,'time p2'][0]
            amplitude_p2 = self.fwhm.loc[:,'amplitude p2'][0]
            largeur = self.fwhm.loc[:,'largeur'][0] 
        else:
            print("Le pic " + str(self.time) + " du signal " + str(self.signal.name) + " a eu un problème sur le calcul des fwhms.")
            time_p1 = np.nan
            amplitude_p1 = np.nan
            time_p2 = np.nan
            amplitude_p2 = np.nan
            largeur = np.nan
        
        
        df = pd.DataFrame(data=[[self.time, self.amplitude, self.sample_number, time_p1, amplitude_p1, time_p2,\
                                 amplitude_p2, largeur , self.kind, self.signal.name]],\
                    columns=["time", "amplitude", "sample_number", "time p1", "amplitude p1", "time p2",\
                             "amplitude p2", "fwhm", "kind", "signal_name"])

        return df
    
    
    def calcultate__half_max(self):
        '''
            Calcul des mi-hauteurs pour l'ensemble des pics contenus dans le signal
            __________
            return: (DataFrame) contenant les positions des points de mi-hauteurs calculés (time et amplitude)
        '''
        
        middle_time = self.time
        middle_amplitude = self.amplitude - self.amplitude/2

        return pd.DataFrame(data = [[middle_time, middle_amplitude]], columns=["time", "amplitude"])
    
    
    def find_sample_number(self):
        '''
            Retrouver le numéro d'échantillon du Pic se rapportant au Signal
            __________
            return: (int) correspondant au numéro d'échantillon
        '''
        
        return self.signal.data[(self.signal.data.loc[:,'time'] == self.time) &\
                                (self.signal.data.loc[:,'amplitude'] == self.amplitude)].index[0]
    
    
    def calculate__fwhm(self, coef=0.09, diviseur=0.55, delta_weight=30, scatter=False, show_derive=False):
        '''
            Calcule du FWHM (Full Weight at Half Maximum)
            __________
            param:
                - (coef : float) Chiffre définissant un coefficient pour trouver les points de rupture
                - (diviseur : float) Chiffre définissant quel hauteur prendre pour le fwhm
                - (delta_weight : int) Nombre de largeur de pic voulu lors du calcul
                - (scatter : bool) Bolean permetant l'affichage ou non des graphiques de calcule de l'ensemble des largeurs
                - (show_derive : bool) Bolean permetant l'affichage ou non des graphiques de dérivé
            __________
            return: (DataFrame - 5 entrées) contenant time et amplitude 1er point (p1) de la mi-hauteur, 
                    time et amplitude 2nd point (p2) de la mi-hauteur et la largeur entre ces 2 points.
        '''  
        
        # Calcul de l'ensemble des largeurs du max/min du pic jusqu'au min/max du signal
        result = pd.DataFrame()
        
        line = self.amplitude
        max_all = self.signal.data.amplitude.max()
        min_all = self.signal.data.amplitude.min()
        
        signal_cut_m = Signal('Tempo gauche', self.signal.data[self.signal.data['time'] <= self.time], temp=True)
        signal_cut_p = Signal('Tempo gauche', self.signal.data[self.signal.data['time'] >= self.time], temp=True)
        
        if scatter:
            plt.plot(self.signal.data['time'], self.signal.data['amplitude'])
        
        
        # Calcul différent selon que le pic est un max ou un min
        step_weight=self.signal.data['amplitude'].max()/30
        if self.kind == "max":
            while(line > min_all):
                line -= step_weight
                df = pd.DataFrame([[self.time, line]], columns=["time", "amplitude"])
                point_x = signal_cut_m.find_close_point(df, min_max=self.kind, signal_gauche=True)
                point_y = signal_cut_p.find_close_point(df, min_max=self.kind, last_point=self.signal.data.iloc[-1,0])
                result = result.append(pd.DataFrame([[point_x['time'][0], point_x['amplitude'][0], point_y['time'][0], point_y['amplitude'][0], point_y['time'].values[0] - point_x['time'].values[0]]], columns=["time p1", "amplitude p1", "time p2", "amplitude p2", "largeur"]), ignore_index=True)
                
                if scatter:
                    plt.scatter(point_x['time'].values[0], point_y['amplitude'].values[0])
                    plt.scatter(point_y['time'].values[0], point_y['amplitude'].values[0])
                
        else:
            while(line < max_all):
                line += step_weight
                df = pd.DataFrame([[self.time, line]], columns=["time", "amplitude"])
                point_x = signal_cut_m.find_close_point(df, min_max=self.kind, signal_gauche=True)
                point_y = signal_cut_p.find_close_point(df, min_max=self.kind, last_point=self.signal.data.iloc[-1,0])
                result = result.append(pd.DataFrame([[point_x['time'][0], point_x['amplitude'][0], point_y['time'][0], point_y['amplitude'][0], point_y['time'].values[0] - point_x['time'].values[0]]], columns=["time p1", "amplitude p1", "time p2", "amplitude p2", "largeur"]), ignore_index=True)
                
                if scatter:
                    plt.scatter(point_x['time'].values[0], point_y['amplitude'].values[0])
                    plt.scatter(point_y['time'].values[0], point_y['amplitude'].values[0])
                
        if scatter:
            plt.show()
        
        
        
        
        def nettoyage_largeur(df):
            '''
                Suppression des largeurs erronées. On suppose qu'une largeur x > x+1 et x > x-1 est erronée.
                __________
                param:
                    - (df : DataFrame) DataFrame de l'ensemble des largeurs dans le pic.
                __________
                return: (DataFrame) correspondant à la DataFrame précédente sans les largeurs supposées erronées.
                
            '''
            del_line = 0
            for index, row in df.iterrows():
                try:
                    if (row["largeur"] > df.loc[index+1,"largeur"]) & (row["largeur"] > df.loc[index-1,"largeur"]):
                        del_line += 1
                        df.drop(index, axis=0, inplace=True)
                except (IndexError, KeyError):
                    pass

            df = df.reset_index(drop=True)
            if del_line > 0:
                df = nettoyage_largeur(df)

            return df
        
        
        
        # Nettoyage des largeurs incohérentes et calcul de la dériver locale de chaque point
        derive = pd.DataFrame()
        all_height = nettoyage_largeur(result.reset_index(drop=True))
        for index, row in all_height.iterrows():
            try:
                derive = derive.append(pd.DataFrame([[all_height.iloc[index+1, 4] - row["largeur"]]], columns=["derive"]), ignore_index=True)
            except (IndexError, KeyError):
                pass
        
        if show_derive:
            plt.scatter(derive.index, derive['derive'])
            plt.title(str(self.signal.name)+"_"+str(self.time))
            
            plt.show()
        
        # Trouver les points de rupture de largeur hors pic
        peaks = pd.DataFrame()
        for index, elt in derive["derive"].items():
            if elt > coef:
                peaks = peaks.append(pd.DataFrame([[index, elt]]), ignore_index=True)
        
        # Récupération du point de rupture
        rupture_point = pd.DataFrame()
        for index, row in all_height.iterrows():
            try:
                if (index != all_height.index[0]):
                    if all_height.iloc[index+1,4] - row["largeur"] > peaks.iloc[0,1]-coef:
                        rupture_point = pd.DataFrame([all_height.iloc[int(index*diviseur),:]], columns=["time p1", "amplitude p1", "time p2", "amplitude p2", "largeur"])
                        break
            except (IndexError, KeyError):
                pass
                

        return rupture_point.reset_index(drop=True)