# 02: Sentiment Analysis of News and Its Effect on Company’s Stock

This notebook works with a single deduplicated news file produced in Notebook 01:

- `outputs_01/news_deduped_2019_2023.csv`

Pipeline:

1. Load the ticker universe from `outputs_01/selected_equities_only_2019_2023.csv`
2. Run a column-completeness audit for the news file
3. Evaluate multiple text variants with FinBERT and select the best one
4. Build daily sentiment aggregates using 8 models
5. Merge daily sentiment with daily price returns and build the master dataset
6. Save all artifacts under `outputs_01/`


## Блок 1: Импорты и базовые настройки

In [1]:
import os
import re
import gc
import json
import shutil
import warnings
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from tqdm import tqdm
from pandas.util import hash_pandas_object

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

from textblob import TextBlob

import nltk
from nltk.sentiment import SentimentIntensityAnalyzer

from IPython.display import display

warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"

pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", 180)
pd.set_option("display.width", 200)

## Блок 2: Конфигурация проекта

- период анализа (2019–2023),
- пути к данным,
- список тикеров (из ноутбука 01),
- параметры чанков,
- поведение чекпоинтов

Записываем в `outputs_final/`

In [2]:
START_DATE = pd.Timestamp("2019-01-01")
END_DATE = pd.Timestamp("2023-12-31")

OUTPUT_DIR = "outputs_01"

NEWS_SOURCES = {
    "nasdaq": os.path.join(OUTPUT_DIR, "news_deduped_2019_2023.csv"),
}

SELECTED_EQUITIES_PATH = os.path.join(OUTPUT_DIR, "selected_equities_only_2019_2023.csv")

PRICES_DIR = "full_history/full_history"

CHUNK_SIZE = 200000
MAX_NEWS_PER_DAY = None

RESUME_FROM_CHECKPOINTS = True
FORCE_RECOMPUTE_CHECKPOINTS = False

SKIP_IF_DAILY_EXISTS = True
FORCE_REBUILD_DAILY = False

RESET_CHECKPOINT_DIR = False

RUN_TAG = "allnews" if (MAX_NEWS_PER_DAY is None or int(MAX_NEWS_PER_DAY) <= 0) else f"max{int(MAX_NEWS_PER_DAY)}"

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("DEVICE:", DEVICE)

os.makedirs(OUTPUT_DIR, exist_ok=True)

CHECKPOINT_DIR = os.path.join(OUTPUT_DIR, "checkpoints", RUN_TAG)
if RESET_CHECKPOINT_DIR and os.path.exists(CHECKPOINT_DIR):
    shutil.rmtree(CHECKPOINT_DIR)

os.makedirs(CHECKPOINT_DIR, exist_ok=True)

print("OUTPUT_DIR:", OUTPUT_DIR)
print("CHECKPOINT_DIR:", CHECKPOINT_DIR)


DEVICE: cpu
OUTPUT_DIR: outputs_final
CHECKPOINT_DIR: outputs_final/checkpoints/allnews


## Блок 3: Проверка входных файлов и загрузка списка тикеров

Проверка наличия
- `selected_equities_only_2019_2023.csv`,
- CSV с новостями,
- папки с ценами

Дальше загружаем тикеры и собираем `TICKER_SET`

In [3]:
if not os.path.exists(SELECTED_EQUITIES_PATH):
    raise FileNotFoundError(f"Ticker file not found: {SELECTED_EQUITIES_PATH}")

missing_news = [fp for fp in NEWS_SOURCES.values() if not os.path.exists(fp)]
if missing_news:
    raise FileNotFoundError(f"News file(s) not found: {missing_news}")

if not os.path.isdir(PRICES_DIR):
    raise FileNotFoundError(f"Prices directory not found: {PRICES_DIR}")

companies = pd.read_csv(SELECTED_EQUITIES_PATH, low_memory=False)
companies["ticker"] = companies["ticker"].astype(str).str.strip()
TICKER_SET = set(companies["ticker"].tolist())

print("Tickers in universe:", len(TICKER_SET))
display(companies.head(5))


Тикеров в выборке: 2193
Колонки в companies: ['ticker', 'company_name', 'sector', 'industry', 'exchange', 'quoteType', 'price_rows', 'price_min_date', 'price_max_date', 'coverage_pct', 'starts_in_2019', 'ends_in_2023', 'full_coverage', 'news_count', 'news_min_date', 'news_max_date']


Unnamed: 0,ticker,company_name,sector,industry,exchange,quoteType,price_rows,price_min_date,price_max_date,coverage_pct,starts_in_2019,ends_in_2023,full_coverage,news_count,news_min_date,news_max_date
0,TSLA,"Tesla, Inc.",Consumer Cyclical,Auto Manufacturers,NMS,EQUITY,1257,2019-01-02,2023-12-28,96.4,True,True,True,12462,2019-07-01 00:00:00,2023-12-16 22:00:00
1,INTC,Intel Corporation,Technology,Semiconductors,NMS,EQUITY,1257,2019-01-02,2023-12-28,96.4,True,True,True,11178,2019-01-01 00:00:00,2023-12-16 10:00:00
2,GOOG,Alphabet Inc.,Communication Services,Internet Content & Information,NMS,EQUITY,1257,2019-01-02,2023-12-28,96.4,True,True,True,11040,2019-01-02 00:00:00,2023-12-16 23:00:00
3,DIS,The Walt Disney Company,Communication Services,Entertainment,NYQ,EQUITY,1257,2019-01-02,2023-12-28,96.4,True,True,True,10597,2019-06-13 00:00:00,2023-12-16 19:00:00
4,AAPL,Apple Inc.,Technology,Consumer Electronics,NMS,EQUITY,1257,2019-01-02,2023-12-28,96.4,True,True,True,9811,2020-03-09 00:00:00,2023-12-16 22:00:00


## Блок 4: Нормализация и фильтрация новостей (общий препроцессинг)

Цель: привести два датасета новостей к общей схеме и сразу отфильтровать все нерелевантное

Ключевые правила:
- оставляем только строки, где заполнен `Stock_symbol` (это привязка к тикеру),
- парсим `Date` в datetime и строим календарный `day`,
- фильтруем по `START_DATE - END_DATE`,
- фильтруем по тикерам из `TICKER_SET`,
- требуем непустой `Url` (он нужен для дедупликации),
- убираем дубли внутри чанка по `(ticker, day, url)`

In [4]:
NEWS_COLS_RAW = [
    "Date", "Article_title", "Stock_symbol", "Url", "Publisher", "Author",
    "Article", "Lsa_summary", "Luhn_summary", "Textrank_summary", "Lexrank_summary",
]

def print_step(title: str):
    line = "=" * 88
    print("\n" + line)
    print(title)
    print(line)

def _clean_str(s: pd.Series) -> pd.Series:
    return s.fillna("").astype(str).str.strip()

def _normalize_news_cols(df: pd.DataFrame) -> pd.DataFrame:
    ren = {}
    if "Date" in df.columns and "date" not in df.columns:
        ren["Date"] = "date"
    if "Stock_symbol" in df.columns and "ticker" not in df.columns:
        ren["Stock_symbol"] = "ticker"
    if "Url" in df.columns and "url" not in df.columns:
        ren["Url"] = "url"
    if "Article_title" in df.columns and "title" not in df.columns:
        ren["Article_title"] = "title"
    if "Article" in df.columns and "article" not in df.columns:
        ren["Article"] = "article"
    if "Lsa_summary" in df.columns and "lsa_summary" not in df.columns:
        ren["Lsa_summary"] = "lsa_summary"
    if "Luhn_summary" in df.columns and "luhn_summary" not in df.columns:
        ren["Luhn_summary"] = "luhn_summary"
    if "Textrank_summary" in df.columns and "textrank_summary" not in df.columns:
        ren["Textrank_summary"] = "textrank_summary"
    if "Lexrank_summary" in df.columns and "lexrank_summary" not in df.columns:
        ren["Lexrank_summary"] = "lexrank_summary"
    if "Publisher" in df.columns and "publisher" not in df.columns:
        ren["Publisher"] = "publisher"
    if "Author" in df.columns and "author" not in df.columns:
        ren["Author"] = "author"
    return df.rename(columns=ren) if ren else df

def _ensure_cols(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    df = df.copy()
    for c in cols:
        if c not in df.columns:
            df[c] = np.nan
    return df

def _prep_news_chunk_base(raw: pd.DataFrame, source_tag: str) -> pd.DataFrame:
    ch = _normalize_news_cols(raw)

    need = [
        "date", "ticker", "url", "title", "article",
        "lsa_summary", "luhn_summary", "textrank_summary", "lexrank_summary",
        "publisher", "author"
    ]
    ch = _ensure_cols(ch, need)

    ch["ticker"] = _clean_str(ch["ticker"])
    ch["url"] = _clean_str(ch["url"])

    ch = ch[ch["ticker"] != ""]
    if ch.empty:
        return ch.iloc[0:0].copy()

    ch["date"] = pd.to_datetime(ch["date"], errors="coerce", utc=True).dt.tz_localize(None)
    ch = ch.dropna(subset=["date"])
    if ch.empty:
        return ch.iloc[0:0].copy()

    ch["day"] = ch["date"].dt.floor("D")
    ch = ch[(ch["day"] >= START_DATE) & (ch["day"] <= END_DATE)]
    if ch.empty:
        return ch.iloc[0:0].copy()

    ch = ch[ch["ticker"].isin(TICKER_SET)]
    if ch.empty:
        return ch.iloc[0:0].copy()

    if MAX_NEWS_PER_DAY is not None and int(MAX_NEWS_PER_DAY) > 0:
        ch = (
            ch.sort_values(["ticker", "day"], kind="mergesort")
              .groupby(["ticker", "day"], as_index=False, sort=False)
              .head(int(MAX_NEWS_PER_DAY))
        )

    url = _clean_str(ch["url"]).str.lower()
    title = _clean_str(ch["title"]).str.lower()
    day_s = ch["day"].dt.strftime("%Y-%m-%d")
    ticker = ch["ticker"].astype(str)

    key = np.where(url != "", (url + "||" + ticker).values, (day_s + "||" + title + "||" + ticker).values)
    h = hash_pandas_object(pd.Series(key), index=False).astype("uint64")
    ch = ch.loc[~h.duplicated()].copy()

    ch["source"] = source_tag

    keep_cols = [
        "ticker", "day", "url",
        "title", "article",
        "lsa_summary", "luhn_summary", "textrank_summary", "lexrank_summary",
        "publisher", "author",
        "source"
    ]
    return ch[keep_cols].reset_index(drop=True)

def build_text_variant(df: pd.DataFrame, variant: str) -> pd.Series:
    title = _clean_str(df["title"]) if "title" in df.columns else pd.Series([""] * len(df), index=df.index)
    article = _clean_str(df["article"]) if "article" in df.columns else pd.Series([""] * len(df), index=df.index)

    lsa = _clean_str(df["lsa_summary"]) if "lsa_summary" in df.columns else pd.Series([""] * len(df), index=df.index)
    luhn = _clean_str(df["luhn_summary"]) if "luhn_summary" in df.columns else pd.Series([""] * len(df), index=df.index)
    tr = _clean_str(df["textrank_summary"]) if "textrank_summary" in df.columns else pd.Series([""] * len(df), index=df.index)
    lex = _clean_str(df["lexrank_summary"]) if "lexrank_summary" in df.columns else pd.Series([""] * len(df), index=df.index)

    if variant == "title_article":
        comb = (title + ". " + article).str.replace(r"\s+", " ", regex=True).str.strip()
        out = pd.Series([""] * len(df), index=df.index, dtype="string")
        both = (title != "") & (article != "")
        only_t = (title != "") & (article == "")
        only_a = (title == "") & (article != "")
        out.loc[both] = comb.loc[both]
        out.loc[only_t] = title.loc[only_t]
        out.loc[only_a] = article.loc[only_a]
        return out.astype(str).str.replace(r"\s+", " ", regex=True).str.strip()

    if variant == "lsa_summary":
        return lsa.where(lsa != "", "")
    if variant == "luhn_summary":
        return luhn.where(luhn != "", "")
    if variant == "textrank_summary":
        return tr.where(tr != "", "")
    if variant == "lexrank_summary":
        return lex.where(lex != "", "")

    raise ValueError(f"Unknown text variant: {variant}")


## Блок 5: Аудит заполненности колонок для каждого датасета

Мы считаем `NaN` **в двух режимах**:
1) по всем строкам файла
2) **после фильтрации**: только строки с `Stock_symbol`, тикером из `TICKER_SET`, датой 2019–2023 и непустым `Url`

Также пытаемся вывести **5 строк**, где заполнены **все интересующие нас столбцы**

In [5]:
AUDIT_COLS = [
    "Date", "Stock_symbol", "Article_title", "Url", "Publisher", "Author",
    "Lsa_summary", "Luhn_summary", "Textrank_summary", "Lexrank_summary"
]

def audit_nan_counts(file_path: str, source_tag: str, max_chunks: Optional[int] = None) -> pd.DataFrame:
    header = pd.read_csv(file_path, nrows=0, low_memory=False)
    usecols = [c for c in AUDIT_COLS if c in header.columns]
    missing = [c for c in AUDIT_COLS if c not in header.columns]

    print(f"SOURCE: {source_tag} | FILE: {file_path}")
    print("Size (MB):", round(os.path.getsize(file_path)/1024/1024, 1))
    print("Usecols:", usecols)
    if missing:
        print("Missing columns in this file:", missing)

    total = 0
    nan_total = pd.Series(0, index=usecols, dtype="int64")

    total_f = 0
    nan_f = pd.Series(0, index=usecols, dtype="int64")

    examples_full = []
    examples_full_needed = 5

    reader = pd.read_csv(file_path, usecols=usecols, chunksize=CHUNK_SIZE, low_memory=False)

    for i, raw in enumerate(tqdm(reader, desc=f"audit {source_tag}")):
        if (max_chunks is not None) and (i >= int(max_chunks)):
            break

        total += len(raw)
        nan_total += raw[usecols].isna().sum().astype("int64")

        ch = _normalize_news_cols(raw)
        ch = _ensure_cols(ch, ["date","ticker","url","title","publisher","author",
                               "lsa_summary","luhn_summary","textrank_summary","lexrank_summary"])

        ch["ticker"] = _clean_str(ch["ticker"])
        ch["url"] = _clean_str(ch["url"])
        ch = ch[ch["ticker"] != ""]
        if ch.empty:
            continue

        ch["date"] = pd.to_datetime(ch["date"], errors="coerce", utc=True).dt.tz_localize(None)
        ch = ch.dropna(subset=["date"])
        if ch.empty:
            continue
        ch["day"] = ch["date"].dt.floor("D")
        ch = ch[(ch["day"] >= START_DATE) & (ch["day"] <= END_DATE)]
        ch = ch[ch["ticker"].isin(TICKER_SET)]
        ch = ch[ch["url"] != ""]
        if ch.empty:
            continue

        total_f += len(ch)

        ch_view = ch.copy()
        remap = {
            "date": "Date", "ticker": "Stock_symbol", "url": "Url", "title": "Article_title",
            "publisher": "Publisher", "author": "Author",
            "lsa_summary": "Lsa_summary", "luhn_summary": "Luhn_summary",
            "textrank_summary": "Textrank_summary", "lexrank_summary": "Lexrank_summary",
        }
        inv = {v:k for k,v in remap.items()}

        for c in usecols:
            nc = inv.get(c)
            if (nc is not None) and (nc in ch_view.columns):
                nan_f[c] += ch_view[nc].isna().sum()
            else:
                nan_f[c] += len(ch_view)

        if len(examples_full) < examples_full_needed:
            tmp = ch_view.copy()
            ex = pd.DataFrame(index=tmp.index)
            for c in usecols:
                nc = inv.get(c)
                if (nc is not None) and (nc in tmp.columns):
                    ex[c] = tmp[nc]
                else:
                    ex[c] = np.nan

            m = pd.Series(True, index=ex.index)
            for c in usecols:
                m &= ex[c].notna() & (_clean_str(ex[c]) != "")
            full_rows = ex.loc[m].head(examples_full_needed - len(examples_full))
            if not full_rows.empty:
                examples_full.extend(full_rows.to_dict(orient="records"))

    out = pd.DataFrame({"column": usecols})
    out["nan_total"] = out["column"].map(nan_total).astype("int64")
    out["nan_total_pct"] = (out["nan_total"] / max(total,1) * 100).round(2)

    out["nan_after_filters"] = out["column"].map(nan_f).astype("int64")
    out["nan_after_filters_pct"] = (out["nan_after_filters"] / max(total_f,1) * 100).round(2)

    print("Rows total:", f"{total:,}")
    print("Rows after filters:", f"{total_f:,}", f"({(total_f/max(total,1)*100):.2f}%)")

    print("\nNaN summary:")
    display(out.sort_values("nan_after_filters_pct", ascending=False))

    print("\nExamples (5 rows) with ALL columns filled:")
    if examples_full:
        display(pd.DataFrame(examples_full)[usecols])
    else:
        print("Не найдено строк, где заполнены ВСЕ указанные столбцы")

    return out

audit_tables = {}
for tag, fp in NEWS_SOURCES.items():
    audit_tables[tag] = audit_nan_counts(fp, tag, max_chunks=None)

SOURCE: all_external | FILE: All_external.csv
Size (MB): 5465.9
Usecols: ['Date', 'Stock_symbol', 'Article_title', 'Url', 'Publisher', 'Author', 'Lsa_summary', 'Luhn_summary', 'Textrank_summary', 'Lexrank_summary']


audit all_external: 66it [00:34,  1.91it/s]

Rows total: 13,057,514
Rows after filters: 335,004 (2.57%)

NaN summary:





Unnamed: 0,column,nan_total,nan_total_pct,nan_after_filters,nan_after_filters_pct
5,Author,11871199,90.91,335004,100.0
6,Lsa_summary,13057514,100.0,335004,100.0
7,Luhn_summary,13057514,100.0,335004,100.0
8,Textrank_summary,13057514,100.0,335004,100.0
9,Lexrank_summary,13057514,100.0,335004,100.0
0,Date,0,0.0,0,0.0
1,Stock_symbol,9804627,75.09,0,0.0
2,Article_title,1,0.0,0,0.0
3,Url,686,0.01,0,0.0
4,Publisher,9030871,69.16,0,0.0



Examples (5 rows) with ALL columns filled:
Не найдено строк, где заполнены ВСЕ указанные столбцы
SOURCE: nasdaq | FILE: nasdaq_exteral_data.csv
Size (MB): 22156.7
Usecols: ['Date', 'Stock_symbol', 'Article_title', 'Url', 'Publisher', 'Author', 'Lsa_summary', 'Luhn_summary', 'Textrank_summary', 'Lexrank_summary']


audit nasdaq: 78it [01:39,  1.27s/it]

Rows total: 15,549,299
Rows after filters: 1,253,430 (8.06%)

NaN summary:





Unnamed: 0,column,nan_total,nan_total_pct,nan_after_filters,nan_after_filters_pct
5,Author,14362984,92.37,1253430,100.0
4,Publisher,11522656,74.1,918426,73.27
6,Lsa_summary,13057522,83.97,335005,26.73
7,Luhn_summary,13057521,83.97,335004,26.73
8,Textrank_summary,13057521,83.97,335004,26.73
9,Lexrank_summary,13057521,83.97,335004,26.73
0,Date,0,0.0,0,0.0
1,Stock_symbol,9804627,63.06,0,0.0
2,Article_title,1,0.0,0,0.0
3,Url,686,0.0,0,0.0



Examples (5 rows) with ALL columns filled:
Не найдено строк, где заполнены ВСЕ указанные столбцы


## Блок 6: Проверка дубликатов URL между источниками

Один и тот же материал может оказаться в обоих CSV -> если считать оба, мы **удвоим** вес новости

Что делаем:
1. Считаем пересечение URL (после фильтра тикеров/дат/Stock_symbol)
2. Оцениваем полноту источников: среднее число заполненных **текстовых** полей на строку (`Article_title`, `Article`, summaries)
3. Выбираем **предпочтительный источник** для дубликатов: если URL встречается в обоих, берем строку из более полного источника
4. Строим список ключей дедупликации `key = hash(url || ticker)` для предпочтительного источника и сохраняем: `outputs_final/dedup/preferred_keys__<source>.npy`

Дальше этот список будет использоваться, чтобы **пропускать дубликаты** во втором источнике

In [6]:
DEDUP_DIR = os.path.join(OUTPUT_DIR, "dedup")
os.makedirs(DEDUP_DIR, exist_ok=True)

preferred_source = list(NEWS_SOURCES.keys())[0]
preferred_keys_sorted = None

def _hash_url_ticker(df: pd.DataFrame) -> np.ndarray:
    if "url" not in df.columns or "ticker" not in df.columns:
        return np.zeros(len(df), dtype="uint64")
    key = _clean_str(df["url"]) + "||" + _clean_str(df["ticker"])
    return hash_pandas_object(key, index=False).astype("uint64").to_numpy()

def _hash_url_only(df: pd.DataFrame) -> np.ndarray:
    if "url" not in df.columns:
        return np.zeros(len(df), dtype="uint64")
    key = _clean_str(df["url"])
    return hash_pandas_object(key, index=False).astype("uint64").to_numpy()

def _is_dup_by_keys(keys_sorted: np.ndarray, keys: np.ndarray) -> np.ndarray:
    if keys_sorted is None or len(keys_sorted) == 0:
        return np.zeros(len(keys), dtype=bool)
    idx = np.searchsorted(keys_sorted, keys)
    return (idx < len(keys_sorted)) & (keys_sorted[idx] == keys)



Скан источников для оценки полноты и пересечения URL


scan all_external: 66it [00:45,  1.45it/s]


all_external rows_filtered: 333,251 | unique_url: 207,042 | mean_filled_text_fields: 1.0


scan nasdaq: 78it [02:24,  1.86s/it]


nasdaq rows_filtered: 1,251,664 | unique_url: 761,320 | mean_filled_text_fields: 4.669

URL overlap (url-only): 207,042
URL overlap (url,ticker): 333,177

Preferred source for duplicates: nasdaq
Other source: all_external
Saved preferred keys: outputs_final/dedup/preferred_keys__nasdaq.npy | count: 1251590


606

## Блок 7: Сентимент по разным вариантам текста

### Кандидаты текста (5 вариантов)
- `title_article` = `Article_title + Article`
- `lsa_summary`
- `luhn_summary`
- `textrank_summary`
- `lexrank_summary`

Если для конкретного варианта поле отсутствует/пустое -> строка **не участвует** для этого варианта

Для каждого источника и варианта:
- покрытие (`coverage`) = доля строк после фильтров, где текст для варианта доступен
- распределение сентимента через **FinBERT**:
  - средняя нейтральность `avg_neu_share` (по вероятности neutral)
  - `non_neutral = 1 - avg_neu_share`
  - интенсивность `mean_abs_score = mean(|p_pos - p_neg|)`

### Как выбираем лучший вариант
Композитный балл:

`score = coverage * non_neutral * mean_abs_score`

и выбираем максимум **отдельно для каждого источника**

In [7]:
TEXT_VARIANTS = ["title_article", "lsa_summary", "luhn_summary", "textrank_summary", "lexrank_summary"]

def _load_hf_model(model_id: str):
    tok = AutoTokenizer.from_pretrained(model_id)
    mdl = AutoModelForSequenceClassification.from_pretrained(model_id).to(DEVICE)
    mdl.eval()
    raw = getattr(mdl.config, "id2label", {})
    id2lab = {int(k): str(v).lower() for k, v in raw.items()} if isinstance(raw, dict) else {}
    return tok, mdl, id2lab

def _resolve_sentiment_slots(id2lab: dict):
    def pick(keys):
        for i, lab in id2lab.items():
            lab2 = str(lab).lower()
            for k in keys:
                if k in lab2:
                    return int(i)
        return None

    neg = pick(["negative", "neg", "bearish"])
    pos = pick(["positive", "pos", "bullish"])
    neu = pick(["neutral", "neu", "none"])
    return neg, neu, pos

def _hf_predict(texts: List[str], tok, mdl, neg_i, neu_i, pos_i,
                batch_size: int = 64, max_length: int = 192):
    n = len(texts)
    pneg = np.zeros(n, dtype=np.float32)
    pneu = np.zeros(n, dtype=np.float32)
    ppos = np.zeros(n, dtype=np.float32)

    with torch.no_grad():
        for a in range(0, n, batch_size):
            b = texts[a:a + batch_size]
            enc = tok(
                b,
                padding=True,
                truncation=True,
                max_length=int(max_length),
                return_tensors="pt"
            )
            enc = {k: v.to(DEVICE) for k, v in enc.items()}
            logits = mdl(**enc).logits
            pr = torch.softmax(logits, dim=-1).detach().cpu().numpy().astype(np.float32)

            if neg_i is None or pos_i is None:
                if pr.shape[1] == 2:
                    pneg[a:a + len(b)] = pr[:, 0]
                    ppos[a:a + len(b)] = pr[:, 1]
                    pneu[a:a + len(b)] = 0.0
                else:
                    raise ValueError("Не удалось определить метки сентимента (neg/pos) для этой модели")
            else:
                pneg[a:a + len(b)] = pr[:, int(neg_i)]
                ppos[a:a + len(b)] = pr[:, int(pos_i)]
                pneu[a:a + len(b)] = pr[:, int(neu_i)] if neu_i is not None else 0.0

    score = (ppos - pneg).astype(np.float32)
    abs_score = np.abs(score).astype(np.float32)
    return score, abs_score, pneg, pneu, ppos

def _is_dup_by_keys(keys_sorted: np.ndarray, keys: np.ndarray) -> np.ndarray:
    idx = np.searchsorted(keys_sorted, keys)
    mask = (idx < len(keys_sorted)) & (keys_sorted[idx] == keys)
    return mask

def eval_text_variants_finbert(source_tag: str, file_path: str,
                              preferred_keys_sorted: Optional[np.ndarray],
                              is_preferred_source: bool,
                              batch_size: int = 64, max_length: int = 192,
                              model_id: str = "ProsusAI/finbert") -> pd.DataFrame:
    print_step(f"FinBERT eval: source={source_tag}")

    tok, mdl, id2lab = _load_hf_model(model_id)
    neg_i, neu_i, pos_i = _resolve_sentiment_slots(id2lab)

    metrics = {v: {"rows_after_filters": 0, "rows_with_text": 0,
                   "len_sum": 0, "neu_sum": 0.0, "abs_score_sum": 0.0}
               for v in TEXT_VARIANTS}

    reader = pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False)

    for raw in tqdm(reader, desc=f"FinBERT variants: {source_tag}"):
        ch = _prep_news_chunk_base(raw, source_tag)
        if ch.empty:
            continue

        if (not is_preferred_source) and (preferred_keys_sorted is not None) and len(preferred_keys_sorted) > 0:
            kh = _hash_url_ticker(ch)
            dup_mask = _is_dup_by_keys(preferred_keys_sorted, kh)
            if dup_mask.any():
                ch = ch.loc[~dup_mask].reset_index(drop=True)
                if ch.empty:
                    continue

        for v in TEXT_VARIANTS:
            metrics[v]["rows_after_filters"] += len(ch)

        for v in TEXT_VARIANTS:
            text = build_text_variant(ch, v)
            m = (text != "")
            if not m.any():
                continue

            texts = text.loc[m].tolist()
            metrics[v]["rows_with_text"] += len(texts)
            metrics[v]["len_sum"] += int(pd.Series(texts).str.len().sum())

            score, abs_score, pneg, pneu, ppos = _hf_predict(
                texts, tok, mdl, neg_i, neu_i, pos_i,
                batch_size=batch_size, max_length=max_length
            )

            metrics[v]["neu_sum"] += float(np.sum(pneu))
            metrics[v]["abs_score_sum"] += float(np.sum(abs_score))

        del ch
        gc.collect()

    rows = []
    for v in TEXT_VARIANTS:
        a = metrics[v]["rows_after_filters"]
        n = metrics[v]["rows_with_text"]

        coverage = (n / a) if a else 0.0
        avg_len = (metrics[v]["len_sum"] / n) if n else 0.0
        avg_neu = (metrics[v]["neu_sum"] / n) if n else 1.0
        non_neutral = 1.0 - avg_neu
        mean_abs_score = (metrics[v]["abs_score_sum"] / n) if n else 0.0

        score = coverage * non_neutral * mean_abs_score

        rows.append({
            "source": source_tag,
            "variant": v,
            "rows_after_filters": int(a),
            "rows_with_text": int(n),
            "coverage": float(coverage),
            "avg_len_chars": float(avg_len),
            "avg_neu_share": float(avg_neu),
            "non_neutral": float(non_neutral),
            "mean_abs_score": float(mean_abs_score),
            "composite_score": float(score),
        })

    out = pd.DataFrame(rows).sort_values("composite_score", ascending=False).reset_index(drop=True)
    return out

preferred_keys_sorted = None

variant_reports = []
for tag, fp in NEWS_SOURCES.items():
    rep = eval_text_variants_finbert(
        source_tag=tag,
        file_path=fp,
        preferred_keys_sorted=preferred_keys_sorted,
        is_preferred_source=True,
        batch_size=64,
        max_length=192,
        model_id="ProsusAI/finbert"
    )
    variant_reports.append(rep)
    print("Top variants for", tag)
    display(rep.head(10))

variant_report = pd.concat(variant_reports, ignore_index=True)
report_path = os.path.join(OUTPUT_DIR, "text_variant_finbert_report.csv")
variant_report.to_csv(report_path, index=False)
print("Saved:", report_path)



FinBERT eval: source=all_external


FinBERT variants: all_external: 66it [00:44,  1.47it/s]


Top variants for all_external





Unnamed: 0,source,variant,rows_after_filters,rows_with_text,coverage,avg_len_chars,avg_neu_share,non_neutral,mean_abs_score,composite_score
0,all_external,title_article,0,0,0.0,0.0,1.0,0.0,0.0,0.0
1,all_external,lsa_summary,0,0,0.0,0.0,1.0,0.0,0.0,0.0
2,all_external,luhn_summary,0,0,0.0,0.0,1.0,0.0,0.0,0.0
3,all_external,textrank_summary,0,0,0.0,0.0,1.0,0.0,0.0,0.0
4,all_external,lexrank_summary,0,0,0.0,0.0,1.0,0.0,0.0,0.0



FinBERT eval: source=nasdaq


FinBERT variants: nasdaq: 78it [55:22:37, 2555.86s/it] 


Top variants for nasdaq





Unnamed: 0,source,variant,rows_after_filters,rows_with_text,coverage,avg_len_chars,avg_neu_share,non_neutral,mean_abs_score,composite_score
0,nasdaq,title_article,1251664,918413,0.733754,5252.162846,0.380678,0.619322,0.51912,0.235904
1,nasdaq,lsa_summary,1251664,918412,0.733753,574.253755,0.386465,0.613535,0.519983,0.234087
2,nasdaq,lexrank_summary,1251664,918413,0.733754,520.792258,0.408596,0.591404,0.501867,0.217783
3,nasdaq,luhn_summary,1251664,918413,0.733754,585.485694,0.435732,0.564268,0.47686,0.197436
4,nasdaq,textrank_summary,1251664,918413,0.733754,615.212608,0.444163,0.555837,0.470008,0.191692


Saved: outputs_final/text_variant_finbert_report.csv


## Блок 8: Выбор лучшего варианта текста для каждого источника

На основании отчета `text_variant_finbert_report.csv` выбираем по одному лучшему варианту на источник.

Также сохраняем выбор в JSON, чтобы он был воспроизводим:
`outputs_final/selected_text_variants.json`

In [8]:
variant_report = pd.read_csv(os.path.join(OUTPUT_DIR, "text_variant_finbert_report.csv"))

selected_variants = {}
for src in variant_report["source"].unique():
    top = variant_report.loc[variant_report["source"] == src].sort_values("composite_score", ascending=False).iloc[0]
    selected_variants[src] = str(top["variant"])

print("Selected text variants per source:")
print(json.dumps(selected_variants, indent=2))

sel_path = os.path.join(OUTPUT_DIR, "selected_text_variants.json")
with open(sel_path, "w", encoding="utf-8") as f:
    json.dump({
        "preferred_source_for_duplicates": preferred_source,
        "selected_variants": selected_variants,
        "text_variants_considered": TEXT_VARIANTS,
        "date_range": [str(START_DATE.date()), str(END_DATE.date())],
    }, f, ensure_ascii=False, indent=2)

print("Saved:", sel_path)


Selected text variants per source:
{
  "all_external": "title_article",
  "nasdaq": "title_article"
}
Saved: outputs_final/selected_text_variants.json


## Блок 9: Доходности по акциям (`prices_returns_2019_2023.parquet`)

Мы строим дневные лог-доходности:

`r_t = ln(P_t) - ln(P_{t-1})`

где P_t — цена = **Adj Close**, чтобы учитывать сплиты и дивиденды корректно

Сохраняем результат в `outputs_final/prices_returns_2019_2023.parquet`

In [9]:
BASE_RET_COLS = ["ticker", "date", "price", "ret_log", "volume", "price_col_used"]

def _colmap_lower(df: pd.DataFrame) -> dict:
    return {str(c).strip().lower(): c for c in df.columns}

def _pick_price_col(df: pd.DataFrame) -> str:
    cm = _colmap_lower(df)
    candidates = ["adj close", "adjusted close", "adj_close", "adjclose", "close"]
    for key in candidates:
        if key in cm:
            return cm[key]
    raise ValueError("Price column not found")

def _pick_volume_col(df: pd.DataFrame) -> Optional[str]:
    cm = _colmap_lower(df)
    for key in ["volume", "vol"]:
        if key in cm:
            return cm[key]
    return None

def build_prices_returns(prices_dir: str, out_path: str) -> pd.DataFrame:
    rows = []
    missing_files = 0
    skipped_bad = 0

    for t in tqdm(sorted(TICKER_SET), desc="Reading prices"):
        fp = os.path.join(prices_dir, f"{t}.csv")
        if not os.path.exists(fp):
            missing_files += 1
            continue

        try:
            df = pd.read_csv(
                fp,
                usecols=lambda c: str(c).strip().lower() in {"date", "adj close", "close", "volume"},
                low_memory=False
            )
        except Exception:
            skipped_bad += 1
            continue

        if "Date" in df.columns and "date" not in df.columns:
            df = df.rename(columns={"Date": "date"})
        elif "date" not in df.columns:
            cm = _colmap_lower(df)
            if "date" in cm:
                df = df.rename(columns={cm["date"]: "date"})
            else:
                skipped_bad += 1
                continue

        df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
        df = df.dropna(subset=["date"])
        df = df[(df["date"] >= START_DATE) & (df["date"] <= END_DATE)]
        df = df.drop_duplicates(subset=["date"]).sort_values("date")

        if df.empty:
            continue

        try:
            price_col = _pick_price_col(df)
        except Exception:
            skipped_bad += 1
            continue

        df["price"] = pd.to_numeric(df[price_col], errors="coerce")
        df = df.dropna(subset=["price"])
        df = df[df["price"] > 0]
        if df.empty:
            continue

        df["ret_log"] = np.log(df["price"]).diff()

        vol_col = _pick_volume_col(df)
        if vol_col is not None:
            df["volume"] = pd.to_numeric(df[vol_col], errors="coerce")
        else:
            df["volume"] = np.nan

        df["ticker"] = str(t)
        df["price_col_used"] = str(price_col)

        rows.append(df[BASE_RET_COLS])

    if not rows:
        raise ValueError("No valid price files found")

    ret = (
        pd.concat(rows, ignore_index=True)
          .sort_values(["ticker", "date"], kind="mergesort")
          .reset_index(drop=True)
    )

    ret.to_parquet(out_path, index=False)
    print("Saved:", out_path)
    if missing_files:
        print("Missing ticker files:", int(missing_files))
    if skipped_bad:
        print("Skipped due to format/read errors:", int(skipped_bad))

    return ret

RET_PATH = os.path.join(OUTPUT_DIR, "prices_returns_2019_2023.parquet")

print_step("Building price returns")
if os.path.exists(RET_PATH):
    ret = pd.read_parquet(RET_PATH)
else:
    ret = build_prices_returns(PRICES_DIR, RET_PATH)

ret = ret[BASE_RET_COLS].copy()
ret["ticker"] = ret["ticker"].astype(str).str.strip()
ret["date"] = pd.to_datetime(ret["date"], errors="coerce").dt.normalize()
ret = ret.dropna(subset=["ticker", "date"])
ret = ret[ret["ticker"].isin(TICKER_SET)].copy()
ret = ret.sort_values(["ticker", "date"], kind="mergesort").reset_index(drop=True)

print("Rows:", f"{len(ret):,}", "Tickers:", int(ret["ticker"].nunique()))
print("Date range:", ret["date"].min().date(), "—", ret["date"].max().date())
display(ret["price_col_used"].value_counts().head(10))
print("ret_log NaN%:", float(ret["ret_log"].isna().mean()*100))



Сборка доходностей из CSV цен


Чтение цен по тикерам: 100%|█████████████| 2193/2193 [00:12<00:00, 175.68it/s]


Сохранено: outputs_final/prices_returns_2019_2023.parquet
Цены+доходности: строк 2,755,903, тикеров 2193
Период: 2019-01-02 — 2023-12-28
price_col_used (топ):


price_col_used
adj close    2755903
Name: count, dtype: int64


ret_log NaN%: 0.07957464395517548
0.001   -0.251266
0.010   -0.097432
0.500    0.000000
0.990    0.098719
0.999    0.240152
Name: ret_log, dtype: float64


## Блок 10: Рыночная доходность и избыточные доходности

Чтобы отделить общий рыночный шум, используем рыночный прокси `R_m,t` 

Будем использовать **SPY**

Избыточная лог‑доходность:

`excess_ret_log = ret_log - mkt_ret_log`

Если SPY недоступен, используем равновзвешенную доходность по всем тикерам выборки на дату

In [10]:
MARKET_TICKER = "SPY"
USE_YFINANCE_MARKET = False

def _make_unique_cols(cols):
    seen = {}
    out = []
    for c in cols:
        c = str(c).strip()
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}__{seen[c]}")
    return out

def _flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if isinstance(df.columns, pd.MultiIndex):
        flat = []
        for tup in df.columns.to_list():
            parts = [str(x) for x in tup if x not in [None, "", " "]]
            flat.append("_".join(parts))
        df.columns = _make_unique_cols(flat)
    else:
        df.columns = _make_unique_cols([str(c).strip() for c in df.columns])
    return df

def _find_col_by_keywords(columns, keywords):
    for c in columns:
        s = str(c).lower()
        if all(k in s for k in keywords):
            return c
    return None

def _market_returns_from_prices_dir(prices_dir: str, ticker: str) -> pd.DataFrame:
    fp = os.path.join(prices_dir, f"{ticker}.csv")
    if not os.path.exists(fp):
        raise FileNotFoundError(fp)

    df = pd.read_csv(
        fp,
        usecols=lambda c: str(c).strip().lower() in {"date", "adj close", "close"},
        low_memory=False
    )
    if "Date" in df.columns and "date" not in df.columns:
        df = df.rename(columns={"Date": "date"})
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
    df = df.dropna(subset=["date"]).drop_duplicates(subset=["date"]).sort_values("date")
    df = df[(df["date"] >= START_DATE) & (df["date"] <= END_DATE)]
    if df.empty:
        raise ValueError("Empty market series")

    price_col = _pick_price_col(df)
    df["mkt_price"] = pd.to_numeric(df[price_col], errors="coerce")
    df = df.dropna(subset=["mkt_price"]).copy()
    df["mkt_ret_log"] = np.log(df["mkt_price"]).diff()
    return df[["date", "mkt_price", "mkt_ret_log"]]

def _market_returns_from_yfinance(ticker: str) -> pd.DataFrame:
    import yfinance as yf

    df = yf.download(
        ticker,
        start=str(START_DATE.date()),
        end=str((END_DATE + pd.Timedelta(days=1)).date()),
        progress=False,
        auto_adjust=False,
        group_by="column",
    )

    if df is None or df.empty:
        raise ValueError("Empty yfinance data")

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [str(t[0]) for t in df.columns.to_list()]

    df = df.reset_index()
    df = _flatten_columns(df)

    date_col = "Date" if "Date" in df.columns else ("Datetime" if "Datetime" in df.columns else df.columns[0])

    price_col = _find_col_by_keywords(df.columns, ["adj", "close"])
    if price_col is None:
        price_col = _find_col_by_keywords(df.columns, ["close"])
    if price_col is None:
        raise KeyError("Close column not found")

    df = df.rename(columns={date_col: "date", price_col: "mkt_price"})
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
    df["mkt_price"] = pd.to_numeric(df["mkt_price"], errors="coerce")

    df = df.dropna(subset=["date", "mkt_price"]).sort_values("date").drop_duplicates(subset=["date"])
    df = df[(df["date"] >= START_DATE) & (df["date"] <= END_DATE)]
    df["mkt_ret_log"] = np.log(df["mkt_price"]).diff()
    return df[["date", "mkt_price", "mkt_ret_log"]]

print_step("Market return + excess_ret_log")

try:
    mkt = _market_returns_from_prices_dir(PRICES_DIR, MARKET_TICKER)
    print("Market proxy:", MARKET_TICKER, "(from prices directory)")
except Exception as e:
    if USE_YFINANCE_MARKET:
        mkt = _market_returns_from_yfinance(MARKET_TICKER)
        print("Market proxy:", MARKET_TICKER, "(yfinance)")
    else:
        raise e

mkt = _flatten_columns(mkt)
mkt["date"] = pd.to_datetime(mkt["date"], errors="coerce").dt.normalize()
mkt = mkt.dropna(subset=["date"]).drop_duplicates(subset=["date"]).sort_values("date")

ret = _flatten_columns(ret)
_drop_cols = [c for c in ret.columns if str(c).startswith("mkt_") or str(c).startswith("excess_ret_log")]
ret = ret.drop(columns=_drop_cols, errors="ignore")

_mkt_map = mkt.set_index("date")["mkt_ret_log"]
ret["mkt_ret_log"] = ret["date"].map(_mkt_map)
ret["excess_ret_log"] = ret["ret_log"] - ret["mkt_ret_log"]

print("mkt_ret_log non-NaN%:", float(ret["mkt_ret_log"].notna().mean()*100))
print("excess_ret_log non-NaN%:", float(ret["excess_ret_log"].notna().mean()*100))



Рыночная доходность + excess_ret_log
Market proxy: SPY (через yfinance)
Доля строк, где mkt_ret_log не NaN: 99.9%
Доля строк, где excess_ret_log не NaN: 99.9%


## Блок 11: Целевые переменные (горизонты 1/2/3/5 дней)

Нам нужны будущие реакции рынка на новости

Определим цели как сумму будущих **избыточных** доходностей:

`y1_ex = excess_ret_log(t+1)`

`y2_ex = excess_ret_log(t+1) + excess_ret_log(t+2)`

`y3_ex = excess_ret_log(t+1) + excess_ret_log(t+2) + excess_ret_log(t+3)`

`y5_ex = excess_ret_log(t+1) + excess_ret_log(t+2) + excess_ret_log(t+3) + excess_ret_log(t+4) + excess_ret_log(t+5)`

In [11]:
print_step("Цели y1/y2/y3/y5 по excess_ret_log")

ret = ret.sort_values(["ticker", "date"], kind="mergesort").reset_index(drop=True)
g_ex = ret.groupby("ticker", sort=False)["excess_ret_log"]

s1 = g_ex.shift(-1)
s2 = g_ex.shift(-2)
s3 = g_ex.shift(-3)
s4 = g_ex.shift(-4)
s5 = g_ex.shift(-5)

ret["y1_ex"] = s1.astype("float32")
ret["y2_ex"] = (s1 + s2).astype("float32")
ret["y3_ex"] = (s1 + s2 + s3).astype("float32")
ret["y5_ex"] = (s1 + s2 + s3 + s4 + s5).astype("float32")

for c in ["y1_ex", "y2_ex", "y3_ex", "y5_ex"]:
    print(f"{c}: доля NaN = {ret[c].isna().mean()*100:.2f}%")

trade = ret[["ticker", "date"]].rename(columns={"date": "trade_date"}).copy()
trade = trade.sort_values(["ticker", "trade_date"], kind="mergesort").reset_index(drop=True)


Цели y1/y2/y3/y5 по excess_ret_log
y1_ex: доля NaN = 0.08%
y2_ex: доля NaN = 0.16%
y3_ex: доля NaN = 0.24%
y5_ex: доля NaN = 0.40%


## Блок 12: Построение дневных агрегатов сентимента (8 моделей) по выбранным текстам

Когда для каждого источника выбран лучший вариант текста (`selected_text_variants.json`), строим финальные дневные агрегаты сентимента

- для preferred источника считаем все новости как есть,
- для второго источника пропускаем новости, чьи `(url,ticker)` уже встречались в preferred,
- агрегирование по `(ticker, day)`, затем суммируем источники на уровне сырой агрегации

In [12]:
with open(os.path.join(OUTPUT_DIR, "selected_text_variants.json"), "r", encoding="utf-8") as f:
    sel_meta = json.load(f)

selected_variants = sel_meta["selected_variants"]
preferred_source = sel_meta.get("preferred_source_for_duplicates") or list(selected_variants.keys())[0]
preferred_keys_sorted = None

print("Preferred source:", preferred_source)
print("Selected variants:", selected_variants)

def _checkpoint_path(prefix: str, source_tag: str, variant: str) -> str:
    return os.path.join(CHECKPOINT_DIR, f"{prefix}__{source_tag}__{variant}__rawagg.parquet")

def _reduce_parts(parts: List[pd.DataFrame]) -> pd.DataFrame:
    if not parts:
        return pd.DataFrame()
    df = pd.concat(parts, ignore_index=True)
    df["ticker"] = df["ticker"].astype(str).str.strip()
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.normalize()
    df = df.dropna(subset=["ticker", "date"])
    df = df.groupby(["ticker", "date"], sort=False).sum(numeric_only=True).reset_index()
    return df.sort_values(["ticker", "date"], kind="mergesort").reset_index(drop=True)

def _prep_news_chunk_variant(raw: pd.DataFrame, source_tag: str, variant: str,
                            skip_keys_sorted: Optional[np.ndarray]) -> pd.DataFrame:
    ch = _prep_news_chunk_base(raw, source_tag)
    if ch.empty:
        return ch.iloc[0:0].copy()

    if (skip_keys_sorted is not None) and len(skip_keys_sorted) > 0:
        kh = _hash_url_ticker(ch)
        dup_mask = _is_dup_by_keys(skip_keys_sorted, kh)
        if dup_mask.any():
            ch = ch.loc[~dup_mask].reset_index(drop=True)
            if ch.empty:
                return ch.iloc[0:0].copy()

    text = build_text_variant(ch, variant)
    m = (text != "")
    if not m.any():
        return ch.iloc[0:0].copy()

    out = ch.loc[m, ["ticker", "day"]].copy()
    out["text"] = text.loc[m].astype(str)
    return out.reset_index(drop=True)

def _vader_scorer():
    try:
        return SentimentIntensityAnalyzer()
    except Exception:
        nltk.download("vader_lexicon", quiet=True)
        return SentimentIntensityAnalyzer()

def _vader_file_rawagg(file_path: str, source_tag: str, variant: str,
                       skip_keys_sorted: Optional[np.ndarray],
                       band: float, sia: SentimentIntensityAnalyzer,
                       prefix: str = "vader") -> pd.DataFrame:
    parts = []
    buffer = []
    reader = pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False)

    for raw in tqdm(reader, desc=f"{prefix}: {source_tag}"):
        ch = _prep_news_chunk_variant(raw, source_tag, variant, skip_keys_sorted)
        if ch.empty:
            continue

        texts = ch["text"].astype(str).tolist()
        scores = np.fromiter((float(sia.polarity_scores(x)["compound"]) for x in texts), dtype=np.float32, count=len(texts))
        neg = (scores <= -float(band)).astype(np.int32)
        pos = (scores >= float(band)).astype(np.int32)

        tmp = ch[["ticker", "day"]].copy()
        tmp = tmp.rename(columns={"day": "date"})
        tmp["n"] = 1
        tmp["ssum"] = scores.astype(np.float32)
        tmp["nneg"] = neg
        tmp["npos"] = pos

        g = tmp.groupby(["ticker", "date"], sort=False).sum(numeric_only=True).reset_index()
        buffer.append(g)

        if len(buffer) >= 25:
            parts.append(_reduce_parts(buffer))
            buffer = []

    if buffer:
        parts.append(_reduce_parts(buffer))

    out = _reduce_parts(parts)
    return out

def build_vader_daily(band: float = 0.02) -> pd.DataFrame:
    sia = _vader_scorer()

    parts = []
    for tag, fp in NEWS_SOURCES.items():
        variant = selected_variants[tag]
        skip = None if tag == preferred_source else preferred_keys_sorted

        cp = _checkpoint_path("vader", tag, variant)
        if RESUME_FROM_CHECKPOINTS and os.path.exists(cp) and not FORCE_RECOMPUTE_CHECKPOINTS:
            part = pd.read_parquet(cp)
        else:
            part = _vader_file_rawagg(fp, tag, variant, skip, band=band, sia=sia)
            part.to_parquet(cp, index=False)
        parts.append(part)

    agg = _reduce_parts(parts)
    if agg.empty:
        raise ValueError("VADER: empty result")

    g = agg.copy()
    g["vader_news_count"] = g["n"].round().astype("int32")
    g["vader_score_mean"] = np.where(g["n"] > 0, g["ssum"] / g["n"], np.nan).astype("float32")
    g["vader_neg_share"] = np.where(g["n"] > 0, g["nneg"] / g["n"], np.nan).astype("float32")
    g["vader_pos_share"] = np.where(g["n"] > 0, g["npos"] / g["n"], np.nan).astype("float32")
    g["vader_neu_share"] = (1.0 - g["vader_neg_share"] - g["vader_pos_share"]).clip(0.0, 1.0).astype("float32")
    g["vader_active_score_mean"] = g["vader_score_mean"]
    g = g.drop(columns=["n", "ssum", "nneg", "npos"])

    out_path = os.path.join(OUTPUT_DIR, "daily_news_vader_2019_2023.parquet")
    g.to_parquet(out_path, index=False)
    print("Saved:", out_path, "Rows:", f"{len(g):,}")
    return g

def _textblob_file_rawagg(file_path: str, source_tag: str, variant: str,
                          skip_keys_sorted: Optional[np.ndarray],
                          band: float, prefix: str = "textblob") -> pd.DataFrame:
    parts = []
    buffer = []
    reader = pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False)

    def _tb(text: str) -> float:
        try:
            return float(TextBlob(text).sentiment.polarity)
        except Exception:
            return 0.0

    for raw in tqdm(reader, desc=f"{prefix}: {source_tag}"):
        ch = _prep_news_chunk_variant(raw, source_tag, variant, skip_keys_sorted)
        if ch.empty:
            continue

        texts = ch["text"].astype(str).tolist()
        scores = np.fromiter((_tb(x) for x in texts), dtype=np.float32, count=len(texts))
        neg = (scores <= -float(band)).astype(np.int32)
        pos = (scores >= float(band)).astype(np.int32)

        tmp = ch[["ticker", "day"]].copy()
        tmp = tmp.rename(columns={"day": "date"})
        tmp["n"] = 1
        tmp["ssum"] = scores.astype(np.float32)
        tmp["nneg"] = neg
        tmp["npos"] = pos

        g = tmp.groupby(["ticker", "date"], sort=False).sum(numeric_only=True).reset_index()
        buffer.append(g)

        if len(buffer) >= 25:
            parts.append(_reduce_parts(buffer))
            buffer = []

    if buffer:
        parts.append(_reduce_parts(buffer))

    out = _reduce_parts(parts)
    return out

def build_textblob_daily(band: float = 0.02) -> pd.DataFrame:
    parts = []
    for tag, fp in NEWS_SOURCES.items():
        variant = selected_variants[tag]
        skip = None if tag == preferred_source else preferred_keys_sorted

        cp = _checkpoint_path("textblob", tag, variant)
        if RESUME_FROM_CHECKPOINTS and os.path.exists(cp) and not FORCE_RECOMPUTE_CHECKPOINTS:
            part = pd.read_parquet(cp)
        else:
            part = _textblob_file_rawagg(fp, tag, variant, skip, band=band)
            part.to_parquet(cp, index=False)
        parts.append(part)

    agg = _reduce_parts(parts)
    if agg.empty:
        raise ValueError("TextBlob: empty result")

    g = agg.copy()
    g["textblob_news_count"] = g["n"].round().astype("int32")
    g["textblob_score_mean"] = np.where(g["n"] > 0, g["ssum"] / g["n"], np.nan).astype("float32")
    g["textblob_neg_share"] = np.where(g["n"] > 0, g["nneg"] / g["n"], np.nan).astype("float32")
    g["textblob_pos_share"] = np.where(g["n"] > 0, g["npos"] / g["n"], np.nan).astype("float32")
    g["textblob_neu_share"] = (1.0 - g["textblob_neg_share"] - g["textblob_pos_share"]).clip(0.0, 1.0).astype("float32")
    g["textblob_active_score_mean"] = g["textblob_score_mean"]
    g = g.drop(columns=["n", "ssum", "nneg", "npos"])

    out_path = os.path.join(OUTPUT_DIR, "daily_news_textblob_2019_2023.parquet")
    g.to_parquet(out_path, index=False)
    print("Saved:", out_path, "Rows:", f"{len(g):,}")
    return g

def _hf_file_rawagg(file_path: str, source_tag: str, variant: str,
                    skip_keys_sorted: Optional[np.ndarray],
                    tok, mdl, neg_i, neu_i, pos_i,
                    batch_size: int = 64, max_length: int = 192,
                    prefix: str = "hf") -> pd.DataFrame:
    parts = []
    buffer = []
    reader = pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False)

    for raw in tqdm(reader, desc=f"{prefix}: {source_tag}"):
        ch = _prep_news_chunk_variant(raw, source_tag, variant, skip_keys_sorted)
        if ch.empty:
            continue

        texts = ch["text"].astype(str).tolist()
        score, abs_score, pneg, pneu, ppos = _hf_predict(
            texts, tok, mdl, neg_i, neu_i, pos_i,
            batch_size=batch_size, max_length=max_length
        )

        mass = (ppos + pneg).astype(np.float32)
        active = (score / (mass + 1e-9)).astype(np.float32)
        pneg_a = (pneg / (mass + 1e-9)).astype(np.float32)
        ppos_a = (ppos / (mass + 1e-9)).astype(np.float32)

        tmp = ch[["ticker", "day"]].copy()
        tmp = tmp.rename(columns={"day": "date"})
        tmp["n"] = 1
        tmp["ssum"] = score.astype(np.float32)
        tmp["asum"] = active.astype(np.float32)
        tmp["neg"] = pneg.astype(np.float32)
        tmp["neu"] = pneu.astype(np.float32)
        tmp["pos"] = ppos.astype(np.float32)
        tmp["neg_a"] = pneg_a.astype(np.float32)
        tmp["pos_a"] = ppos_a.astype(np.float32)

        g = tmp.groupby(["ticker", "date"], sort=False).sum(numeric_only=True).reset_index()
        buffer.append(g)

        if len(buffer) >= 25:
            parts.append(_reduce_parts(buffer))
            buffer = []

    if buffer:
        parts.append(_reduce_parts(buffer))

    out = _reduce_parts(parts)
    return out

def build_hf_daily(model_id: str, prefix: str,
                   batch_size: int = 64, max_length: int = 192) -> pd.DataFrame:
    tok, mdl, id2lab = _load_hf_model(model_id)
    neg_i, neu_i, pos_i = _resolve_sentiment_slots(id2lab)

    parts = []
    for tag, fp in NEWS_SOURCES.items():
        variant = selected_variants[tag]
        skip = None if tag == preferred_source else preferred_keys_sorted

        cp = _checkpoint_path(prefix, tag, variant)
        if RESUME_FROM_CHECKPOINTS and os.path.exists(cp) and not FORCE_RECOMPUTE_CHECKPOINTS:
            part = pd.read_parquet(cp)
        else:
            part = _hf_file_rawagg(
                fp, tag, variant, skip,
                tok, mdl, neg_i, neu_i, pos_i,
                batch_size=batch_size, max_length=max_length,
                prefix=prefix
            )
            part.to_parquet(cp, index=False)
        parts.append(part)

    agg = _reduce_parts(parts)
    if agg.empty:
        raise ValueError(f"{prefix}: empty result")

    g = agg.copy()
    g[f"{prefix}_news_count"] = g["n"].round().astype("int32")
    g[f"{prefix}_score_mean"] = np.where(g["n"] > 0, g["ssum"] / g["n"], np.nan).astype("float32")
    g[f"{prefix}_neg_share"] = np.where(g["n"] > 0, g["neg"] / g["n"], np.nan).astype("float32")
    g[f"{prefix}_neu_share"] = np.where(g["n"] > 0, g["neu"] / g["n"], np.nan).astype("float32")
    g[f"{prefix}_pos_share"] = np.where(g["n"] > 0, g["pos"] / g["n"], np.nan).astype("float32")

    g[f"{prefix}_active_score_mean"] = np.where(g["n"] > 0, g["asum"] / g["n"], np.nan).astype("float32")
    g[f"{prefix}_active_neg_share"] = np.where(g["n"] > 0, g["neg_a"] / g["n"], np.nan).astype("float32")
    g[f"{prefix}_active_pos_share"] = np.where(g["n"] > 0, g["pos_a"] / g["n"], np.nan).astype("float32")

    g = g.drop(columns=["n", "ssum", "asum", "neg", "neu", "pos", "neg_a", "pos_a"])

    out_path = os.path.join(OUTPUT_DIR, f"daily_news_{prefix}_2019_2023.parquet")
    g.to_parquet(out_path, index=False)
    print("Saved:", out_path, "Rows:", f"{len(g):,}")

    del mdl
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    return g


Preferred source: nasdaq
Selected variants: {'all_external': 'title_article', 'nasdaq': 'title_article'}
Preferred key count: 1251590


## Блок 13: Запуск 8 моделей и сохранение `daily_news_*.parquet`

- VADER
- TextBlob
- 6 transformer‑моделей (FinBERT + альтернативы)

Результат: по каждой модели файл в `outputs_final/`:
`daily_news_<model>_2019_2023.parquet`

In [13]:
print_step("Сборка дневных агрегатов сентимента (8 моделей)")

def load_or_build_daily(model_tag: str, builder_fn, out_path: str) -> pd.DataFrame:
    if SKIP_IF_DAILY_EXISTS and (not FORCE_REBUILD_DAILY) and os.path.exists(out_path):
        try:
            df = pd.read_parquet(out_path)
            print(f"[{model_tag}] найден итоговый файл -> {out_path} | строк: {len(df):,}")
            return df
        except Exception as e:
            print(f"[{model_tag}] не удалось прочитать {out_path} ({e}) -> заново")

    print(f"[{model_tag}] считаю/продолжаю через чекпоинты...")
    df = builder_fn()
    if not os.path.exists(out_path):
        df.to_parquet(out_path, index=False)
        print(f"[{model_tag}] сохранён вручную: {out_path}")
    return df

builders = {}

# Lexicon models
builders["vader"] = lambda: build_vader_daily(band=0.02)
builders["textblob"] = lambda: build_textblob_daily(band=0.02)

# Transformers
builders["finbert"] = lambda: build_hf_daily(
    model_id="ProsusAI/finbert",
    prefix="finbert",
    batch_size=64,
    max_length=192
)

builders["finbert_tone"] = lambda: build_hf_daily(
    model_id="yiyanghkust/finbert-tone",
    prefix="finbert_tone",
    batch_size=64,
    max_length=192
)

builders["finroberta"] = lambda: build_hf_daily(
    model_id="soleimanian/financial-roberta-large-sentiment",
    prefix="finroberta",
    batch_size=32,
    max_length=192
)

builders["distilroberta_finnews"] = lambda: build_hf_daily(
    model_id="mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis",
    prefix="distilroberta_finnews",
    batch_size=64,
    max_length=192
)

builders["deberta_finnews"] = lambda: build_hf_daily(
    model_id="mrm8488/deberta-v3-ft-financial-news-sentiment-analysis",
    prefix="deberta_finnews",
    batch_size=32,
    max_length=192
)

builders["twroberta"] = lambda: build_hf_daily(
    model_id="cardiffnlp/twitter-roberta-base-sentiment-latest",
    prefix="twroberta",
    batch_size=64,
    max_length=192
)

model_map = {}

for tag, fn in builders.items():
    out_path = os.path.join(OUTPUT_DIR, f"daily_news_{tag}_2019_2023.parquet")
    model_map[tag] = load_or_build_daily(tag, fn, out_path)

# news_count берём из finbert (как основной)
model_map["finbert"] = model_map["finbert"].copy()
fin_nc = "finbert_news_count"
if fin_nc in model_map["finbert"].columns:
    model_map["finbert"]["news_count"] = model_map["finbert"][fin_nc]
else:
    raise ValueError("В finbert не найден finbert_news_count — проверь build_hf_daily/выходные колонки.")

display(model_map["finbert"].head())


Сборка дневных агрегатов сентимента (8 моделей)
[vader] найден итоговый файл -> outputs_final/daily_news_vader_2019_2023.parquet | строк: 432,985
[textblob] найден итоговый файл -> outputs_final/daily_news_textblob_2019_2023.parquet | строк: 432,985
[finbert] найден итоговый файл -> outputs_final/daily_news_finbert_2019_2023.parquet | строк: 432,985
[finbert_tone] найден итоговый файл -> outputs_final/daily_news_finbert_tone_2019_2023.parquet | строк: 432,985
[finroberta] найден итоговый файл -> outputs_final/daily_news_finroberta_2019_2023.parquet | строк: 432,985
[distilroberta_finnews] найден итоговый файл -> outputs_final/daily_news_distilroberta_finnews_2019_2023.parquet | строк: 432,985
[deberta_finnews] найден итоговый файл -> outputs_final/daily_news_deberta_finnews_2019_2023.parquet | строк: 432,985
[twroberta] найден итоговый файл -> outputs_final/daily_news_twroberta_2019_2023.parquet | строк: 432,985


Unnamed: 0,ticker,date,finbert_news_count,finbert_score_mean,finbert_active_score_mean,finbert_neg_share,finbert_neu_share,finbert_pos_share,finbert_active_neg_share,finbert_active_pos_share,news_count
0,A,2022-09-11,1,-0.059356,-0.396502,0.104527,0.850301,0.045172,0.698251,0.301749,1
1,A,2022-09-15,1,-0.927253,-0.947452,0.952966,0.02132,0.025714,0.973726,0.026274,1
2,A,2022-09-16,2,0.056851,0.610298,0.01657,0.91001,0.07342,0.194851,0.805149,2
3,A,2022-09-29,1,0.018935,0.283029,0.023983,0.9331,0.042917,0.358485,0.641515,1
4,A,2022-10-04,1,0.585094,0.967716,0.00976,0.395387,0.594854,0.016142,0.983858,1


## Блок 14: Выравнивание новостей к торговым дням

Новости публикуются в календарные дни, а рынок не работает по выходным/праздникам

Маппим `date` новости на **ближайший торговый день вперёд**

Если новость вышла в выходной, эффект чаще проявляется на следующей торговой сессии

In [14]:
def align_daily_to_trade_per_ticker(daily_df: pd.DataFrame, day_col: str = "date") -> pd.DataFrame:
    a = daily_df.copy()
    a["ticker"] = a["ticker"].astype(str).str.strip()
    a[day_col] = pd.to_datetime(a[day_col], errors="coerce").dt.normalize()
    a = a.dropna(subset=["ticker", day_col])
    a = a.sort_values(["ticker", day_col], kind="mergesort").reset_index(drop=True)

    if "news_count" in a.columns:
        count_col = "news_count"
    else:
        nc = [c for c in a.columns if c.endswith("_news_count")]
        count_col = nc[0] if nc else None

    if count_col is None:
        count_col = "_tmp_news_count"
        a[count_col] = 1

    out_parts = []
    for tic, g in a.groupby("ticker", sort=False):
        tcal = trade.loc[trade["ticker"] == tic, ["trade_date"]].sort_values("trade_date")
        if tcal.empty:
            continue

        merged = pd.merge_asof(
            g.sort_values(day_col, kind="mergesort"),
            tcal,
            left_on=day_col,
            right_on="trade_date",
            direction="forward",
            allow_exact_matches=True,
            tolerance=pd.Timedelta("7D"),
        )

        merged = merged.dropna(subset=["trade_date"]).copy()
        if merged.empty:
            continue

        merged = merged.drop(columns=[day_col]).rename(columns={"trade_date": "date"})
        merged["date"] = pd.to_datetime(merged["date"]).dt.normalize()

        num_cols = [
            c for c in merged.columns
            if c not in ["ticker", "date", count_col] and pd.api.types.is_numeric_dtype(merged[c])
        ]

        w = pd.to_numeric(merged[count_col], errors="coerce").fillna(0).astype(float)

        tmp = merged[["ticker", "date"] + num_cols].copy()
        tmp[count_col] = w.values

        do_not_weight = {count_col}
        do_not_weight.update([c for c in num_cols if c.endswith("_news_count")])
        weighted_cols = [c for c in num_cols if c not in do_not_weight]

        for c in weighted_cols:
            tmp[c] = pd.to_numeric(tmp[c], errors="coerce").astype(float) * tmp[count_col]

        agg = tmp.groupby(["ticker", "date"], sort=False).sum(numeric_only=True)

        denom = agg[count_col].replace(0, np.nan)
        for c in weighted_cols:
            agg[c] = agg[c] / denom

        agg[count_col] = agg[count_col].fillna(0).round().astype("int32")
        out_parts.append(agg.reset_index())

    out = pd.concat(out_parts, ignore_index=True) if out_parts else a.iloc[0:0].copy()
    out = out.sort_values(["ticker", "date"], kind="mergesort").reset_index(drop=True)

    if "_tmp_news_count" in out.columns:
        out = out.drop(columns=["_tmp_news_count"])
    return out

print_step("Выравнивание моделей к торговым дням")

aligned = {}
for tag, df in model_map.items():
    aligned[tag] = align_daily_to_trade_per_ticker(df, day_col="date")
    dup = aligned[tag].duplicated(["ticker", "date"]).sum()
    if dup:
        raise ValueError(f"{tag}: дубликаты (ticker,date) после выравнивания: {dup}")

print("Строки после выравнивания:")
for tag, df in aligned.items():
    print(f"  - {tag}: {len(df):,}")


Выравнивание моделей к торговым дням
Строки после выравнивания:
  - vader: 410,311
  - textblob: 410,311
  - finbert: 410,311
  - finbert_tone: 410,311
  - finroberta: 410,311
  - distilroberta_finnews: 410,311
  - deberta_finnews: 410,311
  - twroberta: 410,311


## Блок 15: Сборка master‑датасета

Берем доходности `ret` (включая excess и цели y1/y2/y3/y5), выровненные агрегаты сентимента из 8 моделей

Создаем:
- `news_n` = число новостей (по finbert)
- `has_news_today` = индикатор дня с новостями

В дни без новостей сентимент заполняем 0, чтобы можно было использовать все торговые дни в моделях

In [15]:
print_step("Сборка master")

master = ret.copy()

master = master.merge(aligned["finbert"], on=["ticker", "date"], how="left")

for tag in [k for k in aligned.keys() if k != "finbert"]:
    cols = [c for c in aligned[tag].columns if c not in ["ticker", "date"]]
    master = master.merge(aligned[tag][["ticker", "date"] + cols], on=["ticker", "date"], how="left")

if "sector" in companies.columns:
    master = master.merge(companies[["ticker", "sector"]], on="ticker", how="left")

master["news_n"] = pd.to_numeric(master.get("news_count"), errors="coerce").fillna(0).astype("int32")
master["has_news_today"] = (master["news_n"] > 0).astype("int8")

master = master.replace([np.inf, -np.inf], np.nan)

sent_cols = [c for c in master.columns if any(
    c.endswith(suf) for suf in [
        "_score_mean", "_active_score_mean",
        "_neg_share", "_neu_share", "_pos_share",
        "_active_neg_share", "_active_pos_share"
    ]
)]

mask_no_news = master["has_news_today"] == 0
master.loc[mask_no_news, sent_cols] = master.loc[mask_no_news, sent_cols].fillna(0.0)

master = master.sort_values(["ticker", "date"], kind="mergesort").reset_index(drop=True)

print(f"MASTER: строк {len(master):,}, тикеров {master['ticker'].nunique()}, колонок {master.shape[1]}")
print(f"Период: {master['date'].min().date()} — {master['date'].max().date()}")

display(master.head())


Сборка master
MASTER: строк 2,755,903, тикеров 2193, колонок 76
Период: 2019-01-02 — 2023-12-28


Unnamed: 0,ticker,date,price,ret_log,volume,price_col_used,mkt_ret_log,excess_ret_log,y1_ex,y2_ex,y3_ex,y5_ex,finbert_news_count,finbert_score_mean,finbert_active_score_mean,finbert_neg_share,finbert_neu_share,finbert_pos_share,finbert_active_neg_share,finbert_active_pos_share,news_count,vader_score_mean,vader_neg_share,vader_pos_share,vader_neu_share,vader_active_score_mean,vader_news_count,textblob_score_mean,textblob_neg_share,textblob_pos_share,textblob_neu_share,textblob_active_score_mean,textblob_news_count,finbert_tone_score_mean,finbert_tone_active_score_mean,finbert_tone_neg_share,finbert_tone_neu_share,finbert_tone_pos_share,finbert_tone_active_neg_share,finbert_tone_active_pos_share,finbert_tone_news_count,finroberta_score_mean,finroberta_active_score_mean,finroberta_neg_share,finroberta_neu_share,finroberta_pos_share,finroberta_active_neg_share,finroberta_active_pos_share,finroberta_news_count,distilroberta_finnews_score_mean,distilroberta_finnews_active_score_mean,distilroberta_finnews_neg_share,distilroberta_finnews_neu_share,distilroberta_finnews_pos_share,distilroberta_finnews_active_neg_share,distilroberta_finnews_active_pos_share,distilroberta_finnews_news_count,deberta_finnews_score_mean,deberta_finnews_active_score_mean,deberta_finnews_neg_share,deberta_finnews_neu_share,deberta_finnews_pos_share,deberta_finnews_active_neg_share,deberta_finnews_active_pos_share,deberta_finnews_news_count,twroberta_score_mean,twroberta_active_score_mean,twroberta_neg_share,twroberta_neu_share,twroberta_pos_share,twroberta_active_neg_share,twroberta_active_pos_share,twroberta_news_count,sector,news_n,has_news_today
0,A,2019-01-02,64.968681,,2113300.0,adj close,,,-0.013383,-0.012302,0.000856,0.022114,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,Healthcare,0,0
1,A,2019-01-03,62.575249,-0.037536,5383900.0,adj close,-0.024152,-0.013383,0.001081,0.014239,0.019441,0.041318,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,Healthcare,0,0
2,A,2019-01-04,64.741203,0.034028,3123700.0,adj close,0.032947,0.001081,0.013158,0.01836,0.034416,0.046694,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,Healthcare,0,0
3,A,2019-01-07,66.115929,0.021012,3235100.0,adj close,0.007854,0.013158,0.005202,0.021258,0.027079,0.030665,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,Healthcare,0,0
4,A,2019-01-08,67.085175,0.014553,1578100.0,adj close,0.009351,0.005202,0.016056,0.021877,0.028334,0.026179,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,,Healthcare,0,0


## Блок 16: Сохранение и валидация результатов

`outputs_final/returns_sentiment_enhanced.parquet`

Контрольные проверки:
- нет ли дубликатов `(ticker, date)`
- присутствуют обязательные колонки
- доля дней с новостями

In [16]:
print_step("Проверки и сохранение")

dup_master = master.duplicated(subset=["ticker", "date"]).sum()
print("Дубликаты (ticker,date):", int(dup_master))
if dup_master:
    raise ValueError("Обнаружены дубликаты в master — это нужно исправить до моделирования.")

required = [
    "ticker", "date", "ret_log", "mkt_ret_log", "excess_ret_log",
    "y1_ex", "y2_ex", "y3_ex", "y5_ex",
    "has_news_today", "news_n",
    "finbert_score_mean", "vader_score_mean", "textblob_score_mean",
]
missing = [c for c in required if c not in master.columns]
if missing:
    raise ValueError(f"Не хватает обязательных колонок: {missing}")

print("Все обязательные колонки присутствуют")
print("Доля дней с новостями (has_news_today):", f"{master['has_news_today'].mean()*100:.1f}%")

MASTER_PATH = os.path.join(OUTPUT_DIR, "returns_sentiment_enhanced.parquet")
master.to_parquet(MASTER_PATH, index=False)
print(f"Master сохранен: {MASTER_PATH}")


Проверки и сохранение
Дубликаты (ticker,date): 0
Все обязательные колонки присутствуют
Доля дней с новостями (has_news_today): 14.9%
Master сохранен: outputs_final/returns_sentiment_enhanced.parquet


## Блок 17: 20 примеров (новости + цена + доходности)

Берём 10 самых негативных и 10 самых позитивных дней по выбранной колонке сентимента и выводим примеры текстов новостей (до 3 на кейс)

In [17]:
SAMPLE_N = 20
MIN_NEWS = 2
SENT_COL = "finbert_score_mean"
MAX_HEADLINES_PER_CASE = 3

cand = master.loc[
    (master["has_news_today"] == 1) &
    (master["news_n"] >= MIN_NEWS) &
    (master[SENT_COL].notna())
].copy()

k = SAMPLE_N // 2
neg_cases = cand.sort_values(SENT_COL, ascending=True).head(k)
pos_cases = cand.sort_values(SENT_COL, ascending=False).head(k)
cases = pd.concat([neg_cases, pos_cases], ignore_index=True)

cases = cases[[
    "ticker", "date", "price", "ret_log", "excess_ret_log",
    "y1_ex", "y3_ex", "news_n", SENT_COL
]].copy()

tickers_need = set(cases["ticker"].unique())
date_min = (cases["date"].min() - pd.Timedelta(days=7)).normalize()
date_max = (cases["date"].max() + pd.Timedelta(days=1)).normalize()

print("Тикеров в примерах:", len(tickers_need))
print("Окно дат:", date_min.date(), "—", date_max.date())

def load_news_texts_subset_per_source(source_tag: str, file_path: str,
                                      tickers_set: set, dmin: pd.Timestamp, dmax: pd.Timestamp,
                                      variant: str,
                                      skip_keys_sorted: Optional[np.ndarray]) -> pd.DataFrame:
    out = []
    reader = pd.read_csv(file_path, chunksize=CHUNK_SIZE, low_memory=False)

    for raw in tqdm(reader, desc=f"Read texts: {source_tag}"):
        ch = _prep_news_chunk_base(raw, source_tag)
        if ch.empty:
            continue

        ch = ch[ch["ticker"].isin(tickers_set)]
        if ch.empty:
            continue

        ch = ch[(ch["day"] >= dmin) & (ch["day"] <= dmax)]
        if ch.empty:
            continue

        if (skip_keys_sorted is not None) and len(skip_keys_sorted) > 0:
            kh = _hash_url_ticker(ch)
            dup_mask = _is_dup_by_keys(skip_keys_sorted, kh)
            if dup_mask.any():
                ch = ch.loc[~dup_mask].reset_index(drop=True)
                if ch.empty:
                    continue

        text = build_text_variant(ch, variant)
        m = (text != "")
        ch = ch.loc[m, ["ticker","day","url","source"]].copy()
        ch["text"] = text.loc[m].astype(str)

        out.append(ch)

    if not out:
        return pd.DataFrame(columns=["ticker","day","url","source","text"])
    return pd.concat(out, ignore_index=True)

texts_parts = []
for tag, fp in NEWS_SOURCES.items():
    var = selected_variants[tag]
    skip = None if tag == preferred_source else preferred_keys_sorted
    part = load_news_texts_subset_per_source(tag, fp, tickers_need, date_min, date_max, var, skip)
    texts_parts.append(part)

news_subset = pd.concat(texts_parts, ignore_index=True) if texts_parts else pd.DataFrame()
print("Найдено строк новостей (после фильтров):", len(news_subset))

trade_small = trade[trade["ticker"].isin(tickers_need)].copy()
trade_small = trade_small.sort_values(["ticker", "trade_date"], kind="mergesort")

mapped_parts = []
for tic, g in news_subset.groupby("ticker", sort=False):
    tcal = trade_small.loc[trade_small["ticker"] == tic, ["trade_date"]].sort_values("trade_date")
    if tcal.empty:
        continue

    merged = pd.merge_asof(
        g.sort_values("day", kind="mergesort"),
        tcal,
        left_on="day",
        right_on="trade_date",
        direction="forward",
        allow_exact_matches=True,
        tolerance=pd.Timedelta("7D"),
    )
    merged = merged.dropna(subset=["trade_date"]).copy()
    if merged.empty:
        continue

    merged["date"] = pd.to_datetime(merged["trade_date"]).dt.normalize()
    mapped_parts.append(merged[["ticker", "date", "text", "source"]])

mapped = pd.concat(mapped_parts, ignore_index=True) if mapped_parts else pd.DataFrame(columns=["ticker","date","text","source"])

def pack_texts(s):
    texts = s.tolist()[:MAX_HEADLINES_PER_CASE]
    texts = [(t[:220] + ("…" if len(t) > 220 else "")) for t in texts]
    return " | ".join(texts)

examples = (
    mapped.groupby(["ticker", "date"], sort=False)["text"]
    .apply(pack_texts)
    .reset_index()
    .rename(columns={"text": "example_news"})
)

cases = cases.merge(examples, on=["ticker","date"], how="left")

print("Доля кейсов с найденными текстами:", f"{cases['example_news'].notna().mean()*100:.1f}%")
display(cases.sort_values(SENT_COL))

Тикеров в примерах: 19
Окно дат: 2019-11-19 — 2023-12-01


Read texts: all_external: 66it [00:44,  1.49it/s]
Read texts: nasdaq: 78it [02:22,  1.83s/it]

Найдено строк новостей (после фильтров): 11982
Доля кейсов с найденными текстами: 100.0%





Unnamed: 0,ticker,date,price,ret_log,excess_ret_log,y1_ex,y3_ex,news_n,finbert_score_mean,example_news
0,RCUS,2022-02-01,31.469999,0.02152,0.014786,-0.055839,-0.058613,2,-0.969681,"Gilead earnings hurt by legal settlement, other charges. By Deena Beasley Feb 1 (Reuters) - Gilead Sciences Inc GILD.O on Tuesday posted lower-than-expected fourth-quarter earn..."
1,VIV,2020-05-19,7.789398,-0.026766,-0.016442,-0.000943,0.023319,2,-0.969272,"TIM shares drop after core profit guidance omitted. By Elvira Pollina MILAN, May 19 (Reuters) - Shares in Telecom Italia (TIM) TLIT.MI fell sharply on Tuesday as Italy's bigges..."
2,CMC,2023-02-03,55.986725,0.009413,0.020099,-0.005069,-0.001542,2,-0.969266,"LyondellBasell's (LYB) Q4 Earnings Beat, Sales Lag Estimates. LyondellBasell Industries N.V. LYB recorded earnings of $353 million or $1.07 per share in the fourth quarter of 2..."
3,CRK,2023-06-30,11.336234,-0.003442,-0.015174,-0.010678,-0.045819,2,-0.96908,US drillers cut oil and gas rigs for ninth week in a row -Baker Hughes. Adds Haynesville and Williston basin counts June 30 (Reuters) - U.S. energy firms this week cut the numb...
4,BKR,2023-06-30,31.254986,0.00794,-0.003791,-0.001466,0.007883,2,-0.96908,US drillers cut oil and gas rigs for ninth week in a row -Baker Hughes. Adds Haynesville and Williston basin counts June 30 (Reuters) - U.S. energy firms this week cut the numb...
5,HPE,2023-04-10,15.708621,0.012531,0.011506,-0.000891,-0.011249,2,-0.968973,"Global PC shipments slide in Q1, Apple takes biggest hit - IDC. Adds graphic April 10 (Reuters) - Global shipments of personal computers (PCs) fell by 29% in the first quarter ..."
6,CRK,2023-06-16,10.231929,-0.019859,-0.016447,-0.019945,-0.011578,2,-0.968785,US drillers cut oil and gas rigs for seventh week in a row - Baker Hughes. Adds rig reductions in Marcellus and Permian basins June 16 (Reuters) - U.S. energy firms this week c...
7,REZI,2022-11-02,15.83,-0.389118,-0.363702,0.002104,0.02435,2,-0.968765,"Consumer Sector Update for 11/02/2022: TUP, REZI, LL. Consumer stocks were broadly lower in Wednesday trading, with the SPDR Consumer Staples Select Sector ETF (XLP) dropping 0..."
8,AIG,2022-10-20,50.340004,-0.017209,-0.008788,0.002632,-0.00603,2,-0.968749,"Blackstone's earnings fall 16% on sharp drop in asset sales. By Chibuike Oguh NEW YORK, Oct 20 (Reuters) - Blackstone Inc BX.N, the world's largest alternative asset manager, s..."
9,EQNR,2023-10-23,32.608925,-0.014517,-0.012782,-0.021939,0.002443,2,-0.968742,"Norwegian Sept gas output lags forecasts, hits 4-year low on outages. Adds comparisons, graphic, detail in paragraphs 4-7 OSLO, Oct 23 (Reuters) - Norway's natural gas output f..."
