
## Informações Gerais
| Informações |  Detalhes  |
|-------------|------------|
|Origem | bronze.brewery |
|Nome Tabela  | silver.brewery |

## Histórico de Atualizações
| Data | Desenvolvido por | Motivo |
|:----:|------------------|--------|
|19/04/2024 | Gabriel Alves | Criação do notebook |


In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import datetime

In [0]:
# Função para aplicar comentários à tabela e colunas
def adicionaComentariosTabela(schema, table, table_comment, column_comments):
    spark.sql(f"COMMENT ON TABLE {schema}.{table} IS '{table_comment}'")
    for col_name, comment in column_comments.items():
        sqlaux = f"ALTER TABLE {schema}.{table} CHANGE COLUMN {col_name} COMMENT '{comment}'"
        spark.sql(sqlaux)

In [0]:
# Definição de variáveis de configuração
storage_account    = "stbrewerydatalake"       # sua Storage Account no ADLS Gen2
container_name     = "brewery-project"         # nome do container do projeto no ADLS
bronze_container   = "1.bronze"                # pasta da camada Bronze (dados brutos em Delta)
silver_container   = "2.silver"                # pasta da camada Silver (dados tratados)

schema_bronze    = "bronze"                  # nome do banco de dados no metastore para a Bronze
table_bronze       = "brewery"                 # nome da tabela Delta da camada Bronze

schema_silver    = "silver"                  # nome do banco de dados no metastore para a Silver
table_silver       = "brewery"                 # nome da tabela Delta da camada Silver


# Caminho físico da camada Silver (tabela externa)
silver_path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/{silver_container}"

In [0]:
# Leitura dos dados da Bronze e transformação
df_silver = spark.sql(f"""
    SELECT
        name,
        brewery_type,
        CASE 
            WHEN address_1 = 'Unnamed Street' THEN address_2  -- Se a coluna address_1 for igual a 'Unnamed Street', considera o valor de address_2 como endereço,
            ELSE address_1                                    -- caso contrário, utiliza o valor de address_1.
        END AS address,
        city,
        state,
        country
    FROM {schema_bronze}.{table_bronze}
""")


# Adição de timestamp de ingestão Silver
df_silver = df_silver.withColumn("silver_timestamp", current_timestamp())

In [0]:
# Criar database Silver se não existir
spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_silver}")

# Escrita como tabela Delta externa (Silver)
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("state") \
    .option("overwriteSchema", "true") \
    .option("path", silver_path) \
    .saveAsTable(f"{schema_silver}.{table_silver}")

In [0]:
# Comentários na tabela e colunas
comentario_tabela = "Camada Silver contendo dados tratados de cervejarias, com endereço padronizado e informações relevantes para análise exploratória e agregações."

comentarios_colunas = {
    "name": "Nome da cervejaria",
    "brewery_type": "Tipo de estabelecimento da cervejaria",
    "address": "Endereço principal da cervejaria, priorizando valores válidos entre address_1, address_2 e address_3",
    "city": "Cidade onde a cervejaria está localizada",
    "state": "Estado onde a cervejaria está localizada",
    "country": "País onde a cervejaria está localizada",
    "silver_timestamp": "Timestamp de carga na camada Silver"
}

adicionaComentariosTabela(
    schema_silver,
    table_silver,
    comentario_tabela,
    comentarios_colunas
)

In [0]:
# Otimiza a tabela Delta na camada Silver, consolidando pequenos arquivos em arquivos maiores
# para melhorar o desempenho de leitura em consultas analíticas
spark.sql(f'OPTIMIZE {schema_silver}.{table_silver}')

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# Verificação opcional da pasta Silver
display(dbutils.fs.ls(silver_path))

path,name,size,modificationTime
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/_delta_log/,_delta_log/,0,1747698376000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Arizona/,state=Arizona/,0,1747698380000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=California/,state=California/,0,1747698380000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Colorado/,state=Colorado/,0,1747698381000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Delaware/,state=Delaware/,0,1747698381000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Idaho/,state=Idaho/,0,1747698381000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Illinois/,state=Illinois/,0,1747698380000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Indiana/,state=Indiana/,0,1747698381000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Iowa/,state=Iowa/,0,1747698380000
abfss://brewery-project@stbrewerydatalake.dfs.core.windows.net/2.silver/state=Laois/,state=Laois/,0,1747698380000
