In [0]:
dbutils.library.restartPython()

In [0]:
from src.transforms.gold.gold_oil_features import generate_gold_oil_features_table

In [0]:
%skip
generate_gold_oil_features_table(spark)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


uk_unemployment = spark.table("oil_analytics.silver_uk_unemployment")
uk_interest_rate = spark.table("oil_analytics.silver_uk_interest_rate")
uk_gdp = spark.table("oil_analytics.silver_uk_gdp")
uk_cpi = spark.table("oil_analytics.silver_uk_cpi")
fed_unemployment = spark.table("oil_analytics.silver_fed_unemployment")
fed_interest_rate = spark.table("oil_analytics.silver_fed_interest_rate")
fed_gdp = spark.table("oil_analytics.silver_fed_gdp")
fed_cpi = spark.table("oil_analytics.silver_fed_cpi")

START_DATE = "2020-01-01"
END_DATE   = "2026-12-31"

calendar_df = (
    spark
    .createDataFrame([(START_DATE, END_DATE)], ["start_date", "end_date"])
    .withColumn("calendar_date", explode(sequence(to_date(lit(START_DATE)),to_date(lit(END_DATE)))))
    .select("calendar_date")
)

def create_gold_daily(df, calendar, col_name:str, date_col="date"):
    """
    Aligns source dataframe to a daily calendar and forward-fills values.
    df: source Silver dataframe
    calendar: calendar dataframe with column calendar_date
    col_name: column in dataframe to forward-fill
    date_col: silver dataframe date column default= date
    """
    df = (
    calendar
    .join(df, calendar.calendar_date == df[date_col], "left")
    .withColumn(col_name, last(col_name, ignorenulls=True)
        .over(Window.orderBy("calendar_date")))
    .select(calendar.calendar_date.alias("date"), col_name)
    )
    return df

uk_daily_unem_df = create_gold_daily(uk_unemployment, calendar_df, "uk_unemployment_rate", "to_date")
fed_daily_unem_df = create_gold_daily(fed_unemployment, calendar_df, "us_unemployment_rate")

uk_daily_interest_df = create_gold_daily(uk_interest_rate, calendar_df, "uk_interest_rate")
fed_daily_interest_df = create_gold_daily(fed_interest_rate, calendar_df, "us_interest_rate")

uk_daily_cpi_df = create_gold_daily(uk_cpi, calendar_df, "uk_cpi_rate")
fed_daily_cpi_df = create_gold_daily(fed_cpi, calendar_df, "us_cpi_rate")


In [0]:
####### cleaned #######
unem_joined = uk_daily_unem_df \
    .join(fed_daily_unem_df, "date", "left") \
        .orderBy(desc("date"))
clean_unemployment_df = unem_joined \
    .select("date", round("uk_unemployment_rate", 1).alias("uk_unemployment_rate"), "us_unemployment_rate")

joined_interest_rate = uk_daily_interest_df.join(fed_daily_interest_df, "date", "left")
clean_interest_rate_df = joined_interest_rate.orderBy(desc("date")).drop("source_system")

fed_daily_cpi_df = fed_daily_cpi_df.withColumn("us_cpi_rate", round(col("us_cpi_rate"), 1))
clean_cpi_df = uk_daily_cpi_df \
    .join(fed_daily_cpi_df, "date", "outer") \
        .orderBy(desc("date")) \
            .drop("source_system")

fed_gdp = fed_gdp.select(year("date").alias("year"), "us_gdp_rate_yoy")
joined_gdp = uk_gdp.join(fed_gdp, "year", "outer")
clean_gdp_df = joined_gdp.select("year", round("uk_gdp_rate_yoy", 1).alias("uk_gdp_rate_yoy"), "us_gdp_rate_yoy").orderBy(desc("year"))

In [0]:
fed_daily_unem_df.columns

In [0]:
macro_joined = clean_unemployment_df.join(clean_interest_rate_df, "date", "outer").join(clean_cpi_df, "date", "outer")
macro_joined = macro_joined.withColumn("year", year("date"))
clean_macro_df = macro_joined.orderBy(desc("date"))

In [0]:
# clean_macro_df.display()

In [0]:
### ready to join on year

cpi_unem_yoy = clean_macro_df.groupBy(year("date").alias("year")).agg(round(mean("uk_cpi_rate"), 1).alias("uk_cpi_rate_yoy"), round(mean("us_cpi_rate"), 1).alias("us_cpi_rate_yoy"), round(mean("uk_unemployment_rate"), 1).alias("uk_unemployment_rate_yoy"), round(mean("us_unemployment_rate"), 1).alias("us_unemployment_rate_yoy"))

# cpi_unem_yoy.display()

In [0]:
# date

# -- Levels
# interest_rate ###
# inflation_rate ###
# unemployment_rate ###
# gdp ###

# -- Growth
# inflation_yoy ###
# unemployment_yoy ###

# -- Changes
# interest_rate_change_1m
# inflation_change_1m
# unemployment_change_1m

# -- Rolling averages
# inflation_3m_avg
# interest_rate_3m_avg
# unemployment_6m_avg

# -- Regime flags
# high_inflation_regime
# tightening_cycle
# weak_growth_regime

## interest rate
    # date
    # interest_rate
    # interest_rate_change
    # rate_hike_flag
    # rate_cut_flag
    # rate_hold_flag
    # tightening_cycle


In [0]:

# SCHEMA_NAME = "oil_analytics"

# ##### table names #####
# silver_uk_unemployment = "silver_uk_unemployment"
# silver_fed_unemployment = "silver_fed_unemployment"
# silver_uk_cpi = "silver_uk_cpi"
# silver_fed_cpi = "silver_fed_cpi"
# silver_uk_gdp = "silver_uk_gdp"
# silver_fed_gdp = "silver_fed_gdp"
# silver_uk_interest_rate = "silver_uk_interest_rate"
# silver_fed_interest_rate = "silver_fed_interest_rate"

# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_uk_unemployment}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_fed_unemployment}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_uk_cpi}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_fed_cpi}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_uk_gdp}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_fed_gdp}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_uk_interest_rate}")
# spark.sql(f"DROP TABLE IF EXISTS {SCHEMA_NAME}.{silver_fed_interest_rate}")