## Analyse fréquentielle des séries de temps

### Chargement des librairies

In [1]:
# Setting root folder 
%matplotlib widget
import matplotlib
import matplotlib.pyplot as plt
import os, sys
from os import listdir
from os.path import isfile, join
parent_dir = os.path.abspath('..')

if parent_dir not in sys.path:
    sys.path.append(parent_dir)

## Imports 

# Native imports
import pandas as pd
import numpy as np
import math
import pywt
from stockwell import st


import subprocess
from shutil import copy, move
import json
import csv
import openpyxl

# Signal processing packages
from scipy.signal import detrend
from scipy.fft import fft, fftfreq, fftshift
# Peak detection
from scipy.signal import find_peaks
from scipy.signal.windows import get_window
#!pip install PeakUtils

# REGEX
import re

# Time manipulation packages
import datetime
import time

# Visualizing progress
from tqdm.notebook import tqdm
from IPython.core.debugger import set_trace

In [2]:
from IPython.display import HTML, Javascript, display

def initialize():
    display(HTML(
        '''
            <script>
                code_show = false;
                function restart_run_all(){
                    IPython.notebook.kernel.restart();
                    setTimeout(function(){
                        IPython.notebook.execute_all_cells();
                    }, 10000)
                }
                function code_toggle() {
                    if (code_show) {
                        $('div.input').hide(200);
                    } else {
                        $('div.input').show(200);
                    }
                    code_show = !code_show
                }
                restart_run_all();
            </script>
        '''
    ))

In [3]:
# =============================================================================
# SIGNAL PROCESSING ANALYSIS TOOLS
# =============================================================================


# =============================================================================
# FFT analysis tools
# =============================================================================

def next_power_of_2(x):
    return 1 if x == 0 else math.ceil(math.log2(x))

def df_FFT(y, Fs, win = 'boxcar'):
    '''
    Creates a dataframe with frequency bins and amplitudes of a given signal with windowing.
    
    Parameters
    ----------
    y : np.array
        Sample points.
    Fs : float
        Sample frequency.
    win : string, optional
        Window used. The default is 'hann'.
        Works with the following windows:
            boxcar
            triang
            blackman
            hamming
            hann
            bartlett
            flattop
            parzen
            bohman
            blackmanharris
            nuttall
            barthann
            cosine
            exponential
            tukey
            taylor

    Returns
    -------
    df_ywf : pd.DataFrame
        Table with 
            xf : frequency bins
            yf : Acceleration RMS.

    '''
    # Setting constants
    dt = 1 / Fs                         # Sample time
    N = len(y)                          # Number of samples
    
    # For computational efficiency, use nfft instead of N (algorithm is efficient in powers of 2)
    nfft = 2**next_power_of_2(N+1)    # Number of frequency points
    
    # Getting window
    w = get_window(win, N)
    
    # Performing fft
    xf = fftfreq(nfft, d = dt)
    yf_c = fft(y*w, n = nfft)
    
    yf = [0]*len(yf_c)
    
    yf[0]  = np.abs(yf_c[0])**2  
    yf[1:] = np.abs(yf_c[1:])**2
    
    # Creating export dataframe
    df_ywf = pd.DataFrame({
        'xf' : xf,
        'yf' : yf
        })
    
    return df_ywf

### Traduction en fichier .txt les fichiers brutes .dat

In [4]:
os.path.abspath("")
pathRawData =  os.path.join(os.path.abspath(""),r'..',r'data',r'rawdat_data')
pathData = os.path.join(os.path.abspath(""),r'..',r'data',r'rawtxt_data')
pathTraitement = 'Traitement'

In [5]:
# # retrieve all .dat file in current directory 
# files = [file for file in os.listdir(pathRawData) if file.endswith(".dat")]

# for file in files: 

#     if file+'.txt' not in os.listdir(pathData):
#         print(f'Processing {file}')

#         if not os.path.exists(pathTraitement):
#             os.mkdir(pathTraitement)

#         copy(
#             os.path.join(pathRawData,file), 
#             os.path.join(pathTraitement,'result.dat')
#         )

#         os.startfile('decodetest.bat')

#         time.sleep(2)

#         os.system("taskkill /im DecodeBin2Ascii.exe /f")

#         os.rename(
#             os.path.join(pathTraitement, 'result.dat.txt'),
#             os.path.join(pathTraitement, file+'.txt')
#         )

#         move(
#             os.path.join(pathTraitement,file+'.txt'),
#             pathData
#         )

#         os.remove(os.path.join(pathTraitement,'result.dat'))
        
# print('Tous les fichiers sont à jour!')

In [6]:
# =============================================================================
# READING OF A .DAT.TXT FILE
# =============================================================================

def read_GANTNER_dat_txt(fileName, export_dict = False):
    '''
    
    Lecture d'un fichier .dat.txt type VDV décodé sous la forme d'un dictionnaire contenant toutes les informations et dataFrame pandas.

    Parameters
    ----------
    fileName : str
        la localisation du fichier dat.txt à lire
    export_dict : bool, optional
        Variable permettant d'exporter ou non le dictionnaire d'information préliminaire du fichier. The default is False.

    Returns
    -------
    df_file : pd.DataFrame
        dataframe des données du dat.txt
    dict_file : dict
        dictionnaire des informations du dat.txt
        
        
    Examples
    -------
    

    '''
    import codecs
    dict_file = {}
    with codecs.open(fileName, 'r', 'windows-1252') as my_file: #'utf8', 
        for (n,line) in enumerate(my_file):
            if line.encode('utf-8').strip():
                if re.search(r'[*]+', line):
                    break
                else:
                    info_match = re.findall(r'([A-Za-z]*): ([^\/\n]*)', line) 
            
                    for key,value in info_match:
                        if key in dict_file:
                            dict_file[key].append(value.replace(" ", ""))
                        else:
                            dict_file.update({key: [value.replace(" ", "")]})
            
        df_file = pd.read_csv(fileName,
                              skiprows = range(n+1),
                              sep = '\t',
                              usecols = list(range(1,len(dict_file['Name'])+1)),
                              names = dict_file['Name'],
                              encoding='unicode_escape')        
        df_file = (df_file)
        
#     with open(fileName, 'r', encoding= 'windows-1252') as f:
                
#         # Stockage des infos préliminaires dans un dictionnaire
#         dict_file = {}

#         line = f.readline()
        
#         # Match correspond à la ligne séparatrice entre les infos prélim et les données
#         match = re.search(r'[*]+', line) # corresponde à ***************************************
        
#         # Nombre de ligne à sauter pour accéder aux données
#         n = 1
        
#         # Boucle pour extraire les données préliminaires
#         while match is None:
            
#             # Match entre key et value des infos prélim 
#             # (Seul problème avec le StartTime, à modifier)
#             info_match = re.findall(r'([A-Za-z]*): ([^\/\n]*)', line) 
            
#             for key,value in info_match:
#                 if key in dict_file:
#                     dict_file[key].append(value)
#                 else:
#                     dict_file.update({key: [value]})
#             line = f.readline()
#             match = re.search(r'[*]+', line)
#             n += 1
        
#         set_trace()
#         df_file = (
#             pd.read_csv(fileName,
#                         skiprows = range(n), 
#                         sep = '\t',
#                         usecols = list(range(1,len(dict_file['Name'])+1)),
#                         names = dict_file['Name'],
#                         encoding='unicode_escape')
#         )

    if export_dict == True:
        return df_file, dict_file
    else:
        return df_file

In [7]:
def prep_GANTNER_dat_txt(data_path, file):
    
    # Extracting date and time
    date, heure = re.findall(r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})', file)[0]
    
    file_path = os.path.join(data_path,file)
    
    # File as df
    df_jour = read_GANTNER_dat_txt(file_path)
    
    # Creating time indexes
    df_jour.loc[:,'Date_Heure'] = pd.date_range(
        start = pd.to_datetime(
            f'{date} {heure}',
            format = '%Y-%m-%d %H-%M-%S'),
        freq ='0.01S', 
        periods = len(df_jour)
        )
    
    # Reseting indexes
    df_jour.index = df_jour['Date_Heure']
    try:
        df_jour = df_jour[['accelero1(x)','accelero2(y)','PT100']].rename(
            columns = {'accelero1(x)':'Capteur_1','accelero2(y)':'Capteur_2',
                       'PT100' : 'Temperature'})
        df_jour = df_jour.apply(lambda x: x.str.replace(',','.')).astype(float)
        return df_jour
    except:
        return pd.DataFrame()

### Création de la base de données des fréquences basées sur les accélérations des capteurs 1 et 2

In [None]:
# Code qui génère Base de données_Fréquences_Capteurs_1_&_2

onlyTXT = [f for f in listdir(pathData) if f.endswith('.txt')]

# Creation du fichier vierge
df_capt_brut = pd.DataFrame(columns = ['Date_Heure','Temperature','Frequence_1_1','Frequence_2_1','Frequence_3_1','Frequence_1_2','Frequence_2_2','Frequence_3_2'])

Date_Heure = len(onlyTXT)*[0]

for i, TXT in enumerate(onlyTXT):
    date, heure = re.findall(r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})', TXT)[0]
    Date_Heure[i] = f'{date} {heure}'

c = len(onlyTXT)

fmin =  0.0  # Hz
fmax = 5.0  # Hz
fs = 100 # Hz - Sampling frequency
# 

# stock = st.st(w, fmin_samples, fmax_samples)


# Ajout des valeurs de fréquence au fichier
for i, TXT in enumerate(tqdm(onlyTXT)):
    
    # Création du dataframe du jour et de l'heure
    date, heure = re.findall(r'(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})', TXT)[0]
    df_jour = prep_GANTNER_dat_txt(pathData, TXT)
    if df_jour.empty:
        pass
    else:
        dtm = (df_jour.index[1]-df_jour.index[0]).total_seconds()
        vtm = np.array((df_jour.index-df_jour.index[0]).total_seconds())
        df_jour.loc[:,["Capteur_1","Capteur_2"]] = df_jour.loc[:,["Capteur_1","Capteur_2"]].apply(detrend)
        df = 1./dtm  # sampling step in frequency domain (Hz)
        fmin_samples = int(fmin/df)
        fmax_samples = int(fmax/df)
        stock = st.st(df_jour["Capteur_1"].values, fmin_samples, fmax_samples)
        
        extent = (vtm[0], vtm[-1], fmin, fmax)
        fig, ax = plt.subplots(2, 1, sharex=True)
        ax[0].plot(vtm, df_jour["Capteur_1"].values)
        ax[0].set(ylabel='amplitude')
        ax[1].imshow(np.abs(stock), origin='lower', extent=extent)
        ax[1].yscale('log')
        ax[1].axis('tight')
        ax[1].set(xlabel='time (s)', ylabel='frequency (Hz)')
        plt.show()
        

  0%|          | 0/957 [00:00<?, ?it/s]

In [None]:
#             y1 = df_jour.Capteur_1.to_list()
#             y2 = df_jour.Capteur_2.to_list()

#             # Performing a FFT on both sets, merging and taking positive frequency bins
#             df_ywf1 = df_FFT(y1, Fs, win = 'boxcar').rename({'yf' : 'Y1'}, axis = 'columns')
#             df_ywf2 = df_FFT(y2, Fs, win = 'boxcar').rename({'yf' : 'Y2'}, axis = 'columns')
#             df_ywf = df_ywf1.merge(df_ywf2, on = 'xf')
#             df_ywf_1 = df_ywf[(df_ywf.xf >= 0.5) & ((df_ywf.xf <= 0.7))]
#             df_ywf_2 = df_ywf[(df_ywf.xf >= 1.0) & ((df_ywf.xf <= 1.6))]
#             df_ywf_3 = df_ywf[(df_ywf.xf >= 2.0) & ((df_ywf.xf <= 2.9))]
#             idx = pd.to_datetime(f'{date} {heure}', format = "%Y-%m-%d %H-%M-%S")

#         # Capteur 1

#         # Ajout des valeurs de Fréquence_1_1
#         if((df_ywf_1.Y1.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_1_1'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks, peak_dict = find_peaks(df_ywf_1.Y1, prominence=(None, min_prominence), distance=100)
#             peaks_max_1 = df_ywf_1.xf.to_numpy()[peaks[np.argsort(peak_dict['prominences'])[::-1]]][:1]

#             if((peaks_max_1.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_1_1'] = np.nan
#             if((peaks_max_1.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_1_1'] = peaks_max_1[0]
#         # Ajout des valeurs de Fréquence_2_1
#         if((df_ywf_2.Y1.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_2_1'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks, peak_dict = find_peaks(df_ywf_2.Y1, prominence=(None, min_prominence), distance=100)
#             peaks_max_1 = df_ywf_2.xf.to_numpy()[peaks[np.argsort(peak_dict['prominences'])[::-1]]][:1]

#             if((peaks_max_1.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_2_1'] = np.nan
#             if((peaks_max_1.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_2_1'] = peaks_max_1[0]
#         # Ajout des valeurs de Fréquence_3_1
#         if((df_ywf_3.Y1.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_3_1'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks, peak_dict = find_peaks(df_ywf_3.Y1, prominence=(None, min_prominence), distance=100)
#             peaks_max_1 = df_ywf_3.xf.to_numpy()[peaks[np.argsort(peak_dict['prominences'])[::-1]]][:1]

#             if((peaks_max_1.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_3_1'] = np.nan
#             if((peaks_max_1.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_3_1'] = peaks_max_1[0]

#         # Capteur 2

#         # Ajout des valeurs de Fréquence_1_2
#         if((df_ywf_1.Y2.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_1_2'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks_2, peak_dict_2 = find_peaks(df_ywf_1.Y2, prominence=(None, min_prominence), distance=100)
#             peaks_max_2 = df_ywf_1.xf.to_numpy()[peaks_2[np.argsort(peak_dict_2['prominences'])[::-1]]][:1]

#             if((peaks_max_2.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_1_2'] = np.nan
#             if((peaks_max_2.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_1_2'] = peaks_max_2[0]
#         # Ajout des valeurs de Fréquence_2_2
#         if((df_ywf_2.Y2.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_2_2'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks_2, peak_dict_2 = find_peaks(df_ywf_2.Y2, prominence=(None, min_prominence), distance=100)
#             peaks_max_2 = df_ywf_2.xf.to_numpy()[peaks_2[np.argsort(peak_dict_2['prominences'])[::-1]]][:1]

#             if((peaks_max_2.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_2_2'] = np.nan
#             if((peaks_max_2.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_2_2'] = peaks_max_2[0]
#         # Ajout des valeurs de Fréquence_3_2
#         if((df_ywf_3.Y2.iloc[0] == 0)):
#             df_capt_brut.loc[idx, 'Date_Heure'] = idx
#             df_capt_brut.loc[idx,'Temperature'] = np.nan
#             df_capt_brut.loc[idx, 'Frequence_3_2'] = np.nan
#         else:
#             min_prominence = 1000
#             peaks_2, peak_dict_2 = find_peaks(df_ywf_3.Y2, prominence=(None, min_prominence), distance=100)
#             peaks_max_2 = df_ywf_3.xf.to_numpy()[peaks_2[np.argsort(peak_dict_2['prominences'])[::-1]]][:1]

#             if((peaks_max_2.size==0)):
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_3_2'] = np.nan
#             if((peaks_max_2.size==1)) :
#                 df_capt_brut.loc[idx, 'Date_Heure'] = idx
#                 df_capt_brut.loc[idx,'Temperature'] = df_jour.Temperature.mean()
#                 df_capt_brut.loc[idx, 'Frequence_3_2'] = peaks_max_2[0]            

In [None]:
# # Changement des types
# df_capt_brut['Date_Heure'] =  pd.to_datetime(df_capt_brut['Date_Heure'], format='%Y%M%D %H:%M:%S')
# df_capt_brut['Temperature'] = df_capt_brut['Temperature'].astype(float)
# df_capt_brut['Frequence_1_1'] = df_capt_brut['Frequence_1_1'].astype(float)
# df_capt_brut['Frequence_2_1'] = df_capt_brut['Frequence_2_1'].astype(float)
# df_capt_brut['Frequence_3_1'] = df_capt_brut['Frequence_3_1'].astype(float)
# df_capt_brut['Frequence_1_2'] = df_capt_brut['Frequence_1_2'].astype(float)
# df_capt_brut['Frequence_2_2'] = df_capt_brut['Frequence_2_2'].astype(float)
# df_capt_brut['Frequence_3_2'] = df_capt_brut['Frequence_3_2'].astype(float)

# # Création de la base de données en .txt et .csv 
# df_capt_brut.sort_values(by = 'Date_Heure').to_csv(r'Base_de_données_des_Fréquences/Base de données_Fréquences_Capteurs_1_&_2.txt', sep="\t", index=False)
# df_capt_brut.sort_values(by = 'Date_Heure').to_csv(r'Base_de_données_des_Fréquences/Base de données_Fréquences_Capteurs_1_&_2.csv', index=False)

In [None]:
%lsmagic