# **JPM MLCOE TSRL 2026 Q1**
---

## 0. Global Configurations
---

In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
import yfinance as yf
import tensorflow as tf
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
from typing import Tuple


pd.set_option('display.float_format', lambda v: f"{v:,.2f}")


print('yfinance:', yf.__version__)
print('pandas:', pd.__version__)
print('tensorflow:', tf.__version__)

## 1. Data Pipeline
---

### 1.1 Column Aliases and Identity Checks

In [None]:
BS_ALIASES = {
    "Total Assets": ["Total Assets"],
    "Total Liab": ["Total Liab", "Total Liabilities", "Total Liabilities Net Minority Interest"],
    "Total Stockholder Equity": ["Total Stockholder Equity", "Total Equity Gross Minority Interest", "Stockholders Equity"],
}


def canonicalize_bs(df: pd.DataFrame) -> pd.DataFrame:
    rename_map = {}

    for canon, candidates in BS_ALIASES.items():
        for c in candidates:

            if c in df.columns:
                rename_map[c] = canon
                
                break

    return df.rename(columns = rename_map)


def identity_residual(df: pd.DataFrame) -> pd.Series:

    if "Total Liab" not in df.columns or "Total Stockholder Equity" not in df.columns:
        df = canonicalize_bs(df)
    
    required = ["Total Assets", "Total Liab", "Total Stockholder Equity"]
    missing = [c for c in required if c not in df]

    if missing:
        raise KeyError(f"Missing columns: {missing}")


    return df["Total Assets"] - (df["Total Liab"] + df["Total Stockholder Equity"])


def summarize_identity(resid: pd.Series) -> pd.Series:

    return pd.Series({
        "mean": resid.mean(),
        "std": resid.std(),
        "max_abs": resid.abs().max(),
    })

### 1.2 Load AAPL Data with Caching

In [None]:
DATA_DIR = Path("..").resolve() / "data"
DATA_DIR.mkdir(parents = True, exist_ok = True)

def _fetch_statements(tkr, freq: str):
    bs = tkr.get_balance_sheet(freq = freq)
    is_df = tkr.get_financials(freq = freq)

    if bs is None or is_df is None:
        return pd.DataFrame(), pd.DataFrame()

    bs = bs.T.sort_index()
    is_df = is_df.T.sort_index()

    return bs, is_df


def _load_cached(path: Path) -> pd.DataFrame:
    return pd.read_csv(path, index_col = 0, parse_dates = True)


def load_statements(ticker: str = "AAPL"):
    bs_y_path = DATA_DIR / f"{ticker.lower()}_balance_sheet_yearly.csv"
    is_y_path = DATA_DIR / f"{ticker.lower()}_income_statement_yearly.csv"
    bs_q_path = DATA_DIR / f"{ticker.lower()}_balance_sheet_quarterly.csv"
    is_q_path = DATA_DIR / f"{ticker.lower()}_income_statement_quarterly.csv"

    bs_y = pd.DataFrame()
    is_y = pd.DataFrame()
    bs_q = pd.DataFrame()
    is_q = pd.DataFrame()

    if bs_y_path.exists() and is_y_path.exists():
        bs_y = _load_cached(bs_y_path)
        is_y = _load_cached(is_y_path)

    if bs_q_path.exists() and is_q_path.exists():
        bs_q = _load_cached(bs_q_path)
        is_q = _load_cached(is_q_path)

    if bs_y.empty or is_y.empty or bs_q.empty or is_q.empty:
        tkr = yf.Ticker(ticker)

        if bs_y.empty or is_y.empty:
            bs_y, is_y = _fetch_statements(tkr, "yearly")
            if not bs_y.empty and not is_y.empty:
                bs_y.to_csv(bs_y_path)
                is_y.to_csv(is_y_path)

        if bs_q.empty or is_q.empty:
            bs_q, is_q = _fetch_statements(tkr, "quarterly")
            if not bs_q.empty and not is_q.empty:
                bs_q.to_csv(bs_q_path)
                is_q.to_csv(is_q_path)

    freq = "quarterly"
    if bs_q.empty or is_q.empty or bs_y.shape[0] >= bs_q.shape[0]:
        freq = "yearly"

    if freq == "quarterly":
        bs = bs_q
        is_df = is_q
    else:
        bs = bs_y
        is_df = is_y

    if bs.empty or is_df.empty:
        raise RuntimeError(f"Failed to fetch statements for {ticker}")

    bs = canonicalize_bs(bs)

    return bs, is_df, freq


bs, is_df, stmt_freq = load_statements("AAPL")


print("Statement frequency:", stmt_freq)
print("Balance sheet shape:", bs.shape)
print("Income statement shape:", is_df.shape)
print("Identity residual stats:", summarize_identity(identity_residual(bs)))
bs.head()

### 1.3 Dataset Assembly

In [None]:
def _pick(df: pd.DataFrame, options):

    for c in options:
        if c in df.columns:
            return df[c]

    raise KeyError(f'Missing columns: {options}')


def compute_features(
    bs: pd.DataFrame,
    is_df: pd.DataFrame,
    days: float = 365.0,
    growth_periods: int = 1
) -> pd.DataFrame:

    rev = _pick(is_df, ['Total Revenue', 'Operating Revenue'])
    cogs = _pick(is_df, ['Cost Of Revenue', 'Cost of Revenue'])
    op_inc = _pick(is_df, ['Operating Income'])
    net_inc = _pick(is_df, ['Net Income'])
    ar = _pick(bs, ['Accounts Receivable'])
    ap = _pick(bs, ['Accounts Payable'])
    inv = _pick(bs, ['Inventory'])

    feats = pd.DataFrame(index = bs.index)

    sales_per_day = rev / days
    cogs_per_day = cogs / days

    feats['dso'] = ar / sales_per_day
    feats['dpo'] = ap / cogs_per_day
    feats['dih'] = inv / cogs_per_day
    feats['gross_margin'] = (rev - cogs) / rev
    feats['op_margin'] = op_inc / rev
    feats['net_margin'] = net_inc / rev
    feats['rev_yoy'] = rev.pct_change(periods = growth_periods)
    feats['cogs_yoy'] = cogs.pct_change(periods = growth_periods)
    feats['netinc_yoy'] = net_inc.pct_change(periods = growth_periods)
    feats['log_rev'] = np.log1p(rev)
    feats['log_assets'] = np.log1p(_pick(bs, ['Total Assets']))
    
    feats = feats.replace([np.inf, -np.inf], np.nan)
    feats = feats.sort_index().ffill().bfill()


    return feats


period_days = 365.0 if stmt_freq == 'yearly' else 90.0
growth_periods = 1 if stmt_freq == 'yearly' else 4

features = compute_features(bs, is_df, days = period_days, growth_periods = growth_periods)
features.tail()


In [None]:
TARGET_BS = ['Total Assets', 'Total Liab', 'Total Stockholder Equity']
NET_INCOME_COL = 'Net Income'

targets = bs[TARGET_BS].copy()
net_income = _pick(is_df, ['Net Income']).rename(NET_INCOME_COL)
net_income.index = targets.index

dataset = features.join(targets, how = 'inner').join(net_income, how = 'inner')
dataset = dataset.dropna()
FEATURE_COLS = [c for c in dataset.columns if c not in TARGET_BS + [NET_INCOME_COL]]


print('Dataset shape:', dataset.shape)
print('Feature cols:', len(FEATURE_COLS), 'Target cols:', len(TARGET_BS) + 1)

### 1.4 Scaling (z-score) for Stability

In [None]:
feat_scaler = StandardScaler()
bs_scaler = StandardScaler()
earn_scaler = StandardScaler()
prev_scaler = StandardScaler()

X_feat_scaled = feat_scaler.fit_transform(X_feat)
Y_bs_scaled = bs_scaler.fit_transform(Y_bs)
Y_earn_scaled = earn_scaler.fit_transform(Y_earn)
X_prev_scaled = prev_scaler.fit_transform(X_prev)

### 1.5 Train/Val Split on Scaled Data

In [None]:
n = X_feat_scaled.shape[0]
train_size = max(1, int(0.8 * n))

X_train_feat = X_feat_scaled[:train_size]
Y_train_bs = Y_bs_scaled[:train_size]
Y_train_earn = Y_earn_scaled[:train_size]
X_train_prev = X_prev_scaled[:train_size]

X_val_feat = X_feat_scaled[train_size:] if train_size < n else X_feat_scaled[train_size - 1:]
Y_val_bs = Y_bs_scaled[train_size:] if train_size < n else Y_bs_scaled[train_size - 1:]
Y_val_earn = Y_earn_scaled[train_size:] if train_size < n else Y_earn_scaled[train_size - 1:]
X_val_prev = X_prev_scaled[train_size:] if train_size < n else X_prev_scaled[train_size - 1:]


print('Scaled train:', X_train_feat.shape, Y_train_bs.shape, Y_train_earn.shape)
print('Scaled val:', X_val_feat.shape, Y_val_bs.shape, Y_val_earn.shape)

## 2. TensorFlow (Pareja/Pelaez Constrained)

## 2.1 TF Model with Algebraic Generator + Earnings Head

In [None]:
class AlgebraicBS(keras.layers.Layer):

    def __init__(self):
        
        super().__init__()
        
        self.hidden = keras.layers.Dense(64, activation = 'relu')
        self.rev_head = keras.layers.Dense(1, activation = 'relu')
        self.cogs_head = keras.layers.Dense(1, activation = 'relu')
        self.drivers = keras.layers.Dense(5)
        self.margin_head = keras.layers.Dense(1)
        self.payout_head = keras.layers.Dense(1)
        self.earn_head = keras.layers.Dense(1, name = 'net_income_head')


    def call(self, inputs: Tuple[tf.Tensor, tf.Tensor]):

        if not isinstance(inputs, (tuple, list)):
            raise TypeError("inputs must be (features, prev_state)")

        features, prev_state = inputs
        ar_prev, ap_prev, inv_prev, ppe_prev, liab_prev, equity_prev, re_prev, rev_prev = tf.split(prev_state, num_or_size_splits=8, axis=-1)
        hidden = self.hidden(features)
        rev_pred = self.rev_head(hidden)
        cogs_pred = self.cogs_head(hidden)
        drivers_raw = self.drivers(hidden)

        dso = tf.nn.softplus(drivers_raw[:, 0:1])
        dpo = tf.nn.softplus(drivers_raw[:, 1:2])
        dih = tf.nn.softplus(drivers_raw[:, 2:3])
        dep_rate = tf.nn.sigmoid(drivers_raw[:, 3:4]) * 0.2
        capex_rate = tf.nn.sigmoid(drivers_raw[:, 4:5]) * 0.2
        net_margin = tf.tanh(self.margin_head(hidden)) * 0.5
        div_payout = tf.nn.sigmoid(self.payout_head(hidden))

        sales_per_day = rev_pred / 365.0
        cogs_per_day = cogs_pred / 365.0

        ar_next = dso * sales_per_day
        ap_next = dpo * cogs_per_day
        inv_next = dih * cogs_per_day
        dep = dep_rate * ppe_prev
        capex = capex_rate * rev_pred
        ppe_next = ppe_prev + capex - dep
        net_income = net_margin * rev_pred
        earn_pred = self.earn_head(hidden)
        div = div_payout * net_income
        re_next = re_prev + net_income - div

        other_equity = tf.nn.relu(equity_prev - re_prev)

        equity_next = re_next + other_equity

        other_liab_prev = tf.nn.relu(liab_prev - ap_prev)
        growth = tf.where(rev_prev > 0, rev_pred / rev_prev - 1.0, tf.zeros_like(rev_pred))

        other_liab_next = other_liab_prev * (1.0 + growth)
        liab_next = ap_next + other_liab_next
        assets_wo_cash = ar_next + inv_next + ppe_next
        cash_next = equity_next + liab_next - assets_wo_cash
        assets_next = assets_wo_cash + cash_next
        bs_out = tf.concat([assets_next, liab_next, equity_next], axis = -1)


        return bs_out, earn_pred


def build_pareja_model(feat_dim: int, state_dim: int = 8):

    feat_in = keras.Input(shape = (feat_dim,), name = 'features')
    state_in = keras.Input(shape = (state_dim,), name = 'prev_state')
    bs_out, earn_out = AlgebraicBS()([feat_in, state_in])
    
    
    return keras.Model([feat_in, state_in], [bs_out, earn_out], name = 'bs_pareja_style')


pareja_model = build_pareja_model(feat_dim = X_feat_scaled.shape[1], state_dim = X_prev_scaled.shape[1])
pareja_model.summary()