# End-to-End Data Engineering & Governance Demo: Projeto Biosampa

**Tech Stack:** Databricks, PySpark, Delta Lake, Unity Catalog Concepts

### Objetivo
Este projeto simula um pipeline de dados completo (Arquitetura Medalhão) focado em **proteção ambiental**. Ingerimos dados públicos de fauna invasora da Prefeitura de SP, tratamos problemas de qualidade e implementamos uma **camada de governança automatizada** para impedir que dados sensíveis de localização de espécies sejam acessados para fins comerciais indevidos.

---

## Camada Bronze: Ingestão e Raw Data

**Desafio Técnico:** O arquivo original (CSV) possui codificação `UTF-8-SIG` (com BOM) e delimitadores locais (`;`), além de não possuir uma chave primária.
**Estratégia:**
* Utilização de **Pandas** para leitura em memória (Memory-to-Delta) para contornar restrições de escrita em disco do ambiente Community.
* Criação de ID artificial (`id_registro`) para rastreabilidade.
* Gravação em formato **Delta Lake** para garantir ACID e performance.

In [0]:
import pandas as pd
from pyspark.sql.functions import current_timestamp, lit

#configuracao inicial
spark.sql("DROP DATABASE IF EXISTS biosampa_demo CASCADE")
spark.sql("CREATE DATABASE biosampa_demo")
print("Ambiente resetado.")

#leitura com encoding CORRETO
url = "https://dados.prefeitura.sp.gov.br/dataset/9e1cbb71-21d7-4321-9039-e0d8a74f7bf9/resource/80c42fe5-e9ee-4166-aa34-90658947c5f3/download/biosampa_invasora_fauna_2022.csv"

print("Baixando e lendo dados...")
pdf = pd.read_csv(url, sep=';', encoding='utf-8-sig')

#limpeza do nome das colunas
#remove espaços extras e converte para minusculo
pdf.columns = pdf.columns.str.strip().str.lower()

#criando coluna ID baseado em indice
print("Gerando coluna de ID artificial...")
pdf['id_registro'] = pdf.index + 1

print(f"Colunas encontradas: {list(pdf.columns)}")

#geracao da raw
print("Salvando Bronze...")
df_bronze = spark.createDataFrame(pdf)

(df_bronze
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("origem", lit("prefeitura_sp"))
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("biosampa_demo.bronze_fauna_raw")
)

print("SUCESSO! Camada Bronze criada.")
display(spark.read.table("biosampa_demo.bronze_fauna_raw").limit(5))

Ambiente resetado.
Baixando e lendo dados...
Gerando coluna de ID artificial...
Colunas encontradas: ['cd_identificador', 'cd_identificador_original', 'cd_local_levantamento', 'nm_local', 'qt_especie_registrada_2021', 'qt_especie_registrada_2022', 'dt_atualizacao', 'an_biosampa', 'cd_usuario_atualizacao', 'id_registro']
Salvando Bronze...
SUCESSO! Camada Bronze criada.


cd_identificador,cd_identificador_original,cd_local_levantamento,nm_local,qt_especie_registrada_2021,qt_especie_registrada_2022,dt_atualizacao,an_biosampa,cd_usuario_atualizacao,id_registro,ingestion_timestamp,origem
413,47,REGBIO_203,Fazenda Nossa Senhora da Piedade [APA Capivari-Monos],2,2,01/04/24,2022,PRODAM,1,2026-01-05T14:35:47.475Z,prefeitura_sp
417,51,REGBIO_176,Represa Guarapiranga (aves aquaticas - Costa do Aracati),3,3,01/04/24,2022,PRODAM,2,2026-01-05T14:35:47.475Z,prefeitura_sp
545,179,REGBIO_118,PNM Jaceguava,1,2,01/04/24,2022,PRODAM,3,2026-01-05T14:35:47.475Z,prefeitura_sp
524,158,REGBIO_209,Aristocrata,0,1,01/04/24,2022,PRODAM,4,2026-01-05T14:35:47.475Z,prefeitura_sp
573,208,,Terra Indigena Tenonde Pora,0,0,01/04/24,2022,PRODAM,5,2026-01-05T14:35:47.475Z,prefeitura_sp


## Camada Silver: Limpeza e Padronização

Nesta etapa, transformamos os dados brutos em informações confiáveis.
**Regras de Qualidade Aplicadas:**
1.  **Correção de Datas:** Conversão do formato brasileiro (`dd/MM/yy`) para padrão internacional (`YYYY-MM-DD`).
2.  **Sanitização:** Remoção de espaços em branco em nomes de locais.
3.  **Enriquecimento:** Cálculo da coluna `delta_crescimento` para medir a evolução da invasão de espécies entre 2021 e 2022.
4.  **Tratamento de Nulos:** Preenchimento de valores vazios com 0 para permitir cálculos matemáticos.

In [0]:
from pyspark.sql.functions import col, trim, to_date, when

print("Processando Camada Silver...")

#leitura da raw
df_raw = spark.read.table("biosampa_demo.bronze_fauna_raw")

df_silver = (df_raw
    #renomeando colunas principais
    .withColumnRenamed("id_registro", "id_ocorrencia")
    .withColumn("local", trim(col("nm_local")))
    
    #tratando quantidades(cast para inteiro)
    .withColumn("qtd_2021", col("qt_especie_registrada_2021").cast("int"))
    .withColumn("qtd_2022", col("qt_especie_registrada_2022").cast("int"))
    
    #tratando da Data
    #usar o formato 'dd/MM/yy' para ler '01/04/24' corretamente
    .withColumn("data_ref", to_date(col("dt_atualizacao"), "dd/MM/yy"))
)

#bloco de filtro e enriquecimento
df_silver_final = (df_silver
    .filter(col("local").isNotNull())
    #preenchendo nulos com 0 para poder fazer a conta
    .na.fill(0, ["qtd_2021", "qtd_2022"])
    #criar o delta
    .withColumn("delta_crescimento", col("qtd_2022") - col("qtd_2021"))
    #selecao final para limpar colunas antigas
    .select(
        "id_ocorrencia", 
        "local", 
        "qtd_2021", 
        "qtd_2022", 
        "data_ref", 
        "delta_crescimento"
    )
)

#gravando a camada silver
df_silver_final.write.format("delta").mode("overwrite").saveAsTable("biosampa_demo.silver_fauna_clean")

print("Camada Silver processada com sucesso!")
display(df_silver_final)

Processando Camada Silver...
Camada Silver processada com sucesso!


id_ocorrencia,local,qtd_2021,qtd_2022,data_ref,delta_crescimento
1,Fazenda Nossa Senhora da Piedade [APA Capivari-Monos],2,2,2024-04-01,0
2,Represa Guarapiranga (aves aquaticas - Costa do Aracati),3,3,2024-04-01,0
3,PNM Jaceguava,1,2,2024-04-01,1
4,Aristocrata,0,1,2024-04-01,1
5,Terra Indigena Tenonde Pora,0,0,2024-04-01,0
6,Castanheiras,2,2,2024-04-01,0
7,Sitio Nakao,2,1,2024-04-01,-1
8,Mboi Mirim,0,0,2024-04-01,0
9,Jd. Tapera - Jd. Sao Luis,0,0,2024-04-01,0
10,Jardim Apura - Parque dos Bufalos,1,1,2024-04-01,0


## Camada Gold: Agregação de Negócio

Criação de KPI (Key Performance Indicator) para consumo de Dashboards.
**Pergunta de Negócio:** Quais locais públicos de São Paulo possuem a maior concentração de registros de fauna invasora?

In [0]:
from pyspark.sql.functions import sum, avg, count, desc

print("Gerando Gold...")

df_silver = spark.read.table("biosampa_demo.silver_fauna_clean")

#agregação para kpis
df_gold = (df_silver
    .groupBy("local")
    .agg(
        sum("qtd_2022").alias("total_especies_2022"),
        sum("delta_crescimento").alias("crescimento_anual"),
        count("id_ocorrencia").alias("registros_unicos")
    )
    .orderBy(desc("total_especies_2022"))
)

#salvando a camada Gold
df_gold.write.format("delta").mode("overwrite").saveAsTable("biosampa_demo.gold_kpi_fauna")

print("Sucesso na geração da camada Gold")
display(df_gold)

Gerando Gold...
Sucesso na geração da camada Gold


local,total_especies_2022,crescimento_anual,registros_unicos
Ibirapuera,24,-2,2
Aclimacao,16,0,2
Jardim da Luz,14,0,2
Cordeiro - Martin Luther King,14,0,2
Anhanguera (RVS + PU),10,0,2
Praça Orlando Gomes,10,10,2
Lajeado - Izaura Pereira De Souza Franzolin,8,0,2
Chacara do Jockey,8,0,2
Cantareira + Alberto Lofgren (Horto Florestal),8,0,2
Povo - Mario Pimenta Camargo,8,0,2


In [0]:
%sql
SELECT * FROM biosampa_demo.gold_kpi_fauna 
WHERE total_especies_2022 > 10
ORDER BY total_especies_2022 DESC

local,total_especies_2022,crescimento_anual,registros_unicos
Ibirapuera,24,-2,2
Aclimacao,16,0,2
Jardim da Luz,14,0,2
Cordeiro - Martin Luther King,14,0,2


# Módulo de Governança de Dados

A partir daqui, implementamos o controle de acesso. Em vez de gerenciar permissões manualmente, utilizamos uma abordagem **Metadata-Driven** (Guiada por Metadados).

## O Contrato de Dados (Tags & Properties)
Definimos as "Leis" que regem esta tabela. Usamos `TBLPROPERTIES` para etiquetar a tabela `silver_fauna_clean` como **Ambiental Restrito**.
* **Propósito Permitido:** Apenas `pesquisa`, `controle_pragas` e `biologia`.
* **Owner:** Secretaria do Verde.

In [0]:
%sql
--seleciona o catálogo/schema correto
USE biosampa_demo; 

--APLICAR AS TAGS DE GOVERNANÇA
ALTER TABLE silver_fauna_clean SET TAGS (
  'governance_category' = 'ambiental_restrito',
  'governance_purpose' = 'pesquisa,controle_pragas,biologia',
  'governance_restriction' = 'anonimizacao_localizacao_obrigatoria',
  'governance_owner' = 'secretaria_verde_sp@sp.gov.br'
);

SELECT "Tabela de Fauna 'Etiquetada' com regras de governança." as status;

status
Tabela de Fauna 'Etiquetada' com regras de governança.


## Validação Automatizada

Este script Python atua como um portão de segurança (Compliance Check).
Ele lê as propriedades da tabela dinamicamente e valida se o motivo da solicitação de acesso é compatível com as regras definidas acima.

* **Cenário 1:** Um Pesquisador solicita acesso (Deve ser APROVADO).
* **Cenário 2:** Uma Construtora solicita acesso comercial (Deve ser BLOQUEADO).

In [0]:
import pyspark.sql.functions as F

def check_bio_access_tags(table_name, purpose_proposed):
    print(f"Validando TAGS da tabela: {table_name}")
    print(f"Propósito declarado: {purpose_proposed.upper()}\n")
    
    try:
        #ler direto da tabela de sistema do unity catalog
        #esta tabela deve conter todas as tags de todas as tabelas
        tags_df = spark.table("system.information_schema.table_tags")
        
        #filtrar apenas as tags da nossa tabela
        my_tags = tags_df.filter(tags_df.table_name == table_name).collect()
        
        #transformar em dicionário Python para facilitar: {'tag_name': 'tag_value'}
        tags_dict = {row['tag_name']: row['tag_value'] for row in my_tags}
        
        #para ver o retorno
        if not tags_dict:
            print("AVISO: Nenhuma tag encontrada via system.information_schema.")
            print("Verifique se você está no Catálogo correto (USE CATALOG ...).")
            return
            
        #validar as regras
        allowed_purposes = tags_dict.get('governance_purpose', '')
        owner = tags_dict.get('governance_owner', 'N/A')
        
        #decisao para governanca
        if purpose_proposed.lower() in allowed_purposes.lower():
            print(f"ACESSO APROVADO")
            print("------------------------------------------------")
            print(f"Justificativa: A tag 'governance_purpose' permite '{purpose_proposed}'.")
        else:
            print(f"ACESSO NEGADO")
            print("------------------------------------------------")
            print(f"Violação de Tag: '{purpose_proposed}' não consta na lista aprovada.")
            print(f"Ação: Solicitar revisão de tags com o Owner.")
        
        print(f"Permitidos: {allowed_purposes}")
        print(f"Owner (via Tag): {owner}")

    except Exception as e:
        print(f"Erro ao acessar tags: {e}")
        print("Dica: Se der erro de 'Table not found', seu cluster não tem permissão de leitura no System Schema.")

print("Sistema de Validação via TAGS Ativo.")

Sistema de Validação via TAGS Ativo.


In [0]:
#CENARIO 1: O BIÓLOGO (Permitido)
print(">>> SIMULAÇÃO 1: Pedido de Pesquisador Credenciado <<<\n")
check_bio_access_tags('silver_fauna_clean', 'pesquisa')

print("\n" + "="*60 + "\n")

#CENARIO 2: A CONSTRUTORA (Negado)
print(">>> SIMULAÇÃO 2: Pedido Comercial (Construtora) <<<\n")
check_bio_access_tags('silver_fauna_clean', 'comercial')

>>> SIMULAÇÃO 1: Pedido de Pesquisador Credenciado <<<

Validando TAGS da tabela: silver_fauna_clean
Propósito declarado: PESQUISA

ACESSO APROVADO
------------------------------------------------
Justificativa: A tag 'governance_purpose' permite 'pesquisa'.
Permitidos: pesquisa,controle_pragas,biologia
Owner (via Tag): secretaria_verde_sp@sp.gov.br


>>> SIMULAÇÃO 2: Pedido Comercial (Construtora) <<<

Validando TAGS da tabela: silver_fauna_clean
Propósito declarado: COMERCIAL

ACESSO NEGADO
------------------------------------------------
Violação de Tag: 'comercial' não consta na lista aprovada.
Ação: Solicitar revisão de tags com o Owner.
Permitidos: pesquisa,controle_pragas,biologia
Owner (via Tag): secretaria_verde_sp@sp.gov.br


In [0]:
import datetime

#criar tabela de auditoria(log)
spark.sql("""
CREATE TABLE IF NOT EXISTS biosampa_demo.audit_access_log (
    data_hora TIMESTAMP,
    tabela STRING,
    usuario_beneficiado STRING,
    concedido_por STRING,
    acao STRING
) USING DELTA
""")

#funcao para executar o grant
def execute_access_grant(table_name, user_email):
    """
    Executa o comando SQL de Grant e registra na auditoria.
    """
    print(f"Executando liberação de acesso para: {user_email}...")
    
    try:
        #comando de acesso
        #OBS: O `IF EXISTS` para evitar erros se o user não existir no Community
        sql_command = f"GRANT SELECT ON TABLE biosampa_demo.{table_name} TO `{user_email}`"
        
        #users fictícios dão erro fazendo o simular o sucesso visualmente
        # ou tentar rodar (se falhar, tratamos o erro para não parar a demo)
        try:
            spark.sql(sql_command)
        except Exception as e:
            print(f"Simulação: O comando '{sql_command}' seria executado.")
            print("(No Community Edition, só podemos dar grant para usuários reais do workspace)")

        #registro de auditoria (controle de governanca)
        spark.sql(f"""
            INSERT INTO biosampa_demo.audit_access_log VALUES (
                current_timestamp(),
                '{table_name}',
                '{user_email}',
                current_user(),
                'GRANT SELECT - APROVADO PELO PROCESSO'
            )
        """)
        
        print("SUCESSO! Acesso concedido e logado na auditoria.")
        
    except Exception as e:
        print(f"Erro fatal: {e}")

print("Sistema de Execução de Acessos pronto.")

In [0]:
#primeiro validar
check_bio_access_tags('silver_fauna_clean', 'pesquisa')

print("\n" + "SE APROVADO, O ENGENHEIRO RODA ISSO: " + "\n")

#executar o grant
execute_access_grant('silver_fauna_clean', 'biologo_chefe@instituto.org')

print("\n" + "="*60 + "\n")

#mortar a auditoria
print("LOG DE AUDITORIA:")
display(spark.sql("SELECT * FROM biosampa_demo.audit_access_log ORDER BY data_hora DESC"))