In [None]:
import polars as pl
import pathlib
from datetime import date, timedelta
from typing import NamedTuple

RAW_DATA = pathlib.Path("../data/raw")
CPI = RAW_DATA / "cpi_2006-2025.csv"
FED_FUNDS = RAW_DATA / "fed_funds_rate_2006-2025.csv"
GDP = RAW_DATA / "gdp_2006-2025.csv"
PRESIDENTS = RAW_DATA / "presidents_and_chairmen.csv"
UNEMPLOYMENT = RAW_DATA / "unemployment_2006-2025.csv"

NEWS_DATA = "../data/filtered/07_sampled_news.parquet"
COMMUNICATIONS = "../data/processed/communications_stemmed.parquet"

In [None]:
class TimeSeries(NamedTuple):
    name: str
    filepath: pathlib.Path
    date_col: str
    data_col: str

In [None]:
def read_timeseries(ts: TimeSeries) -> pl.DataFrame:
    df = pl.read_csv(ts.filepath, schema={ts.date_col: pl.Date, ts.data_col: pl.Float64})
    df = df.rename({ts.date_col: "date", ts.data_col: ts.name})
    return df

In [None]:
timeseries = [
    TimeSeries("cpi", CPI, "observation_date", "CPIAUCSL_PC1"),
    TimeSeries("funds_rate", FED_FUNDS, "observation_date", "FEDFUNDS"),
    TimeSeries("gdp", GDP, "observation_date", "GDP_PC1"),
    TimeSeries("unemployment", UNEMPLOYMENT, "observation_date", "UNRATE"),
]

In [None]:
df = pl.read_parquet(NEWS_DATA)
df

In [None]:
cpi_df = pl.read_csv(
    CPI,
    schema={
        "observation_date": pl.Date,
        "CPIAUCSL_PC1": pl.Float64
    }
    ).rename({
    "observation_date": "date",
    "CPIAUCSL_PC1": "cpi"
})
cpi_df = cpi_df.with_columns(
    pl.col("date").shift(-1).alias("daily_month")
).drop_nulls()
cpi_df

In [None]:
def disaggregate_timeseries(df: pl.DataFrame) -> pl.DataFrame:
    df = df.with_columns(
        pl.col("date").shift(-1).alias("daily_month")
    ).drop_nulls()

    cols = df.columns
    date_col = cols[0]
    var_col = cols[1]

    daily_data = {
        date_col: [],
        var_col: []
    }
    for _, change, month_dt in df.iter_rows():
        # Get all days in this month
        start = month_dt
        if month_dt.month == 12:
            end = date(month_dt.year + 1, 1, 1)
        else:
            end = date(month_dt.year, month_dt.month + 1, 1)
        delta = (end - start).days
        days = [start + timedelta(days=i) for i in range(delta)]
        for day in days:
            daily_data[date_col].append(day)
            daily_data[var_col].append(change)
    return pl.DataFrame(daily_data)

In [None]:
disaggregate_timeseries(cpi_df)

In [None]:
unemployment_df = pl.read_csv(
    UNEMPLOYMENT,
    schema={"observation_date": pl.Date, "UNRATE": pl.Float64}
).rename({"observation_date": "date", "UNRATE": "unemployment"})
disaggregate_timeseries(unemployment_df)

In [None]:
gdp_df = pl.read_csv(
    GDP, schema={"observation_date": pl.Date, "GDP_PC1": pl.Float64}
).rename({"observation_date": "date", "GDP_PC1": "gdp"})
gdp_df = gdp_df.with_columns(
    pl.col("date").dt.offset_by("-3mo").alias("quarter_start")
)
gdp_df

In [None]:
daily_data = {"date": [], "gdp": []}
for row in gdp_df.iter_rows(named=True):
    start, end, gdp_value = row["quarter_start"], row["date"], row["gdp"]
    delta = (end - start).days + 1  # inclusive of last day
    for i in range(delta):
        day = start + timedelta(days=i)
        daily_data["date"].append(day)
        daily_data["gdp"].append(gdp_value)

# Create daily DataFrame
daily_gdp = pl.DataFrame(daily_data)
daily_gdp

In [None]:
def disaggregate_gdp(df: pl.DataFrame) -> pl.DataFrame:
    daily_data = {"date": [], "gdp": []}
    for row in gdp_df.iter_rows(named=True):
        start, end, gdp_value = row["quarter_start"], row["date"], row["gdp"]
        delta = (end - start).days + 1  # inclusive of last day
        for i in range(delta):
            day = start + timedelta(days=i)
            daily_data["date"].append(day)
            daily_data["gdp"].append(gdp_value)

    return pl.DataFrame(daily_data)
disaggregate_gdp(gdp_df)

In [None]:
dfs = [read_timeseries(ts) for ts in timeseries]
daily_ts_dfs = []
for ts, df in zip(timeseries, dfs):
    if ts.name != "gdp":
        daily_df = disaggregate_timeseries(df)
    else:
        daily_df = disaggregate_gdp(df)
    daily_ts_dfs.append(daily_df)

In [None]:
from functools import reduce

econ_data_df: pl.DataFrame = reduce(lambda left, right: left.join(right, on="date", how="left"), daily_ts_dfs)
econ_data_df.write_parquet("../data/processed/econ_data.parquet")
econ_data_df

In [None]:
df = pl.read_parquet(NEWS_DATA)
df = df.with_columns(pl.col("date").dt.date())
df = df.join(econ_data_df, on="date", how="left")
df

In [None]:
pres_df = pl.read_csv(PRESIDENTS, schema={"date": pl.Date, "president": pl.String, "party": pl.String, "fed_chair": pl.String})
pres_df

In [None]:
df = df.join(pres_df, on="date", how="left")
df

In [None]:
df = df.select(["date", "text", "cpi", "funds_rate", "gdp", "unemployment", "president", "party", "fed_chair"])
df

In [None]:
df.write_parquet("../data/processed/news_final.parquet")

In [None]:
communications_df = pl.read_parquet(COMMUNICATIONS).with_columns([
    pl.col("Date").str.strptime(pl.Date, "%Y-%m-%d"),
    pl.col("Release Date").str.strptime(pl.Date, "%Y-%m-%d"),
])
communications_df = communications_df.select(["Date", "text"])
communications_df = communications_df.rename({"Date": "date"})
communications_df

In [None]:
communications_metadata_df = communications_df.join(econ_data_df, on="date", how="left")
communications_metadata_df = communications_metadata_df.join(pres_df, on="date", how="left")
communications_metadata_df = communications_metadata_df.drop_nulls()
communications_metadata_df = communications_metadata_df.filter(pl.col("date") >= date(2006, 10, 25))
communications_metadata_df = communications_metadata_df.filter(pl.col("date") <= date(2023, 12, 13))
communications_metadata_df.write_parquet("../data/processed/communications_final.parquet")
communications_metadata_df

In [None]:
comms_df = pl.read_parquet("../data/processed/communications_final.parquet")
comms_df

In [None]:
dir = pathlib.Path("D:\\Projeto NLP\\output\\stm_fed_05_2025-06-11_22-25-46")
df = pl.read_parquet(dir / "theta.parquet")
df

In [None]:
df = pl.read_parquet(dir / "topic_dist.parquet")
df

In [None]:
pl.read_parquet("../data/processed/communications_final.parquet")