In [29]:
import os
from multiprocessing import Pool

import numpy as np
import pandas as pd
import plotly.graph_objects as go

from scipy.signal import find_peaks
from Tools.DataChecker import DataHandler

In [30]:
TEST_PATH = "/home/gabrielmc/Documentos/LINSE/Dados"

DISTANCIA_PICOS = 10

In [31]:
def achar_feathers(pasta: str|os.PathLike) -> list[str]:
    buffer = []
    def loop(path):
        for i in os.listdir(path):
            p = os.path.join(path, i)
            if os.path.isdir(p):
                loop(p)
            elif i.endswith(".feather"):
                buffer.append(p)
    loop(pasta)
    
    return buffer

In [32]:
def atuador(path: str|os.PathLike) -> int:
    file_name = os.path.basename(path)
    atuador = int(file_name.split("_")[1][1])
    
    return atuador

In [33]:
def desgaste(path: str|os.PathLike) -> int:
    dir_path = os.path.dirname(path)
    dir = dir_path[dir_path.rfind('Viga'):]
    desgaste_lvl = 0 if dir.find("Intacta") > 0 else int(dir[-1])
    
    return desgaste_lvl

In [34]:
def sort_by_desgaste(feather_paths: list[str|os.PathLike]) -> list[list[str]]:
    sorted_paths = [[None] for _ in range(8)]
    for path in feather_paths:
        d = desgaste(path)
        if sorted_paths[d][0] is None:
            sorted_paths[d][0] = path
        else:
            sorted_paths[d].append(path)
    
    return sorted_paths

In [35]:
def achar_ressonancias(feather: str|os.PathLike) -> list[int]:
    dac_n = atuador(feather)
    
    data = DataHandler(feather, dac=f'dac{dac_n}', imu='imu1accz')
    data.generate_fir_freq()
    fir_freq = np.array(data.fir_freq)

    altura = np.mean(fir_freq[0]) + abs(np.mean(fir_freq[0]) * .33)
    indices_picos = find_peaks(fir_freq[0], height=altura, width=DISTANCIA_PICOS)[0]
    frequencias = [fir_freq[1, i] for i in indices_picos]
    
    return frequencias

In [36]:
all_feathers = achar_feathers(TEST_PATH)

In [37]:
sorted_feathers = sort_by_desgaste(all_feathers)

In [38]:
dfs = []
for desgaste_lvl, feathers in enumerate(sorted_feathers):
    if feathers == [None]:
        continue
    with Pool() as pool:
        ressonancias = [i for i in pool.imap_unordered(achar_ressonancias, feathers)]
        index = pd.MultiIndex.from_product([[desgaste_lvl], range(len(ressonancias))],
                                           names= ["Desgaste", None])
        dfs.append(pd.DataFrame(ressonancias, index=index))
df_ressonancias = pd.concat(dfs)
df_ressonancias