In [0]:
%sql
CREATE CATALOG IF NOT EXISTS `breweries-catalog`;
--Create schema
CREATE SCHEMA IF NOT EXISTS `breweries-catalog`.bronze_layer;

In [0]:
import requests
import time
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType
)
import pyspark.sql.functions as F

# Define schema based on API fields
schema = StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("brewery_type", StringType(), False),
    StructField("address_1", StringType(), True),
    StructField("address_2", StringType(), True),
    StructField("address_3", StringType(), True),
    StructField("city", StringType(), False),
    StructField("state_province", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("country", StringType(), False),
    StructField("longitude", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("website_url", StringType(), True),
    StructField("state", StringType(), False),
    StructField("street", StringType(), True)
])

# Get data from API with pagination and rate limit handling
api_url = "https://api.openbrewerydb.org/v1/breweries"
all_data = []
page = 1
per_page = 50

while True:
    try:
        resp = requests.get(
            api_url,
            params={"page": page, "per_page": per_page}
        )
        if resp.status_code == 429:
            time.sleep(2)  # Wait before retrying
            continue
        resp.raise_for_status()
        page_data = resp.json()
        if not page_data:
            break
        all_data.extend(page_data)
        page += 1
        time.sleep(0.5)  # Be nice to the API
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error: {e}")
        break

# Create DataFrame
df_bronze = spark.createDataFrame(all_data, schema=schema)

# Add ingestion timestamp column
df_bronze = df_bronze.withColumn(
    "ingestion_timestamp",
    F.current_timestamp()
)

In [0]:
#DQ
if df_bronze.count() == 0:
    raise Exception("Data Quality Check Failed: Bronze Table is empty")

not_null_columns = [field.name for field in df_bronze.schema.fields if not field.nullable]
null_not_allowed = [c for c in not_null_columns if df_bronze.filter(F.col(c).isNull()).count() > 0]
if null_not_allowed:
    raise Exception(f"Data Quality Check Failed: Null values found in NOT NULL columns: {null_not_allowed}")

# DQ Log
null_counts = df_bronze.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_bronze.columns
])
df_bronze_dq = null_counts.withColumn("dq_layer", F.lit("bronze")) \
                   .withColumn("ingestion_timestamp", F.current_timestamp())

# Salvar informações na tabela de dq
df_bronze_dq.write.mode("append").format("delta").saveAsTable("`breweries-catalog`.bronze_layer.breweries_dq")

In [0]:
# Write table
df_bronze.write \
  .format("delta") \
  .mode("append") \
  .saveAsTable("`breweries-pipeline-data-catalog`.bronze_layer.breweries_bronze")