### Pipeline Raw to Silver

In [1]:
import pandas as pd 
import pandera.pandas as pa # Validação de dados de DataFrames (semelhante ao Pydantic)
from pathlib import Path
import psycopg2
from psycopg2 import extras

#### Definições e Configurações

In [20]:
# Definição de caminhos 
DATA_PATH: Path = Path("../Data Layer/silver/transactions_cards_users_mcc_fraud.csv")
DDL_PATH: Path = Path("scripts/transactions_cards_users_mcc_fraud.sql")

# Parâmetros de conexão do Postgres

conn_params: dict = {
  "host": "localhost",
  "port": 5433,
  "dbname": "transactions",
  "user": "admin",
  "password": "admin"
}

schema: pa.DataFrameSchema = pa.DataFrameSchema(
  columns={   
    "transaction_id": pa.Column(
      pa.Int64,
      nullable=False,
      checks=pa.Check.greater_than(0)
    ),
    "date": pa.Column(
      pa.DateTime, 
      nullable=False
    ),
    "client_id": pa.Column(
      pa.Int64,
      nullable=False
    ),
    "card_id": pa.Column(
      pa.Int64,
      nullable=True
    ),
    "amount": pa.Column(
      pa.Float, 
      nullable=True,
    ),
    "mcc": pa.Column(
      pa.Int16,
      nullable=False
    ),
    "use_chip": pa.Column(pa.String, nullable=True),
    "merchant_id": pa.Column(pa.String, nullable=True),
    "merchant_city": pa.Column(pa.String, nullable=True),
    "merchant_state": pa.Column(pa.String, nullable=True),
    "zip": pa.Column(
      pa.Int32, 
      nullable=True, 
    ),
    "errors": pa.Column(pa.String, nullable=True),
    "card_brand": pa.Column(pa.String, nullable=True),
    "card_type": pa.Column(pa.String, nullable=True),
    "card_number": pa.Column(pa.String, nullable=True),
    "expires": pa.Column(pa.DateTime, nullable=True),
    "cvv": pa.Column(
      pa.Int32, 
      nullable=True,
    ),
    "has_chip": pa.Column(pa.String, nullable=True),
    "num_cards_issued": pa.Column(pa.Int16, nullable=True), 
    "credit_limit": pa.Column(pa.Float, nullable=True), 
    "acct_open_date": pa.Column(pa.DateTime, nullable=True),
    "year_pin_last_changed": pa.Column(pa.String, nullable=True), 
    "card_on_dark_web": pa.Column(pa.String, nullable=True),
        
    "current_age": pa.Column(pa.Int16, nullable=True),
    "retirement_age": pa.Column(pa.Int16, nullable=True),
    "birth_year": pa.Column(pa.Int16, nullable=True),
    "birth_month": pa.Column(pa.Int16, nullable=True), 
    "gender": pa.Column(pa.String, nullable=True),
    "address": pa.Column(pa.String, nullable=True),
    "latitude": pa.Column(pa.Float, nullable=True), 
    "longitude": pa.Column(pa.Float, nullable=True), 
    "per_capita_income": pa.Column(pa.Float, nullable=True),
    "yearly_income": pa.Column(pa.Float, nullable=True), 
    "total_debt": pa.Column(pa.Float, nullable=True), 
    "credit_score": pa.Column(pa.Int16, nullable=True), 
    "num_credit_cards": pa.Column(pa.Int16, nullable=True), 

    "mcc_description": pa.Column(pa.String, nullable=True),
    "is_fraud": pa.Column(
      pa.String,
      nullable=True,
    )
  },
  strict="filter"
)

top-level pandera module will be **removed in a future version of pandera**.
If you're using pandera to validate pandas objects, we highly recommend updating
your import:

```
# old import
import pandera as pa

# new import
import pandera.pandas as pa
```

If you're using pandera to validate objects from other compatible libraries
like pyspark or polars, see the supported libraries section of the documentation
for more information on how to import pandera:

https://pandera.readthedocs.io/en/stable/supported_libraries.html


```
```



#### Funções auxiliares

In [3]:
def convert_sql_to_string(file: Path) -> str:
  """
  Recebe: Caminho para arquivo SQL (DDL);
  Retorna: String com a instrução DDL para criação da tabela.
  """
  try:
    with open(file, 'r') as f:
      return f.read()
  except Exception as e:
    print(f"ERRO! Falha ao encontrar/ler o arquivo: {e}")
    raise

# def execute_query(sql_query: str, engine: Engine) -> bool:
#   try:
#     with engine.connect() as conn:
#       conn.execute(text(sql_query))
#       conn.commit()
#     return True
#   except Exception as e:
#     print(f"ERRO! Falha ao executar a query SQL: {e}")
#     return False
  
def clean_currency_column(df: pd.DataFrame, column: str) -> pd.DataFrame:
  """
  Remove símbolos de moeda (ex: '$' e vírgulas) e converte a coluna para float.
  """
  print(f"  -> Limpando e convertendo a coluna de moeda: {column}")
    
  df[column] = df[column].astype(str).str.replace(r'[^\d\.\-]', '', regex=True)
  df[column] = pd.to_numeric(df[column], errors='coerce')
    
  return df

def convert_to_string(df: pd.DataFrame, columns: list) -> pd.DataFrame:
  """
  Converte uma lista de colunas para o tipo 'category' para otimização de memória.
  """
  for col in columns:
    if col in df.columns:
      df[col] = df[col].astype('string')
      print(f"  -> Converteu '{col}' para 'string'.")
  return df  

#### Funções de ETL

In [17]:
def extract(path: Path) -> pd.DataFrame:
  """
  Recebe: Caminho para arquivo CSV.
  Retorna: DataFrame;
  """
  try:
    df = pd.read_csv(path)
    return df
  except Exception as e:
    print(f"ERRO! Um problema ocorreu na conversão do arquivo para DataFrame: {e}")
    return None
  
def transform_and_validate(df: pd.DataFrame) -> pd.DataFrame:
  """
  Recebe: DataFrame e realiza limpeza e conversão de tipos.
  Retorna: DataFrame validado e compatível com psycopg2.
  """
  try:
    date_cols = ['date', 'expires', 'acct_open_date']
    int16_cols = ['mcc', 'num_cards_issued', 'current_age', 'retirement_age',
                  'birth_year', 'birth_month', 'credit_score', 'num_credit_cards']
    int32_cols = ['zip', 'cvv']
    string_cols = ['merchant_id', 'card_number', 'year_pin_last_changed', 'use_chip',
                   'merchant_city', 'merchant_state', 'errors', 'card_brand', 'card_type',
                   'has_chip', 'gender', 'address', 'card_on_dark_web', 'mcc_description']
    is_fraud_col = 'is_fraud'

    # Datas
    for col in date_cols:
      df[col] = pd.to_datetime(df[col], errors='coerce')

    # Inteiros 16 bits (mantém compatível e tolerante a nulos)
    for col in int16_cols:
      df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int16')

    # Inteiros 32 bits (tolerante a nulos)
    for col in int32_cols:
      df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int32')

    # Coluna is_fraud
    df[is_fraud_col] = pd.to_numeric(df[is_fraud_col], errors='coerce').astype('Int16')

    # Strings
    for col in string_cols:
      df[col] = df[col].astype(str).replace('<NA>', pd.NA).astype('string')

    # Conversão final para tipos Python nativos, compatível com psycopg2
    df = df.astype(object).where(pd.notnull(df), None)

    return df

  except Exception as e:
    print(f"ERRO! O DataFrame não foi validado: {e}")
    return None

def load(df: pd.DataFrame, ddl_script_path: Path, conn_params: dict) -> bool:
    try:
        conn = psycopg2.connect(**conn_params)
        cur = conn.cursor()

        print(f"Lendo o arquivo DDL: {ddl_script_path}")
        ddl = ddl_script_path.read_text()

        print(f"Criando a tabela {ddl_script_path.stem}...")
        cur.execute(ddl)
        conn.commit()
        print("Tabela criada no Banco de Dados!")

        table = ddl_script_path.stem
        columns = list(df.columns)
        cols = ', '.join(columns)
        insert_query = f"INSERT INTO {table} ({cols}) VALUES ({', '.join(['%s'] * len(columns))})"

        print(f"Inserindo {len(df)} linhas...")
        tuples = [tuple(x) for x in df.itertuples(index=False, name=None)]

        extras.execute_batch(cur, insert_query, tuples, page_size=20000)
        conn.commit()

        print("LOAD no PostgreSQL concluído com sucesso.")
        return True

    except Exception as e:
        print(f"ERRO: Processo de load interrompido: {e}")
        if conn:
            conn.rollback()
        return False

    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()

#### Pipeline

In [10]:
def run_pipeline(data_path: Path, ddl_script_path: Path, conn_params: dict, schema: pa.DataFrameSchema) -> bool:
  """
  Executa o Pipeline 
  """
  try:
    print("ETAPA 01: Extração de dados")
    print("Executando...")
    df_raw = extract(data_path)
    if df_raw is None: 
      print("FALHA NO PIPELINE: Extração.")
      return False
    print(f"Extração concluida, DataFrame carregado. Linhas: {len(df_raw)}")


    print("\nETAPA 02: Transformação e Validação dos dados")
    print("Executando...")
    df_silver = transform_and_validate(
      df_raw, 
    #  schema
    )
    if df_silver is None: 
      print("FALHA NO PIPELINE: Transformação e Validação.")
      return False 
    
    print("\nETAPA 03: Carregamento dos dados (LOAD)")
    success = load(df_silver, ddl_script_path, conn_params)
    if not success:
      print("FALHA NO PIPELINE: Carregamento (LOAD)")

    print("\nPIPELINE CONCLUÍDA COM SUCESSO!")
    return True
  except Exception as e: 
    print(f"ERRO GERAL: {e}")
    return False

In [21]:
run_pipeline(
    DATA_PATH, 
    DDL_PATH, 
    conn_params, 
    schema
)

ETAPA 01: Extração de dados
Executando...
Extração concluida, DataFrame carregado. Linhas: 3637703

ETAPA 02: Transformação e Validação dos dados
Executando...

ETAPA 03: Carregamento dos dados (LOAD)
Lendo o arquivo DDL: scripts/transactions_cards_users_mcc_fraud.sql
Criando a tabela transactions_cards_users_mcc_fraud...
Tabela criada no Banco de Dados!
Inserindo 3637703 linhas...
LOAD no PostgreSQL concluído com sucesso.

PIPELINE CONCLUÍDA COM SUCESSO!


True

In [22]:
DF = extract(DATA_PATH)
DF.columns.tolist()

['transaction_id',
 'date',
 'client_id',
 'card_id',
 'amount',
 'use_chip',
 'merchant_id',
 'merchant_city',
 'merchant_state',
 'zip',
 'mcc',
 'errors',
 'card_brand',
 'card_type',
 'card_number',
 'expires',
 'cvv',
 'has_chip',
 'num_cards_issued',
 'credit_limit',
 'acct_open_date',
 'year_pin_last_changed',
 'card_on_dark_web',
 'current_age',
 'retirement_age',
 'birth_year',
 'birth_month',
 'gender',
 'address',
 'latitude',
 'longitude',
 'per_capita_income',
 'yearly_income',
 'total_debt',
 'credit_score',
 'num_credit_cards',
 'mcc_description',
 'is_fraud']