In [10]:
import os
import pandas as pd
import glob
from typing import List, Literal
import json

In [160]:
class IngestionBronzeAirport:
    """
    A class to load the data from airports.

    Attributes
    ----------
    source : Literal['VRA', 'AIR_CIA', 'icaos']
        The source of the data.
    output_format: Literal['csv', 'parquet', 'json']
        Format of the output file.
    folder_path_incremental : str
        The path of the incremental folder.
    folder_path_history : str
        The path of the history folder.
    file_format : Literal['csv', 'json', 'parquet']
        Format of the input file.
    pks: List[str]
        The primary keys of the table.
    order_by: str
        The order to filter the table.
        
    Methods
    -------
    get_file_paths() -> List[str]:
        Get a list of all files in the folder with the given file_format.
    get_df_from_csv(file_paths: List[str], **kwargs) -> List[pd.DataFrame] :
        Load the file from a CSV file to a dataframe.
    get_df_from_json(file_paths: List[str], **kwargs) -> List[pd.DataFrame] :
        Load the file from a JSON file to a dataframe.
    get_df_from_parquet(file_paths: List[str], **kwargs) -> List[pd.DataFrame] :
        Load the file from a JSON file to a dataframe.
    move_file(file_path: str) -> None :
        Moves the staged file when the extraction is completed.
    extract(file_paths: List[str], **kwargs) -> List[pd.DataFrame] :
        Extract the data from a file and return a dataframe.
    consolidate_dfs(self, dfs: List[pd.DataFrame]) -> pd.DataFrame :
        Concatenate a list of dataframes into a single dataframe.
    upsert(self, df: pd.DataFrame) -> pd.DataFrame:
        Selects only the most up to date rows from the dataframe.
    transform(df: pd.DataFrame) -> pd.DataFrame :
        Apply all transformations to the list of dataframes 
        and consolidates as a single one.
    save_file(df: pd.DataFrame, source: str | None, **kwargs) -> None:
        Saves the dataframe in the specified file format.
    process_incremental() -> pd.DataFrame :
        Process the data from incremental path.
    process_history() -> pd.DataFrame :
        Process the data from history path.
    process_data() -> None :
        Add the data from folder incremental with the history.
    """

    def __init__(self, 
                 source: Literal['VRA', 'AIR_CIA', 'icaos'],
                 output_format: Literal['csv', 'parquet', 'json'] = 'parquet'
                 ) -> None:
        """
        Constructs all the necessary atributes for the airport data object.

        Parameters
        ----------
        source : Literal['VRA', 'AIR_CIA', 'icaos']
            The source of the data.
        output_format: Literal['csv', 'parquet', 'json']
            Format of the output file.
        """

        self.source = source
        self.output_format = output_format

        with open('config.json') as f:
            config = json.load(f)

        self.folder_path_incremental = config[source]['folder_path_incremental']
        self.folder_path_history = config[source]['folder_path_history']
        self.file_format = config[source]['file_format']
        self.pks = config[source]['pks']
        self.order_by = config[source]['order_by']


    def get_file_paths(self, folder_path: str) -> List[str]:
        """
        Get a list of all files in the folder with the given file_format.

        Parameters
        ----------
        folder_path : str
            The path to the folder

        Returns
        -------
        List[str]
            A list of strings.
        """

        file_paths = glob.glob(os.path.join(f'{folder_path}/{self.file_format}', f"*.{self.file_format}"))
        if not file_paths:
             print(f"No {self.file_format} files found in the folder {folder_path}")
        
        return file_paths
    
    
    def get_df_from_csv(self, file_paths: List[str], **kwargs) -> List[pd.DataFrame]:
        """
        Load the file from a CSV file to a dataframe

        Parameters
        ----------
        file_paths : List[str]
            A list of paths to the CSV files.
        kwargs : kwargs
            Keyword arguments.
        Returns
        -------
        List[pd.DataFrame]
            A list of dataframes.
        """

        return [pd.read_csv(path, **kwargs) for path in file_paths]
        
    
    
    def get_df_from_json(self, file_paths: List[str], **kwargs) -> List[pd.DataFrame]:
        """
        Load the file from a JSON file to a dataframe

        Parameters
        ----------
        file_paths : List[str]
            A list of paths to the JSON files.
        kwargs : kwargs
            Keyword arguments.
        Returns
        -------
        List[pd.DataFrame]
            A list of dataframes.
        """

        return [pd.read_json(path, **kwargs) for path in file_paths]
        

    def get_df_from_parquet(self, file_paths: List[str], **kwargs) -> List[pd.DataFrame]:
        """
        Load the file from a parquet file to a dataframe

        Parameters
        ----------
        file_paths : List[str]
            A list of paths to the parquet files.
        kwargs : kwargs
            Keyword arguments.
        Returns
        -------
        List[pd.DataFrame]
            A list of dataframes.
        """

        return [pd.read_parquet(path, **kwargs) for path in file_paths]
    
    
    def move_file(self, file_path: str) -> None:
        """
        Moves the staged file when the extraction is completed.

        Parameters
        ----------
        file_path : str
            File path to the source file.
        
        Returns
        -------
        None
        """

        new_path = file_path.replace('staged', 'raw')
        folder = '/'.join(new_path.split('/')[:-1])
        if not os.path.exists(folder):
            os.makedirs(folder)
        os.rename(file_path, new_path)
    

    def extract(self, file_paths: List[str], **kwargs) -> List[pd.DataFrame]:
        """
        Extract the data from a file and return a dataframe.

        Parameters
        ----------
        file_paths : List[str]
            A list of paths to the files.
        kwargs : dict
            Keyword arguments.

        Returns
        -------
        List[pd.DataFrame]
            A list of dataframes.
        """

        
        dfs = getattr(self, f'get_df_from_{self.file_format}')(file_paths, **kwargs)
        for path in file_paths:
            if 'staged' in path:
                self.move_file(path)

        return dfs
    
    
    def consolidate_dfs(self, dfs: List[pd.DataFrame]) -> pd.DataFrame:
        """
        Concatenate a list of dataframes into a single dataframe.

        Parameters
        ----------
        dfs : List[pd.DataFrame]
            A list of dataframes to concatenate.

        Returns
        -------
        pd.DataFrame
            A single dataframe.
        """

        return pd.concat(dfs)
    
    def upsert(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Selects only the most up to date rows from the dataframe.

        Parameters
        ----------
        df : pd.DataFrame
            A dataframe to be upserted.

        Returns
        -------
        pd.DataFrame
            a DataFrame with the most up to date rows.
        """

        df = (
                df
                .sort_values(by=[self.order_by], ascending=False)
                .drop_duplicates(subset=self.pks, keep='first')
            )
        
        return df
    

    def transform(self, dfs: List[pd.DataFrame]) -> pd.DataFrame:
        """
        Apply all transformations to the list of dataframes 
        and consolidates as a single one.

        Parameters
        ----------
        dfs : List[pd.DataFrame]
            A list of dataframes to transforme.

        Returns
        -------
        pd.DataFrame
            A dataframe with all transformations applied.
        """

        df = self.consolidate_dfs(dfs)
        df = self.upsert(df)

        return df
        
    
    def save_file(self, df: pd.DataFrame, **kwargs) -> None:
        """
        Saves the dataframe in the specified file format.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe to be saved as a file.
        source : str |  None (optional)
            source of the data
        kwargs : kwargs
            Keyword arguments

        Returns
        -------
        None
        """
            
        if not os.path.exists(self.folder_path_history):
            os.makedirs(self.folder_path_history)
        
        filepath = f'{self.folder_path_history}/bronze_{self.source}.{self.output_format}'

        if self.output_format == 'json' and not kwargs:
            kwargs = {'orient': 'records', 'indent': 4}
            
        getattr(df, f'to_{self.output_format}')(filepath, index=False, **kwargs)


    def process_incremental(self) -> pd.DataFrame:
        """
        Process the data from incremental path.

        Returns
        -------
        pd.DataFrame
            A dataframe with all the data from the incremental path
        """

        file_paths_incr = self.get_file_paths(self.folder_path_incremental)
        
        if file_paths_incr:
            dfs_incr = self.extract(file_paths=file_paths_incr)
            df_incr = self.transform(dfs_incr)
            return df_incr
    

    def process_history(self) -> pd.DataFrame:
        """
        Process the data from history path.

        Returns
        -------
        pd.DataFrame
            A dataframe with all the data from the history path
        """

        file_paths_hist = self.get_file_paths(self.folder_path_history)
        
        if file_paths_hist:
            dfs_hist = self.extract(file_paths=file_paths_hist)
            df_hist = self.transform(dfs_hist)
            return df_hist
    

    def process_data(self) -> None:
        """
        Add the data from folder incremental with the history.

        Returns
        -------
        None
        """
        
        df_incr = self.process_incremental()
        df_hist = self.process_history()
        df = self.transform([df_incr, df_hist])
        self.save_file(df)

In [161]:
ing = IngestionBronzeAirport(
        source='icaos'
    )

ing.process_data()

No parquet files found in the folder /Users/USUARIO/Library/CloudStorage/OneDrive-Personal/Documentos/Casos/Eleflow/de_case_response/data/bronze/icaos


In [143]:
sources = ['VRA', 'AIR_CIA', 'icaos']

for source in sources:
    ing = IngestionBronzeAirport(
        source=source
    )
    ing.process_data()

No parquet files found in the folder /Users/USUARIO/Library/CloudStorage/OneDrive-Personal/Documentos/Casos/Eleflow/de_case_response/data/loaded/VRA
No parquet files found in the folder /Users/USUARIO/Library/CloudStorage/OneDrive-Personal/Documentos/Casos/Eleflow/de_case_response/data/loaded/AIR_CIA
No parquet files found in the folder /Users/USUARIO/Library/CloudStorage/OneDrive-Personal/Documentos/Casos/Eleflow/de_case_response/data/loaded/icaos
