In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

storage_account = "storagevinicius"
container = "datalake"

# Ajuste estes paths conforme seus nomes reais na Silver
sp500_silver_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/sp500_prices"
cpi_silver_path   = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/cpiaucsl"

gold_path         = f"abfss://{container}@{storage_account}.dfs.core.windows.net/gold/sp500_vs_cpi_monthly"

In [0]:
df_sp = spark.read.format("delta").load(sp500_silver_path)
df_cpi = spark.read.format("delta").load(cpi_silver_path)

In [0]:
df_sp_m = df_sp.withColumn("month", F.trunc(F.col("date"), "MM"))

w_month_last = Window.partitionBy("month").orderBy(F.col("date").desc())

sp_monthly = (
    df_sp_m
    .withColumn("rn", F.row_number().over(w_month_last))
    .filter(F.col("rn") == 1)
    .select(
        "month",
        F.col("date").alias("sp_last_trading_day"),
        F.col("close").cast("double").alias("sp500_close_eom"),
        F.col("volume").cast("double").alias("sp500_volume_eom")
    )
    .orderBy("month")
)

# Retorno mensal: (close_t / close_{t-1}) - 1
w_order = Window.orderBy("month")
sp_monthly = (
    sp_monthly
    .withColumn("sp500_close_prev", F.lag("sp500_close_eom").over(w_order))
    .withColumn("sp500_monthly_return", (F.col("sp500_close_eom") / F.col("sp500_close_prev")) - F.lit(1.0))
    .drop("sp500_close_prev")
)

display(sp_monthly.limit(15))

In [0]:
df_cpi2 = (
    df_cpi
    .select(
        F.trunc(F.col("date"), "MM").alias("month"),
        F.col("cpiaucsl").cast("double").alias("cpi_index")
    )
    .orderBy("month")
)

w_m = Window.orderBy("month")
df_cpi2 = (
    df_cpi2
    .withColumn("cpi_prev", F.lag("cpi_index").over(w_m))
    .withColumn("cpi_mom_inflation", (F.col("cpi_index") / F.col("cpi_prev")) - F.lit(1.0))
    .withColumn("cpi_12m_ago", F.lag("cpi_index", 12).over(w_m))
    .withColumn("cpi_yoy_inflation", (F.col("cpi_index") / F.col("cpi_12m_ago")) - F.lit(1.0))
    .drop("cpi_prev", "cpi_12m_ago")
)

In [0]:
gold = (
    sp_monthly.alias("sp")
    .join(df_cpi2.alias("cpi"), on="month", how="left")
    .select(
        "month",
        "sp_last_trading_day",
        "sp500_close_eom",
        "sp500_monthly_return",
        "sp500_volume_eom",
        "cpi_index",
        "cpi_mom_inflation",
        "cpi_yoy_inflation"
    )
    .orderBy("month")
)

display(gold.limit(24))

In [0]:
(
    gold.write
    .format("delta")
    .mode("overwrite")
    .save(gold_path)
)

print("Gold salvo em:", gold_path)

In [0]:
export_path = "abfss://datalake@storagevinicius.dfs.core.windows.net/export/sp500_vs_cpi_monthly_csv"

(
    gold
    .coalesce(1) 
    .write
    .mode("overwrite")
    .option("header", True)
    .csv(export_path)
)

print("Export CSV pronto em:", export_path)