# Regime Aligned Fundamental Screener

This agentic workflow is designed to assess whether current economic conditions are favorable for a specific company, based on fundamental financial criteria. It introduces an economic regime-aware screening process, using a prompt-chaining structure and optional human-in-the-loop inputs. This use case demonstrates how reasoning-capable LLM agents can blend macroeconomic awareness, financial data retrieval, and conditional logic to generate informed assessments.

## Workflow Summary

This use case follows a **router workflow pattern**, with regime-specific branching. It can be broken down into the following steps:

1. **Get Ticker** 
2. **Pull Financial Data**
3. **Route to Economic Regime**
4. **Calculate Regime-Specific Metrics**
5. **Provide Data Validation Commentary using React Agent** 
6. **Evaluation against Criteria and Provide Score**

Each step can be more or less autonomous, depending on user preference.

![Economic Regime Fundamentals](../assets/svgs/economic_regime_fundamentals.drawio.svg)

### Install Dependencies

In [None]:
pip install llama-index-core llama-index-utils-workflow llama-index-llms-openai

## Single Ticker Worflow 


## Criteria Variables
First we need to define the criteria for the different economic regimes. We also seperate our criteria for financials and non-financial companies as the metrics used to assess regime fit tend to be different between sectors. This workflow could be extended to several sectors. We use only Financials and Non-Financials for simplicity.

### Non-financials Criteria

In [6]:
EXPANSIONARY_CRITERIA = """
| Metric                 | Favorable | Neutral   | Unfavorable | Rationale                                                                 |
|------------------------|-----------|-----------|-------------|---------------------------------------------------------------------------|
| Revenue Growth (YoY)   | > 8%      | 5% – 8%   | < 5%        | Strong top-line growth captures upside in expansion without inflation drag. |
| EBITDA Margin          | > 20%     | 15% – 20% | < 15%       | Healthy margins indicate efficient scaling and operating leverage.        |
| Net Debt / EBITDA      | < 1.5x    | 1.5x–3.0x | > 3.0x      | Some leverage is fine in growth phases, but excess adds downside risk.    |"""

INFLATIONARY_CRITERIA = """
| Metric                               | Favorable | Neutral   | Unfavorable | Rationale                                                                 |
|--------------------------------------|-----------|-----------|-------------|---------------------------------------------------------------------------|
| Gross Margin Trend (YoY, bps)        | > +100    | 0 to +100 | < 0         | Sustained gross margin expansion signals pricing power vs. rising inputs. |
| Inventory Turnover (x)               | > 8x      | 5x – 8x   | < 5x        | Faster turns reduce exposure to rapidly rising input costs.               |
| Interest Coverage (EBIT / IntExp)    | > 6x      | 3x – 6x   | < 3x        | Rising rates + inflation stress weak coverage; resilient firms stay >6x.  |"""

STAGFLATIONARY_CRITERIA = """
| Metric                                    | Favorable | Neutral   | Unfavorable | Rationale                                                                  |
|-------------------------------------------|-----------|-----------|-------------|----------------------------------------------------------------------------|
| EBITDA Margin Volatility (stdev).         | < 4%      | 4% – 6%   | > 6%        | Operational stability matters when growth slows and costs rise.            |
| Gross Margin Trend (YoY, bps)             | > +50     | 0 to +50  | < 0         | Ability to hold or expand gross margin indicates pricing power.            |
| Net Debt / EBITDA                         | < 1.0x    | 1.0x–2.0x | > 2.0x      | Low leverage mitigates refinancing and spread-widening risk.               |"""

RECESSION_CRITERIA = """
| Metric                                               | Favorable | Neutral   | Unfavorable | Rationale                                                                 |
|------------------------------------------------------|-----------|-----------|-------------|---------------------------------------------------------------------------|
| Cash & ST Inv. / Total Debt                          | > 50%     | 20%–50%   | < 20%       | A strong cash cushion supports liquidity when prices and demand fall.     |
| Interest Coverage (EBIT / IntExp)                    | > 8x      | 4x – 8x   | < 4x        | High coverage protects in credit-tight, revenue-weak environments.        |
| DSO Change (Days Sales Outstanding, YoY change, days)| < +5      | +5 to +15 | > +15       | Rising DSO signals collection stress and customer weakness.               |"""

OUTLIER_TABLE = """
| Metric | Q1 | Q3 | IQR | Extreme Lower Bound (Q1 − 1.5×IQR) | Extreme Upper Bound (Q3 + 1.5×IQR) | Notes |
|--------|----|----|-----|------------------------------------|------------------------------------|-------|
| Revenue Growth (YoY) | -5% | +25% | 30 pp | -50% | +70% | Very large negative or very high growth warrants checking seasonality, M&A, one-offs. |
| EBITDA Margin | 10% | 30% | 20 pp | -20% | 60% | Implausibly high margins or deep losses need verification of classification/adjustments. |
| Net Debt / EBITDA | 1.0x | 4.0x | 3.0x | -3.5x | 8.5x | High leverage or large net cash (negative) should be validated for definitions and normalization. |
| Gross Margin Trend (YoY, bps) | -100 | +100 | 200 bps | -400 bps | +400 bps | Large swings could reflect mix, pricing, or recognition timing anomalies. |
| Inventory Turnover (x) | 3.0x | 8.0x | 5.0x | -4.5x (floor 0) | 15.5x | Extremely low implies stale inventory; extremely high could mask stockouts or accounting quirks. |
| Interest Coverage (EBIT / Interest) | 3.0x | 15.0x | 12.0x | -15x (floor 0) | 33x | Very low coverage is a risk; very high may hide near-zero interest expense. |
| EBITDA Margin Volatility (std) | 1.0% | 5.0% | 4.0 pp | -5.0% (floor 0) | 11.0% | Excessive volatility suggests instability; near-zero (if seen) might prompt sanity check. |
| Cash & ST Inv. / Total Debt | 0.2x | 1.0x | 0.8x | -1.0x (floor 0) | 2.2x | Very low ratio signals liquidity stress; very high may reflect conservative balance sheet or classification issues. |
| DSO Change (YoY, days) | -5d | +5d | 10d | -20d | +20d | Large positive = slowing collections; large negative = aggressive behavior or write-offs. |"""

### Financials Criteria

In [7]:
EXPANSIONARY_CRITERIA_FINANCIALS = """
| Metric              | Favorable | Neutral      | Unfavorable | Rationale                                                              |
|---------------------|-----------|--------------|-------------|------------------------------------------------------------------------|
| PPNR Growth (YoY)   | > 8%      | 3% – 8%      | < 3%        | Core earnings momentum critical in expansion.                          |
| Efficiency Ratio    | < 55%     | 55% – 65%    | > 65%       | Leaner cost structure scales better in growth phases.                  |
| ROE                 | > 12%     | 8% – 12%     | < 8%        | Strong profitability relative to equity capital.                       |
"""

INFLATIONARY_CRITERIA_FINANCIALS = """
| Metric             | Favorable | Neutral       | Unfavorable | Rationale                                                               |
|--------------------|-----------|---------------|-------------|-------------------------------------------------------------------------|
| NII Growth (YoY)   | > 10%     | 3% – 10%      | < 3%        | Rising rates should boost NII in inflationary regimes.                  |
| Efficiency Ratio Δ | ≤ -200bps | -200 to +200  | > +200bps   | Cost control is critical as wage/tech inflation pressures rise.         |
| Equity / Assets    | > 10%     | 7% – 10%      | < 7%        | Strong capital buffers mitigate volatility in higher rate environments. |
"""

STAGFLATIONARY_CRITERIA_FINANCIALS = """
| Metric                       | Favorable | Neutral     | Unfavorable | Rationale                                                               |
|------------------------------|-----------|-------------|-------------|-------------------------------------------------------------------------|
| PPNR Growth Volatility (quarterly)  | < 5%      | 5% – 8%     | > 8%        | Stable *growth* in core earnings is vital when growth is weak and costs rise. |
| ROA                          | > 1.0%    | 0.6% – 1.0% | < 0.6%      | Asset efficiency matters in sluggish macro conditions.                  |
| Equity / Assets              | > 10%     | 7% – 10%    | < 7%        | Stronger capital adequacy helps offset market/credit shocks.            |
"""

RECESSION_CRITERIA_FINANCIALS = """
| Metric          | Favorable | Neutral      | Unfavorable | Rationale                                                              |
|-----------------|-----------|--------------|-------------|------------------------------------------------------------------------|
| Equity / Assets | > 10%     | 7% – 10%     | < 7%        | Thick capital cushion protects against credit losses in downturns.     |
| Efficiency Ratio| < 60%     | 60% – 70%    | > 70%       | Cost discipline becomes critical as revenues weaken.                   |
| PPNR / Assets   | > 1.2%    | 0.8% – 1.2%  | < 0.8%      | Strong pre-provision earnings relative to assets = resilience in stress.|
"""

OUTLIER_TABLE_FINANCIALS = """
| Metric                  | Q1    | Q3    | IQR   | Extreme Lower Bound (Q1 − 1.5×IQR) | Extreme Upper Bound (Q3 + 1.5×IQR) | Notes |
|-------------------------|-------|-------|-------|------------------------------------|------------------------------------|-------|
| PPNR Growth (YoY)       | -2%   | +15%  | 17 pp | -27%                               | +40%                               | Very weak or very high growth may reflect credit cycle extremes, NII shocks, or unusual expenses. |
| Efficiency Ratio        | 50%   | 70%   | 20 pp | 20% (floor 0)                      | 100%+                              | <20% may be misclassified revenues; >100% signals unsustainable costs. |
| ROE                     | 5%    | 15%   | 10 pp | -10% (floor 0)                     | +30%                               | Very low ROE suggests weak profitability; very high may reflect leverage or one-offs. |
| ROA                     | 0.4%  | 1.2%  | 0.8 pp| -0.8% (floor 0)                    | +2.4%                              | Extreme values warrant review of asset base or income recognition. |
| Equity / Assets         | 6%    | 12%   | 6 pp  | -3% (floor 0)                      | +21%                               | Very low = undercapitalization; very high may indicate niche/mix-driven business models. |
| PPNR Growth Volatility (stdev, quarterly)| 2%    | 6%    | 4 pp  | -4% (floor 0)                      | +12%                               | Quarterly growth vol better than annual with sparse data.         |
| PPNR / Assets           | 0.6%  | 1.5%  | 0.9 pp| -0.75% (floor 0)                   | +2.85%                             | Extreme low = weak core earnings; extreme high may reflect temporary windfalls or abnormal quarters. |
| NII Growth (YoY)        | -3%   | +12%  | 15 pp | -25%                               | +34%                               | Very high/low growth may reflect rate shocks, asset/liability mismatches, or trading income noise. |
"""

## Event Models
Now we need to define the events that will occur in our workflow. This follows the Workflow building models provided in [Llama-Index Workflow Documentation](https://docs.llamaindex.ai/en/stable/understanding/workflows/)

In [None]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Context,
    step,
    Event,
    InputRequiredEvent,
    HumanResponseEvent,
)

# --- Events -------------------------------------------------------
class ProcessTicker(Event):
    ticker: str

class PullFinancialData(Event):
    ticker: str

class ExpansionRoute(Event):
    ticker: str

class InflationRoute(Event):
    ticker: str

class StagflationRoute(Event):
    ticker: str

class RecessionRoute(Event):
    ticker: str

class MetricsEvent(Event):
    ticker: str

class DataCommentary(Event):
    ticker: str

class ResultEvent(Event):
    ticker: str

## State Models
We need to give our workflow memory accross events, so we define state models that will allow the variables to presist acrossed events.

In [9]:
from pydantic import BaseModel, Field
from typing import Optional, Tuple

class RegimeCriteria(BaseModel):
    Expansionary: str = Field(default=EXPANSIONARY_CRITERIA)
    Inflationary: str = Field(default=INFLATIONARY_CRITERIA)
    Stagflationary: str = Field(default=STAGFLATIONARY_CRITERIA)
    Recession: str = Field(default=RECESSION_CRITERIA)

class Financials(BaseModel):
    Ann_BalanceSheet: dict | None = Field(default=None)
    Ann_IncomeStatement: dict | None = Field(default=None)
    Qtr_BalanceSheet: dict | None = Field(default=None)
    Qtr_IncomeStatement: dict | None = Field(default=None)

class Metrics(BaseModel):
    MetricData: dict = Field(default_factory=dict)

class LLMEvaluation(BaseModel):
    explanation: Optional[str] = None
    score: Optional[int] = None

class State(BaseModel):
    EconomicRegime: Optional[str] = Field(default=None)
    Ticker: Optional[str] = Field(default=None)
    Sector: Optional[str] = Field(default=None)
    FinancialData: Financials = Field(default_factory=Financials)
    ScreeningCriteria: RegimeCriteria = Field(default_factory=RegimeCriteria)
    MetricData: Metrics = Field(default_factory=Metrics)
    DataCommentary:str = Field(default=None)
    EvaluationResult: Optional[LLMEvaluation] = None

## Metric Calculations
We now need to define the functions to calculate the various metrics used to assess the criteria.

### Non-financial Metrics


In [10]:
import yfinance as yf
import numpy as np
import pandas as pd

# --- Metrics -----------------------------------------------------------

# --- Revenue Growth (YoY) ---
def revenue_growth(ann_incstm, qtr_incstm):
    ann_rev = ann_incstm.loc['TotalRevenue', :]
    qtr_rev = qtr_incstm.loc['TotalRevenue', :]
    ann_rev_g = (ann_rev.iloc[0] - ann_rev.iloc[1]) / ann_rev.iloc[1]
    qtr_rev_g = (qtr_rev.iloc[0] - qtr_rev.iloc[1]) / qtr_rev.iloc[1]
    return {
        "Annual Revenue": ann_rev,
        "Quarter Revenue": qtr_rev,
        "Annual Revenue Growth": ann_rev_g,
        "Quarter Revenue Growth": qtr_rev_g,
    }

# --- EBITDA Margin ---
def ebitda_margin(ann_incstm, qtr_incstm):
    ann_rev = ann_incstm.loc['TotalRevenue', :]
    ann_ebitda = ann_incstm.loc['EBITDA', :]
    qtr_rev = qtr_incstm.loc['TotalRevenue', :]
    qtr_ebitda = qtr_incstm.loc['EBITDA', :]

    ann_margin = ann_ebitda / ann_rev
    qtr_margin = qtr_ebitda / qtr_rev

    return {
        "Annual EBITDA": ann_ebitda,
        "Annual Revenue": ann_rev,
        "Annual EBITDA Margin": ann_margin,
        "Quarter EBITDA": qtr_ebitda,
        "Quarter Revenue": qtr_rev,
        "Quarter EBITDA Margin": qtr_margin,
        "Annual EBITDA Margin (Latest)": ann_margin.iloc[0],
        "Quarter EBITDA Margin (Latest)": qtr_margin.iloc[0],
    }

# --- Net Debt / EBITDA ---
def net_debt_to_ebitda(ann_incstm, ann_bs):
    # Use columns[0] exactly like your snippet (latest at col 0)
    net_debt_latest = ann_bs.at['NetDebt', ann_bs.columns[0]]
    ebitda_latest = ann_incstm.at['EBITDA', ann_incstm.columns[0]]
    ratio_latest = net_debt_latest / ebitda_latest

    # Also return series (aligned on columns)
    net_debt_series = ann_bs.loc['NetDebt', :]
    ebitda_series = ann_incstm.loc['EBITDA', :]
    with np.errstate(divide='ignore', invalid='ignore'):
        ratio_series = net_debt_series / ebitda_series

    return {
        "Annual Net Debt": net_debt_series,
        "Annual EBITDA": ebitda_series,
        "Annual Net Debt / EBITDA (series)": ratio_series,
        "Net Debt / EBITDA (Latest)": ratio_latest,
    }

# --- EBITDA Margin Volatility (annual std) ---
def ebitda_margin_volatility(ann_incstm):
    ann_rev = ann_incstm.loc['TotalRevenue', :]
    ann_ebitda = ann_incstm.loc['EBITDA', :]
    ann_gm = ann_ebitda / ann_rev
    ann_gm_vol = ann_gm.sort_index().std()

    return {
        "Annual EBITDA Margin (series)": ann_gm,
        "Annual EBITDA Margin Volatility (stdev)": ann_gm_vol,
    }


# --- Gross margin Trend (bps) ---
def gross_margin_trend_bps(ann_incstm, qtr_incstm):
    # Annual (YoY)
    ann_rev = ann_incstm.loc['TotalRevenue', :]
    ann_cogs = ann_incstm.loc['CostOfRevenue', :]
    ann_gm = ((ann_rev - ann_cogs) / ann_rev).sort_index()
    ann_trend = ann_gm - ann_gm.shift(1)
    ann_trend_avg_bps = ann_trend.mean() * 10000.0

    # Quarterly (QoQ)
    qtr_rev = qtr_incstm.loc['TotalRevenue', :]
    qtr_cogs = qtr_incstm.loc['CostOfRevenue', :]
    qtr_gm = ((qtr_rev-qtr_cogs) / qtr_rev).sort_index()
    qtr_trend = qtr_gm - qtr_gm.shift(1)
    qtr_trend_avg_bps = qtr_trend.mean() * 10000.0

    return {
        "Annual Gross Margin (series)": ann_gm,
        "Annual GM Trend (YoY, fraction)": ann_trend,
        "Annual GM Trend Avg (bps)": ann_trend_avg_bps,
        "Quarter Gross Margin (series)": qtr_gm,
        "Quarter GM Trend (QoQ, fraction)": qtr_trend,
        "Quarter GM Trend Avg (bps)": qtr_trend_avg_bps,
    }


# --- Inventory Turnover (x) ---
def inventory_turnover(ann_incstm, ann_bs):
    cogs = ann_incstm.loc['CostOfRevenue', :]
    inv = ann_bs.loc['Inventory', :]

    if inv.dropna().shape[0] >= 2:
        avg_inv_latest_two = inv.iloc[:2].mean()  # latest window (cols assumed latest-first)
    else:
        avg_inv_latest_two = inv.iloc[0]

    turnover_latest = (cogs.iloc[0] / avg_inv_latest_two) if avg_inv_latest_two != 0 else np.nan

    # Optional: turnover series using rolling average of 2 (simple, order as-is)
    avg_inv_series = (inv.rolling(2, min_periods=1, axis=0)
                        .mean() if hasattr(inv, 'rolling') else inv)  # defensive
    with np.errstate(divide='ignore', invalid='ignore'):
        turnover_series = cogs / avg_inv_series

    return {
        "Annual COGS": cogs,
        "Annual Inventory": inv,
        "Average Inventory (latest two)": avg_inv_latest_two,
        "Inventory Turnover (series)": turnover_series,
        "Inventory Turnover (Latest)": turnover_latest,
    }

# --- Interest Coverage (EBIT / |Interest Expense|) ---
def interest_coverage(ann_incstm):
    ebit = ann_incstm.loc['EBIT', :]
    int_exp = ann_incstm.loc['InterestExpense', :]

    denom_latest = abs(int_exp.iloc[0])
    coverage_latest = (ebit.iloc[0] / denom_latest) if denom_latest != 0 else float('inf')

    with np.errstate(divide='ignore', invalid='ignore'):
        coverage_series = ebit / abs(int_exp)

    return {
        "Annual EBIT": ebit,
        "Annual Interest Expense": int_exp,
        "Interest Coverage (series)": coverage_series,
        "Interest Coverage (Latest)": coverage_latest,
    }

# --- Cash & ST Inv. / Total Debt ---
def cash_to_debt(ann_bs):
    cash = ann_bs.loc['CashAndCashEquivalents', :]
    try:
        sti = ann_bs.loc['ShortTermInvestments', :]
    except KeyError:
        # If not present, treat as zero series (same index/columns)
        sti = cash * 0
    total_debt = ann_bs.loc['TotalDebt', :]

    cash_sti = cash + sti
    with np.errstate(divide='ignore', invalid='ignore'):
        ratio_series = cash_sti / total_debt

    den_latest = total_debt.iloc[0]
    ratio_latest = (cash_sti.iloc[0] / den_latest) if den_latest != 0 else float('inf')

    return {
        "Annual Cash": cash,
        "Annual Short Term Investments": sti,
        "Annual Total Debt": total_debt,
        "Cash+STI (series)": cash_sti,
        "Cash+STI / Total Debt (series)": ratio_series,
        "Cash+STI / Total Debt (Latest)": ratio_latest,
    }

# --- DSO Change (YoY, days) ---
def dso_change_yoy(ann_incstm, ann_bs, days=365):
    ar = ann_bs.loc['AccountsReceivable', :]
    rev = ann_incstm.loc['TotalRevenue', :]

    dso_series = (ar / rev.replace(0, np.nan)) * days
    dso_series = dso_series.dropna()
    if dso_series.shape[0] < 2:
        delta_latest = np.nan
    else:
        delta_latest = dso_series.iloc[0] - dso_series.iloc[1]

    return {
        "Annual Accounts Receivable": ar,
        "Annual Revenue": rev,
        "DSO (series, days)": dso_series,
        "DSO Change (YoY, days) Latest": delta_latest,
    }

### Financial Metrics

In [113]:
# --- PPNR ---
def ppnr(ann_incstm):
    nii = ann_incstm.loc['NetInterestIncome', :]
    total_rev = ann_incstm.loc['TotalRevenue', :]
    nonint_income = total_rev - nii

    # Not all tickers have both SG&A and OtherNonInterestExpense, so guard
    sga = ann_incstm.loc['SellingGeneralAndAdministration', :] if 'SellingGeneralAndAdministration' in ann_incstm.index else 0
    other = ann_incstm.loc['OtherNonInterestExpense', :] if 'OtherNonInterestExpense' in ann_incstm.index else 0
    nonint_expense = sga + other

    ppnr_series = (nii + nonint_income - nonint_expense).sort_index(ascending=False)
    ppnr_latest = ppnr_series.iloc[0]
    ppnr_growth = (ppnr_series.iloc[0] - ppnr_series.iloc[1]) / ppnr_series.iloc[1]

    return {
        "PPNR (series)": ppnr_series,
        "PPNR (Latest)": ppnr_latest,
        "PPNR Growth YoY (Latest)": ppnr_growth,
    }

# --- Efficiency Ratio ---
def efficiency_ratio(ann_incstm):
    nii = ann_incstm.loc['NetInterestIncome', :]
    total_rev = ann_incstm.loc['TotalRevenue', :]
    nonint_income = total_rev - nii

    sga = ann_incstm.loc['SellingGeneralAndAdministration', :] if 'SellingGeneralAndAdministration' in ann_incstm.index else 0
    other = ann_incstm.loc['OtherNonInterestExpense', :] if 'OtherNonInterestExpense' in ann_incstm.index else 0
    nonint_expense = sga + other

    eff_series = (nonint_expense / (nii + nonint_income)).sort_index(ascending=False)
    eff_latest = eff_series.iloc[0]
    eff_delta = (eff_series.iloc[0] - eff_series.iloc[1]) * 10000.0

    return {
        "Efficiency Ratio (series)": eff_series,
        "Efficiency Ratio (Latest)": eff_latest,
        "Efficiency Ratio Δ YoY (bps) Latest": eff_delta,
    }

# --- NII Growth (YoY) ---
def nii_growth_yoy(ann_incstm):
    nii = (ann_incstm.loc['NetInterestIncome', :]).sort_index(ascending=False)
    nii_growth = (nii.iloc[0] - nii.iloc[1]) / nii.iloc[1]
    return {
        "Net Interest Income (series)": nii,
        "NII Growth YoY (Latest)": nii_growth,
    }

# --- PPNR Growth Volatility (std of QoQ growth on quarterly PPNR) ---
def ppnr_growth_volatility_qtr(qtr_incstm):
    nii = qtr_incstm.loc['NetInterestIncome', :]
    total_rev = qtr_incstm.loc['TotalRevenue', :]
    nonint_income = total_rev - nii

    sga = qtr_incstm.loc['SellingGeneralAndAdministration', :] if 'SellingGeneralAndAdministration' in qtr_incstm.index else 0
    other = qtr_incstm.loc['OtherNonInterestExpense', :] if 'OtherNonInterestExpense' in qtr_incstm.index else 0
    nonint_expense = sga + other

    ppnr_series = (nii + nonint_income - nonint_expense).sort_index()   # oldest→latest
    ppnr_growth = ppnr_series / ppnr_series.shift(1) - 1.0
    vol = ppnr_growth.std()

    return {
        "Quarterly PPNR": ppnr_series,
        "Quarterly PPNR Growth": ppnr_growth,
        "PPNR Growth Volatility (stdev, quarterly)": vol,
    }

# --- ROE / ROA ---
def roe_roa(ann_incstm, ann_bs):
    net_inc = ann_incstm.loc['NetIncome', :]
    equity = ann_bs.loc['TotalEquityGrossMinorityInterest', :]
    assets = ann_bs.loc['TotalAssets', :]

    roe_series = (net_inc / equity).sort_index(ascending=False)
    roa_series = (net_inc / assets).sort_index(ascending=False)

    return {
        "ROE (series)": roe_series,
        "ROE (Latest)": roe_series.iloc[0],
        "ROA (series)": roa_series,
        "ROA (Latest)": roa_series.iloc[0],
    }

# --- Equity / Assets ---
def equity_to_assets(ann_bs):
    equity = ann_bs.loc['TotalEquityGrossMinorityInterest', :]
    assets = ann_bs.loc['TotalAssets', :]
    ea_series = (equity / assets).sort_index(ascending=False)
    return {
        "Equity / Assets (series)": ea_series,
        "Equity / Assets (Latest)": ea_series.iloc[0],
    }

# --- PPNR / Assets ---
def ppnr_to_assets(ann_incstm, ann_bs):
    pp_series = ppnr(ann_incstm)["PPNR (series)"]
    assets = ann_bs.loc['TotalAssets', :]
    pa_series = (pp_series / assets).sort_index(ascending=False)
    return {
        "PPNR / Assets (series)": pa_series,
        "PPNR / Assets (Latest)": pa_series.iloc[0],
    }

## Helper Functions
These functions are used to assist in the execution of other functions or events

In [11]:
# --- Helpers -------------------------------------------------------
def to_df(x):
        if x is None:
            return pd.DataFrame()
        if isinstance(x, pd.DataFrame):
            return x
        return pd.DataFrame(x)

def get_financials(state) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Returns: ann_inc, qtr_inc, ann_bs, qtr_bs as DataFrames extracted from the
    state.FinancialData list of FinData models.
    """
    ticker = state.Ticker
    financials = state.FinancialData

    if financials is None:
        if not state.FinancialData:
            raise KeyError(f"state.FinancialData is empty; no financial record available for {ticker}")

    ann_inc = to_df(financials.Ann_IncomeStatement)
    qtr_inc = to_df(financials.Qtr_IncomeStatement)
    ann_bs  = to_df(financials.Ann_BalanceSheet)
    qtr_bs  = to_df(financials.Qtr_BalanceSheet)

    return ann_inc, qtr_inc, ann_bs, qtr_bs

def regime_to_criteria(state: State, regime: str, is_financials:bool) -> str:
    if is_financials:
        mapping = mapping = {
        "Expansion": EXPANSIONARY_CRITERIA_FINANCIALS,
        "Inflation": INFLATIONARY_CRITERIA_FINANCIALS,
        "Stagflation": STAGFLATIONARY_CRITERIA_FINANCIALS,
        "Recession": RECESSION_CRITERIA_FINANCIALS,
        }
    else:
        mapping = {
            "Expansion": state.ScreeningCriteria.Expansionary,
            "Inflation": state.ScreeningCriteria.Inflationary,
            "Stagflation": state.ScreeningCriteria.Stagflationary,
            "Recession": state.ScreeningCriteria.Recession,
        }
    return mapping.get(regime, "")

async def save_metrics(ctx, metrics_dict):
    """Put metrics into state['MetricData'].MetricData (append/update)."""
    async with ctx.store.edit_state() as st:
        if "MetricData" not in st or st.MetricData is None:
            st.MetricData = Metrics()
        base = st.MetricData.MetricData or {}
        base.update(metrics_dict)
        st.MetricData.MetricData = base

def calculate_financials_metrics(regime, ann_inc, ann_bs, qtr_inc):
    if regime == 'Expansionary':
        pp = ppnr(ann_inc)
        ef = efficiency_ratio(ann_inc)
        rr = roe_roa(ann_inc, ann_bs)  # or roe_roa_avg(ann_inc, ann_bs)

        metrics_expansion_fin = {
            "PPNR Growth (YoY)": pp["PPNR Growth YoY (Latest)"],
            "Efficiency Ratio": ef["Efficiency Ratio (Latest)"],
            "ROE": rr["ROE (Latest)"],
            "supplemental info": {
                "PPNR": {k: str(v) for k, v in pp.items() if k != "PPNR Growth YoY (Latest)"},
                "Efficiency Ratio": {k: str(v) for k, v in ef.items() if k != "Efficiency Ratio (Latest)"},
                "ROE/ROA": {k: str(v) for k, v in rr.items() if k != "ROE (Latest)"},
            },
        }
        return metrics_expansion_fin

    elif regime == "Inflationary":
        ng = nii_growth_yoy(ann_inc)
        ef = efficiency_ratio(ann_inc)
        ea = equity_to_assets(ann_bs)

        metrics_inflation_fin = {
            "NII Growth (YoY)": ng["NII Growth YoY (Latest)"],
            "Efficiency Ratio Δ (YoY, bps)": ef["Efficiency Ratio Δ YoY (bps) Latest"],
            "Equity / Assets": ea["Equity / Assets (Latest)"],
            "supplemental info": {
                "NII": {k: str(v) for k, v in ng.items() if k != "NII Growth YoY (Latest)"},
                "Efficiency Ratio": {k: str(v) for k, v in ef.items() if k != "Efficiency Ratio Δ YoY (bps) Latest"},
                "Equity / Assets": {k: str(v) for k, v in ea.items() if k != "Equity / Assets (Latest)"},
            },
        }
        return metrics_inflation_fin

    elif regime == "Stagflationary":
        pgv = ppnr_growth_volatility_qtr(qtr_inc)   # <-- quarterly
        rr  = roe_roa(ann_inc, ann_bs)              # or roe_roa_avg(ann_inc, ann_bs)
        ea  = equity_to_assets(ann_bs)

        metrics_stagflation_fin = {
            "PPNR Growth Volatility (stdev, quarterly)": pgv["PPNR Growth Volatility (stdev, quarterly)"],
            "ROA": rr["ROA (Latest)"],
            "Equity / Assets": ea["Equity / Assets (Latest)"],
            "supplemental info": {
                "PPNR Growth": {k: str(v) for k, v in pgv.items() if k != "PPNR Growth Volatility (stdev, quarterly)"},
                "ROE/ROA": {k: str(v) for k, v in rr.items() if k != "ROA (Latest)"},
                "Equity / Assets": {k: str(v) for k, v in ea.items() if k != "Equity / Assets (Latest)"},
            },
        }
        return metrics_stagflation_fin

    elif regime == "Recession":
        ea = equity_to_assets(ann_bs)
        ef = efficiency_ratio(ann_inc)
        pa = ppnr_to_assets(ann_inc, ann_bs)  # or ppnr_to_assets_avg(ann_inc, ann_bs)

        metrics_recession_fin = {
            "Equity / Assets": ea["Equity / Assets (Latest)"],
            "Efficiency Ratio": ef["Efficiency Ratio (Latest)"],
            "PPNR / Assets": pa["PPNR / Assets (Latest)"],
            "supplemental info": {
                "Equity / Assets": {k: str(v) for k, v in ea.items() if k != "Equity / Assets (Latest)"},
                "Efficiency Ratio": {k: str(v) for k, v in ef.items() if k != "Efficiency Ratio (Latest)"},
                "PPNR / Assets": {k: str(v) for k, v in pa.items() if k != "PPNR / Assets (Latest)"},
            },
        }
        return metrics_recession_fin

    else:
        print("Invalid regime. Please try again.")
        return None
        

## Workflow Steps
We now define the events that will be run for the workflow.

In [14]:
import asyncio
import json
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import Workflow
from llama_index.core.agent.workflow import ReActAgent
from dotenv import load_dotenv

load_dotenv()

# --- Workflow Declaration ------------------------------------------

class RegimeScreeningWorkflow(Workflow): pass

# --- Steps ---------------------------------------------------------

# Step 1:  Get Ticker

@step(workflow=RegimeScreeningWorkflow)
async def start_workflow(ctx: Context[State], ev: StartEvent) -> None | ProcessTicker:
    regime_resp = await ctx.wait_for_event(
        HumanResponseEvent,
        waiter_id="EconomicRegime",
        waiter_event=InputRequiredEvent(
            prefix="Select regime by number: 0=Expansionary, 1=Inflationary, 2=Stagflationary, 3=Recession"
        ),
    )
    regime_opts = ['Expansionary', 'Inflationary', 'Stagflationary', 'Recession']
    try:
        regime_choice = regime_opts[int(regime_resp.response.strip())]
    except Exception:
        print('An error occured. Try again...')
        start_workflow(ctx,ev)

    ticker_resp = await ctx.wait_for_event(
        HumanResponseEvent,
        waiter_id="Ticker",
        waiter_event=InputRequiredEvent(prefix="Input a ticker for the stock you want to screen:"),
    )
    ticker = str(ticker_resp.response.strip())
    
    async with ctx.store.edit_state() as st:
        st.EconomicRegime = regime_choice
        st.Ticker = ticker
    return ProcessTicker(ticker=ticker)

# Step 2: Pull Financial Data

@step(workflow=RegimeScreeningWorkflow)
async def pull_financial_data(ctx: Context[State], ev: ProcessTicker) -> PullFinancialData:
    ticker = ev.ticker
    print(f"Fetching financials for {ticker} via yfinance...")
    stock = await asyncio.to_thread(yf.Ticker, ticker)
    sector = await asyncio.to_thread(lambda: stock.info['sector'])
    ann_income_df = await asyncio.to_thread(lambda: stock.get_financials(freq='yearly'))
    ann_bs_df = await asyncio.to_thread(lambda: stock.get_balance_sheet(freq='yearly'))
    qtr_income_df = await asyncio.to_thread(lambda: stock.get_financials(freq='quarterly'))
    qtr_bs_df = await asyncio.to_thread(lambda: stock.get_balance_sheet(freq='quarterly'))

    async with ctx.store.edit_state() as st:
        st.FinancialData = Financials(
                    Ann_BalanceSheet=ann_bs_df.to_dict(),
                    Ann_IncomeStatement=ann_income_df.to_dict(),
                    Qtr_BalanceSheet=qtr_bs_df.to_dict(),
                    Qtr_IncomeStatement=qtr_income_df.to_dict()
                )
        st.Sector = sector
    print("Financial data stored.")
    return PullFinancialData(ticker=ticker)

# Step 3: Route based on Regime

@step(workflow=RegimeScreeningWorkflow)
async def regime_router(ctx: Context[State], ev: PullFinancialData) -> ExpansionRoute | InflationRoute | StagflationRoute | RecessionRoute:
    state = await ctx.store.get_state()
    regime = state.EconomicRegime
    
    if regime == "Expansionary":
        return ExpansionRoute(ticker=ev.ticker)
    elif regime == "Inflationary":
        return InflationRoute(ticker=ev.ticker)
    elif regime == "Stagflationary":
        return StagflationRoute(ticker=ev.ticker)
    elif regime == "Recession":
        return RecessionRoute(ticker=ev.ticker)
    else:
        print("Invalid regime. Please try again.")
        start_workflow(ctx)

# Step 4: Calculate Metrics based on Regime - ExpansionMetrics
        
@step(workflow=RegimeScreeningWorkflow)
async def ExpansionMetrics(ctx: Context[State], ev: ExpansionRoute) -> MetricsEvent:
    state = await ctx.store.get_state()
    ann_inc, qtr_inc, ann_bs, qtr_bs = get_financials(state)

    if state.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,ann_inc,ann_bs,qtr_inc)
    else:
        rg  = revenue_growth(ann_inc, qtr_inc)
        em  = ebitda_margin(ann_inc, qtr_inc)
        nd  = net_debt_to_ebitda(ann_inc, ann_bs)
        metrics = {
        "Revenue Growth (YoY)": rg["Annual Revenue Growth"],
        "EBITDA Margin": em["Annual EBITDA Margin (Latest)"],
        "Net Debt / EBITDA": nd["Net Debt / EBITDA (Latest)"],
        "supplemental info": {
            "Revenue Growth": {k: str(v) for k, v in rg.items() if k != "Annual Revenue Growth"},
            "EBITDA Margin": {k: str(v) for k, v in em.items() if k != "Annual EBITDA Margin (Latest)"},
            "Net Debt / EBITDA": {k: str(v) for k, v in nd.items() if k != "Net Debt / EBITDA (Latest)"},
            },
        }
    print("=" *50)
    print("METRICS")
    print("="*50)
    print(str(metrics))
    await save_metrics(ctx, metrics)
    return MetricsEvent(ticker=ev.ticker)

# Step 4: Calculate Metrics based on Regime - InflationMetrics

@step(workflow=RegimeScreeningWorkflow)
async def InflationMetrics(ctx: Context[State], ev: InflationRoute) -> MetricsEvent:
    state = await ctx.store.get_state()
    ann_inc, qtr_inc, ann_bs, qtr_bs = get_financials(state)

    if state.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,ann_inc,ann_bs,qtr_inc)
    else:
        gmt  = gross_margin_trend_bps(ann_inc, qtr_inc)
        it  = inventory_turnover(ann_inc, ann_bs)
        ic  = interest_coverage(ann_inc)

        metrics = {
            "Gross Margin Trend (YoY, bps)": gmt["Annual GM Trend Avg (bps)"],
            "Inventory Turnover (x)": it["Inventory Turnover (Latest)"],
            "Interest Coverage (EBIT/InterestExpense)": ic["Interest Coverage (Latest)"],
            "supplemental info": {
                "Gross Margin Trend": {k: str(v) for k, v in gmt.items() if k != "Annual GM Trend Avg (bps)"},
                "Inventory Turnover (x)": {k: str(v) for k, v in it.items() if k != "Inventory Turnover (Latest)"},
                "Interest Coverage (EBIT/InterestExpense)": {k: str(v) for k, v in ic.items() if k != "Interest Coverage (Latest)"},
            },
        }
    print("=" *50)
    print("METRICS")
    print("="*50)
    print(str(metrics))
    await save_metrics(ctx, metrics)
    return MetricsEvent(ticker=ev.ticker)

# Step 4: Calculate Metrics based on Regime - StagflationMetrics

@step(workflow=RegimeScreeningWorkflow)
async def StagflationMetrics(ctx: Context[State], ev: StagflationRoute) -> MetricsEvent:
    state = await ctx.store.get_state()
    ann_inc, qtr_inc, ann_bs, qtr_bs = get_financials(state)

    if state.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,ann_inc,ann_bs,qtr_inc)
    else:
        emv = ebitda_margin_volatility(ann_inc)
        gmt  = gross_margin_trend_bps(ann_inc, qtr_inc)
        nd  = net_debt_to_ebitda(ann_inc, ann_bs)

        metrics = {
            "EBITDA Margin Volatility": emv["Annual EBITDA Margin Volatility (stdev)"],
            "Gross Margin Trend (YoY, bps)": gmt["Annual GM Trend Avg (bps)"],
            "Net Debt / EBITDA": nd["Net Debt / EBITDA (Latest)"],
            "supplemental info": {
                "EBITDA Margin Volatility": {k: str(v) for k, v in emv.items() if k != "Annual EBITDA Margin Volatility (stdev)"},
                "Gross Margin Trend": {k: str(v) for k, v in gmt.items() if k != "Annual GM Trend Avg (bps)"},
                "Net Debt / EBITDA": {k: str(v) for k, v in nd.items() if k != "Net Debt / EBITDA (Latest)"},
            },
        }
    print("=" *50)
    print("METRICS")
    print("="*50)
    print(str(metrics))
    await save_metrics(ctx, metrics)
    return MetricsEvent(ticker=ev.ticker)

# Step 4: Calculate Metrics based on Regime - RecessionMetrics

@step(workflow=RegimeScreeningWorkflow)
async def RecessionMetrics(ctx: Context[State], ev: RecessionRoute) -> MetricsEvent:
    state = await ctx.store.get_state()
    ann_inc, qtr_inc, ann_bs, qtr_bs = get_financials(state)

    if state.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,ann_inc,ann_bs,qtr_inc)
    else:
        ctd = cash_to_debt(ann_bs)
        ic  = interest_coverage(ann_inc)
        dso = dso_change_yoy(ann_inc, ann_bs)

        metrics = {
            "Cash & ST Inv. / Total Debt": ctd["Cash+STI / Total Debt (Latest)"],
            "Interest Coverage (EBIT/InterestExpense)": ic["Interest Coverage (Latest)"],
            "DSO Change (YoY, days)": dso["DSO Change (YoY, days) Latest"],
            "supplemental info": {
                "Cash & ST Inv. / Total Debt": {k: str(v) for k, v in ctd.items() if k != "Cash+STI / Total Debt (Latest)"},
                "Interest Coverage (EBIT/InterestExpense)": {k: str(v) for k, v in ic.items() if k != "Interest Coverage (Latest)"},
                "DSO Change (YoY, days)": {k: str(v) for k, v in dso.items() if k != "DSO Change (YoY, days) Latest"},
            },
        }
    print("=" *50)
    print("METRICS")
    print("="*50)
    print(str(metrics))
    await save_metrics(ctx, metrics)
    return MetricsEvent(ticker=ev.ticker)

# Step 5: Data Validation using React Agent

@step(workflow=RegimeScreeningWorkflow)
async def data_validation(ctx: Context[State], ev: MetricsEvent) -> DataCommentary:
    state = await ctx.store.get_state()
    metrics = state.MetricData.MetricData or {}
    sector = state.Sector
    outlier_table = OUTLIER_TABLE if sector=='Financial Services' else OUTLIER_TABLE_FINANCIALS
    
    if not metrics:
        return StopEvent(result="No financial metrics available for evaluation.")
    
    llm = OpenAI(model="gpt-4.1-mini",temperature=0)
    calls=[]
    
    def get_annual_income_data():
        """Get annual income statement data"""
        result = to_df(state.FinancialData.Ann_IncomeStatement)
        calls.append('Called Annual Income')
        return result
    
    def get_quarterly_income_data():
        """Get quarterly income statement data"""
        result = to_df(state.FinancialData.Qtr_IncomeStatement)
        calls.append('Called Quarterly Income')
        return result
    
    def get_annual_balance_data():
        """Get annual balance sheet data"""
        result = to_df(state.FinancialData.Ann_BalanceSheet)
        calls.append('Called Annual Balance Sheet')
        return result
    
    def get_quarterly_balance_data():
        """Get quarterly balance sheet data"""
        result = to_df(state.FinancialData.Qtr_BalanceSheet)
        calls.append('Called Quarterly Balance Sheet')
        return result
    
    tools = [
        get_annual_income_data,
        get_quarterly_income_data, 
        get_annual_balance_data,
        get_quarterly_balance_data,
    ]
    
    agent = ReActAgent(tools=tools, llm=llm, verbose=True)  # Add verbose=True
    
    prompt = (
        f"Evaluate the following financial metrics for accuracy, completeness, and nuance.\n\n"
        f"{json.dumps(metrics, indent=2)}\n\n"
        "Look at the outlier table:\n\n"   
        f"{outlier_table}"
        "and comment on any metrics that fall in the extreme upper and lower bounds by looking at the input data by accessing the financial data through your tools."
        "and determine if the metrics are reasonable based on the input data or if there are odd/missing items that are affecting their calculations."
        "Lastly, comment on the nuance that may be affecting these metrics that would be important to determine their value."
        "For example, if gross margin volatility is in the extreme upper bound but the volatility is mostly comming from the upside or extremely high values of gross margin then provide this nuance in the commentary. YOU MUST LIST ALL THE TOOLS YOU HAVE ACCESS TO AND CALL AT LEAST ONE!!!"
    )
    
    resp = await agent.run(prompt)

    print(calls)
    print("=" * 50)
    print("DATA VALIDATION COMMENTARY")
    print("=" * 50)
    print(resp)
    
    async with ctx.store.edit_state() as st:
        st.DataCommentary = resp
    
    return DataCommentary(ticker=ev.ticker)

# Step 6: Evaluation against Criteria

@step(workflow=RegimeScreeningWorkflow)
async def evaluate_financials(ctx: Context[State], ev: DataCommentary) -> StopEvent:
    state = await ctx.store.get_state()
    regime = state.EconomicRegime or ""
    is_financials = True if state.Sector=='Financial Services' else False
    criteria = regime_to_criteria(state, regime, is_financials)
    metrics = state.MetricData.MetricData or {}
    if not metrics:
        return StopEvent(result=f"No financial metrics available for evaluation (Ticker:{state.Ticker}).")
    data_commentary = state.DataCommentary
    llm = OpenAI(model="gpt-4.1-mini",temperature=0)
    sllm = llm.as_structured_llm(LLMEvaluation)
    prompt = (
        f"Evaluate the following financial metrics based on the criteria for {regime} regime:\n\n"
        f"{json.dumps(metrics, indent=2)}\n\n"
        f"Criteria:\n{criteria}\n\n"
        f"Look at the data validation commentary to determine if the data is accurate, complete or if there is nuance you need to incorporate in your evaluation."
        f"Data Validation Commentary:\n{data_commentary}\n\n"
        "Provide an explanation and a score from 0 to 100, where 0 is poor fitness for the regime and 100 is excellent. Put your response in json format {'explanation':<your explanation>,'score':<your score>}"
    )
    resp = await sllm.acomplete(prompt)
    evaluation: LLMEvaluation = resp.raw
    print("=" *50)
    print("SCORE EVALUATION")
    print("="*50)
    print(evaluation.explanation)

    async with ctx.store.edit_state() as st:
        st.EvaluationResult = LLMEvaluation(
            explanation=evaluation.explanation,
            score=evaluation.score,
        )

    return StopEvent(result=evaluation)

## Run Workflow
Now we can run the workflow for any ticker on yahoo finance.

In [None]:

from llama_index.core.agent.workflow import ToolCallResult

w = RegimeScreeningWorkflow(timeout=600, verbose=True)

handler = w.run() 

async for ev in handler.stream_events():
    if isinstance(ev, ToolCallResult):
        print(f"\nCall {ev.tool_name} with {ev.tool_kwargs}\nReturned: {ev.tool_output}")
    elif isinstance(ev, InputRequiredEvent):
        user_text = input(ev.prefix if ev.prefix else "Enter value: ")
        handler.ctx.send_event(HumanResponseEvent(response=user_text))

result = await handler

In [None]:
print(result.score)

62

# Extend to Parallelized Multi-ticker Workflow
The following is an extension to a parallelized workflow so we can run multiple tickers in parallel. We can extend our single tiker workflow by inputing that as our parent class in the workflow.


## State Models

In [66]:
from pydantic import BaseModel, Field
from typing import Dict, List, Optional

class TickerState(BaseModel):
    Ticker: str
    Sector: Optional[str] = Field(default=None)
    FinData: Financials = Field(default_factory=Financials)
    MetricData: Metrics = Field(default_factory=Metrics)
    DataCommentary: Optional[str] = None
    EvaluationResult: Optional[LLMEvaluation] = None

class ParentState(BaseModel):
    EconomicRegime: Optional[str] = None
    ScreeningCriteria: RegimeCriteria = Field(default_factory=RegimeCriteria)
    Tickers: List[str] = Field(default_factory=list)
    children: Dict[str, TickerState] = Field(default_factory=dict)
    completed: set[str] = Field(default_factory=set)
    
class ResultEvent(Event):
    ticker: str

## Steps

In [None]:
# --- Workflow Declaration ------------------------------------------   

class ParallelRegimeScreeningWorkflow(RegimeScreeningWorkflow): pass

# --- Steps ---------------------------------------------------------
@step(workflow=ParallelRegimeScreeningWorkflow)
async def start_workflow(ctx: Context[ParentState], ev: StartEvent) -> None | ProcessTicker:
    regime_resp = await ctx.wait_for_event(
        HumanResponseEvent,
        waiter_id="EconomicRegime",
        waiter_event=InputRequiredEvent(
            prefix="Select regime by number: 0=Expansionary, 1=Inflationary, 2=Stagflationary, 3=Recession"
        ),
    )
    regime_opts = ['Expansionary', 'Inflationary', 'Stagflationary', 'Recession']
    try:
        regime_choice = regime_opts[int(regime_resp.response.strip())]
    except Exception:
        print("Invalid regime. Try again.")
        return ctx.send_event(StartEvent())

    tickers_resp = await ctx.wait_for_event(
        HumanResponseEvent,
        waiter_id="Tickers",
        waiter_event=InputRequiredEvent(prefix="Enter comma-separated tickers (e.g. aapl,nvda,jnj):"),
    )
    tickers = [t.strip().upper() for t in tickers_resp.response.split(",") if t.strip()]

    async with ctx.store.edit_state() as st:
        st.EconomicRegime = regime_choice
        st.Tickers = tickers
        st.children = {t: TickerState(Ticker=t) for t in tickers}
        st.completed = set()

    for t in tickers:
        ctx.send_event(ProcessTicker(ticker=t))

@step(workflow=ParallelRegimeScreeningWorkflow, num_workers=10)
async def pull_financial_data(ctx: Context[ParentState], ev: ProcessTicker) -> PullFinancialData:
    t = ev.ticker
    print(f"Fetching financials for {t} via yfinance...")
    stock = await asyncio.to_thread(yf.Ticker, t)
    sector = await asyncio.to_thread(lambda: stock.info['sector'])
    ann_income_df = await asyncio.to_thread(lambda: stock.get_financials(freq='yearly'))
    ann_bs_df    = await asyncio.to_thread(lambda: stock.get_balance_sheet(freq='yearly'))
    qtr_income_df= await asyncio.to_thread(lambda: stock.get_financials(freq='quarterly'))
    qtr_bs_df    = await asyncio.to_thread(lambda: stock.get_balance_sheet(freq='quarterly'))

    async with ctx.store.edit_state() as st:
        child = st.children[t]
        child.Sector = sector
        child.FinData = Financials(
            Ann_BalanceSheet=ann_bs_df.to_dict(),
            Ann_IncomeStatement=ann_income_df.to_dict(),
            Qtr_BalanceSheet=qtr_bs_df.to_dict(),
            Qtr_IncomeStatement=qtr_income_df.to_dict(),
        )
        st.children[t] = child  
    return PullFinancialData(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def regime_router(ctx: Context[ParentState], ev: PullFinancialData) -> ExpansionRoute | InflationRoute | StagflationRoute | RecessionRoute:
    regime = (await ctx.store.get_state()).EconomicRegime
    t = ev.ticker
    if regime == "Expansionary":
        return ExpansionRoute(ticker=t)
    if regime == "Inflationary":
        return InflationRoute(ticker=t)
    if regime == "Stagflationary":
        return StagflationRoute(ticker=t)
    if regime == "Recession":
        return RecessionRoute(ticker=t)
    print("Invalid regime in state; restarting.")
    return None

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def expansion_metrics(ctx: Context[ParentState], ev: ExpansionRoute) -> MetricsEvent:
    t = ev.ticker
    state = await ctx.store.get_state()
    child = state.children[t]
    ann_inc = child.FinData.Ann_IncomeStatement
    qtr_inc = child.FinData.Qtr_IncomeStatement
    ann_bs  = child.FinData.Ann_BalanceSheet

    if child.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,to_df(ann_inc),to_df(ann_bs),to_df(qtr_inc))
    else:
        rg  = revenue_growth(to_df(ann_inc), to_df(qtr_inc))
        em  = ebitda_margin(to_df(ann_inc), to_df(qtr_inc))
        nd  = net_debt_to_ebitda(to_df(ann_inc), to_df(ann_bs))
        metrics = {
        "Revenue Growth (YoY)": rg["Annual Revenue Growth"],
        "EBITDA Margin": em["Annual EBITDA Margin (Latest)"],
        "Net Debt / EBITDA": nd["Net Debt / EBITDA (Latest)"],
        "supplemental info": {
            "Revenue Growth": {k: str(v) for k, v in rg.items() if k != "Annual Revenue Growth"},
            "EBITDA Margin": {k: str(v) for k, v in em.items() if k != "Annual EBITDA Margin (Latest)"},
            "Net Debt / EBITDA": {k: str(v) for k, v in nd.items() if k != "Net Debt / EBITDA (Latest)"},
            },
        }
    async with ctx.store.edit_state() as st:
        st.children[t].MetricData.MetricData = metrics

    print(f"[{t}] metrics computed")
    return MetricsEvent(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def inflation_metrics(ctx: Context[ParentState], ev: InflationRoute) -> MetricsEvent:
    t = ev.ticker
    state = await ctx.store.get_state()
    child = state.children[t]
    ann_inc = child.FinData.Ann_IncomeStatement
    qtr_inc = child.FinData.Qtr_IncomeStatement
    ann_bs  = child.FinData.Ann_BalanceSheet

    if child.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,to_df(ann_inc),to_df(ann_bs),to_df(qtr_inc))
    else:
        gmt  = gross_margin_trend_bps(to_df(ann_inc), to_df(qtr_inc))
        it  = inventory_turnover(to_df(ann_inc), to_df(ann_bs))
        ic  = interest_coverage(to_df(ann_inc))

        metrics = {
            "Gross Margin Trend (YoY, bps)": gmt["Annual GM Trend Avg (bps)"],
            "Inventory Turnover (x)": it["Inventory Turnover (Latest)"],
            "Interest Coverage (EBIT/InterestExpense)": ic["Interest Coverage (Latest)"],
            "supplemental info": {
                "Gross Margin Trend": {k: str(v) for k, v in gmt.items() if k != "Annual GM Trend Avg (bps)"},
                "Inventory Turnover (x)": {k: str(v) for k, v in it.items() if k != "Inventory Turnover (Latest)"},
                "Interest Coverage (EBIT/InterestExpense)": {k: str(v) for k, v in ic.items() if k != "Interest Coverage (Latest)"},
            },
        }
    async with ctx.store.edit_state() as st:
        st.children[t].MetricData.MetricData = metrics

    print(f"[{t}] metrics computed")
    return MetricsEvent(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def stagflation_metrics(ctx: Context[ParentState], ev: StagflationRoute) -> MetricsEvent:
    t = ev.ticker
    state = await ctx.store.get_state()
    child = state.children[t]
    ann_inc = child.FinData.Ann_IncomeStatement
    qtr_inc = child.FinData.Qtr_IncomeStatement
    ann_bs  = child.FinData.Ann_BalanceSheet

    if child.Sector == 'Financial Services':
        metrics = calculate_financials_metrics(state.EconomicRegime,to_df(ann_inc),to_df(ann_bs),to_df(qtr_inc))
    else:
        emv = ebitda_margin_volatility(to_df(ann_inc))
        gmt  = gross_margin_trend_bps(to_df(ann_inc), to_df(qtr_inc))
        nd  = net_debt_to_ebitda(to_df(ann_inc), to_df(ann_bs))

        metrics = {
            "EBITDA Margin Volatility": emv["Annual EBITDA Margin Volatility (stdev)"],
            "Gross Margin Trend (YoY, bps)": gmt["Annual GM Trend Avg (bps)"],
            "Net Debt / EBITDA": nd["Net Debt / EBITDA (Latest)"],
            "supplemental info": {
                "EBITDA Margin Volatility": {k: str(v) for k, v in emv.items() if k != "Annual EBITDA Margin Volatility (stdev)"},
                "Gross Margin Trend": {k: str(v) for k, v in gmt.items() if k != "Annual GM Trend Avg (bps)"},
                "Net Debt / EBITDA": {k: str(v) for k, v in nd.items() if k != "Net Debt / EBITDA (Latest)"},
            },
        }
    async with ctx.store.edit_state() as st:
        st.children[t].MetricData.MetricData = metrics

    print(f"[{t}] metrics computed")
    return MetricsEvent(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def recession_metrics(ctx: Context[ParentState], ev: RecessionRoute) -> MetricsEvent:
    t = ev.ticker
    state = await ctx.store.get_state()
    child = state.children[t]
    ann_inc = child.FinData.Ann_IncomeStatement
    ann_bs  = child.FinData.Ann_BalanceSheet
    qtr_inc = child.FinData.Qtr_IncomeStatement

    if child.Sector == 'Financial Services':
        metrics = metrics = calculate_financials_metrics(state.EconomicRegime,to_df(ann_inc),to_df(ann_bs),to_df(qtr_inc))
    else:

        ctd = cash_to_debt(to_df(ann_bs))
        ic  = interest_coverage(to_df(ann_inc))
        dso = dso_change_yoy(to_df(ann_inc), to_df(ann_bs))

        metrics = {
            "Cash & ST Inv. / Total Debt": ctd["Cash+STI / Total Debt (Latest)"],
            "Interest Coverage (EBIT/InterestExpense)": ic["Interest Coverage (Latest)"],
            "DSO Change (YoY, days)": dso["DSO Change (YoY, days) Latest"],
            "supplemental info": {
                "Cash & ST Inv. / Total Debt": {k: str(v) for k, v in ctd.items() if k != "Cash+STI / Total Debt (Latest)"},
                "Interest Coverage (EBIT/InterestExpense)": {k: str(v) for k, v in ic.items() if k != "Interest Coverage (Latest)"},
                "DSO Change (YoY, days)": {k: str(v) for k, v in dso.items() if k != "DSO Change (YoY, days) Latest"},
            },
        }
    async with ctx.store.edit_state() as st:
        st.children[t].MetricData.MetricData = metrics

    print(f"[{t}] metrics computed")
    return MetricsEvent(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def data_validation(ctx:Context[ParentState],ev: MetricsEvent) -> DataCommentary:
    t = ev.ticker
    state = await ctx.store.get_state()
    child = state.children[t]
    sector = child.Sector
    outlier_table = OUTLIER_TABLE if sector=='Financial Services' else OUTLIER_TABLE_FINANCIALS
    metrics = child.MetricData.MetricData
    if not metrics:
        return DataCommentary(ticker=t)

    ann_inc = child.FinData.Ann_IncomeStatement
    qtr_inc = child.FinData.Qtr_IncomeStatement
    ann_bs  = child.FinData.Ann_BalanceSheet
    qtr_bs  = child.FinData.Qtr_BalanceSheet

    if not metrics:
        return StopEvent(result="No financial metrics available for evaluation.")
    
    llm = OpenAI(model="gpt-4.1",temperature=0,)
    calls=[]
    
    def get_annual_income_data():
        """Get annual income statement data"""
        result = to_df(ann_inc)
        calls.append('Called Annual Income')
        return result
    
    def get_quarterly_income_data():
        """Get quarterly income statement data"""
        result = to_df(qtr_inc)
        calls.append('Called Quarterly Income')
        return result
    
    def get_annual_balance_data():
        """Get annual balance sheet data"""
        result = to_df(ann_bs)
        calls.append('Called Annual Balance Sheet')
        return result
    
    def get_quarterly_balance_data():
        """Get quarterly balance sheet data"""
        result = to_df(qtr_bs)
        calls.append('Called Quarterly Balance Sheet')
        return result
    
    tools = [
        get_annual_income_data,
        get_quarterly_income_data, 
        get_annual_balance_data,
        get_quarterly_balance_data,
    ]
    
    agent = ReActAgent(tools=tools, llm=llm, verbose=True)  # Add verbose=True
    
    prompt = (
        f"Evaluate the following financial metrics for accuracy, completeness, and nuance.\n\n"
        f"{json.dumps(metrics, indent=2)}\n\n"
        "Look at the outlier table:\n\n"   
        f"{outlier_table}"
        "and comment on any metrics that fall in the extreme upper and lower bounds by looking at the input data by accessing the financial data through your tools."
        "and determine if the metrics are reasonable based on the input data or if there are odd/missing items that are affecting their calculations."
        "Lastly, comment on the nuance that may be affecting these metrics that would be important to determine their value."
        "For example, if gross margin volatility is in the extreme upper bound but the volatility is mostly comming from the upside or extremely high values of gross margin then provide this nuance in the commentary. YOU MUST LIST ALL THE TOOLS YOU HAVE ACCESS TO AND CALL AT LEAST ONE!!!"
    )
    
    resp = await agent.run(prompt)

    print(calls)
    async with ctx.store.edit_state() as st:
        st.children[t].DataCommentary = resp

    print(f"[{t}] data validation commentary saved")
    return DataCommentary(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow,num_workers=10)
async def evaluate_financials(ctx: Context[ParentState], ev: DataCommentary) -> ResultEvent:
    t = ev.ticker
    state = await ctx.store.get_state()
    regime = state.EconomicRegime or ""
    child = state.children[t]
    is_financials = True if child.Sector=='Financial Services' else False
    metrics = child.MetricData.MetricData or {}
    print(metrics)

    if not metrics:
        # still emit a result so fan-in can proceed
        async with ctx.store.edit_state() as st:
            st.children[t].EvaluationResult = LLMEvaluation(explanation="No metrics", score=0.0)
        return ResultEvent(ticker=t)

    criteria = regime_to_criteria(state, regime, is_financials)
    data_commentary = child.DataCommentary or ""
    llm = OpenAI(model="gpt-4.1",temperature=0, request_timeout=180.0)  # 2 minute timeout
    sllm = llm.as_structured_llm(LLMEvaluation)
    prompt = (
        f"Evaluate the following financial metrics based on the criteria for {regime} regime:\n\n"
        f"{json.dumps(metrics, indent=2)}\n\n"
        f"Criteria:\n{criteria}\n\n"
        f"Look at the data validation commentary to determine if the data is accurate, complete or if there is nuance you need to incorporate in your evaluation."
        f"Data Validation Commentary:\n{data_commentary}\n\n"
        "Add nuance but ground heavily on the bands provided in the criteria and the metrics."
        "Provide an explanation and a score from 0 to 100, where 0 is poor fitness for the regime and 100 is excellent. Put your response in json format {'explanation':<your explanation>,'score':<your score>}"
    )
    resp = await sllm.acomplete(prompt)
    evaluation: LLMEvaluation = resp.raw

    async with ctx.store.edit_state() as st:
        st.children[t].EvaluationResult = LLMEvaluation(
            explanation=evaluation.explanation,
            score=evaluation.score,
        )

    print(f"[{t}] evaluation saved")
    return ResultEvent(ticker=t)

@step(workflow=ParallelRegimeScreeningWorkflow)
async def combine_scores(ctx: Context[ParentState], ev: ResultEvent) -> StopEvent | None:
    t = ev.ticker
    async with ctx.store.edit_state() as st:
        st.completed.add(t)
        done = len(st.completed)
        total = len(st.Tickers)

    print(f"Completed {done}/{total}")

    if done == total:
        state = await ctx.store.get_state()
        
        result = {
            "regime": state.EconomicRegime,
            "tickers": state.Tickers,
            "evaluations": {
                tk: (
                    state.children[tk].EvaluationResult.model_dump()
                    if state.children[tk].EvaluationResult else None
                )
                for tk in state.Tickers
            },
            "metrics": {
                tk: state.children[tk].MetricData.MetricData
                for tk in state.Tickers
            },
        }
        return StopEvent(result=result)

    return None


## Run The Workflow

In [None]:
w = ParallelRegimeScreeningWorkflow(timeout=600, verbose=True)

handler = w.run()  

async for ev in handler.stream_events():
    if isinstance(ev, InputRequiredEvent):
        user_text = input(ev.prefix if ev.prefix else "Enter value: ")
        handler.ctx.send_event(HumanResponseEvent(response=user_text))

result = await handler

In [None]:
result

# Test Importing
We want to be able to import this workflow into other Jupyter Notebooks so we have created the python files in this directory in order to import the workflow like a package. The below is testing this.

In [None]:
import sys, os
sys.path.append(os.path.abspath(".."))

In [None]:
from regime_workflows import RegimeScreeningWorkflow
from llama_index.core.workflow import InputRequiredEvent,HumanResponseEvent

w = RegimeScreeningWorkflow(timeout=600, verbose=True)

handler = w.run() 

async for ev in handler.stream_events():
    if isinstance(ev, InputRequiredEvent):
        user_text = input(ev.prefix if ev.prefix else "Enter value: ")
        handler.ctx.send_event(HumanResponseEvent(response=user_text))

result = await handler

Running step start_workflow
Step start_workflow produced event ProcessTicker
Running step pull_financial_data
Fetching financials for aapl via yfinance...
Financial data stored.
Step pull_financial_data produced event PullFinancialData
Running step regime_router
Step regime_router produced event ExpansionRoute
Running step ExpansionMetrics
METRICS
{'Revenue Growth (YoY)': np.float64(0.020219940775141214), 'EBITDA Margin': np.float64(0.3443707085043538), 'Net Debt / EBITDA': np.float64(0.5694744580836323), 'supplemental info': {'Revenue Growth': {'Annual Revenue': '2024-09-30    3.910350e+11\n2023-09-30    3.832850e+11\n2022-09-30    3.943280e+11\n2021-09-30    3.658170e+11\n2020-09-30             NaN\nName: TotalRevenue, dtype: float64', 'Quarter Revenue': '2025-06-30    9.403600e+10\n2025-03-31    9.535900e+10\n2024-12-31    1.243000e+11\n2024-09-30    9.493000e+10\n2024-06-30    8.577700e+10\nName: TotalRevenue, dtype: float64', 'Quarter Revenue Growth': '-0.013873887100326136'}, 'EB

In [None]:
from regime_workflows import ParallelRegimeScreeningWorkflow
from llama_index.core.workflow import InputRequiredEvent,HumanResponseEvent
w = ParallelRegimeScreeningWorkflow(timeout=600, verbose=True)

handler = w.run()  

async for ev in handler.stream_events():
    if isinstance(ev, InputRequiredEvent):
        user_text = input(ev.prefix if ev.prefix else "Enter value: ")
        handler.ctx.send_event(HumanResponseEvent(response=user_text))

result = await handler

Running step start_workflow
Step start_workflow produced no event
Running step pull_financial_data
Fetching financials for AAPL via yfinance...
Running step pull_financial_data
Fetching financials for NVDA via yfinance...
Step pull_financial_data produced event PullFinancialData
Running step regime_router
Step regime_router produced event ExpansionRoute
Running step expansion_metrics
[AAPL] metrics computed
Step expansion_metrics produced event MetricsEvent
Running step data_validation
Step pull_financial_data produced event PullFinancialData
Running step regime_router
Step regime_router produced event ExpansionRoute
Running step expansion_metrics
[NVDA] metrics computed
Step expansion_metrics produced event MetricsEvent
Running step data_validation
[NVDA] data validation commentary saved
Step data_validation produced event DataCommentary
Running step evaluate_financials
{'Revenue Growth (YoY)': np.float64(1.1420340763599357), 'EBITDA Margin': np.float64(0.6600688138424639), 'Net Debt 

In [4]:
result

{'regime': 'Expansionary',
 'tickers': ['AAPL', 'NVDA'],
 'evaluations': {'AAPL': {'explanation': 'The financial metrics for the Expansionary regime are well-aligned with the criteria and show strong fitness. Revenue Growth (YoY) at 2.02% is modest but positive and within normal bounds, though recent quarterly softness suggests some seasonality or short-term headwinds. EBITDA Margin at 34.44% is high and stable, indicating strong operational efficiency without volatility or outliers. Net Debt / EBITDA at 0.57x is low, reflecting a conservative capital structure supported by substantial cash holdings, which reduces financial risk. The data is accurate and consistent with the input, with no extreme values or anomalies. The main nuances to consider are the recent quarterly revenue softness, the sustainability of high EBITDA margins, and the impact of low leverage on capital efficiency. Overall, these metrics demonstrate excellent suitability for an Expansionary regime, balancing growth, p