# <h1 align="center"><font color="gree">Arquitetura Medallion e Ambiente de Desenvolvimento no Databricks</font></h1>

<font color="pink">Senior Data Scientist.: Dr. Eddy Giusepe Chirinos Isidro</font>

![](https://static.wixstatic.com/media/09d18d_aeac75794f8c4d089fa5266d142388d8~mv2.png/v1/fill/w_568,h_220,al_c,q_85,usm_0.66_1.00_0.01,enc_avif,quality_auto/09d18d_aeac75794f8c4d089fa5266d142388d8~mv2.png)

Na **Arquitetura Medallion**, cada camada tem um prop√≥sito:

| Camada | Prop√≥sito | Quando Usar |
|--------|-----------|-------------|
| **Raw Data** | Arquivos brutos (CSV, JSON, etc.) | Primeira ingest√£o ou reprocessamento total |
| **Bronze** ü•â | Dados brutos em formato otimizado (Delta) | Como fonte para transforma√ß√µes (Silver/Gold) |
| **Silver** ü•à | Dados limpos e transformados | Para an√°lises e agrega√ß√µes |
| **Gold** ü•á | Dados prontos para consumo (dashboards, ML) | Para usu√°rios finais |

# <font color="red">logging</font>

In [1]:
import logging

def setup_logging() -> None:
    """Configures the logging system"""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        handlers=[
            logging.FileHandler("LOGs_Databricks.log"),
            logging.StreamHandler(),
        ],
    )


setup_logging()
logger = logging.getLogger(__name__)

# <font color="red">Conex√£o com o Databricks</font>

In [2]:
# S√≥ executar quando estiver trabalhando localmente:
import os
from dotenv import load_dotenv, find_dotenv


_ = load_dotenv(find_dotenv())

databricks_host = os.environ['DATABRICKS_HOST']
databricks_token = os.environ['DATABRICKS_TOKEN']
databricks_cluster_id = os.environ['DATABRICKS_CLUSTER_ID']

print("üîó Carregado minhas credenciais do Databricks, com sucesso!")

üîó Carregado minhas credenciais do Databricks, com sucesso!


A seguir vamos usar `Databricks Connect` para conectar ao Databricks e executar c√≥digo `Spark` localmente, mas processando os dados remotamente no cluster Databricks.

In [3]:
from databricks.connect import DatabricksSession # Esta classe √© usada para conectar ao Databricks remotamente
from pyspark.dbutils import DBUtils # Para usar dbutils localmente

spark = DatabricksSession.builder.remote(host=databricks_host, token=databricks_token, cluster_id=databricks_cluster_id).getOrCreate()

# Criar inst√¢ncia do DBUtils para usar dbutils localmente:
dbutils = DBUtils(spark)

print("‚úÖ Conectado ao Databricks com sucesso!")
print(f"üîß Vers√£o Spark: {spark.version}")
print(f"üì¶ DBUtils configurado e pronto para uso!")

‚úÖ Conectado ao Databricks com sucesso!
üîß Vers√£o Spark: 4.0.0
üì¶ DBUtils configurado e pronto para uso!


In [4]:
# Executar, s√≥ l√°, no Databricks para conhecer o ID do Cluster:

#databricks_cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

#print(f"Databricks Cluster ID: {databricks_cluster_id}")

# <font color="red">Listando a ``Arquitetura Medallion`` com Unity Catalog</font>

No Unity Catalog, a hierarquia √©: ``catalog.schema.table``
A seguir vamos visualizar/listar meus schemas e tabelas do catalog ``eddy_uber_taxi_v1``.

Alguns comandos:

```python
SHOW CATALOGS        # --> Lista todos os cat√°logos
SHOW SCHEMAS IN catalog_name    # --> Lista schemas de um cat√°logo
SHOW TABLES IN catalog.schema   # --> Lista tabelas de um schema
SHOW VOLUMES IN catalog.schema  # --> Lista volumes de um schema
```

In [5]:
catalogs = spark.sql("SHOW CATALOGS").show() # Pode ser com .toPandas()

catalogs


+-----------------+
|          catalog|
+-----------------+
|      catalogeddy|
|eddy_uber_taxi_v1|
|          samples|
|           system|
|        workspace|
+-----------------+



In [6]:
meu_schema = spark.sql("SHOW SCHEMAS IN catalogeddy").show()

meu_schema

+------------------+
|      databaseName|
+------------------+
|catalogeddy_schema|
|information_schema|
+------------------+



In [7]:
meu_table = spark.sql("SHOW TABLES IN catalogeddy.catalogeddy_schema").show()

meu_table

+------------------+---------------+-----------+
|          database|      tableName|isTemporary|
+------------------+---------------+-----------+
|catalogeddy_schema|tabel_baby_eddy|      false|
+------------------+---------------+-----------+



A seguir vou trabalhar com meu catalog ``eddy_uber_taxi_v1``, j√° que eu sei que a√≠ tenho uma Arquitetura Medallion e vou listar meus ``schemas`` e ``tabelas``:

In [8]:
catalog_name = "eddy_uber_taxi_v1"

schemas = spark.sql(f"SHOW SCHEMAS IN {catalog_name}").show() # Ou pode ser com .toPandas()

schemas

+------------------+
|      databaseName|
+------------------+
|            bronze|
|              gold|
|information_schema|
|          raw_data|
|            silver|
+------------------+



In [9]:
# Listar tabelas nos schemas 'bronze', 'silver' e 'gold':
medallion_layers = ['bronze', 'silver', 'gold']

for layer in medallion_layers:
    logger.info(f"üèÜ TABELAS NO SCHEMA '{catalog_name}.{layer}'")
    
    try:
        tables = spark.sql(f"SHOW TABLES IN {catalog_name}.{layer}").toPandas()
        if tables.empty:
            logger.warning("‚ö†Ô∏è Nenhuma tabela encontrada")
        else:
            logger.info(tables[['database', 'tableName', 'isTemporary']])
            logger.info(f"\nüìä Total de tabelas: {len(tables)}")
    except Exception as e:
        logger.error(f"  ‚ùå Erro ao acessar schema '{layer}': {str(e)}")

2025-10-19 17:01:09,872 - __main__ - INFO - üèÜ TABELAS NO SCHEMA 'eddy_uber_taxi_v1.bronze'
2025-10-19 17:01:10,710 - __main__ - INFO -   database tableName  isTemporary
0   bronze  uber_raw        False
2025-10-19 17:01:10,712 - __main__ - INFO - 
üìä Total de tabelas: 1
2025-10-19 17:01:10,713 - __main__ - INFO - üèÜ TABELAS NO SCHEMA 'eddy_uber_taxi_v1.silver'
2025-10-19 17:01:11,513 - __main__ - INFO -   database              tableName  isTemporary
0   silver  uber_col_ptbr_silver1        False
2025-10-19 17:01:11,516 - __main__ - INFO - 
üìä Total de tabelas: 1
2025-10-19 17:01:11,517 - __main__ - INFO - üèÜ TABELAS NO SCHEMA 'eddy_uber_taxi_v1.gold'


# <font color="red">Carregando dados a partir do ``bronze``</font>

In [10]:
catalog_name = "eddy_uber_taxi_v1"
bronze_table = f"{catalog_name}.bronze.uber_raw"


df_bronze = spark.table(bronze_table)

logger.info(f"O shape da tabela bronze √©: {df_bronze.count()} linhas e {len(df_bronze.columns)} colunas")


2025-10-19 17:01:15,311 - __main__ - INFO - O shape da tabela bronze √©: 200000 linhas e 9 colunas


In [11]:
logger.info("O schema da tabela bronze √©:\n")
df_bronze.printSchema()



2025-10-19 17:01:15,320 - __main__ - INFO - O schema da tabela bronze √©:



root
 |-- _c0: integer (nullable = true)
 |-- key: timestamp (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [12]:
logger.info("üëÄ Preview dos dados (6 primeiras linhas):\n")

df_bronze.show(6, truncate=False)

2025-10-19 17:01:15,342 - __main__ - INFO - üëÄ Preview dos dados (6 primeiras linhas):



+--------+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|_c0     |key                |fare_amount|pickup_datetime    |pickup_longitude  |pickup_latitude  |dropoff_longitude |dropoff_latitude |passenger_count|
+--------+-------------------+-----------+-------------------+------------------+-----------------+------------------+-----------------+---------------+
|24238194|2015-05-07 19:52:06|7.5        |2015-05-07 19:52:06|-73.99981689453125|40.73835372924805|-73.99951171875   |40.72321701049805|1              |
|27835199|2009-07-17 20:04:56|7.7        |2009-07-17 20:04:56|-73.994355        |40.728225        |-73.99471         |40.750325        |1              |
|44984355|2009-08-24 21:45:00|12.9       |2009-08-24 21:45:00|-74.005043        |40.74077         |-73.962565        |40.772647        |1              |
|25894730|2009-06-26 08:22:21|5.3        |2009-06-26 08:22:21|-73.976124        |4

# <font color="red">Renomeando Colunas do DataFrame para logo salvar no ``silver``</font>

| M√©todo | Quando Usar | Pr√≥s | Contras |
|--------|-------------|------|---------|
| **1. withColumnRenamed()** | Poucas colunas (1-3) | Simples e leg√≠vel | Verboso para muitas colunas |
| **2. Dicion√°rio + reduce()** | Muitas colunas, c√≥digo reutiliz√°vel | Elegante, f√°cil manuten√ß√£o | Requer importar `reduce` |
| **3. toDF()** | Renomear TODAS as colunas | Muito r√°pido e direto | Precisa listar todas na ordem |
| **4. select() + alias()** | Renomear + transformar dados | Flex√≠vel para transforma√ß√µes | Mais verboso |

### ‚úÖ **Recomenda√ß√£o:**

- **Para produ√ß√£o**: Use **M√©todo 2 (Dicion√°rio)** - mais limpo e f√°cil de manter
- **Para prot√≥tipos r√°pidos**: Use **M√©todo 3 (toDF())** - mais r√°pido de escrever
- **Se vai transformar dados tamb√©m**: Use **M√©todo 4 (select + alias)**

In [13]:
from functools import reduce # Python padr√£o, n√£o PySpark

# Mapeamento de nomes: ingl√™s -> portugu√™s
colunas_ptbr = {
    "_c0": "id",
    "key": "chave_temporal",
    "fare_amount": "valor_corrida",
    "pickup_datetime": "data_hora_embarque",
    "pickup_longitude": "longitude_embarque",
    "pickup_latitude": "latitude_embarque",
    "dropoff_longitude": "longitude_desembarque",
    "dropoff_latitude": "latitude_desembarque",
    "passenger_count": "numero_passageiros"
}

# Aplicar renomea√ß√£o:
df_bronze_renamed = reduce(
    lambda df, col: df.withColumnRenamed(col, colunas_ptbr[col]), # Fun√ß√£o
    colunas_ptbr.keys(), # Iter√°vel
    df_bronze) # Inicializador (opcional)

logger.info(f"üìä Colunas antigas: {df_bronze.columns}")
logger.info(f"üìä Colunas novas:  {df_bronze_renamed.columns}")

logger.info("\nüëÄ Preview dos dados:")
df_bronze_renamed.show(6, truncate=False)


2025-10-19 17:01:18,151 - __main__ - INFO - üìä Colunas antigas: ['_c0', 'key', 'fare_amount', 'pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count']
2025-10-19 17:01:18,551 - __main__ - INFO - üìä Colunas novas:  ['id', 'chave_temporal', 'valor_corrida', 'data_hora_embarque', 'longitude_embarque', 'latitude_embarque', 'longitude_desembarque', 'latitude_desembarque', 'numero_passageiros']
2025-10-19 17:01:18,552 - __main__ - INFO - 
üëÄ Preview dos dados:


+--------+-------------------+-------------+-------------------+------------------+-----------------+---------------------+--------------------+------------------+
|id      |chave_temporal     |valor_corrida|data_hora_embarque |longitude_embarque|latitude_embarque|longitude_desembarque|latitude_desembarque|numero_passageiros|
+--------+-------------------+-------------+-------------------+------------------+-----------------+---------------------+--------------------+------------------+
|24238194|2015-05-07 19:52:06|7.5          |2015-05-07 19:52:06|-73.99981689453125|40.73835372924805|-73.99951171875      |40.72321701049805   |1                 |
|27835199|2009-07-17 20:04:56|7.7          |2009-07-17 20:04:56|-73.994355        |40.728225        |-73.99471            |40.750325           |1                 |
|44984355|2009-08-24 21:45:00|12.9         |2009-08-24 21:45:00|-74.005043        |40.74077         |-73.962565           |40.772647           |1                 |
|25894730|2009-0

In [14]:
# Agora vou salvar este DataFrame "df_bronze_renamed" no "silver":
from pyspark.sql.functions import current_timestamp

catalog_name = "eddy_uber_taxi_v1"
schema_name = "silver"
table_name = "uber_col_ptbr_silver1"
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

logger.info(f"üíæ Iniciando salvamento no Silver: {full_table_name}")
logger.info(f"üìä Total de registros: {df_bronze_renamed.count():,}")
logger.info(f"üìã Total de colunas: {len(df_bronze_renamed.columns)}")

# Opcional: Adicionar metadados de auditoria:
df_silver = df_bronze_renamed.withColumn("data_uber_silver1", current_timestamp())

# Salvar como tabela Delta:
try:
    df_silver.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .option("delta.columnMapping.mode", "name") \
        .saveAsTable(full_table_name)
    
    logger.info(f"‚úÖ Tabela {full_table_name} salva com sucesso!")
    
    # Verificar a tabela criada:
    logger.info(f"üîç Verificando tabela criada:")
    spark.sql(f"DESCRIBE TABLE {full_table_name}").show()
    
    # Contar registros salvos:
    count = spark.table(full_table_name).count()
    logger.info(f"üìä Total de registros salvos: {count:,}")
    
except Exception as e:
    logger.error(f"‚ùå Erro ao salvar no Silver: {str(e)}")
    raise


2025-10-19 17:01:20,209 - __main__ - INFO - üíæ Iniciando salvamento no Silver: eddy_uber_taxi_v1.silver.uber_col_ptbr_silver1
2025-10-19 17:01:20,575 - __main__ - INFO - üìä Total de registros: 200,000
2025-10-19 17:01:20,576 - __main__ - INFO - üìã Total de colunas: 9
2025-10-19 17:01:25,093 - __main__ - INFO - ‚úÖ Tabela eddy_uber_taxi_v1.silver.uber_col_ptbr_silver1 salva com sucesso!
2025-10-19 17:01:25,094 - __main__ - INFO - üîç Verificando tabela criada:


+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                  id|      int|   NULL|
|      chave_temporal|timestamp|   NULL|
|       valor_corrida|   double|   NULL|
|  data_hora_embarque|timestamp|   NULL|
|  longitude_embarque|   double|   NULL|
|   latitude_embarque|   double|   NULL|
|longitude_desemba...|   double|   NULL|
|latitude_desembarque|   double|   NULL|
|  numero_passageiros|      int|   NULL|
|   data_uber_silver1|timestamp|   NULL|
+--------------------+---------+-------+



2025-10-19 17:01:26,368 - __main__ - INFO - üìä Total de registros salvos: 200,000


# <font color="red">Aplicando algumas ``Regras de Neg√≥cio``</font>

## <font color="blue">üìä REGRA DE NEG√ìCIO 1: Valida√ß√£o e Limpeza B√°sica</font>

In [17]:
logger.info("üîç Aplicando Regra de Neg√≥cio 1: Valida√ß√£o B√°sica")

# Usar nomes ORIGINAIS da tabela Bronze
df_silver_valido = spark.sql(f"""
    SELECT 
        _c0 AS id,
        key AS chave_temporal,
        fare_amount AS valor_corrida,
        pickup_datetime AS data_hora_embarque,
        pickup_longitude AS longitude_embarque,
        pickup_latitude AS latitude_embarque,
        dropoff_longitude AS longitude_desembarque,
        dropoff_latitude AS latitude_desembarque,
        passenger_count AS numero_passageiros,
        CURRENT_TIMESTAMP() AS data_processamento
    FROM {catalog_name}.bronze.uber_raw
    WHERE 
        -- Usar nomes ORIGINAIS nas condi√ß√µes
        fare_amount > 0 
        AND fare_amount < 500
        
        AND passenger_count > 0 
        AND passenger_count <= 4
        
        AND pickup_longitude BETWEEN -74.05 AND -73.75
        AND pickup_latitude BETWEEN 40.63 AND 40.85
        AND dropoff_longitude BETWEEN -74.05 AND -73.75
        AND dropoff_latitude BETWEEN 40.63 AND 40.85
        
        AND pickup_datetime <= CURRENT_TIMESTAMP()
""")

logger.info(f"‚úÖ Registros v√°lidos: {df_silver_valido.count():,}")

# Salvar no Silver
df_silver_valido.write.format("delta").mode("overwrite").saveAsTable(
    f"{catalog_name}.silver.uber_valido"
)

2025-10-19 17:06:01,945 - __main__ - INFO - üîç Aplicando Regra de Neg√≥cio 1: Valida√ß√£o B√°sica
2025-10-19 17:06:04,000 - __main__ - INFO - ‚úÖ Registros v√°lidos: 175,146


## <font color="blue">üìä REGRA DE NEG√ìCIO 2: Deduplica√ß√£o e Padroniza√ß√£o</font>

In [23]:
logger.info("üîç Aplicando Regra de Neg√≥cio 2: Deduplica√ß√£o")

tb_silver_unico = spark.sql(f"""
    WITH ranked_data AS (
        SELECT 
            *,
            ROW_NUMBER() OVER (
                PARTITION BY _c0 
                ORDER BY pickup_datetime DESC
            ) AS rank_temporal
        FROM {catalog_name}.bronze.uber_raw
        WHERE 
            fare_amount > 0 
            AND passenger_count > 0
    )
    SELECT 
        _c0,
        key,
        ROUND(fare_amount, 2) AS fare_amount,
        pickup_datetime,
        DATE_FORMAT(pickup_datetime, 'dd/MM/yyyy HH:mm:ss') AS pickup_datetime_formatted,
        ROUND(pickup_longitude, 4) AS pickup_longitude,
        ROUND(pickup_latitude, 4) AS pickup_latitude,
        ROUND(dropoff_longitude, 4) AS dropoff_longitude,
        ROUND(dropoff_latitude, 4) AS dropoff_latitude,
        passenger_count,
        CURRENT_TIMESTAMP() AS processing_timestamp
    FROM ranked_data
    WHERE rank_temporal = 1
""")

logger.info(f"‚úÖ Unique records: {tb_silver_unico.count():,}")

# Save to Silver
tb_silver_unico.write.format("delta").mode("overwrite").saveAsTable(
    f"{catalog_name}.silver.uber_unique_id_deduplicated"
)

logger.info(f"‚úÖ Table {catalog_name}.silver.uber_unique saved successfully!")

2025-10-19 18:10:38,634 - __main__ - INFO - üîç Aplicando Regra de Neg√≥cio 2: Deduplica√ß√£o
2025-10-19 18:10:40,960 - __main__ - INFO - ‚úÖ Unique records: 199,269
2025-10-19 18:10:47,456 - __main__ - INFO - ‚úÖ Table eddy_uber_taxi_v1.silver.uber_unique saved successfully!
