In [20]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PYSPARK_PYTHON"] = "/home/peralba/etl-aws/venv/bin/python"

In [21]:
from pyspark.sql.functions import (
    avg,
    col,
    concat,
    current_date,
    date_format,
    lag,
    lpad,
    max,
    min,
    quarter,
    stddev,
    to_date
)
from pyspark.sql.window import Window
from pyspark.sql import SparkSession


In [22]:
spark = SparkSession.builder \
    .appName("ETL_IPEA") \
    .getOrCreate()

### Loading the raw data

In [23]:
raw_data_path = "../../src/data/raw/eia366.csv"
df = spark.read.parquet(raw_data_path)

In [24]:
current_year = date_format(current_date(), "yyyy").cast("int")
current_month = date_format(current_date(), "MM").cast("int")

### Filtrando para remover os dados do mês corrente

In [25]:
df = df.filter(~((col("YEAR") == current_year) & (col("MONTH") == current_month)))

### Removendo registros com valores ausentes em "VALUE (US$)"

In [26]:
df = df.na.drop(subset=["VALUE (US$)"])

### Média mensal de preço do petróleo bruto

In [27]:
df = df.groupBy("YEAR", "MONTH").agg(avg("VALUE (US$)").alias("preco_medio_usd"))

### Criação da coluna "anomes" utilizando os campos "YEAR" e "MONTH"

In [28]:
df = df.withColumn("anomes", to_date(concat(col("YEAR").cast("string"), lpad(col("MONTH").cast("string"), 2, "0")), "yyyyMM"))

### Ordenando o DatFrame por data

In [29]:
window_spec = Window.orderBy("anomes")

### Criando lags de 1 a 6 meses

In [30]:
df = df.withColumn("lag_1_mes_preco_medio_usd", lag("preco_medio_usd", 1).over(window_spec)) \
        .withColumn("lag_2_meses_preco_medio_usd", lag("preco_medio_usd", 2).over(window_spec)) \
        .withColumn("lag_3_meses_preco_medio_usd", lag("preco_medio_usd", 3).over(window_spec)) \
        .withColumn("lag_4_meses_preco_medio_usd", lag("preco_medio_usd", 4).over(window_spec)) \
        .withColumn("lag_5_meses_preco_medio_usd", lag("preco_medio_usd", 5).over(window_spec)) \
        .withColumn("lag_6_meses_preco_medio_usd", lag("preco_medio_usd", 6).over(window_spec))

### Média móvel de 6 meses

In [31]:
df = df.withColumn("media_movel_6_meses_preco_medio_usd", avg(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

### Desvio padrão móvel de 6 meses

In [32]:
df = df.withColumn("desvio_padrao_movel_6_meses_preco_medio_usd", stddev(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

### Valor mínimo e máximo dos últimos 6 meses

In [33]:
df = df.withColumn("valor_minimo_6_meses_preco_medio_usd", min(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1))) \
        .withColumn("valor_maximo_6_meses_preco_medio_usd", max(col("preco_medio_usd")).over(window_spec.rowsBetween(-6, -1)))

### Componentes sazonais: ano, mês e trimestre

In [34]:
df = df.withColumn("trimestre", quarter("anomes"))
df = df.withColumnRenamed("YEAR", "ano")
df = df.withColumnRenamed("MONTH", "mes")

### Remoção do campo "anomes"

In [35]:
df = df.drop("anomes")

### Removendo linhas com valores NaN

In [36]:
df = df.dropna()

### Salvando dados processados

In [None]:
transformed_data_path = "../../src/data/processed"
df.write.mode("overwrite").parquet(transformed_data_path)

25/08/16 17:38:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 17:38:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 17:38:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 17:38:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 17:38:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 17:38:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/16 1

25/08/16 22:34:53 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 15891231 ms exceeds timeout 120000 ms
25/08/16 22:34:53 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/16 22:34:53 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint