# 1. Ingestão de Dados (Camada Bronze)

**Objetivo:** Extrair os dados brutos de Risco de Crédito (German Credit Data) do repositório UCI e carregá-los no Databricks.
**Saída:** Tabela Delta `german_credit_bronze`.

Nesta etapa, simulamos um processo de **ETL (Extract, Transform, Load)**:
1.  **Extract:** Baixar dados da fonte original.
2.  **Transform (Leve):** Ajustar cabeçalhos e tipagem básica.
3.  **Load:** Salvar em formato Delta para alta performance.

In [0]:
import pandas as pd
from pyspark.sql.functions import col

# Configuração da Fonte de Dados
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/german/german.data"

# Definição do Schema (Nomes das Colunas)
# O dataset original não possui cabeçalho, então definimos manualmente conforme a documentação oficial.
col_names = [
    "checking_status", "duration", "credit_history", "purpose", "credit_amount",
    "savings_status", "employment", "installment_commitment", "personal_status", "other_parties",
    "residence_since", "property_magnitude", "age", "other_payment_plans", "housing",
    "existing_credits", "job", "num_dependents", "own_telephone", "foreign_worker", "class"
]

print("Configuração inicial concluída.")

In [0]:
# 1. Leitura dos dados usando Pandas (Extract)
# Utilizamos separador de espaço (' ') conforme formato do arquivo original
print(f"Baixando dados de: {url}...")
pdf = pd.read_csv(url, sep=' ', names=col_names)

# 2. Conversão para Spark DataFrame
# Trazendo os dados para o ambiente distribuído
df_raw = spark.createDataFrame(pdf)

display(df_raw.limit(5))

In [0]:
# 3. Tratamento Inicial
# A coluna alvo 'class' original é: 1 (Bom) e 2 (Ruim).
# Ajustamos para: 0 (Bom) e 1 (Ruim).
df_bronze = df_raw.withColumn("label", 
    (col("class") - 1).cast("integer") 
).drop("class")

# --- CORREÇÃO DO ERRO ---
# Primeiro, criamos um Schema (Banco de Dados) para organizar seu portfólio.
# Isso garante que o local de salvamento exista.
spark.sql("CREATE SCHEMA IF NOT EXISTS portfolio_credit")

# 4. Gravação na Camada Bronze (Load)
# Agora salvamos explicitamente dentro do schema 'portfolio_credit'
table_name = "portfolio_credit.german_credit_bronze"

df_bronze.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"Sucesso! Tabela '{table_name}' salva no catálogo.")

## Validação
Verificando se a tabela foi registrada corretamente no Metastore e exibindo o schema final.

In [0]:
# Verificação final
# Note que agora chamamos 'portfolio_credit.german_credit_bronze'
table_full_name = "portfolio_credit.german_credit_bronze"

print(f"Lendo tabela: {table_full_name}")
print(f"Contagem de registros: {spark.read.table(table_full_name).count()}")
display(spark.read.table(table_full_name))