In [1]:
import requests
import os
import json
import pandas as pd
import pytz

from datetime import datetime
from pyspark.sql import functions as F

from pyspark.sql.functions import coalesce, col

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from pyspark.sql.functions import current_date, current_timestamp

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Breweries").enableHiveSupport().getOrCreate()

In [9]:
input_url = "/home/jovyan/work/layers/bronze/"
output_path = "/home/jovyan/work/layers/silver/"

In [10]:
todos_dados = []

for arquivo in os.listdir(input_url):
    if arquivo.endswith(".json"): # Get only .json
        caminho = os.path.join(input_url, arquivo)
        with open(caminho, "r", encoding="utf-8") as f:
            dados = json.load(f)
            todos_dados.append(dados)

df_spark = spark.read.option("multiline", "true").json(input_url)

# Converter tudo para string
df_spark = df_spark.select([F.col(c).cast("string").alias(c) for c in df_spark.columns])

# Se 'city' não existir ou estiver vazio, usar 'state_province'
df_silver = df_spark.withColumn(
    "brewery_location",
    coalesce(col("city"), col("state_province"))
)

# 2. Remover duplicados (pelo ID)
df_silver = df_silver.dropDuplicates(["id"])

date = str(datetime.now(pytz.timezone("Brazil/East"))).split(" ")[0]

df_silver = df_silver.withColumn("exec_date", F.to_date(F.lit(date)))

# 3. Salvar no formato Parquet, particionando por localização (state)
df_silver.write \
    .mode("overwrite") \
    .partitionBy("brewery_location") \
    .parquet(output_path)

In [12]:
df_silver.select('brewery_type', 'city', 'country', 'name', 'state', 'exec_date').show()

+------------+----------------+-------------+--------------------+--------------+----------+
|brewery_type|            city|      country|                name|         state| exec_date|
+------------+----------------+-------------+--------------------+--------------+----------+
|     brewpub|      Louisville|United States|    12Degree Brewing|      Colorado|2025-08-08|
|       micro|         Houston|United States|11 Below Brewing ...|         Texas|2025-08-08|
|       micro|            Mesa|United States|12 West Brewing C...|       Arizona|2025-08-08|
|       large|          Denver|United States|10 Barrel Brewing...|      Colorado|2025-08-08|
|       micro|       Milwaukee|United States|1840 Brewing Company|     Wisconsin|2025-08-08|
|       micro|            Reno|United States|10 Torr Distillin...|        Nevada|2025-08-08|
|       micro|        Abington|United States|10th District Bre...| Massachusetts|2025-08-08|
|       micro|      Georgetown|United States|  16 Mile Brewing Co|    

In [14]:
# Função para carregar dados no SQLite
def save_to_sqlite():
    try:
        input_path = '/opt/airflow/dags/dados_transformados.csv'
        if not os.path.exists(input_path):
            raise FileNotFoundError(f"Arquivo {input_path} não encontrado!")
        conn = sqlite3.connect('/opt/airflow/dags/meu_banco.db')
        df = pd.read_csv(input_path, encoding='utf-8')
        df.to_sql('transacoes', conn, if_exists='replace', index=False)
        conn.close()
        print("Dados carregados no SQLite com sucesso!")
        print(f"Verificando existência do arquivo: {os.path.exists('/opt/airflow/dags/meu_banco.db')}")
    except Exception as e:
        print(f"Erro ao salvar no SQLite: {str(e)}")
        raise

# Definindo a DAG
with DAG(
    dag_id='etl_pipeline_2',
    start_date=datetime(2025, 3, 16),
    schedule_interval=None,
    catchup=False,
) as dag:
    load = PythonOperator(task_id='load_file', python_callable=load_file)
    transform = PythonOperator(task_id='transform_data', python_callable=transform_data)
    save = PythonOperator(task_id='save_to_sqlite', python_callable=save_to_sqlite)
    load >> transform >> save

In [19]:
#df_pandas = pd.json_normalize(dados_json)

In [None]:
#df_pandas.head()