In [24]:
import pandas as pd
import pyarrow
import os
from pyarrow import dataset as ds
def carregar_dados():
    path = "/home/leo/airflow/Projeto/data/Gold"

    if not os.path.exists(path):
        # st.warning(f"‚ö†Ô∏è Caminho n√£o encontrado: {path}")
        return pd.DataFrame()
    try:
        # Usa pyarrow.dataset para ler todas as parti√ß√µes
        dataset = ds.dataset(path, format="parquet", partitioning="hive")
        df = dataset.to_table().to_pandas()
        # st.success("‚úÖ Dados carregados com sucesso!")
        return df
    except Exception as e:
        # st.error(f"‚ö†Ô∏è Erro ao carregar dados particionados: {e}")
        return pd.DataFrame()

# üü° Aqui voc√™ chama a fun√ß√£o e armazena o resultado
df = carregar_dados()


In [1]:
import os
import zipfile
import pandas as pd
from loguru import logger

KAGGLE_DATASET = "janiobachmann/bank-marketing-dataset"
OUTPUT_DIR = "/home/leo/airflow/data/kaggle_data/bank_marketing"

def baixar_dados_bank_marketing():
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    logger.info("Baixando Bank Marketing dataset...")
    os.system(f"kaggle datasets download -d {KAGGLE_DATASET} -p {OUTPUT_DIR}")
    for f in os.listdir(OUTPUT_DIR):
        if f.endswith(".zip"):
            with zipfile.ZipFile(os.path.join(OUTPUT_DIR, f), 'r') as z:
                z.extractall(OUTPUT_DIR)
            os.remove(os.path.join(OUTPUT_DIR, f))
    logger.success("Download e extra√ß√£o conclu√≠dos.")

def converter_parquet():
    df = pd.read_csv(os.path.join(OUTPUT_DIR, "bank.csv"))
    df['data_part'] = pd.to_datetime('now').strftime('%Y-%m-%d')
    df.to_parquet(f"{OUTPUT_DIR}/bank_marketing.parquet",index=False)
    logger.success("Bank Marketing salvo como Parquet.")

if __name__ == "__main__":
    baixar_dados_bank_marketing()
    converter_parquet()

[32m2025-08-15 22:47:00.050[0m | [1mINFO    [0m | [36m__main__[0m:[36mbaixar_dados_bank_marketing[0m:[36m11[0m - [1mBaixando Bank Marketing dataset...[0m


Downloading bank-marketing-dataset.zip to /home/leo/airflow/data/kaggle_data/bank_marketing


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 142k/142k [00:00<00:00, 420kB/s]
[32m2025-08-15 22:47:03.358[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mbaixar_dados_bank_marketing[0m:[36m18[0m - [32m[1mDownload e extra√ß√£o conclu√≠dos.[0m





[32m2025-08-15 22:47:04.849[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mconverter_parquet[0m:[36m24[0m - [32m[1mBank Marketing salvo como Parquet.[0m


In [18]:
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from scripts.silver_cleaner import limpar_e_enriquecer_dados_silver  # ou fun√ß√µes separadas

@pytest.fixture(scope="module")
def spark():
    spark = SparkSession.builder.master("local[1]").appName("TestSilver").getOrCreate()
    yield spark
    spark.stop()

def test_filtros_basicos(spark):
    # Supondo que voc√™ tenha uma fun√ß√£o separada que retorna df limpo
    df = limpar_e_enriquecer_dados_silver()
    
    # Verifica duplicatas
    ids = [row.id_transacao for row in df.select("id_transacao").collect()]
    assert len(ids) == len(set(ids))
    
    # Verifica valores positivos
    assert df.filter(col("valor") <= 0).count() == 0
    
    # Verifica CPF v√°lido
    assert df.filter(~col("cpf").contains(".")).count() == 0
    assert df.filter(~col("cpf").contains("-")).count() == 0
    assert df.filter(col("cpf").rlike(r"\d{3}\.\d{3}\.\d{3}-\d{2}") == False).count() == 0

def test_bancos_nome(spark):
    df = limpar_e_enriquecer_dados_silver()
    bancos_validos = ["Banco do Brasil","Bradesco","Caixa Econ√¥mica Federal","Ita√∫ Unibanco","Santander","Desconhecido"]
    assert df.select("banco_origem_nome").distinct().rdd.flatMap(lambda x: x).collect() <= set(bancos_validos)
    assert df.select("banco_destino_nome").distinct().rdd.flatMap(lambda x: x).collect() <= set(bancos_validos)


ModuleNotFoundError: No module named 'scripts'