In [1]:
# Importing the libraries needed
import pandas as pd
import numpy as np
import gc
import os
import copy

In [45]:
# Creating a dictionary to map the column names to the aggregation method names
methods = {
    'amscHprsovDrivF-1a': 'sum',
    'amscHprsovDrivF-1b': 'sum',
    'amscHprsovDrivF-2b': 'sum',
    'amscPrsovDrivF-1a': 'sum',
    'amscPrsovDrivF-1b': 'sum', 
    'amscPrsovDrivF-2b': 'sum',
    'basBleedLowPressF-1a': 'sum',
    'basBleedLowPressF-2b': 'sum',
    'basBleedLowTempF-1a': 'sum',
    'basBleedLowTempF-2b': 'sum',
    'basBleedOverPressF-1a': 'sum', 
    'basBleedOverPressF-2b': 'sum',
    'basBleedOverTempF-1a': 'sum', 
    'basBleedOverTempF-2b': 'sum',
    'bleedFavTmCmd-1a': 'mode', 
    'bleedFavTmCmd-1b': 'mode',
    'bleedFavTmCmd-2a': 'mode', 
    'bleedFavTmCmd-2b': 'mode', 
    'bleedFavTmFbk-1a': 'mode',
    'bleedFavTmFbk-1b': 'mode', 
    'bleedFavTmFbk-2b': 'mode', 
    'bleedHprsovCmdStatus-1a': 'sum',
    'bleedHprsovCmdStatus-1b': 'sum', 
    'bleedHprsovCmdStatus-2a': 'sum',
    'bleedHprsovCmdStatus-2b': 'sum', 
    'bleedHprsovOpPosStatus-1a': 'sum',
    'bleedHprsovOpPosStatus-1b': 'sum', 
    'bleedHprsovOpPosStatus-2a': 'sum',
    'bleedHprsovOpPosStatus-2b': 'sum', 
    'bleedMonPress-1a': 'mode',
    'bleedMonPress-1b': 'mode', 
    'bleedMonPress-2a': 'mode', 
    'bleedMonPress-2b': 'mode',
    'bleedOnStatus-1a': 'sum', 
    'bleedOnStatus-1b': 'sum', 
    'bleedOnStatus-2b': 'sum',
    'bleedOverpressCas-2a': 'sum', 
    'bleedOverpressCas-2b': 'sum',
    'bleedPrecoolDiffPress-1a': 'mean', 
    'bleedPrecoolDiffPress-1b': 'mean',
    'bleedPrecoolDiffPress-2a': 'mean', 
    'bleedPrecoolDiffPress-2b': 'mean',
    'bleedPrsovClPosStatus-1a': 'sum',
    'bleedPrsovClPosStatus-2a': 'sum',
    'bleedPrsovFbk-1a': 'sum',
    'bleedPrsovFbk-1b': 'sum',
    'bleedPrsovFbk-2b': 'sum',
    'flightMinutes': 'sum',
    'message0418DAA-1': 'sum',
    'message0422DAA-1': 'sum',
}

In [22]:
# Agregation of the colunms based on the method
def process_column(col_name, method, dataframe):
    if method == 'max':
        return dataframe[col_name].max()
    elif method == 'sum':
        return dataframe[col_name].sum()
    elif method == 'answer':
        dataframe[col_name] = dataframe[col_name].apply(lambda x: 1 if x != 0 and pd.notnull(x) else 0)
        return dataframe[col_name].sum()
    elif method == 'mode':
        return dataframe[col_name].mode()[0]
    elif method == 'mean':
        return dataframe[col_name].mean()
    else:
        raise ValueError(f"Unknown method {method} for column {col_name}")

In [46]:
def aggregate_error_data(dirpath, methods, output_directory):
    # Formatting the output file name
    output_file = os.path.join(output_directory, os.path.basename(dirpath) + "_error_aggregated.parquet")

    # Initialize an empty DataFrame for collecting rows and one for storing aggregated results
    columns = list(methods.keys())
    temp_df = pd.DataFrame(columns=columns)
    aggregated_data = []

    # Loop through all files in the directory and process them
    for filename in os.listdir(dirpath):
        if filename.endswith('.parquet'):
            file_path = os.path.join(dirpath, filename)
            try:
                # Load the file
                df = pd.read_parquet(file_path)
                print(f"Processing file {file_path}")

                for index, row in df.iterrows():
                    if row['message0418DAA-1'] != 0 or row['message0422DAA-1'] != 0:
                        temp_df.loc[len(temp_df) + 1] = row
                        # Process the temp_df using the process_column function
                        summary_data = {col: process_column(col, method, temp_df) for col, method in methods.items()}
                        aggregated_data.append(summary_data)
                        
                        del temp_df
                        gc.collect()
                        # Clear the temp_df
                        temp_df = pd.DataFrame(columns=columns)
                    else:
                        # Append the row to the temp_df
                        temp_df.loc[len(temp_df) + 1] = row

                # Cleaning up the memory
                del df
                gc.collect()

            except Exception as e:
                print(f"Error reading the file {file_path}: {e.message}")

    # If there's any remaining data in temp_df, process it
    if not temp_df.empty:
        summary_data = {col: process_column(col, method, temp_df) for col, method in methods.items()}
        aggregated_data.append(summary_data)

    # Convert the aggregated data to a DataFrame and save it
    result_df = pd.DataFrame(aggregated_data)
    result_df.to_parquet(output_file)

    # Cleaning up the memory
    del result_df
    gc.collect()


In [47]:
# Paths to the data files
directory_path = "/Users/henriquematias/Documents/GitHub/Inteli-Modulo-7/Projeto_Grupo1_Inteli_Azul/notebooks/Output/Voos_em_1_linha"
output_directory = "/Users/henriquematias/Documents/GitHub/Inteli-Modulo-7/Projeto_Grupo1_Inteli_Azul/notebooks/Output/Erros_agregados"

# Making sure that the output directory exists
os.makedirs(output_directory, exist_ok=True)

aggregate_error_data(directory_path, methods, output_directory)

# # Passing through all the files in all directories specified in the directory_path
# for dirpath, dirnames, filenames in os.walk(directory_path):
#     process_files_in_directory(dirpath, methods)

Processing file /Users/henriquematias/Documents/GitHub/Inteli-Modulo-7/Projeto_Grupo1_Inteli_Azul/notebooks/Output/Voos_em_1_linha/06120021_consolidated.parquet
   amscHprsovDrivF-1a  amscHprsovDrivF-1b  amscHprsovDrivF-2b  \
1                 0.0                 0.0                 0.0   
2                 0.0                 0.0                 0.0   
3                 0.0                 0.0                 0.0   
4                 0.0                 0.0                 0.0   
5                 0.0                 0.0                 0.0   

   amscPrsovDrivF-1a  amscPrsovDrivF-1b  amscPrsovDrivF-2b  \
1                0.0                0.0                0.0   
2                0.0                0.0                0.0   
3                0.0                0.0                0.0   
4                0.0                0.0                0.0   
5                0.0                0.0                0.0   

   basBleedLowPressF-1a  basBleedLowPressF-2b  basBleedLowTempF-1a  \
1    