## Extract

In [None]:
import ipeadatapy as ip
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, date_format

# iniciando uma sessão do Spark
spark = SparkSession.builder \
    .appName("ETL Example") \
    .getOrCreate()

# obtenção da série temporal do preço do petróleo bruto
cod = "EIA366_PBRENT366"
eia366 = ip.timeseries(cod)

# conversão de pandas DataFrame para o DataFrame do PySpark
df = spark.createDataFrame(eia366)

# diretório para salvar os dados brutos
raw_data_path = "../data/raw"

# dados brutos em formato Parquet
df.write.mode("overwrite").parquet(raw_data_path)

# Mostrar os dados
df.show()

## Transform

In [None]:
from pyspark.sql.functions import (
    avg, 
    col, 
    concat,
    current_date,
    date_format,
    lag,
    lpad,
    max,
    min,
    quarter,
    stddev,
    to_date
)
from pyspark.sql.window import Window

# carregando os dados brutos
raw_data_path = "../data/raw"
df = spark.read.parquet(raw_data_path)

# obtendo o ano e mês corrente
current_year = date_format(current_date(), "yyyy").cast("int")
current_month = date_format(current_date(), "MM").cast("int")
df.show()

# filtrando para remover os dados do mês corrente
df = df.filter(~((col("YEAR") == current_year) & (col("MONTH") == current_month)))

# removendo registros com valores ausentes em "VALUE (US$)"
df = df.na.drop(subset=["VALUE (US$)"])

# média mensal de preço do petróleo bruto
df = df.groupBy("YEAR", "MONTH").agg(avg("VALUE (US$)").alias("preco_medio_usd"))

# criação da coluna "anomes" utilizando os campos "YEAR" e "MONTH"
df = df.withColumn("anomes", to_date(concat(col("YEAR").cast("string"), lpad(col("MONTH").cast("string"), 2, "0")), "yyyyMM"))

# ordenando o DataFrame por data
window_spec = Window.orderBy("anomes")

# criando lags de 1 a 6 meses
df = df.withColumn("lag_1_mes_preco_medio_usd", lag("preco_medio_usd", 1).over(window_spec)) \
       .withColumn("lag_2_meses_preco_medio_usd", lag("preco_medio_usd", 2).over(window_spec)) \
       .withColumn("lag_3_meses_preco_medio_usd", lag("preco_medio_usd", 3).over(window_spec)) \
       .withColumn("lag_4_meses_preco_medio_usd", lag("preco_medio_usd", 4).over(window_spec)) \
       .withColumn("lag_5_meses_preco_medio_usd", lag("preco_medio_usd", 5).over(window_spec)) \
       .withColumn("lag_6_meses_preco_medio_usd", lag("preco_medio_usd", 6).over(window_spec))

# média móvel de 6 meses
df = df.withColumn("media_movel_6_meses_preco_medio_usd", avg(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

# desvio padrão móvel de 6 meses
df = df.withColumn("desvio_padrao_movel_6_meses_preco_medio_usd", stddev(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

# valor mínimo e máximo dos últimos 6 meses
df = df.withColumn("valor_minimo_6_meses_preco_medio_usd", min(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1))) \
       .withColumn("valor_maximo_6_meses_preco_medio_usd", max(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

# componentes sazonais: ano, mês e trimestre
df = df.withColumn("trimestre", quarter("anomes"))
df = df.withColumnRenamed("YEAR", "ano")
df = df.withColumnRenamed("MONTH", "mes")

# remoção do campo "anomes"
df = df.drop("anomes")

# removendo linhas com valores NaN que foram criados ao fazer o shift ou nas agregações
df = df.dropna()

# persistindo os dados transformados
transformed_data_path = "../data/transformed"
df.write.mode("overwrite").parquet(transformed_data_path)
df.show()

## Load

In [None]:
from pyspark.sql.types import DecimalType, IntegerType

# carregando os dados transformados
transformed_data_path = "../data/transformed"
df = spark.read.parquet(transformed_data_path)

# forçando os tipos de dados
df = df.select(
    col("ano").cast(IntegerType()),
    col("mes").cast(IntegerType()),
    col("preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_1_mes_preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_2_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_3_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_4_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_5_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("lag_6_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("media_movel_6_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("desvio_padrao_movel_6_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("valor_minimo_6_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("valor_maximo_6_meses_preco_medio_usd").cast(DecimalType(5,2)),
    col("trimestre").cast(IntegerType())
)

# persistindo os dados prontos para consumo
final_data_path = "../data/final"
df.write.mode("overwrite").parquet(final_data_path)
df.show()