In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
import requests
import json

#### Dados de acesso

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "05385f3d-aff7-4b2a-a354-e55490cf3c3c",
"fs.azure.account.oauth2.client.secret": 'WBx8Q~_K9~ON-VTsZLq_xMdMsFF6BZGSumDEQcoF',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/cc1ec62c-0411-4afb-a12a-a1b2e028414a/oauth2/token"}

In [0]:
lake = dbutils.secrets.get(scope = "scopedbwinbevtest", key = "secret-datalake-name")
lake_key = dbutils.secrets.get(scope = "scopedbwinbevtest", key = "secret-key-datalake")
containers = ['bronze', 'silver', 'gold']

In [0]:
print(lake)

In [0]:
def mount_datalake(container):
    try:
        for container in containers:  # Fixed typo from 'contanier' to 'container'
            dbutils.fs.mount(
                source = f"wabfss://{container}@{lake}.dfs.core.windows.net",
                mount_point = f"/mnt/{container}",  # Fixed mount_point to use 'container'
                extra_configs = configs
            )
    except ValueError as e:  # Fixed exception type from 'valueError' to 'ValueError'
        print(f"Erro ao montar o datalake: {e}")

In [0]:
# Caminho para o arquivo JSON no mount point

json_file_path = "/mnt/bronze/"

def extract_data():
    url = 'https://api.openbrewerydb.org/breweries'
    response = requests.get(url)
    data = response.json()
    
    # Salva o JSON no bucket bronze
    with open("/dbfs" + json_file_path, 'w') as json_file:
        json.dump(data, json_file)
    
    return data


#### Leitura dos dados da bronze source, contrato de dados e elaboração da camada prata com salvamento em parquet

In [0]:
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType

# Contrato de dados
schema = StructType([
    StructField("id", StringType(), nullable=False),
    StructField("name", StringType()),
    StructField("brewery_type", StringType()),
    StructField("address_1", StringType()),
    StructField("address_2", StringType()),
    StructField("address_3", StringType()),
    StructField("city", StringType()),
    StructField("state_province", StringType()),
    StructField("postal_code", StringType()),
    StructField("country", StringType()),
    StructField("longitude", StringType()),
    StructField("latitude", StringType()),
    StructField("phone", StringType()),
    StructField("website_url", StringType()),
    StructField("state", StringType()),
    StructField("street", StringType())
])

json_file_path = "/mnt/bronze/data_a041638e-b655-48ff-962d-004c5f15d048_a566ec44-c3db-497c-94cd-4d13888e7d60.json" 
# Lê o arquivo JSON diretamente para um DataFrame, aplicando o schema
try:
    df_bronze = spark.read.json(json_file_path, schema=schema)
except Exception as e:
    print(f"Erro ao ler o arquivo JSON: {e}")
    # Trate o erro de acordo com sua necessidade. Por exemplo:
    # raise  # Re-lança a exceção para interromper o processamento
    df_bronze = spark.createDataFrame([], schema=schema)

def create_silver_layer(df_bronze):
    """Cria a camada prata."""
    df_prata = df_bronze.withColumn("longitude", col("longitude").cast("double")).withColumn("latitude", col("latitude").cast("double"))
    return df_prata

# Cria a camada prata
df_prata = create_silver_layer(df_bronze)

# Salva o DataFrame em formato Parquet (adapte o caminho conforme necessário)
parquet_file_path = "/mnt/silver/breweries.parquet" # Exemplo dentro do mesmo mount
df_prata.write.parquet(parquet_file_path, mode="overwrite")

print(f"Arquivo Parquet salvo em: {parquet_file_path}")

#### Leitura do arquivo da prata e agregações pra gold

In [0]:
from pyspark.sql.functions import count

def create_brewery_aggregation(df_prata):
    """
    Cria uma visão agregada com a quantidade de cervejarias por tipo e localização.

    Args:
        df_prata: DataFrame da camada prata contendo os dados das cervejarias.

    Returns:
        DataFrame Spark contendo a agregação.
    """

    df_agg = df_prata.groupBy("brewery_type", "state", "city").agg(count("*").alias("quantidade_cervejarias"))

    return df_agg

# Cria a visão agregada
df_agregado = create_brewery_aggregation(df_prata)

# Exibe o resultado
#df_agregado.show()


# Salva o DataFrame em formato Parquet (adapte o caminho conforme necessário)
parquet_file_path = "/mnt/gold/breweries.parquet" # Exemplo dentro do mesmo mount
df_agregado.write.parquet(parquet_file_path, mode="overwrite")

print(f"Arquivo Parquet salvo em: {parquet_file_path}")

#### Persistência e Disponibilização dos dados da Gold

In [0]:
user = dbutils.secrets.get(scope = "scopedbwinbevtest", key = "secret-sql-user")
password = dbutils.secrets.get(scope = "scopedbwinbevtest", key = "secret-sql-pass")

# carga na tabela SQL

def sql_spark(df):
    df.write.format('jdbc')\
    .mode('overwrite')\
    .option('url', "jdbc:sqlserver://sql-serv-inbevtest.database.windows.net")\
    .option("databaseName", "sqldb.inbevtest")\
    .option('dbtable', 'Breweries_gold')\
    .option('password', password)\
    .option('user', user)\
    .save()

sql_spark(df_agregado)

Desmonta as camAdas DBFS

In [0]:
def unmount_datalake(containers):
    try:
        for container in containers:
            dbutils.fs.unmount(f"/mnt/{container}/")
    except ValueError as err:
        print(f"Erro ao desmontar o datalake: {err}")