In [None]:
import os
import sys
from pathlib import Path

sys.path.append(
    Path.cwd().parents[0].as_posix()
)

In [None]:
import pandas as pd
import polars as pl

from finnews.data.controllers import FNSPIDController

# Defines

In [None]:
dc = FNSPIDController()

In [None]:
root_input_dir = os.path.join("data", "raw")
root_output_dir = os.path.join("data", "processed")

n_symbols = 20
etf_symbols = [
    "PMAY",
]

train_start = "2010-01-01"
val_start = "2023-06-01"
test_start = "2023-09-01"

# Raw Data

Downloading raw FNSPID data using data controller.

In [None]:
dc.download_raw_data(
    output_dir=root_input_dir
)

# Articles

Selecting a sample of stocks and articles:

- Top 30
- From 2010

In [None]:
dl = dc.get_articles(root_input_dir)

In [None]:
dl.collect_schema().names()

In [None]:
daily_counts = (
    dl
    .with_columns(
        (pl.col("date").dt.date().alias("day")),
        (pl.col("article").str.len_chars().alias("article_len")),
    )
    .group_by(
        ["day", "Stock_symbol"]
    )
    .agg(
        count=pl.len(),
        count_min=pl.col("article_len").min()
    )
    .collect(streaming=True)
)

In [None]:
df_daily_counts = daily_counts.to_pandas()

In [None]:
df_sample = df_daily_counts.loc[
    (~df_daily_counts["Stock_symbol"].isin(etf_symbols))
    & (df_daily_counts["day"] >= train_start)
].groupby(
    "Stock_symbol",
    as_index=False
)[["count"]].sum().sort_values(
    by="count",
    ascending=False,
    ignore_index=True
).head(n_symbols)

In [None]:
df_sample["count"].sum()

In [None]:
df_sample

In [None]:
symbols = df_sample["Stock_symbol"].tolist()

In [None]:
dl_sample = dl.filter(
    (pl.col("Stock_symbol").is_in(symbols))
    & (pl.col("date") >= pd.to_datetime(train_start).to_pydatetime())
).select(
    ["date", "Stock_symbol", "article"]
).rename(
    {
        "Stock_symbol": "symbol"
    }
)

In [None]:
dl_sample.select(["date", "symbol", "article"]).sink_csv(
    os.path.join(root_output_dir, "articles.csv")
)

In [None]:
dl_sample.select(["date", "symbol", "article"]).sink_parquet(
    os.path.join(root_output_dir, "articles.parquet")
)

# Prices

In [None]:
df_prices = dc.get_prices(
    input_dir=root_input_dir,
    symbols=symbols
)

In [None]:
df_prices.loc[
    df_prices["date"] >= train_start
].reset_index(
    drop=True
).to_parquet(
    os.path.join(root_output_dir, "prices.parquet")
)