# 5r. 单公司 2025-Q4 Beat/Miss 预测

## 任务说明

使用 **4.ipynb** 训练好的 Logistic 回归模型，基于 CVX 的 2025 Q1、Q2、Q3 的 earnings call transcript，预测 **2025-Q4** 是否会 Beat/Miss。

- **Transcript 路径**: `预测/transcripts/CVX/`
- **模型来源**: 4.ipynb（Logistic 回归 + StandardScaler，特征见 `data/selected_features.txt`）
- **预测逻辑**: 当前季度 call（2025-Q3）的特征 → 预测下一季度（2025-Q4）的 Beat 概率

In [2]:
import os
import re
import pickle
from pathlib import Path
from datetime import datetime

import numpy as np
import pandas as pd
from textstat import (
    automated_readability_index,
    coleman_liau_index,
    dale_chall_readability_score,
    flesch_reading_ease,
    flesch_kincaid_grade,
    gunning_fog,
    smog_index,
)
from sklearn.preprocessing import StandardScaler

PROJECT_ROOT = Path("..").resolve()
DATA_DIR = PROJECT_ROOT / "data"
PRED_DIR = PROJECT_ROOT / "预测"
TRANSCRIPTS_DIR = PRED_DIR / "transcripts" / "CVX"

TICKER = "CVX"
TARGET_QUARTERS = ["2025-Q1", "2025-Q2", "2025-Q3"]  # 用于特征和 delta
PREDICT_QUARTER = "2025-Q4"  # 预测目标

print("PROJECT_ROOT:", PROJECT_ROOT)
print("TRANSCRIPTS_DIR:", TRANSCRIPTS_DIR)
print("TICKER:", TICKER, "| 预测季度:", PREDICT_QUARTER)

PROJECT_ROOT: /Users/xinyuewang/Desktop/1.27
TRANSCRIPTS_DIR: /Users/xinyuewang/Desktop/1.27/预测/transcripts/CVX
TICKER: CVX | 预测季度: 2025-Q4


## 1. 解析与分段（与 3.ipynb 一致）

In [3]:
DATE_LINE_PATTERN = re.compile(
    r"(?P<month>[A-Za-z]+\.?)\s+(?P<day>\d{1,2}),\s+(?P<year>\d{4}),?\s+"
    r"(?P<hour>\d{1,2}):(?P<minute>\d{2})\s*(?P<ampm>AM|PM)\s*ET",
    re.IGNORECASE,
)
MONTH_MAP = {
    "jan": 1, "january": 1, "jan.": 1,
    "feb": 2, "february": 2, "feb.": 2,
    "mar": 3, "march": 3, "mar.": 3,
    "apr": 4, "april": 4, "apr.": 4,
    "may": 5, "jun": 6, "june": 6, "jun.": 6,
    "jul": 7, "july": 7, "jul.": 7,
    "aug": 8, "august": 8, "aug.": 8,
    "sep": 9, "september": 9, "sep.": 9,
    "oct": 10, "october": 10, "oct.": 10,
    "nov": 11, "november": 11, "nov.": 11,
    "dec": 12, "december": 12, "dec.": 12,
}

QA_MARKERS = [
    r"Question-and-Answer Session", r"Questions and Answers",
    r"Question and Answer Session", r"Q&A Session", r"Q & A Session",
]

def parse_quarter_from_filename(filename: str):
    m = re.search(r"Q([1-4])\s+(\d{4})", filename, re.IGNORECASE)
    if m:
        return f"{int(m.group(2))}-Q{int(m.group(1))}"
    return None

def parse_call_datetime_et(text: str):
    lines = text.split("\n")[:15]
    search_text = "\n".join([l for l in lines if "Call Start" not in l])
    matches = list(DATE_LINE_PATTERN.finditer(search_text))
    valid = [(m, int(m.group("year"))) for m in matches if 2000 <= int(m.group("year")) <= 2030]
    if not valid:
        return None
    m, _ = max(valid, key=lambda x: x[1])
    month_str = m.group("month").lower()
    month = MONTH_MAP.get(month_str.rstrip("."))
    if month is None:
        return None
    day = int(m.group("day")); year = int(m.group("year"))
    hour = int(m.group("hour")); minute = int(m.group("minute"))
    ampm = m.group("ampm").upper()
    if ampm == "PM" and hour != 12: hour += 12
    if ampm == "AM" and hour == 12: hour = 0
    try:
        return datetime(year, month, day, hour, minute).strftime("%Y-%m-%d %H:%M")
    except ValueError:
        return None

def find_qa_marker(text: str, skip_first_n_sentences: int = 10):
    sentence_endings = re.finditer(r"[.!?]+\s+", text)
    positions = [m.end() for m in sentence_endings]
    start = positions[skip_first_n_sentences - 1] if len(positions) >= skip_first_n_sentences else 0
    search_text = text[start:]
    for marker in QA_MARKERS:
        m = re.search(marker, search_text, re.IGNORECASE)
        if m:
            return start + m.start()
    return None

def split_transcript_into_segments(text: str):
    segments = []
    qa_start = find_qa_marker(text)
    if qa_start is not None:
        segments.append({"segment_type": "Prepared Remarks", "text_content": text[:qa_start].strip()})
        segments.append({"segment_type": "Q&A", "text_content": text[qa_start:].strip()})
    else:
        segments.append({"segment_type": "Prepared Remarks", "text_content": text.strip()})
    return segments

## 2. 文本特征（与 3.ipynb 一致）

In [4]:
PRONOUNS_ALL = {"i","me","my","mine","you","your","yours","he","him","his","she","her","hers","it","its","we","us","our","ours","they","them","their","theirs"}
PRONOUNS_PLURAL = {"we","us","our","ours","they","them","their","theirs"}
COMMON_ADVERBS = {"very","quite","rather","extremely","highly","significantly","substantially","slightly","barely","rarely","frequently","usually","typically","generally","probably","possibly","certainly","clearly","obviously"}
WORD_PATTERN = re.compile(r"[A-Za-z']+")

def tokenize_words(text): return WORD_PATTERN.findall(text.lower())
def count_sentences(text): return sum(1 for p in re.split(r"[.!?]+", text) if p.strip())

def basic_features(text: str):
    tokens = tokenize_words(text)
    n_words = len(tokens)
    n_sent = count_sentences(text)
    avg_words = n_words / n_sent if n_sent > 0 else 0.0
    pronouns = [w for w in tokens if w in PRONOUNS_ALL]
    pronouns_plural = [w for w in pronouns if w in PRONOUNS_PLURAL]
    pronoun_plural_ratio = len(pronouns_plural) / len(pronouns) if pronouns else 0.0
    adverbs = [w for w in tokens if w in COMMON_ADVERBS or (len(w) > 3 and w.endswith("ly"))]
    adverb_ratio = len(adverbs) / n_words if n_words > 0 else 0.0
    return {"n_words": n_words, "n_sentences": n_sent, "avg_words_per_sentence": avg_words, "pronoun_plural_ratio": pronoun_plural_ratio, "adverb_ratio": adverb_ratio}

def readability_features(text: str):
    out = {}
    for name, func in [("ari", automated_readability_index), ("coleman_liau", coleman_liau_index), ("dale_chall", dale_chall_readability_score), ("flesch_ease", flesch_reading_ease), ("flesch_kincaid", flesch_kincaid_grade), ("gunning_fog", gunning_fog), ("smog", smog_index)]:
        try: out[name] = func(text)
        except: out[name] = None
    return out

LM_CSV_PATH = DATA_DIR / "LM" / "LM_MasterDictionary.csv"
if LM_CSV_PATH.exists():
    lm_df = pd.read_csv(LM_CSV_PATH)
    lm_df["Word"] = lm_df["Word"].astype(str).str.lower()
    def build_lm_set(col): return set(lm_df.loc[lm_df[col] > 0, "Word"]) if col in lm_df.columns else set()
    LM_POSITIVE = build_lm_set("Positive"); LM_NEGATIVE = build_lm_set("Negative"); LM_UNCERTAINTY = build_lm_set("Uncertainty")
    LM_LITIGIOUS = build_lm_set("Litigious"); LM_SUPERFLUOUS = build_lm_set("Superfluous"); LM_INTERESTING = build_lm_set("Interesting")
    LM_MODAL_WEAK = build_lm_set("ModalWeak"); LM_MODAL_MODERATE = build_lm_set("ModalModerate"); LM_MODAL_STRONG = build_lm_set("ModalStrong")
    LM_CONSTRAINING = build_lm_set("Constraining"); LM_COMPLEXITY = build_lm_set("Complexity")
else:
    LM_POSITIVE = LM_NEGATIVE = LM_UNCERTAINTY = LM_LITIGIOUS = LM_SUPERFLUOUS = LM_INTERESTING = set()
    LM_MODAL_WEAK = LM_MODAL_MODERATE = LM_MODAL_STRONG = LM_CONSTRAINING = LM_COMPLEXITY = set()

def lm_features(text: str):
    tokens = tokenize_words(text)
    n_words = len(tokens) if tokens else 1
    def ratio(s): return sum(1 for w in tokens if w in s) / n_words
    pos, neg = ratio(LM_POSITIVE), ratio(LM_NEGATIVE)
    return {"lm_positive": pos, "lm_negative": neg, "lm_uncertainty": ratio(LM_UNCERTAINTY), "lm_litigous": ratio(LM_LITIGIOUS), "lm_superfluous": ratio(LM_SUPERFLUOUS), "lm_interesting": ratio(LM_INTERESTING), "lm_modal_weak": ratio(LM_MODAL_WEAK), "lm_modal_moderate": ratio(LM_MODAL_MODERATE), "lm_modal_strong": ratio(LM_MODAL_STRONG), "lm_constraining": ratio(LM_CONSTRAINING), "lm_complexity": ratio(LM_COMPLEXITY), "lm_net_sentiment": pos - neg, "lm_polarity": pos + neg, "lm_subjectivity": pos + neg + ratio(LM_UNCERTAINTY) + ratio(LM_LITIGIOUS) + ratio(LM_SUPERFLUOUS) + ratio(LM_INTERESTING)}

def compute_segment_features(text: str):
    d = {}
    d.update(basic_features(text))
    d.update(readability_features(text))
    d.update(lm_features(text))
    return d

## 3. 加载 CVX transcript，分段并计算特征

In [5]:
BASE_FEATURE_NAMES = ["n_words", "n_sentences", "avg_words_per_sentence", "pronoun_plural_ratio", "adverb_ratio", "ari", "coleman_liau", "dale_chall", "flesch_ease", "flesch_kincaid", "gunning_fog", "smog", "lm_positive", "lm_negative", "lm_uncertainty", "lm_litigous", "lm_superfluous", "lm_interesting", "lm_modal_weak", "lm_modal_moderate", "lm_modal_strong", "lm_constraining", "lm_complexity", "lm_net_sentiment", "lm_polarity", "lm_subjectivity"]

rows = []
for txt_path in sorted(TRANSCRIPTS_DIR.glob("*.txt")):
    quarter = parse_quarter_from_filename(txt_path.name)
    if quarter not in TARGET_QUARTERS:
        continue
    text = txt_path.read_text(encoding="utf-8", errors="ignore")
    call_dt = parse_call_datetime_et(text)
    for seg in split_transcript_into_segments(text):
        feat = compute_segment_features(seg["text_content"])
        row = {"ticker": TICKER, "fiscal_quarter": quarter, "call_datetime_et": call_dt, "segment_type": seg["segment_type"]}
        row.update(feat)
        rows.append(row)

df_segments = pd.DataFrame(rows)
print("Segment 数:", len(df_segments))
print(df_segments[["fiscal_quarter", "segment_type", "n_words"]].to_string())

Segment 数: 6
  fiscal_quarter      segment_type  n_words
0        2025-Q1  Prepared Remarks     1184
1        2025-Q1               Q&A     6547
2        2025-Q2  Prepared Remarks     1609
3        2025-Q2               Q&A     5924
4        2025-Q3  Prepared Remarks     1208
5        2025-Q3               Q&A     6766


## 4. 聚合为 call 级（每季度一行，_preparedremark / _QA）

In [6]:
call_rows = []
for (ticker, fq), grp in df_segments.groupby(["ticker", "fiscal_quarter"]):
    row = {"ticker": ticker, "fiscal_quarter": fq, "call_datetime_et": grp["call_datetime_et"].iloc[0]}
    prep = grp[grp["segment_type"] == "Prepared Remarks"]
    qa = grp[grp["segment_type"] == "Q&A"]
    for f in BASE_FEATURE_NAMES:
        if f in grp.columns:
            row[f + "_preparedremark"] = prep[f].iloc[0] if len(prep) > 0 else np.nan
            row[f + "_QA"] = qa[f].iloc[0] if len(qa) > 0 else np.nan
    if np.isnan(row.get("n_words_preparedremark")) or row.get("n_words_preparedremark") is None: row["n_words_preparedremark"] = 0
    if np.isnan(row.get("n_words_QA")) or row.get("n_words_QA") is None: row["n_words_QA"] = 0
    call_rows.append(row)

df_calls = pd.DataFrame(call_rows).sort_values("fiscal_quarter").reset_index(drop=True)
print("Call 数:", len(df_calls))
print(df_calls[["fiscal_quarter", "n_words_preparedremark", "n_words_QA"]].to_string())

Call 数: 3
  fiscal_quarter  n_words_preparedremark  n_words_QA
0        2025-Q1                    1184        6547
1        2025-Q2                    1609        5924
2        2025-Q3                    1208        6766


## 5. 添加 sentiment_divergence, qa_word_ratio, delta_net_sentiment, delta_complexity

In [7]:
df_calls["sentiment_divergence"] = df_calls["lm_net_sentiment_preparedremark"] - df_calls["lm_net_sentiment_QA"]
df_calls["qa_word_ratio"] = df_calls["n_words_QA"] / df_calls["n_words_preparedremark"].replace(0, np.nan)

def calculate_standardized_change(group, col_name, window=4):
    values = group[col_name].values
    out = np.full(len(values), np.nan)
    for i in range(len(values)):
        if i < window:
            hist = values[:i]
        else:
            hist = values[i - window:i]
        if len(hist) > 0 and not np.isnan(hist).all():
            mean_val = np.nanmean(hist)
            std_val = np.nanstd(hist)
            if std_val > 0:
                out[i] = (values[i] - mean_val) / std_val
    return out

df_calls["net_sentiment_avg"] = (df_calls["lm_net_sentiment_preparedremark"] + df_calls["lm_net_sentiment_QA"]) / 2

# 修复：确保 groupby().apply() 返回的是 Series，并正确对齐索引
def compute_delta(group, col_name, window=4):
    result = calculate_standardized_change(group, col_name, window)
    return pd.Series(result, index=group.index)

# 计算 delta_net_sentiment
delta_net_result = df_calls.groupby("ticker").apply(compute_delta, col_name="net_sentiment_avg", window=4)
# 如果返回的是 DataFrame，取第一列；如果是 Series，直接使用
if isinstance(delta_net_result, pd.DataFrame):
    delta_net_result = delta_net_result.iloc[:, 0]
# 移除 groupby 的索引层级，只保留原始索引
delta_net_result = delta_net_result.reset_index(level=0, drop=True)
df_calls["delta_net_sentiment"] = delta_net_result

# 计算 delta_complexity
delta_comp_result = df_calls.groupby("ticker").apply(compute_delta, col_name="ari_preparedremark", window=4)
if isinstance(delta_comp_result, pd.DataFrame):
    delta_comp_result = delta_comp_result.iloc[:, 0]
delta_comp_result = delta_comp_result.reset_index(level=0, drop=True)
df_calls["delta_complexity"] = delta_comp_result

df_calls = df_calls.drop(columns=["net_sentiment_avg"])
print("新特征已添加")

新特征已添加


## 6. 加载模型并预测 2025-Q4 Beat 概率

In [9]:
model_path = DATA_DIR / "model.pkl"
scaler_path = DATA_DIR / "scaler.pkl"
selected_features_path = DATA_DIR / "selected_features.txt"

if not model_path.exists() or not scaler_path.exists():
    raise FileNotFoundError("请先运行 4.ipynb 并执行到「保存模型」的 cell，生成 data/model.pkl 和 data/scaler.pkl")

with open(model_path, "rb") as f:
    model = pickle.load(f)
with open(scaler_path, "rb") as f:
    scaler = pickle.load(f)
with open(selected_features_path) as f:
    selected_features = [line.strip() for line in f if line.strip()]

row_2025q3 = df_calls[df_calls["fiscal_quarter"] == "2025-Q3"].iloc[0]
X = pd.DataFrame([row_2025q3])[selected_features].fillna(0)
X_scaled = scaler.transform(X)
prob_beat = model.predict_proba(X_scaled)[0, 1]

print("=" * 60)
print(f"{TICKER} 2025-Q4 Beat/Miss 预测")
print("=" * 60)
print(f"基于 2025-Q3 earnings call 特征")
print(f"预测 2025-Q4 Beat 概率: {prob_beat:.4f}")
print(f"预测 2025-Q4 Miss 概率: {1 - prob_beat:.4f}")
print(f"结论: {'Beat' if prob_beat >= 0.5 else 'Miss'} (阈值 0.5)")
print("=" * 60)

CVX 2025-Q4 Beat/Miss 预测
基于 2025-Q3 earnings call 特征
预测 2025-Q4 Beat 概率: 0.5491
预测 2025-Q4 Miss 概率: 0.4509
结论: Beat (阈值 0.5)
