In [1]:
!pip -q install yfinance==0.2.52 pandas-datareader==0.10.0 python-dotenv==1.0.1 \
               requests==2.32.3 beautifulsoup4==4.12.3 lxml==5.3.0 rapidfuzz==3.9.7 \
               nltk==3.9.1 pydantic==2.9.2 PyYAML==6.0.2
import nltk; nltk.download("punkt", quiet=True)

True

## Environment 

In [4]:
import os, pathlib, re
from dotenv import load_dotenv, find_dotenv
ROOT = pathlib.Path.cwd() / "agent_finance_nb"
ROOT.mkdir(parents=True, exist_ok=True)
os.chdir(ROOT)  # ensure .env is here or parent
load_dotenv(find_dotenv(usecwd=True) or ".env")
# strip accidental whitespace from pasted keys
for k in ["OPENAI_API_KEY","NEWSAPI_KEY","ALPHAVANTAGE_API_KEY","FRED_API_KEY"]:
    v = os.getenv(k); 
    if v: os.environ[k] = re.sub(r"\s+", "", v)
# force official API base
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"

# minimal config / memory file
(mem_path := pathlib.Path("data/memory.json")).parent.mkdir(parents=True, exist_ok=True)
if not mem_path.exists(): mem_path.write_text("[]", encoding="utf-8")

##  Rate control core (token bucket + queue + circuit breaker) + disk cache

In [7]:
# rate_control.py
import time, json, pathlib, hashlib, threading, random
from typing import Any

CACHE_DIR = pathlib.Path("data/.cache"); CACHE_DIR.mkdir(parents=True, exist_ok=True)

def _cache_path(ns: str, key: str) -> pathlib.Path:
    h = hashlib.sha256(key.encode()).hexdigest()[:40]
    p = CACHE_DIR / ns; p.mkdir(exist_ok=True)
    return p / f"{h}.json"

def cache_get(ns: str, key: str, ttl_sec: int):
    p = _cache_path(ns, key)
    if not p.exists(): return None
    if time.time() - p.stat().st_mtime > ttl_sec: return None
    try: return json.loads(p.read_text())
    except: return None

def cache_set(ns: str, key: str, obj: Any):
    _cache_path(ns, key).write_text(json.dumps(obj))

class CircuitBreaker:
    def __init__(self, fail_threshold:int=3, open_seconds:int=30):
        self.fail_threshold = fail_threshold
        self.open_seconds = open_seconds
        self.fail_count = 0
        self.open_until = 0.0
        self.lock = threading.Lock()
    def record_success(self):
        with self.lock: self.fail_count = 0; self.open_until = 0.0
    def record_failure(self):
        with self.lock:
            self.fail_count += 1
            if self.fail_count >= self.fail_threshold:
                self.open_until = time.time() + self.open_seconds
    def is_open(self): 
        with self.lock: return time.time() < self.open_until

class TokenBucket:
    def __init__(self, capacity:int, refill_per_sec:float):
        self.capacity = capacity; self.tokens = capacity
        self.refill_per_sec = refill_per_sec
        self.last = time.time(); self.lock = threading.Lock()
    def _refill(self):
        now = time.time(); dt = now - self.last; self.last = now
        self.tokens = min(self.capacity, self.tokens + dt*self.refill_per_sec)
    def take(self, tokens:float=1.0):
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens; return 0.0
            deficit = tokens - self.tokens
            wait = deficit / self.refill_per_sec
            self.tokens = 0.0
        return wait

class RequestQueue:
    def __init__(self, max_concurrent:int=1, min_gap_sec:float=0.3):
        self.sema = threading.Semaphore(max_concurrent)
        self.min_gap = min_gap_sec
        self.last_ts = 0.0
        self.lock = threading.Lock()
    def acquire(self):
        self.sema.acquire()
        with self.lock:
            gap = time.time() - self.last_ts
            if gap < self.min_gap: time.sleep(self.min_gap - gap)
    def release(self):
        with self.lock: self.last_ts = time.time()
        self.sema.release()

def honor_retry_after(resp, default_sleep):
    try:
        ra = resp.headers.get("Retry-After")
        if not ra: return default_sleep
        val = float(ra) if ra.isdigit() else default_sleep
        return max(val, default_sleep)
    except: return default_sleep

def guarded_call(name:str, bucket:TokenBucket, queue:RequestQueue, breaker:CircuitBreaker, fn, *, max_retries:int=5, base:float=0.7, max_sleep:float=12.0):
    if breaker.is_open(): raise RuntimeError(f"{name} circuit open; cooling down.")
    w = bucket.take(1.0)
    if w>0: time.sleep(w)
    queue.acquire()
    try:
        last_exc = None
        for i in range(max_retries):
            try:
                resp = fn(); breaker.record_success(); return resp
            except Exception as e:
                status = getattr(getattr(e, "response", None), "status_code", None)
                breaker.record_failure()
                if i == max_retries-1: raise
                sleep = min(max_sleep, base*(2**i))*(0.7 + 0.6*random.random())
                resp = getattr(e, "response", None)
                if resp is not None: sleep = honor_retry_after(resp, sleep)
                print(f"[{name}] retry {i+1}/{max_retries} status={status} sleep={sleep:.2f}s")
                time.sleep(sleep); last_exc = e
        raise last_exc
    finally:
        queue.release()

# Provider profiles (conservative defaults)
OPENAI_BUCKET = TokenBucket(capacity=50, refill_per_sec=50/60)
OPENAI_Q = RequestQueue(max_concurrent=1, min_gap_sec=0.25)
OPENAI_CB = CircuitBreaker(fail_threshold=2, open_seconds=15)

ALPHA_BUCKET = TokenBucket(capacity=4, refill_per_sec=4/60)     # Alpha Vantage ~5rpm free → set 4rpm
ALPHA_Q = RequestQueue(max_concurrent=1, min_gap_sec=0.6)
ALPHA_CB = CircuitBreaker(fail_threshold=2, open_seconds=30)

NEWS_BUCKET = TokenBucket(capacity=10, refill_per_sec=10/60)    # modest throttle
NEWS_Q = RequestQueue(max_concurrent=1, min_gap_sec=0.4)
NEWS_CB = CircuitBreaker(fail_threshold=2, open_seconds=20)

## LLM client

In [10]:
# llm_client.py
import os, requests, json, hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional

@dataclass
class ChatMessage:
    role: str
    content: str

def _llm_ckey(payload: dict) -> str:
    return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()

class LLM:
    def __init__(self, model: str="gpt-4o-mini", temperature: float=0.2, base_url: Optional[str]=None):
        base = base_url or os.getenv("OPENAI_BASE_URL") or "https://api.openai.com/v1"
        if not base.startswith("https://api.openai.com"): base = "https://api.openai.com/v1"
        self.base = base.rstrip("/")
        self.model = model; self.temperature = temperature
        self.key = (os.getenv("OPENAI_API_KEY") or "").strip()
        if not self.key: raise RuntimeError("OPENAI_API_KEY missing")

    def chat(self, messages: List[Dict] | List[ChatMessage], max_tokens: int = 700) -> str:
        headers = {"Authorization": f"Bearer {self.key}", "Content-Type": "application/json"}
        payload = {
            "model": self.model,
            "temperature": self.temperature,
            "max_tokens": max_tokens,
            "messages": [{"role": (m["role"] if isinstance(m, dict) else m.role),
                          "content": (m["content"] if isinstance(m, dict) else m.content)} for m in messages],
        }
        ckey = _llm_ckey({"url": self.base + "/chat/completions", **payload})
        hit = cache_get("llm", ckey, ttl_sec=3600)
        if hit: return hit["text"]

        def _post():
            r = requests.post(self.base + "/chat/completions", json=payload, headers=headers, timeout=60, allow_redirects=False)
            if r.is_redirect and r.status_code in (307,308) and r.headers.get("Location"):
                r = requests.post(r.headers["Location"], json=payload, headers=headers, timeout=60, allow_redirects=False)
            r.raise_for_status(); return r

        r = guarded_call("openai", OPENAI_BUCKET, OPENAI_Q, OPENAI_CB, _post, max_retries=6, base=0.8, max_sleep=20.0)
        text = r.json()["choices"][0]["message"]["content"].strip()
        cache_set("llm", ckey, {"text": text})
        return text

## Data tools (Prices → Stooq→Yahoo→Alpha; NewsAPI; FRED macro)

In [13]:
# price_tool.py
import datetime as dt, requests, yfinance as yf
from pandas_datareader import data as pdr
from typing import Dict, Any
import os

ALPHA_KEY = os.getenv("ALPHAVANTAGE_API_KEY")

def _hist_to_rows(df, days:int):
    if df is None or df.empty: return []
    df = df.sort_index()
    df = df.tail(days)
    rows=[]
    for idx, row in df.iterrows():
        rows.append({
            "date": (idx.date().isoformat() if hasattr(idx, "date") else str(idx)),
            "open": float(row.get("Open", row.get("open", 0.0))),
            "high": float(row.get("High", row.get("high", 0.0))),
            "low":  float(row.get("Low",  row.get("low", 0.0))),
            "close":float(row.get("Close",row.get("close",0.0))),
            "volume": int(row.get("Volume", row.get("volume", 0))),
        })
    return rows

class PriceTool:
    def fetch(self, symbol: str, days: int = 20) -> Dict[str, Any]:
        key = f"{symbol}:{days}"
        hit = cache_get("prices", key, ttl_sec=3600)
        if hit: return hit

        # 1) Stooq (daily)
        def _stooq():
            df = pdr.DataReader(symbol, "stooq")
            return _hist_to_rows(df, days)
        try:
            rows = guarded_call("stooq", NEWS_BUCKET, NEWS_Q, NEWS_CB, _stooq, max_retries=3, base=0.6, max_sleep=6.0)
            if rows:
                out = {"symbol": symbol, "history": rows}; cache_set("prices", key, out); return out
        except Exception as e: print("[prices] stooq failed:", e)

        # 2) Yahoo
        def _yahoo():
            end = dt.date.today(); start = end - dt.timedelta(days=days*2)
            df = yf.download(symbol, start=start.isoformat(), end=end.isoformat(), progress=False)
            if df is None or df.empty:
                resp = requests.Response(); resp.status_code = 429
                raise requests.HTTPError(response=resp)
            return _hist_to_rows(df, days)
        try:
            rows = guarded_call("yahoo", NEWS_BUCKET, NEWS_Q, NEWS_CB, _yahoo, max_retries=4, base=0.8, max_sleep=10.0)
            if rows:
                out = {"symbol": symbol, "history": rows}; cache_set("prices", key, out); return out
        except Exception as e: print("[prices] yahoo failed:", e)

        # 3) Alpha Vantage (strict)
        def _alpha():
            r = requests.get(
                "https://www.alphavantage.co/query",
                params={"function":"TIME_SERIES_DAILY_ADJUSTED","symbol":symbol,"outputsize":"compact","apikey":ALPHA_KEY},
                timeout=20)
            r.raise_for_status()
            ts = r.json().get("Time Series (Daily)", {}) or {}
            rows_sorted = sorted(ts.items())[-days:]
            out=[]
            for d,v in rows_sorted:
                out.append({
                    "date": d, "open": float(v["1. open"]), "high": float(v["2. high"]),
                    "low": float(v["3. low"]), "close": float(v["4. close"]),
                    "volume": int(float(v.get("6. volume","0")))
                })
            return out
        try:
            rows = guarded_call("alpha", ALPHA_BUCKET, ALPHA_Q, ALPHA_CB, _alpha, max_retries=4, base=0.8, max_sleep=20.0)
            out = {"symbol": symbol, "history": rows}; cache_set("prices", key, out); return out
        except Exception as e:
            print("[prices] alpha failed:", e)
            return {"symbol": symbol, "history": []}

In [15]:
# news_tool.py
import os, requests
from typing import List, Dict
class NewsTool:
    def __init__(self): self.key = os.getenv("NEWSAPI_KEY")
    def fetch(self, symbol: str, limit: int = 12) -> List[Dict]:
        key = f"{symbol}:{limit}"
        hit = cache_get("news", key, ttl_sec=600)
        if hit: return hit
        def _call():
            r = requests.get("https://newsapi.org/v2/everything",
                             params={"q": symbol, "pageSize": limit, "language":"en",
                                     "sortBy":"publishedAt","apiKey": self.key},
                             timeout=30)
            r.raise_for_status()
            return [{"title": a.get("title"), "url": a.get("url"),
                     "publishedAt": a.get("publishedAt"), "source": a.get("source",{}).get("name"),
                     "content": a.get("content") or ""} for a in r.json().get("articles", [])]
        items = guarded_call("newsapi", NEWS_BUCKET, NEWS_Q, NEWS_CB, _call, max_retries=5, base=0.7, max_sleep=15.0)
        cache_set("news", key, items)
        return items[:limit]

In [17]:
# fred_tool.py
import os, requests
from typing import Dict, Any, List
FRED_KEY = os.getenv("FRED_API_KEY")
FRED_BUCKET = TokenBucket(capacity=30, refill_per_sec=30/60)
FRED_Q = RequestQueue(max_concurrent=1, min_gap_sec=0.2)
FRED_CB = CircuitBreaker(fail_threshold=2, open_seconds=10)

class FREDTool:
    BASE = "https://api.stlouisfed.org/fred"
    def _series_obs(self, sid: str) -> List[Dict[str, Any]]:
        if not FRED_KEY: return []
        params = {"series_id": sid, "api_key": FRED_KEY, "file_type": "json"}
        def _call():
            r = requests.get(f"{self.BASE}/series/observations", params=params, timeout=20)
            r.raise_for_status(); return r.json().get("observations", [])
        obs = guarded_call("fred", FRED_BUCKET, FRED_Q, FRED_CB, _call, max_retries=4, base=0.6, max_sleep=8.0)
        return [o for o in obs if o.get("value") not in (None, ".", "")]
    def latest_snapshot(self) -> Dict[str, Any]:
        series = {"UNRATE":"Unemployment Rate (%)","DGS10":"10Y Treasury Yield (%)",
                  "CPIAUCSL":"CPI (Index 1982-84=100)", "FEDFUNDS":"Fed Funds Rate (%)"}
        out={}
        for sid,name in series.items():
            key=f"fred:{sid}"; hit=cache_get("fred", key, ttl_sec=6*3600)
            if hit: out[name]=hit; continue
            obs=self._series_obs(sid)
            if obs:
                last=obs[-1]; val={"date": last.get("date"), "value": last.get("value")}
                out[name]=val; cache_set("fred", key, val)
        return out

## Prompt chain, routing, analyzers, quant, macro

In [20]:
# chain + analyzers
from nltk.tokenize import sent_tokenize
import re, html, hashlib, math
from typing import List, Dict

def _norm_title(t): 
    if not t: return ""
    t = html.unescape(t); t = re.sub(r"\s+", " ", t).strip()
    t = re.sub(r"\s*[-–—]\s*(Bloomberg|Reuters|WSJ|CNBC|Yahoo Finance|The Verge|TechCrunch)\s*$","",t,flags=re.I)
    return t

def _dedupe(items: List[Dict]) -> List[Dict]:
    seen=set(); out=[]
    for d in items:
        t=_norm_title(d.get("title")); u=(d.get("url") or "").strip()
        key=hashlib.sha256((t+"|"+u).encode()).hexdigest()[:16]
        if key in seen: continue
        seen.add(key); out.append({**d,"title":t})
    return out

def pc_ingest(news_items: List[Dict]) -> List[Dict]: return _dedupe(news_items or [])

def pc_preprocess(items: List[Dict]) -> List[Dict]:
    out=[]
    for d in items:
        t=_norm_title(d.get("title"))
        text=(d.get("content") or t or "").strip()
        text=re.sub(r"<[^>]+>"," ",text); text=re.sub(r"\s+"," ",text).strip()
        sents=sent_tokenize(text) if text else []
        out.append({**d,"title":t,"sentences":sents,"raw":text})
    return out

# classifier
EARN=r"(earnings|results|q[1-4]\b|fY\d{2}|guidance|eps|revenue|margin|outlook)"
ANALYST=r"(upgrade|downgrade|price target|initiates coverage|maintains|overweight|underweight|buy rating|sell rating|neutral)"
MACRO=r"(fed|federal reserve|cpi|inflation|ppi|yield|treasury|jobs report|payrolls|unemployment|rate hike|rate cut|dot plot)"
MA=r"(acquire|acquisition|merger|merge|takeover|buyout|M&A|spinoff|spin-off|divest)"
LEGAL=r"(lawsuit|sues|sued|settlement|probe|investigation|antitrust|FTC|DoJ|SEC|regulator|regulatory)"
PRODUCT=r"(launch|unveil|introduc(e|es|ed)|ship|rollout|preorder|prototype|roadmap|AI model|chip|GPU|iPhone|feature)"
CAPITAL=r"(buyback|repurchase|dividend|payout|special dividend|split|secondary|offering|convertible|notes)"
MGMT=r"(CEO|CFO|COO|chair|exec|resigns|steps down|appointed|names|leadership|board)"
SUPPLY=r"(supply chain|shortage|capacity|inventory|backlog|fabs?|foundry|wafer|shipment|logistics|strike)"
RISK=r"(recall|outage|security|hack|breach|downtime|fire|accident|halt|ban|sanction)"
_PATTERNS=[("earnings",EARN),("analyst",ANALYST),("macro",MACRO),("ma",MA),("legal",LEGAL),("product",PRODUCT),
           ("capital",CAPITAL),("management",MGMT),("supply",SUPPLY),("risk",RISK)]

def pc_classify(items: List[Dict]) -> List[Dict]:
    out=[]
    for d in items:
        t=(d.get("title") or "").lower(); scores=[]
        for name,pat in _PATTERNS:
            if re.search(pat,t,re.I): scores.append(name)
        label=scores[0] if scores else "news"
        out.append({**d,"label":label})
    return out

def pc_extract(items: List[Dict]) -> List[Dict]:
    out=[]
    for d in items:
        s=d.get("sentences",[]); top=sorted(s,key=len,reverse=True)[:2]
        out.append({**d,"keypoints":top})
    return out

def pc_summarize(llm, symbol: str, items: List[Dict]) -> str:
    if not items:
        return f"No recent news for {symbol}. Monitor earnings, guidance, and macro catalysts."
    bullets=[]
    for d in items[:5]:
        bullets.append(f"- [{d.get('label','news')}] {d.get('title')}")
        for kp in d.get("keypoints",[])[:2]: bullets.append(f"  • {kp}")
    prompt=(f"You are a finance analyst. Summarize these items for {symbol} in 120-180 words, "
            f"calling out catalysts and risks.\n\n"+"\n".join(bullets))
    return llm.chat([{"role":"system","content":"Be concise, neutral, decision-useful."},
                     {"role":"user","content":prompt}], max_tokens=600)

# analyzers + quant + macro
def _pct_ret(hist: List[Dict], n:int)->float:
    if len(hist)<n+1: return float("nan")
    p0=hist[-(n+1)]["close"]; p1=hist[-1]["close"]
    return (p1-p0)/p0*100.0 if p0 else float("nan")
def _sma(hist: List[Dict], n:int)->float:
    if len(hist)<n: return float("nan")
    return sum(h["close"] for h in hist[-n:])/n
def _vol(hist: List[Dict], n:int=10)->float:
    if len(hist)<n+1: return float("nan")
    import math
    rets=[]
    for i in range(-n,0):
        p0=hist[i-1]["close"]; p1=hist[i]["close"]
        if p0 and p1: rets.append(math.log(p1/p0))
    if not rets: return float("nan")
    m=sum(rets)/len(rets); var=sum((r-m)**2 for r in rets)/(len(rets)-1)
    return (var**0.5)* (252**0.5) *100.0

def build_metrics(prices: Dict)->Dict[str,float]:
    h=prices.get("history",[])
    if not h: return {}
    return {"R5%":_pct_ret(h,5),"R10%":_pct_ret(h,10),"R20%":_pct_ret(h,20),
            "SMA5":_sma(h,5),"SMA10":_sma(h,10),"SMA20":_sma(h,20),"Vol10%":_vol(h,10)}

def _fmt(x): 
    return "n/a" if (x is None or isinstance(x,float) and (math.isnan(x) or math.isinf(x))) else f"{x:.2f}"

def metrics_table(m: Dict[str,float])->str:
    if not m: return "(no price history)"
    order=["R5%","R10%","R20%","SMA5","SMA10","SMA20","Vol10%"]
    hdr="| Metric | Value |\n|---|---|\n"
    rows="\n".join([f"| {k} | {_fmt(m.get(k))}{'%' if k.endswith('%') else ''} |" for k in order])
    return hdr+rows

def analyze_news(symbol: str, docs: List[Dict]) -> str:
    titles="; ".join(d.get("title","") for d in docs[:3])
    return f"News summary for {symbol}: {titles}"

def analyze_earnings(symbol: str, docs: List[Dict]) -> str:
    titles=", ".join(d.get("title","") for d in docs[:3])
    return f"Earnings items for {symbol}: {titles}"

def analyze_market(symbol: str, docs: List[Dict], *, prices: Dict|None=None, fred: Dict|None=None) -> str:
    m=build_metrics(prices or {})
    if not m: return f"Market snapshot for {symbol}: no price history."
    trend="up" if not math.isnan(m["SMA5"]) and not math.isnan(m["SMA20"]) and m["SMA5"]>=m["SMA20"] else "down/flat"
    fred_bits=[]
    if fred:
        for k in ["10Y Treasury Yield (%)","Fed Funds Rate (%)","Unemployment Rate (%)"]:
            if k in fred: v=fred[k]; fred_bits.append(f"{k.split(' (')[0]} {v['value']} (as of {v['date']})")
    macro=" | ".join(fred_bits) if fred_bits else "macro: n/a"
    return (f"{symbol}: r5={_fmt(m['R5%'])}% r10={_fmt(m['R10%'])}% r20={_fmt(m['R20%'])}% "
            f"SMA5={_fmt(m['SMA5'])} vs SMA20={_fmt(m['SMA20'])} → trend {trend}. "
            f"Vol10={_fmt(m['Vol10%'])}%. {macro}")

SPECIALISTS={"earnings": analyze_earnings, "analyst": analyze_news, "macro": analyze_news,
             "ma": analyze_news, "legal": analyze_news, "product": analyze_news,
             "capital": analyze_news, "management": analyze_news, "supply": analyze_news,
             "risk": analyze_news, "news": analyze_news, "market": analyze_market}

def route_docs(docs: List[Dict])->Dict[str,List[Dict]]:
    buckets={}
    for d in docs:
        label=d.get("label","news")
        buckets.setdefault(label,[]).append(d)
    return buckets

def run_specialists(symbol: str, buckets: Dict[str,List[Dict]], *, prices=None, fred=None)->Dict[str,str]:
    out={}
    for label,docs in buckets.items():
        if SPECIALISTS.get(label) is analyze_market:
            out[label]=analyze_market(symbol, docs, prices=prices, fred=fred)
        else:
            out[label]=SPECIALISTS.get(label, analyze_news)(symbol, docs)
    return out

## Evaluator–Optimizer 

In [23]:
from pydantic import BaseModel, Field, ValidationError, conint
from typing import Dict, Any
import json, re

class EvalJSON(BaseModel):
    accuracy: conint(ge=1, le=5)
    specificity: conint(ge=1, le=5)
    evidence: conint(ge=1, le=5)
    risks: conint(ge=1, le=5)
    actionability: conint(ge=1, le=5)
    feedback: str = Field(default="")

WEIGHTS = {"accuracy":0.30,"specificity":0.15,"evidence":0.20,"risks":0.15,"actionability":0.20}
THRESHOLD = 78.0
MAX_OPT_PASSES = 1

RUBRIC_MSG=('Score 1–5 on accuracy, specificity, evidence, risks, actionability. '
            'Return ONLY JSON like {"accuracy":3,"specificity":3,"evidence":3,"risks":3,"actionability":3,"feedback":"..."}')

def _json_from_text(txt:str)->Dict[str,Any]:
    try: return json.loads(txt)
    except: pass
    m=re.search(r"\{.*\}",txt,re.S)
    if m:
        try: return json.loads(m.group(0))
        except: pass
    out={}
    for k in ["accuracy","specificity","evidence","risks","actionability"]:
        mm=re.search(fr'"?{k}"?\s*:\s*(\d)',txt,re.I)
        if mm: out[k]=int(mm.group(1))
    fb=re.search(r'"?feedback"?\s*:\s*"(.*)"',txt,re.S|re.I)
    out["feedback"]=(fb.group(1) if fb else txt)[:1200]
    return out

def _validate_eval(d:Dict[str,Any])->EvalJSON:
    for k in ["accuracy","specificity","evidence","risks","actionability"]:
        try: d[k]=max(1,min(5,int(d.get(k,3))))
        except: d[k]=3
    d.setdefault("feedback","")
    try: return EvalJSON(**d)
    except ValidationError: return EvalJSON(**{**{k:3 for k in ["accuracy","specificity","evidence","risks","actionability"]}, "feedback": d.get("feedback","")})

def evaluate_strict(llm, symbol:str, draft:str)->Dict[str,Any]:
    msgs=[{"role":"system","content":"You are a strict finance editor. Output JSON only."},
          {"role":"user","content": f"{RUBRIC_MSG}\n\nDRAFT for {symbol}:\n{draft}"}]
    raw=llm.chat(msgs, max_tokens=350)
    ej=_validate_eval(_json_from_text(raw))
    overall=sum(getattr(ej,k)*w for k,w in WEIGHTS.items())/5*100.0
    return {"raw":raw,"json":ej.model_dump(),"overall":round(overall,1)}

def optimize_once(llm, symbol:str, draft:str, feedback:str)->str:
    msgs=[{"role":"system","content":"You turn finance-editor feedback into concise, evidence-based briefs."},
          {"role":"user","content": (f"Improve the draft for {symbol}. Keep under 220 words. "
                                     f"Be specific, cite concrete items already present; avoid speculation.\n"
                                     f"FEEDBACK:\n{feedback}\n\nDRAFT:\n{draft}")}]
    txt=llm.chat(msgs, max_tokens=500)
    ws=txt.split(); return " ".join(ws[:220])

def eval_optimize_loop(llm, symbol:str, draft:str, *, threshold:float=THRESHOLD, max_iters:int=MAX_OPT_PASSES):
    history=[]; current=" ".join(draft.split()[:220])
    for i in range(max_iters):
        ev=evaluate_strict(llm, symbol, current); history.append(ev)
        if ev["overall"]>=threshold: return {"final":current,"evals":history}
        fb=ev["json"]["feedback"] or "Tighten, add evidence, clarify risks/catalysts."
        current=optimize_once(llm, symbol, current, fb)
    ev=evaluate_strict(llm, symbol, current); history.append(ev)
    return {"final":current,"evals":history}

## Agent + wiring + run

In [26]:
# memory
import json
class Memory:
    def __init__(self, path:str="data/memory.json"):
        self.path=path; pathlib.Path(path).parent.mkdir(parents=True, exist_ok=True)
        if not pathlib.Path(path).exists(): pathlib.Path(path).write_text("[]","utf-8")
    def remember(self, shard:dict):
        arr=json.loads(open(self.path,"r",encoding="utf-8").read()); arr.append(shard)
        open(self.path,"w",encoding="utf-8").write(json.dumps(arr,indent=2))
    def recall(self, symbol:str, limit:int=2):
        try:
            arr=json.loads(open(self.path,"r",encoding="utf-8").read())
            return [a for a in arr[::-1] if a.get("symbol")==symbol][:limit]
        except: return []

# agent
from typing import Any
class ResearchAgent:
    name="researcher"
    def __init__(self, tools:dict[str,Any], memory:Memory):
        self.tools=tools; self.memory=memory
    def plan(self, symbol:str)->list[str]:
        return ["Fetch prices","Fetch news","Prompt chain","Route to specialists","Draft brief"]
    def act(self, symbol:str, days:int=20)->dict:
        prices=self.tools["prices"].fetch(symbol, days=days)
        news=self.tools["news"].fetch(symbol, limit=12)
        fred=self.tools["fred"].latest_snapshot() if self.tools.get("fred") else {}

        items=pc_preprocess(pc_ingest(news))
        tagged=pc_classify(items)
        extracted=pc_extract(tagged)
        summary=pc_summarize(self.tools["llm"], symbol, extracted)

        buckets=route_docs(tagged)
        specialist=run_specialists(symbol, buckets, prices=prices, fred=fred)

        m=build_metrics(prices); mtable=metrics_table(m)
        market_line=analyze_market(symbol, [], prices=prices, fred=fred)

        draft=("\n".join([
            f"Prices (last {len(prices.get('history',[]))} days) fetched.",
            specialist.get("earnings",""), specialist.get("analyst",""), specialist.get("macro",""),
            specialist.get("ma",""), specialist.get("legal",""), specialist.get("product",""), specialist.get("news",""),
            market_line, "Metrics:", mtable, "News Summary:\n"+summary
        ])).strip()

        self.memory.remember({"symbol":symbol,"notes":{**specialist,"macro":fred},"last_summary":summary[:500]})
        return {"prices":prices,"news":news,"draft":draft,"macro":fred,"metrics":m}

# wire
llm = LLM(model="gpt-4o-mini", temperature=0.2)
prices = PriceTool()
news = NewsTool()
fred = FREDTool()
memory = Memory(str(mem_path))

TOOLS={"llm":llm,"prices":prices,"news":news,"fred":fred}
researcher=ResearchAgent(TOOLS, memory)

# run
symbol="AAPL"  # change as needed
plan=researcher.plan(symbol); print("== PLAN ==\n- "+"\n- ".join(plan))
res=researcher.act(symbol, days=20)
result=eval_optimize_loop(llm, symbol, res["draft"], threshold=THRESHOLD, max_iters=MAX_OPT_PASSES)

print("\n== FINAL BRIEF ==\n"+result["final"])
print("\n== MEMORY (latest) ==\n", memory.recall(symbol, limit=2))

== PLAN ==
- Fetch prices
- Fetch news
- Prompt chain
- Route to specialists
- Draft brief
[openai] retry 1/6 status=429 sleep=0.93s
[openai] retry 2/6 status=429 sleep=1.23s
[openai] retry 3/6 status=429 sleep=3.72s
[openai] retry 4/6 status=429 sleep=7.76s
[openai] retry 5/6 status=429 sleep=14.13s


HTTPError: 429 Client Error: Too Many Requests for url: https://api.openai.com/v1/chat/completions