# O objetivo deste notebook é realizar a leitura de arquivos parquets extraídos de um arquivo ZIP de uma única aeronave e agregar todos os arquivos .parquet em um único arquivo parquet

### Importação das bibliotecas

In [1]:

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import os
import dask.dataframe as dd

### Leitura dos arquivos parquets

In [3]:
# Abrindo todos parquets para checar as falhas

directory = "C:/Users/eduar/Downloads/failures/"

dir_list = os.listdir(directory)

for file in dir_list:
    path = directory + file
    
    df = pd.read_parquet(path, engine='auto')


    columnA = "message0418DAA-1"
    columnB = "message0422DAA-1"
    column_a_unique = df["message0418DAA-1"].unique()
    column_b_unique = df["message0422DAA-1"].unique()


    print('parquet :', file, 'coluna a :',column_a_unique, 'coluna b :', column_b_unique)

parquet : TCRF_ARCHIVE_06120033_20220527183941.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220527212542.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220527234737.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528013940.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528053141.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528084241.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528125041.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528153541.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528182441.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_06120033_20220528233241.parquet coluna a : [nan  0.] coluna b : [nan  0.]
parquet : TCRF_ARCHIVE_0612003

### Concanteção dos arquivos parquets utilizando a API Dask considerando apenas as colunas filtradas

In [4]:
def concatenate_and_count_rows_in_parquet_files(file_path, filtered_cols):
    total_rows = 0
    list_of_dfs = []  # Lista para armazenar os DataFrames lidos

    # Lista de arquivos Parquet no diretório especificado
    parquet_files = [file for file in os.listdir(file_path) if file.endswith('.parquet')]

    # Loop para ler cada arquivo Parquet, contar as linhas e armazenar o DataFrame na lista
    for file_name in parquet_files:
        full_path = os.path.join(file_path, file_name)
        df = dd.read_parquet(full_path, columns=filtered_cols)
        total_rows += len(df)
        list_of_dfs.append(df)
        print(f"Arquivo {file_name} tem {len(df)} linhas.")

    # Concatenar todos os Dask DataFrames em um único Dask DataFrame
    concatenated_df = dd.concat(list_of_dfs, interleave_partitions=True)

    return concatenated_df, total_rows

# Uso da função
file_path = "C:/Users/eduar/Downloads/failures/"
filtered_cols = ['recording_time', 'dateDay-1', 'dateMonth-1', 'dateYear-1',
                 'phaseOfFlight-1', 'message0418DAA-1', 'message0422DAA-1', 
                 'bleedFavTmCmd-1a', 'bleedFavTmCmd-1b', 'bleedFavTmCmd-2a', 
                 'bleedFavTmCmd-2b', 'bleedFavTmFbk-1a', 'bleedFavTmFbk-1b', 
                 'bleedFavTmFbk-2b', 'bleedHprsovCmdStatus-1a', 'bleedHprsovCmdStatus-1b',
                 'bleedHprsovCmdStatus-2a', 'bleedHprsovCmdStatus-2b', 'bleedHprsovOpPosStatus-1a',
                 'bleedHprsovOpPosStatus-1b', 'bleedHprsovOpPosStatus-2a', 'bleedHprsovOpPosStatus-2b', 
                 'bleedMonPress-1a', 'bleedMonPress-1b', 'bleedMonPress-2a', 'bleedMonPress-2b', 
                 'bleedOnStatus-1a', 'bleedOnStatus-1b']



resulting_df, total_rows = concatenate_and_count_rows_in_parquet_files(file_path, filtered_cols)
print(f"O número total de linhas em todos os arquivos é: {total_rows}")

Arquivo TCRF_ARCHIVE_06120033_20220527183941.parquet tem 159741 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220527212542.parquet tem 157541 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220527234737.parquet tem 81301 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528013940.parquet tem 84881 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528053141.parquet tem 152241 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528084241.parquet tem 178021 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528125041.parquet tem 105941 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528153541.parquet tem 129501 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528182441.parquet tem 116541 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220528233241.parquet tem 122541 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220529023739.parquet tem 112561 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220529102844.parquet tem 122341 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220529194201.parquet tem 15201 linhas.
Arquivo TCRF_ARCHIVE_06120033_20220529194202.parquet tem 112881 lin

#### Remoção de linhas duplicadas e remoção de valores das colunas de data

In [5]:
def remove_duplicates_except_record_time(dask_df):
    """
    Remove linhas duplicadas com base em todas as colunas, exceto 'record_time'.

    Parâmetros:
    - dask_df: DataFrame Dask de entrada.

    Retorna:
    - DataFrame Dask com linhas duplicadas removidas.
    """

    # Crie uma lista de colunas a serem consideradas ao identificar duplicatas
    columns_to_consider = [col for col in dask_df.columns if col != 'record_time']

    # Use o método drop_duplicates do Dask para remover duplicatas com base nas colunas especificadas
    #Mantém o primeiro valor duplicado por default
    unique_df = dask_df.drop_duplicates(subset=columns_to_consider)

    return unique_df

# Uso da função
unique_df = remove_duplicates_except_record_time(resulting_df)

In [7]:
# Preenchendo com NaN as datas vazias
data_df = unique_df

columns_to_fill = ['dateYear-1', 'dateMonth-1', 'dateDay-1']
data_df[columns_to_fill] = data_df[columns_to_fill].fillna(0)

In [6]:
data_df

Unnamed: 0_level_0,recording_time,dateDay-1,dateMonth-1,dateYear-1,phaseOfFlight-1,message0418DAA-1,message0422DAA-1,bleedFavTmCmd-1a,bleedFavTmCmd-1b,bleedFavTmCmd-2a,bleedFavTmCmd-2b,bleedFavTmFbk-1a,bleedFavTmFbk-1b,bleedFavTmFbk-2b,bleedHprsovCmdStatus-1a,bleedHprsovCmdStatus-1b,bleedHprsovCmdStatus-2a,bleedHprsovCmdStatus-2b,bleedHprsovOpPosStatus-1a,bleedHprsovOpPosStatus-1b,bleedHprsovOpPosStatus-2a,bleedHprsovOpPosStatus-2b,bleedMonPress-1a,bleedMonPress-1b,bleedMonPress-2a,bleedMonPress-2b,bleedOnStatus-1a,bleedOnStatus-1b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1
,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


### Salvando o DataFrame agregado em um novo arquivo parquet

In [None]:
# Exportando o parquet
data_df.to_parquet('failure_window.parquet')
