In [None]:
#| default_exp datasources.base
%load_ext autoreload
%autoreload 2

import sys,os
from pathlib import Path

In [None]:
# Insert in Path Project Directory
sys.path.insert(0, str(Path().cwd().parent))
os.chdir(Path.cwd().parent / "extracao")

# Classe Base
> Módulo para encapsular a extração e processamento comum às diferentes fontes de dados

In [None]:
# | export
import re
from dataclasses import dataclass
from functools import cached_property
from typing import Tuple, Union, List

import pandas as pd
from dotenv import find_dotenv, load_dotenv
from fastcore.xtras import Path, listify
from pyarrow import ArrowInvalid, ArrowTypeError

from extracao.constants import BW, RE_BW

In [None]:
# | export
load_dotenv(find_dotenv())

True

In [None]:
# | hide: true
# | eval:false
__file__ = Path.cwd().parent / "extracao" / "datasources.py"

In [None]:
#| export
@dataclass
class Base:
    folder: Union[str, Path] = Path(__file__).parent / "arquivos" / "saida"

    def _read(self, stem: str) -> pd.DataFrame:
        """Lê o dataframe formado por self.folder / self.stem.parquet.gzip"""
        file = Path(f"{self.folder}/{stem}.parquet.gzip")
        try:
            df = pd.read_parquet(file)
        except (ArrowInvalid, FileNotFoundError) as e:
            raise ValueError(f"Error when reading {file}") from e
        return df

    def _save(
        self, df: pd.DataFrame, folder: Union[str, Path], stem: str
    ) -> pd.DataFrame:
        """Format, Save and return a dataframe"""
        df = df.astype("string").drop_duplicates(keep="first", ignore_index=True)
        try:
            file = Path(f"{folder}/{stem}.parquet.gzip")
            df.to_parquet(file, compression="gzip", index=False)
        except (ArrowInvalid, ArrowTypeError) as e:
            raise e(f"Não foi possível salvar o arquivo parquet {file}") from e
        return df

    @cached_property
    def df(self) -> pd.DataFrame:
        try:
            df = self._read(self.stem)
        except (ArrowInvalid, FileNotFoundError):
            df = self._format(self.extraction)
        return df

    @staticmethod
    def parse_bw(
        bw: str,  # Designação de Emissão (Largura + Classe) codificada como string
    ) -> Tuple[str, str]:  # Largura e Classe de Emissão
        """Parse the bandwidth string"""
        if match := re.match(RE_BW, bw):
            multiplier = BW[match[2]]
            if mantissa := match[3]:
                number = float(f"{match[1]}.{mantissa}")
            else:
                number = float(match[1])
            classe = match[4]
            return str(multiplier * number), str(classe)
        return pd.NA, pd.NA

    @cached_property
    def discarded(self) -> pd.DataFrame:
        df = pd.DataFrame(columns=self.columns)
        df["Log"] = ""
        return df

    def append2discarded(self, dfs: Union[pd.DataFrame, List]) -> None:
        """Receives one of more dataframes and append to the discarded dataframe"""
        self.discarded = pd.concat([self.discarded] + listify(dfs), ignore_index=True)

    @staticmethod
    def register_log(df: pd.DataFrame, log: str, row_filter: pd.Series = None):
        """Register a log in the dataframe"""
        if row_filter is None:
            row_filter = pd.Series(True, index=df.index)
        df.loc[row_filter, "Log"] = df.loc[row_filter, "Log"] + "|" + log
        return df

    @property
    def columns(self):
        raise NotImplementedError(
            "Subclasses devem implementar a propriedade 'columns'"
        )

    @property
    def cols_mapping(self):
        raise NotImplementedError(
            "Subclasses devem implementar a propriedade 'cols_mapping'"
        )

    @property
    def stem(self):
        raise NotImplementedError("Subclasses devem setar a propriedade stem!")

    def extraction(self) -> pd.DataFrame:
        raise NotImplementedError("Subclasses devem implementar o método extract")

    def _format(
        self,
        df: pd.DataFrame,  # DataFrame com os dados de Estações
    ) -> pd.DataFrame:  # DataFrame formatado
        """Formata, limpa e padroniza os dados provenientes da query no banco"""
        raise NotImplementedError("Subclasses devem implementar o método _format")

    def update(self):
        self.df = self._format(self.extraction)

    def save(self, folder: Union[str, Path] = None):
        if folder is None:
            folder = self.folder
        self._save(self.df, folder, self.stem)
        self._save(self.discarded, folder, f"{self.stem}_discarded")

In [None]:
# | hide
from nbdev.doclinks import nbdev_export

nbdev_export()

In [None]:
listify(pd.DataFrame())

[Empty DataFrame
 Columns: []
 Index: []]