In [0]:
# Biblioteca(s)
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, current_timestamp, lit, input_file_name
from pyspark.sql import functions as F

In [0]:
# Torna disponíveis todas as funções, classes e variáveis definidas em 'lib_utils' no ambiente atual
# Obs: Comando em célula separada, pois não comporta bem com o comando %run

In [0]:
%run "Workspace/programas_python/projetos_e_teste/lib_utils"

In [0]:
# Configura os caminhos de armazenamento de dados
db_name_silver = f'db_breweries_silver'
db_name_gold = f'db_breweries_gold'
api_path = "api_openbrewerydb"
table_name = "breweries"
table_name_gold = "dm_brewery_distribution_by_state_and_type"
silver_path = "dbfs:/FileStore/silver_layer"
gold_path = "dbfs:/FileStore/gold_layer"


silver_base_path = f"{silver_path}/{api_path}/{db_name_silver}"
silver_source_delta = f"{silver_base_path}/{table_name}/"
gold_base_path = f"{gold_path}/{api_path}/{db_name_gold}"
gold_sink_delta = f"{gold_base_path}/{table_name_gold}/"


In [0]:
# Lê uma tabela Delta especificada no caminho 'silver_source_delta' e habilita a mesclagem de esquema.
# Isso permite que o Spark reconheça e integre mudanças no esquema da tabela Delta durante a leitura.
df_source = spark.read.option("mergeSchema", "true").format('delta').load(silver_source_delta)
# df_source.printSchema() # Descomente para visualizar o esquema da tabela

In [0]:
df_grouped = df_source.groupBy("brewery_type", "state").agg(F.count("*").alias("soma")).orderBy(F.col("soma").desc())
# display(df_grouped) # Descomente para visualizar o DataFrame

In [0]:
# Cria colunas para possível monitoramento 
df_sink = (
    df_grouped.withColumn("dt_gold_entry_utc", current_timestamp())
)

In [0]:
# Cria o banco de dados se ele não existir, usando o nome fornecido em 'db_name_gold'
# O caminho 'silver_sink_delta' especifica o diretório onde o banco de dados será armazenado
create_database_if_not_exists(db_name_gold, gold_sink_delta)

In [0]:
# Escreve o DataFrame 'df_sink' no formato Delta Lake, sobrescrevendo dados existentes
(df_sink.write.format('delta').mode('overwrite')
         .option("mergeSchema", "true")
         .saveAsTable(f'{db_name_gold}.{table_name_gold}'))