# ETL
É uma abordagem  fundamental para a movimentação e transformação de dados em pipelines de dados.

Ele consiste em 3 Etapas:

**Extract [Extração]**: Coleta dados de diversas fontes, bancos de dados Sql, NoSql, Api, Arquivos (CSV, PARQUET, JSON, XML e  etc..)

**Transform [Transformação]**: Limpeza, enriquecimento e aplicação de regra de negocio para garantir que s dados estejam estruturados e prontos para análise.

**Load [Carga]**: Gravação de Dados transformados em um destino, como um Data Warehouse, DataLake ou outro sistema de Armazenamento

##  Exemplo de ETL

Buscando dados em uma database (Sql Server)

- Database: db_farmacia_euripedes
- Schema = setor_sul
- Tabela = fato_farmacia

In [0]:
# Caso estivessemos em um outro ambiente que nao seja o databricks, seria necessario add o maven no cluster
#com.microsoft.sqlserver:mssql-jdbc

In [0]:
# Defina a URL de conexão JDBC
jdbc_url = "jdbc:sqlserver://fundacao-ejitbr.database.windows.net:1433;database=db_farmacia_euripedes"

# Defina as propriedades de conexão
properties = {
    "user": "admin_user",
    "password": "80Ab55sd@9",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Ler dados do SQL Server (Tabela)
df = spark.read.jdbc(url=jdbc_url, table="setor_sul.fato_farmacia", properties=properties)

# Mostrar os dados lidos
df.display()


## Como estruturar melhor seu codigo?
Trabalhar com POO é o caminho para ter um codigo mais legivel e escalavel.


## Criando um segredo no Azure Key Vault

https://learn.microsoft.com/en-us/azure/databricks/security/secrets/

In [0]:
class SqlConnector:
    """
    Classe Responsavel por connectar ao database SQL SERVER
    """

    # Defina as propriedades de conexão
    PROPERTIES = {
        "host_name": dbutils.secrets.get(scope='beca_2025',key='sql-host-name-farmacia'),
        "user": dbutils.secrets.get(scope="beca_2025", key="sql-user"),
        "password": dbutils.secrets.get(scope="beca_2025", key="sql-password"),
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    
    @staticmethod
    def read(database_name, table_name, filter_conditional="1=1"):
        return spark.read.jdbc(
            url=SqlConnector.PROPERTIES["host_name"] + ";database="+database_name,
            table=table_name,
            properties=SqlConnector.PROPERTIES
        ).filter(filter_conditional)

In [0]:
SqlConnector.read("db_farmacia_euripedes", "setor_sul.fato_farmacia").display()

In [0]:
SqlConnector().read("db_farmacia_euripedes","setor_sul.dim_produtos").display()

In [0]:
SqlConnector().read("db_farmacia_euripedes","setor_sul.dim_fornecedores").display()

In [0]:
SqlConnector().read("db_farmacia_euripedes","setor_sul.dim_usuarios").display()