
# Analyst-Revision Fade — Hybrid Contrarian Notebook
This notebook implements a systematic fade of analyst price-target/rating adjustments, conditioned on **fundamental disagreement** (DCF/multiples vs target), **media/news sentiment disagreement** (FinBERT), and **technical reversal confirmation** (ATR + VWAP±σ band breaks).

**Pipeline**
1) Ingest analyst revision events (Finnhub or CSV).
2) Pull daily OHLCV (Polygon) and SEC facts for simple DCF/multiples anchors.
3) Pull/score news (Alpha Vantage + FinBERT).
4) Build features (revision magnitude, disagreement metrics).
5) Filter trades by fundamentals & sentiment disagreement.
6) Confirm timing via ATR/VWAP band breaks + momentum flip.
7) Simulate trades and plot equity curve.


## 1) Imports & Config

In [None]:

import os, re, time, math, json, warnings, requests
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone, date
from typing import Optional
import numpy as np
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")

POLYGON_API_KEY   = os.getenv("POLYGON_API_KEY", "")
ALPHAVANTAGE_KEY  = os.getenv("ALPHAVANTAGE_API_KEY", "")
FINNHUB_API_KEY   = os.getenv("FINNHUB_API_KEY", "")  # optional
SEC_EMAIL         = os.getenv("SEC_EMAIL", "you@example.com")

NEWS_MONTHS_BACK        = 3
SENTIMENT_CARRY_DAYS    = 5
VWAP_WINDOW_DAYS        = 30
VWAP_SIGMA_WINDOW_DAYS  = 20
ATR_WINDOW_DAYS         = 14
REV_MOM_LOOKBACK_DAYS   = 5
MAX_HOLD_DAYS           = 28
TRAIL_STOP_PCT          = 0.05
FINBERT_ID              = "yiyanghkust/finbert-tone"


## 2) HTTP Session & Utilities

In [None]:

from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

def _session():
    s = requests.Session()
    retries = Retry(total=3, backoff_factor=0.5, status_forcelist=(429,500,502,503,504))
    s.mount("https://", HTTPAdapter(max_retries=retries))
    return s

def _to_datestring(dt: datetime) -> str:
    return dt.strftime("%Y-%m-%d")

SEC_HEADERS = {"User-Agent": SEC_EMAIL, "Accept": "application/json"}
POLY_BASE   = "https://api.polygon.io"
AV_BASE     = "https://www.alphavantage.co/query"
FINNHUB_BASE= "https://finnhub.io/api/v1"


## 3) Prices — Polygon daily OHLCV

In [None]:

def polygon_daily_ohlcv(ticker: str, start: str, end: str) -> pd.DataFrame:
    url = f"{POLY_BASE}/v2/aggs/ticker/{ticker.upper()}/range/1/day/{start}/{end}"
    params = {"adjusted":"true","sort":"asc","limit":50000,"apiKey":POLYGON_API_KEY}
    r = _session().get(url, params=params, timeout=30); r.raise_for_status()
    rows = r.json().get("results", []) or []
    if not rows:
        return pd.DataFrame(columns=["date","open","high","low","close","volume"])
    df = pd.DataFrame(rows)[["t","o","h","l","c","v"]].rename(
        columns={"t":"ts","o":"open","h":"high","l":"low","c":"close","v":"volume"})
    df["date"] = pd.to_datetime(df["ts"], unit="ms", utc=True).dt.tz_convert("US/Eastern").dt.date
    df = df.drop(columns=["ts"]).drop_duplicates("date")
    return df[["date","open","high","low","close","volume"]]


## 4) Technical Features: ATR, VWAP bands, momentum flip

In [None]:

def compute_atr(df: pd.DataFrame, n: int = ATR_WINDOW_DAYS) -> pd.Series:
    if df.empty: return pd.Series(dtype=float)
    h,l,c = df["high"].astype(float), df["low"].astype(float), df["close"].astype(float)
    prev_c = c.shift(1)
    tr = pd.concat([(h-l), (h-prev_c).abs(), (l-prev_c).abs()], axis=1).max(axis=1)
    atr = tr.rolling(n, min_periods=max(2, n//2)).mean()
    return atr

def compute_vwap_and_sigma(df: pd.DataFrame, window: int = VWAP_WINDOW_DAYS, sigma_win: int = VWAP_SIGMA_WINDOW_DAYS):
    if df.empty: return df.assign(vwap=np.nan, vwap_sigma=np.nan, vwap_p=np.nan)
    tp = (df["high"] + df["low"] + df["close"]) / 3.0
    vol = df["volume"].replace(0, np.nan)
    num = (tp * vol).rolling(window, min_periods=5).sum()
    den = vol.rolling(window, min_periods=5).sum()
    vwap = num / den
    resid = df["close"] - vwap
    vwap_sigma = resid.rolling(sigma_win, min_periods=5).std(ddof=0)
    out = df.copy()
    out["vwap"] = vwap
    out["vwap_sigma"] = vwap_sigma
    out["vwap_p"] = (df["close"] - vwap) / (vwap_sigma.replace(0, np.nan))
    return out

def momentum_flip(df: pd.DataFrame, look=REV_MOM_LOOKBACK_DAYS) -> pd.Series:
    if df.empty: return pd.Series(dtype=float)
    roc = df["close"].pct_change(look)
    return np.sign(roc)


## 5) News Sentiment — Alpha Vantage + FinBERT

In [None]:

def fetch_news_single_ticker(ticker: str, days_back: int, limit: int = 100) -> pd.DataFrame:
    start = (datetime.now(timezone.utc) - timedelta(days=days_back)).strftime("%Y%m%dT%H%M")
    params = {"function":"NEWS_SENTIMENT","tickers":ticker.upper(),"time_from":start,"sort":"LATEST","limit":int(limit),"apikey":ALPHAVANTAGE_KEY}
    r = _session().get(AV_BASE, params=params, timeout=30); data = r.json()
    if "Note" in data:
        time.sleep(12); return fetch_news_single_ticker(ticker, days_back, limit)
    feed = data.get("feed", []) or []
    rows = []
    for item in feed:
        for ts in item.get("ticker_sentiment", []):
            rows.append({
                "dt": pd.to_datetime(item.get("time_published"), format="%Y%m%dT%H%M%S", utc=True, errors="coerce"),
                "ticker": ts.get("ticker"),
                "title": item.get("title") or "",
                "summary": item.get("summary") or "",
                "url": item.get("url"),
                "source": item.get("source"),
            })
    df = pd.DataFrame(rows)
    if df.empty: return df
    df["text"] = (df["title"].fillna("").str.strip() + ". " + df["summary"].fillna("").str.strip()).str.strip()
    df = df[df["text"].str.len() > 0].drop_duplicates(subset=["url","ticker"])
    return df.sort_values("dt", ascending=False).reset_index(drop=True)

def load_finbert():
    tok = AutoTokenizer.from_pretrained(FINBERT_ID)
    mdl = AutoModelForSequenceClassification.from_pretrained(FINBERT_ID)
    pipe = TextClassificationPipeline(model=mdl, tokenizer=tok, return_all_scores=True, truncation=True)
    return pipe

def score_news_finbert(df: pd.DataFrame, pipe) -> pd.DataFrame:
    if df.empty: return df.assign(finbert_pos=[], finbert_neu=[], finbert_neg=[], net_sentiment=[])
    texts = df["text"].tolist()
    scores = []
    for i in range(0, len(texts), 32):
        out = pipe(texts[i:i+32], max_length=256)
        for row in out:
            d = {dct["label"].lower(): dct["score"] for dct in row}
            scores.append([d.get("positive",0.0), d.get("neutral",0.0), d.get("negative",0.0)])
    S = np.array(scores) if scores else np.zeros((0,3))
    df["finbert_pos"], df["finbert_neu"], df["finbert_neg"] = S[:,0], S[:,1], S[:,2]
    df["net_sentiment"] = df["finbert_pos"] - df["finbert_neg"]
    return df


## 6) SEC Facts → DCF & Multiples Anchors

In [None]:

def _zero_pad_cik(x: str) -> str:
    return f"{int(x):010d}"

def _load_ticker_table() -> dict:
    url = "https://www.sec.gov/files/company_tickers.json"
    r = _session().get(url, headers=SEC_HEADERS, timeout=30); r.raise_for_status()
    raw = r.json()
    return {row["ticker"].upper(): _zero_pad_cik(str(row["cik_str"])) for _, row in raw.items()}

def normalize_cik(identifier: str) -> str:
    s = str(identifier).strip().upper()
    if s.startswith("CIK"):
        s = s[3:].strip()
    if re.fullmatch(r"\d+", s):
        return _zero_pad_cik(s)
    table = _load_ticker_table()
    if s in table:
        return table[s]
    raise ValueError(f"Could not resolve identifier to CIK: {identifier}")

def get_company_facts(ticker_or_cik):
    cik10 = normalize_cik(ticker_or_cik)
    url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik10}.json"
    r = _session().get(url, headers=SEC_HEADERS, timeout=30); r.raise_for_status()
    time.sleep(0.2)
    return r.json()

def _ttm_sum(items, n=4):
    if not items: return None
    vals = [x.get("val") for x in items][-n:]
    vals = [v for v in vals if v is not None]
    return float(np.nansum(vals)) if vals else None

def build_ttm_metrics(facts: dict) -> dict:
    usgaap = facts.get("facts", {}).get("us-gaap", {})
    def get_series(tag):
        return (usgaap.get(tag, {}).get("units", {}).get("USD", []) or
                usgaap.get(tag, {}).get("units", {}).get("USD/shares", []) or
                usgaap.get(tag, {}).get("units", {}).get("shares", []))
    revenue_q   = get_series("Revenues")
    ni_q        = get_series("NetIncomeLoss")
    eps_q       = usgaap.get("EarningsPerShareDiluted", {}).get("units", {}).get("USD/shares", [])
    dil_sh_q    = get_series("WeightedAverageNumberOfDilutedSharesOutstanding")
    cfo_q       = get_series("NetCashProvidedByUsedInOperatingActivities")
    capex_q     = get_series("PaymentsToAcquirePropertyPlantAndEquipment")

    revenue_ttm     = _ttm_sum(revenue_q)
    net_income_ttm  = _ttm_sum(ni_q)
    eps_ttm         = _ttm_sum(eps_q)
    diluted_sh_ttm  = _ttm_sum(dil_sh_q)
    cfo_ttm         = _ttm_sum(cfo_q)
    capex_ttm       = _ttm_sum(capex_q)
    fcf_ttm         = (cfo_ttm or 0.0) - abs(capex_ttm or 0.0)

    return dict(
        revenue_ttm=revenue_ttm, net_income_ttm=net_income_ttm,
        eps_diluted_ttm=eps_ttm, diluted_shares_ttm=diluted_sh_ttm,
        cfo_ttm=cfo_ttm, capex_ttm=capex_ttm, fcf_ttm=fcf_ttm,
        rev_per_share=(revenue_ttm / diluted_sh_ttm) if (revenue_ttm and diluted_sh_ttm) else None,
    )

SECTOR_MULTIPLES = {"Technology":{"PE":30.0,"PS":6.8}, "_default":{"PE":18.0,"PS":2.5}}

def multiples_anchor(metrics:dict, sector="Technology"):
    cfg = SECTOR_MULTIPLES.get(sector, SECTOR_MULTIPLES["_default"])
    eps = metrics.get("eps_diluted_ttm")
    rps = metrics.get("rev_per_share")
    pe_anchor = eps * cfg["PE"] if eps else None
    ps_anchor = rps * cfg["PS"] if rps else None
    anchors = [x for x in (pe_anchor, ps_anchor) if x is not None and math.isfinite(x)]
    mid = float(np.mean(anchors)) if anchors else None
    return {"pe_anchor":pe_anchor, "ps_anchor":ps_anchor, "fair_value_mid":mid, "assumptions":cfg}

def dcf_anchor(metrics:dict, years=5, g=0.04, r=0.095, g_term=0.02):
    fcf0 = metrics.get("fcf_ttm")
    sh   = metrics.get("diluted_shares_ttm")
    if not fcf0 or not sh or sh <= 0: return None
    pv, fcf = 0.0, fcf0
    for t in range(1, years+1):
        fcf *= (1+g); pv += fcf / ((1+r)**t)
    terminal = (fcf * (1+g_term)) / (r - g_term)
    pv_term  = terminal / ((1+r)**years)
    return (pv + pv_term) / sh

def blended_fair_value(mult_mid, dcf_val, w=0.5):
    if mult_mid is None and dcf_val is None: return None
    if mult_mid is None: return dcf_val
    if dcf_val is None:  return mult_mid
    return float(w*mult_mid + (1-w)*dcf_val)


## 7) Analyst Revision Events — Finnhub or CSV

In [None]:

def finnhub_price_targets(ticker: str, months_back: int = 6) -> pd.DataFrame:
    # Fetch price target info if your Finnhub plan supports it; else returns empty.
    if not FINNHUB_API_KEY:
        return pd.DataFrame()
    since = (datetime.utcnow() - timedelta(days=30*months_back)).strftime("%Y-%m-%d")
    url = f"{FINNHUB_BASE}/stock/price-target"
    params = {"symbol": ticker.upper(), "from": since, "token": FINNHUB_API_KEY}
    r = _session().get(url, params=params, timeout=30)
    if r.status_code != 200:
        return pd.DataFrame()
    j = r.json() or {}
    # If your plan returns a snapshot-like dict, normalize basic fields
    if isinstance(j, dict) and "target" in j:
        rows = [{
            "ticker": ticker.upper(),
            "event_dt": datetime.utcnow().date(),
            "old_target": np.nan,
            "new_target": j.get("target"),
            "rating_action": "update",
            "analyst": j.get("lastUpdatedSource"),
            "source": "finnhub"
        }]
        return pd.DataFrame(rows)
    return pd.DataFrame()

def load_analyst_events_csv(path: str) -> pd.DataFrame:
    # CSV columns: ticker,event_dt,old_target,new_target,rating_action,analyst,source
    df = pd.read_csv(path)
    df["event_dt"] = pd.to_datetime(df["event_dt"]).dt.date
    for col in ["old_target","new_target"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    return df

def build_revision_features(df_events: pd.DataFrame) -> pd.DataFrame:
    if df_events.empty: return df_events
    out = df_events.copy()
    if "old_target" in out.columns and "new_target" in out.columns:
        out["rev_mag_pct"] = (out["new_target"]/out["old_target"] - 1.0) * 100.0
    else:
        out["rev_mag_pct"] = np.nan
    out["is_upgrade"] = out.get("rating_action","").astype(str).str.lower().str.contains("upgrad").astype(int)
    out["is_downgrade"] = out.get("rating_action","").astype(str).str.lower().str.contains("downgrad").astype(int)
    return out


## 8) Disagreement & Confirmation Rules

In [None]:

@dataclass
class DisagreementThresholds:
    target_over_fair_x: float = 1.2
    min_rev_mag_pct: float   = 5.0
    min_sent_div: float      = 0.05
    vwap_band: float         = 1.0

def compute_fair_value_snapshot(ticker: str, sector_hint="Technology") -> Optional[float]:
    try:
        facts = get_company_facts(ticker)
        metrics = build_ttm_metrics(facts)
        mult_mid = multiples_anchor(metrics, sector=sector_hint)["fair_value_mid"]
        dcf_val  = dcf_anchor(metrics)
        return blended_fair_value(mult_mid, dcf_val, w=0.5)
    except Exception:
        return None

def compute_recent_news_sentiment(ticker: str, months_back: int = NEWS_MONTHS_BACK) -> Optional[float]:
    try:
        df = fetch_news_single_ticker(ticker, days_back=months_back*30, limit=100)
        if df.empty: return None
        pipe = load_finbert()
        df = score_news_finbert(df, pipe)
        df["date"] = df["dt"].dt.date
        d = df.groupby("date")["net_sentiment"].mean().sort_index()
        d = d.rolling(3, min_periods=1).mean()
        return float(d.iloc[-1])
    except Exception:
        return None

def technical_confirmation(df_ohlcv: pd.DataFrame, event_date: date, direction: int, vwap_band: float=1.0) -> bool:
    if df_ohlcv.empty: return False
    start = (pd.to_datetime(event_date) - pd.Timedelta(days=60)).date()
    df = df_ohlcv[df_ohlcv["date"] >= start].copy()
    df = compute_vwap_and_sigma(df, window=VWAP_WINDOW_DAYS, sigma_win=VWAP_SIGMA_WINDOW_DAYS)
    mom = momentum_flip(df, look=REV_MOM_LOOKBACK_DAYS)
    df["mom_flip"] = mom
    s = df[df["date"] >= event_date]
    if s.empty: return False
    row = s.iloc[0]
    if direction == -1:
        cond_band = (row["close"] <= (row["vwap"] + vwap_band * row["vwap_sigma"]))
        cond_mom  = (row["mom_flip"] <= 0)
        return bool(cond_band and cond_mom)
    else:
        cond_band = (row["close"] >= (row["vwap"] - vwap_band * row["vwap_sigma"]))
        cond_mom  = (row["mom_flip"] >= 0)
        return bool(cond_band and cond_mom)


## 9) Trade Simulator (28d with trailing stop)

In [None]:

def simulate_trade_28d(df_ohlc: pd.DataFrame,
                       start_date: date,
                       direction: int,
                       tsl_pct: float = TRAIL_STOP_PCT,
                       max_days: int = MAX_HOLD_DAYS) -> dict:
    df = df_ohlc[df_ohlc["date"] >= start_date].reset_index(drop=True)
    if df.empty:
        return {"filled": False, "pnl_pct": np.nan}

    entry_open = float(df.loc[0, "open"])
    entry_day  = df.loc[0, "date"]

    run_max = entry_open
    run_min = entry_open

    def _update_trail_long(px_max): return px_max * (1 - tsl_pct)
    def _update_trail_short(px_min): return px_min * (1 + tsl_pct)

    exit_reason = "time"
    exit_price  = float(df.iloc[min(max_days-1, len(df)-1)]["close"])
    exit_day    = df.iloc[min(max_days-1, len(df)-1)]["date"]

    for i in range(len(df)):
        o,h,l,c, d = float(df.loc[i,"open"]), float(df.loc[i,"high"]), float(df.loc[i,"low"]), float(df.loc[i,"close"]), df.loc[i,"date"]
        if direction == 1:
            run_max = max(run_max, h)
            stop = _update_trail_long(run_max)
            if l <= stop:
                exit_reason, exit_price, exit_day = "tsl", stop, d
                break
        else:
            run_min = min(run_min, l)
            stop = _update_trail_short(run_min)
            if h >= stop:
                exit_reason, exit_price, exit_day = "tsl", stop, d
                break

    pnl = (exit_price / entry_open - 1.0) * 100.0 if direction == 1 else (entry_open / exit_price - 1.0) * 100.0
    return {
        "filled": True,
        "entry_date": entry_day,
        "exit_date": exit_day,
        "entry_price": entry_open,
        "exit_price": exit_price,
        "exit_reason": exit_reason,
        "pnl_pct": float(round(pnl, 3))
    }


## 10) Backtester Orchestrator

In [None]:

def backtest_events(events: pd.DataFrame,
                    thresholds: DisagreementThresholds,
                    sector_hint="Technology") -> pd.DataFrame:
    recs = []
    for _, ev in events.iterrows():
        tkr = ev["ticker"]
        ev_dt = ev["event_dt"]
        end = ev_dt + timedelta(days=MAX_HOLD_DAYS+5)
        start = (ev_dt - timedelta(days=80)).isoformat()
        df_px = polygon_daily_ohlcv(tkr, start, end.isoformat())
        if df_px.empty: 
            continue

        direction = -1 if (ev.get("rev_mag_pct", 0) > 0 or ev.get("is_upgrade",0)==1) else 1

        fair = compute_fair_value_snapshot(tkr, sector_hint=sector_hint)
        last_close = df_px[df_px["date"] <= ev_dt]["close"].iloc[-1] if not df_px.empty else None
        if fair is None or last_close is None:
            continue

        tgt = ev.get("new_target", np.nan)
        if not np.isfinite(tgt): 
            continue
        disagree = (tgt > thresholds.target_over_fair_x * fair) if direction==-1 else (tgt < (fair / thresholds.target_over_fair_x))
        if not disagree: 
            continue

        if abs(ev.get("rev_mag_pct", 0.0)) < thresholds.min_rev_mag_pct:
            continue

        s_now = compute_recent_news_sentiment(tkr, months_back=NEWS_MONTHS_BACK)
        if s_now is None:
            continue
        news_disagree = (s_now < -thresholds.min_sent_div) if direction==-1 else (s_now > thresholds.min_sent_div)
        if not news_disagree:
            continue

        if not technical_confirmation(df_px, ev_dt, direction, vwap_band=thresholds.vwap_band):
            continue

        sim = simulate_trade_28d(df_px, ev_dt, direction, tsl_pct=TRAIL_STOP_PCT, max_days=MAX_HOLD_DAYS)
        if not sim["filled"]:
            continue

        rec = {
            "ticker": tkr, "event_dt": ev_dt, "direction": direction,
            "rev_mag_pct": ev.get("rev_mag_pct", np.nan),
            "new_target": tgt, "fair_value": fair, "news_sent": s_now,
            "entry_date": sim["entry_date"], "exit_date": sim["exit_date"],
            "pnl_pct": sim["pnl_pct"], "exit_reason": sim["exit_reason"]
        }
        recs.append(rec)
        time.sleep(0.2)
    return pd.DataFrame(recs)


## 11) Reporting

In [None]:

def report_results(df_trades: pd.DataFrame, title="Analyst Fade Backtest"):
    if df_trades.empty:
        print("No trades."); return
    df = df_trades.copy().sort_values("entry_date")
    equity = (1.0 + df["pnl_pct"]/100.0).cumprod()
    stats = {
        "n_trades": len(df),
        "avg_pnl_pct": float(df["pnl_pct"].mean()),
        "win_rate": float((df["pnl_pct"]>0).mean()),
        "total_return_pct": float((equity.iloc[-1]-1.0)*100.0)
    }
    print("=== Summary ===")
    for k,v in stats.items():
        print(f"{k}: {v:.3f}" if isinstance(v, float) else f"{k}: {v}")
    plt.figure(figsize=(10,5))
    plt.plot(equity.values)
    plt.title(title); plt.xlabel("Trade #"); plt.ylabel("Cumulative Growth (×)"); plt.grid(True)
    plt.show()


## 12) Run — choose events source (CSV or Finnhub), thresholds, backtest

In [None]:

USE_CSV_EVENTS      = True                          # set False to try Finnhub (if plan supports)
CSV_EVENTS_PATH     = "analyst_events_sample.csv"   # supply your file
TICKERS             = ["AAPL","MSFT","NVDA","AMZN"] # used only for Finnhub demo

thresholds = DisagreementThresholds(
    target_over_fair_x = 1.2,
    min_rev_mag_pct    = 5.0,
    min_sent_div       = 0.05,
    vwap_band          = 1.0
)

if USE_CSV_EVENTS:
    events = load_analyst_events_csv(CSV_EVENTS_PATH)
else:
    frames = []
    for t in TICKERS:
        df = finnhub_price_targets(t, months_back=6)
        if not df.empty:
            df = df.rename(columns={"dt":"event_dt"}) if "dt" in df.columns else df
            frames.append(df)
        time.sleep(0.2)
    events = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

if events.empty:
    print("No events loaded. Point CSV_EVENTS_PATH to your file.")
else:
    events = build_revision_features(events)
    print(events.head())
    trades = backtest_events(events, thresholds, sector_hint="Technology")
    report_results(trades, title="Analyst Fade — Fundamentals & Sentiment-Filtered")
    trades.to_csv("analyst_fade_trades.csv", index=False)
    print("Saved analyst_fade_trades.csv")
