# Low-Latency Multi-Asset Allocation Notebook

This notebook prototypes an intraday portfolio construction workflow for 26 futures assets using minute-level data, a handcrafted LSTM forecaster, and an online mean-variance optimiser that produces updated portfolio weights at every timestep.

## Objective & Approach

* **Objective:** deliver low-latency weight recommendations $w_i$ for each asset $i$ on a minute-by-minute basis.
* **Forecasting model:** a lightweight Long Short-Term Memory (LSTM) encoder feeding a two-layer feedforward network trained to predict one-step-ahead returns.
* **Allocator:** an online mean-variance optimiser that refreshes exponentially-weighted covariance estimates and solves for weights under a leverage cap.
* **Data:** synthetic but stylised minute returns for 26 assets used to stress-test the pipeline.


In [None]:
import math
import random
import statistics
from typing import Dict, List, Sequence, Tuple

random.seed(7)

ASSETS: List[str] = [
    'CLA',
    'BTC',
    'QPA',
    'EB',
    'SAA',
    'PV',
    'ZIN',
    'ZG',
    'BPA',
    'MXA',
    'CNA',
    'SFA',
    'WHA',
    'PLA',
    'KCA',
    'EUA',
    'DAA',
    'CAA',
    'LHA',
    'CTA',
    'SBA',
    'CCA',
    'NKD',
    'USAA',
    'GCA',
    'NGA',
]
MINUTES: int = 720  # 12 hours of observations
WINDOW: int = 20    # lookback window for the LSTM
SCALE: float = 1000.0  # rescale returns for stable optimisation



In [None]:
# ---------- Linear algebra helpers (pure Python, dependency-light) ----------
def zeros(length: int) -> List[float]:
    return [0.0] * length

def zero_matrix(rows: int, cols: int) -> List[List[float]]:
    return [[0.0 for _ in range(cols)] for _ in range(rows)]

def vec_add(a: Sequence[float], b: Sequence[float]) -> List[float]:
    return [x + y for x, y in zip(a, b)]

def vec_mul(a: Sequence[float], b: Sequence[float]) -> List[float]:
    return [x * y for x, y in zip(a, b)]

def vec_scalar_mul(a: Sequence[float], scalar: float) -> List[float]:
    return [scalar * x for x in a]

def mat_vec_mul(matrix: Sequence[Sequence[float]], vec: Sequence[float]) -> List[float]:
    return [sum(row[j] * vec[j] for j in range(len(vec))) for row in matrix]

def mat_T_vec(matrix: Sequence[Sequence[float]], vec: Sequence[float]) -> List[float]:
    cols = len(matrix[0]) if matrix else 0
    result = [0.0] * cols
    for r in range(len(matrix)):
        for c in range(cols):
            result[c] += matrix[r][c] * vec[r]
    return result

def outer_product(a: Sequence[float], b: Sequence[float]) -> List[List[float]]:
    return [[x * y for y in b] for x in a]

def add_matrices(a: Sequence[Sequence[float]], b: Sequence[Sequence[float]]) -> List[List[float]]:
    return [[a[i][j] + b[i][j] for j in range(len(a[0]))] for i in range(len(a))]

def scalar_matrix_mul(matrix: Sequence[Sequence[float]], scalar: float) -> List[List[float]]:
    return [[scalar * matrix[i][j] for j in range(len(matrix[0]))] for i in range(len(matrix))]

def identity_matrix(n: int, scale: float = 1.0) -> List[List[float]]:
    mat = zero_matrix(n, n)
    for i in range(n):
        mat[i][i] = scale
    return mat

def max_drawdown(series: Sequence[float]) -> float:
    peak = series[0] if series else 0.0
    max_dd = 0.0
    for value in series:
        if value > peak:
            peak = value
        drawdown = peak - value
        if drawdown > max_dd:
            max_dd = drawdown
    return max_dd

def sigmoid(x: float) -> float:
    if x >= 0:
        z = math.exp(-x)
        return 1.0 / (1.0 + z)
    else:
        z = math.exp(x)
        return z / (1.0 + z)

def tanh(x: float) -> float:
    return math.tanh(x)

def relu(x: float) -> float:
    return x if x > 0 else 0.0

def relu_derivative(x: float) -> float:
    return 1.0 if x > 0 else 0.0



In [None]:
# ---------- Synthetic minute-level multi-asset return generation ----------
ASSET_SECTORS: Dict[str, str] = {
    'CLA': 'Energy',
    'BTC': 'Crypto',
    'QPA': 'Energy',
    'EB': 'FX',
    'SAA': 'FX',
    'PV': 'Grains',
    'ZIN': 'Equity',
    'ZG': 'Equity',
    'BPA': 'FX',
    'MXA': 'FX',
    'CNA': 'Grains',
    'SFA': 'FX',
    'WHA': 'Grains',
    'PLA': 'Metals',
    'KCA': 'Softs',
    'EUA': 'FX',
    'DAA': 'FX',
    'CAA': 'FX',
    'LHA': 'Livestock',
    'CTA': 'Softs',
    'SBA': 'Softs',
    'CCA': 'Softs',
    'NKD': 'Equity',
    'USAA': 'Rates',
    'GCA': 'Metals',
    'NGA': 'Energy',
}

SECTOR_NAMES = sorted({sector for sector in ASSET_SECTORS.values()})
ASSET_PARAMS: Dict[str, Tuple[float, float, float]] = {}
for asset in ASSETS:
    beta_market = random.uniform(0.4, 1.1)
    beta_sector = random.uniform(0.6, 1.4)
    idio_vol = random.uniform(0.0004, 0.0012)
    ASSET_PARAMS[asset] = (beta_market, beta_sector, idio_vol)

minute_returns: List[List[float]] = []
prices: Dict[str, float] = {asset: 100.0 for asset in ASSETS}

for _ in range(MINUTES):
    market_shock = random.gauss(0.0, 0.0005)
    sector_shocks = {sector: random.gauss(0.0, 0.0007) for sector in SECTOR_NAMES}
    minute_vector: List[float] = []
    for asset in ASSETS:
        beta_market, beta_sector, idio_vol = ASSET_PARAMS[asset]
        sector = ASSET_SECTORS[asset]
        residual = random.gauss(0.0, idio_vol)
        ret = beta_market * market_shock + beta_sector * sector_shocks[sector] + residual
        prices[asset] *= (1.0 + ret)
        minute_vector.append(ret)
    minute_returns.append(minute_vector)

cumulative_curves: Dict[str, List[float]] = {asset: [] for asset in ASSETS}
for asset_index, asset in enumerate(ASSETS):
    total = 0.0
    path: List[float] = []
    for vec in minute_returns:
        total += vec[asset_index]
        path.append(total)
    cumulative_curves[asset] = path
print('Generated {} minute observations for {} assets.'.format(len(minute_returns), len(ASSETS)))



## Exploratory Diagnostics

We inspect the synthetic series to verify rough volatility levels and cross-sectional dispersion before fitting the model.

In [None]:
asset_stats = []
for idx, asset in enumerate(ASSETS):
    series = [row[idx] for row in minute_returns]
    avg = sum(series) / len(series)
    vol = statistics.pstdev(series)
    asset_stats.append((asset, avg, vol))
asset_stats.sort(key=lambda x: x[2], reverse=True)
print('{:<6} {:>12} {:>12}'.format('Asset', 'Mean (bp)', 'Vol (bp)'))
for asset, avg, vol in asset_stats[:10]:
    print('{:<6} {:>12.3f} {:>12.3f}'.format(asset, avg * 10000.0, vol * 10000.0))



## Sequence Construction

We transform the minute returns into overlapping windows for the LSTM. Each sample contains `WINDOW` minutes of history, scaled to stabilise gradient descent, with the following minute used as the prediction target.

In [None]:
def scale_sequence(seq: Sequence[Sequence[float]]) -> List[List[float]]:
    return [[value * SCALE for value in row] for row in seq]

def scale_vector(vec: Sequence[float]) -> List[float]:
    return [value * SCALE for value in vec]

sequences: List[List[List[float]]] = []
targets: List[List[float]] = []
raw_targets: List[List[float]] = []
for idx in range(WINDOW, len(minute_returns)):
    history = minute_returns[idx - WINDOW:idx]
    sequences.append(scale_sequence(history))
    target = minute_returns[idx]
    targets.append(scale_vector(target))
    raw_targets.append(list(target))

split_idx = int(0.7 * len(sequences))
train_sequences = sequences[:split_idx]
train_targets = targets[:split_idx]
test_sequences = sequences[split_idx:]
test_targets = targets[split_idx:]
online_raw_returns = raw_targets[split_idx:]
print('Training samples: {}, testing samples: {}'.format(len(train_sequences), len(test_sequences)))



## Handcrafted LSTM + Deep Neural Forecaster

To respect the lightweight constraint we implement the LSTM cell, activations, and feedforward layers from scratch using only the Python standard library. The network emits the next-minute return vector conditioned on the previous `WINDOW` minutes.

In [None]:
class LSTMCell:
    def __init__(self, input_dim: int, hidden_dim: int):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.W = [[random.uniform(-0.05, 0.05) for _ in range(input_dim)] for _ in range(4 * hidden_dim)]
        self.U = [[random.uniform(-0.05, 0.05) for _ in range(hidden_dim)] for _ in range(4 * hidden_dim)]
        self.b = [0.0 for _ in range(4 * hidden_dim)]

    def forward(self, inputs: Sequence[Sequence[float]], h_prev: List[float] = None, c_prev: List[float] = None):
        if h_prev is None:
            h_prev = zeros(self.hidden_dim)
        if c_prev is None:
            c_prev = zeros(self.hidden_dim)
        caches = []
        h_t = list(h_prev)
        c_t = list(c_prev)
        for x_t in inputs:
            z = vec_add(vec_add(mat_vec_mul(self.W, x_t), mat_vec_mul(self.U, h_t)), self.b)
            gate_i = [sigmoid(z[idx]) for idx in range(0, self.hidden_dim)]
            gate_f = [sigmoid(z[self.hidden_dim + idx]) for idx in range(0, self.hidden_dim)]
            gate_g = [tanh(z[2 * self.hidden_dim + idx]) for idx in range(0, self.hidden_dim)]
            gate_o = [sigmoid(z[3 * self.hidden_dim + idx]) for idx in range(0, self.hidden_dim)]
            c_new = [gate_f[idx] * c_t[idx] + gate_i[idx] * gate_g[idx] for idx in range(self.hidden_dim)]
            h_new = [gate_o[idx] * tanh(c_new[idx]) for idx in range(self.hidden_dim)]
            caches.append({
                'x': list(x_t),
                'h_prev': list(h_t),
                'c_prev': list(c_t),
                'i': gate_i,
                'f': gate_f,
                'g': gate_g,
                'o': gate_o,
                'c': list(c_new),
                'h': list(h_new)
            })
            h_t = h_new
            c_t = c_new
        return h_t, c_t, caches

    def backward(self, grad_h: Sequence[float], grad_c: Sequence[float], caches: Sequence[dict]):
        grad_W = zero_matrix(len(self.W), len(self.W[0]))
        grad_U = zero_matrix(len(self.U), len(self.U[0]))
        grad_b = [0.0 for _ in range(len(self.b))]
        dh_next = list(grad_h)
        dc_next = list(grad_c)
        for cache in reversed(caches):
            tanh_c = [tanh(value) for value in cache['c']]
            dh_total = list(dh_next)
            do = [dh_total[idx] * tanh_c[idx] for idx in range(self.hidden_dim)]
            do_raw = [do[idx] * cache['o'][idx] * (1.0 - cache['o'][idx]) for idx in range(self.hidden_dim)]
            dc = [dc_next[idx] + dh_total[idx] * cache['o'][idx] * (1.0 - tanh_c[idx] ** 2) for idx in range(self.hidden_dim)]
            df = [dc[idx] * cache['c_prev'][idx] for idx in range(self.hidden_dim)]
            df_raw = [df[idx] * cache['f'][idx] * (1.0 - cache['f'][idx]) for idx in range(self.hidden_dim)]
            di = [dc[idx] * cache['g'][idx] for idx in range(self.hidden_dim)]
            di_raw = [di[idx] * cache['i'][idx] * (1.0 - cache['i'][idx]) for idx in range(self.hidden_dim)]
            dg = [dc[idx] * cache['i'][idx] for idx in range(self.hidden_dim)]
            dg_raw = [dg[idx] * (1.0 - cache['g'][idx] ** 2) for idx in range(self.hidden_dim)]
            dz = di_raw + df_raw + dg_raw + do_raw
            outer_x = outer_product(dz, cache['x'])
            outer_h = outer_product(dz, cache['h_prev'])
            for row in range(len(self.W)):
                for col in range(len(self.W[0])):
                    grad_W[row][col] += outer_x[row][col]
            for row in range(len(self.U)):
                for col in range(len(self.U[0])):
                    grad_U[row][col] += outer_h[row][col]
            for idx in range(len(self.b)):
                grad_b[idx] += dz[idx]
            dh_prev = mat_T_vec(self.U, dz)
            dc_prev = [dc[idx] * cache['f'][idx] for idx in range(self.hidden_dim)]
            dh_next = dh_prev
            dc_next = dc_prev
        return grad_W, grad_U, grad_b

    def apply_gradients(self, grad_W: Sequence[Sequence[float]], grad_U: Sequence[Sequence[float]], grad_b: Sequence[float], lr: float):
        for row in range(len(self.W)):
            for col in range(len(self.W[0])):
                self.W[row][col] -= lr * grad_W[row][col]
        for row in range(len(self.U)):
            for col in range(len(self.U[0])):
                self.U[row][col] -= lr * grad_U[row][col]
        for idx in range(len(self.b)):
            self.b[idx] -= lr * grad_b[idx]


In [None]:
class LinearLayer:
    def __init__(self, input_dim: int, output_dim: int):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.W = [[random.uniform(-0.05, 0.05) for _ in range(input_dim)] for _ in range(output_dim)]
        self.b = [0.0 for _ in range(output_dim)]
        self.last_input: List[float] = []

    def forward(self, vec_in: Sequence[float]) -> List[float]:
        self.last_input = list(vec_in)
        return vec_add(mat_vec_mul(self.W, vec_in), self.b)

    def backward(self, grad_output: Sequence[float], lr: float) -> List[float]:
        grad_input = mat_T_vec(self.W, grad_output)
        for i in range(self.output_dim):
            for j in range(self.input_dim):
                self.W[i][j] -= lr * grad_output[i] * self.last_input[j]
            self.b[i] -= lr * grad_output[i]
        return grad_input

class DeepSequenceModel:
    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
        self.lstm = LSTMCell(input_dim, hidden_dim)
        self.fc1 = LinearLayer(hidden_dim, hidden_dim)
        self.fc2 = LinearLayer(hidden_dim, output_dim)
        self.last_relu_input: List[float] = []

    def forward(self, sequence: Sequence[Sequence[float]]):
        h_last, c_last, caches = self.lstm.forward(sequence)
        z1 = self.fc1.forward(h_last)
        self.last_relu_input = list(z1)
        a1 = [relu(value) for value in z1]
        output = self.fc2.forward(a1)
        return output, caches

    def backward(self, grad_output: Sequence[float], caches: Sequence[dict], lr: float):
        grad_a1 = self.fc2.backward(grad_output, lr)
        grad_z1 = [grad_a1[i] * relu_derivative(self.last_relu_input[i]) for i in range(len(grad_a1))]
        grad_h = self.fc1.backward(grad_z1, lr)
        grad_c = zeros(len(grad_h))
        grad_W, grad_U, grad_b = self.lstm.backward(grad_h, grad_c, caches)
        self.lstm.apply_gradients(grad_W, grad_U, grad_b, lr)



In [None]:
def mean_squared_error(pred: Sequence[float], target: Sequence[float]) -> float:
    return sum((p - t) ** 2 for p, t in zip(pred, target)) / len(pred)

def train_model(model: DeepSequenceModel, sequences: Sequence[Sequence[Sequence[float]]], targets: Sequence[Sequence[float]], epochs: int = 8, lr: float = 0.0005):
    for epoch in range(epochs):
        paired = list(zip(sequences, targets))
        random.shuffle(paired)
        total_loss = 0.0
        for seq, target in paired:
            pred, caches = model.forward(seq)
            loss = mean_squared_error(pred, target)
            grad = [2.0 * (pred[i] - target[i]) / len(pred) for i in range(len(pred))]
            model.backward(grad, caches, lr)
            total_loss += loss
        avg_loss = total_loss / len(paired)
        print('Epoch {:>2}: training loss = {:.6f}'.format(epoch + 1, avg_loss))

model = DeepSequenceModel(input_dim=len(ASSETS), hidden_dim=16, output_dim=len(ASSETS))
train_model(model, train_sequences, train_targets, epochs=10, lr=0.0003)

def evaluate(model: DeepSequenceModel, sequences: Sequence[Sequence[Sequence[float]]], targets: Sequence[Sequence[float]]):
    total_loss = 0.0
    for seq, target in zip(sequences, targets):
        pred, _ = model.forward(seq)
        total_loss += mean_squared_error(pred, target)
    return total_loss / len(sequences) if sequences else 0.0

test_loss = evaluate(model, test_sequences, test_targets)
print('Test MSE: {:.6f}'.format(test_loss))



## Online Mean-Variance Allocation

At run-time we pair the model forecasts with an exponentially-weighted covariance estimator. We solve the unconstrained mean-variance problem $\max_w \mu^\top w - \lambda w^\top \Sigma w$ with a leverage cap to maintain implementability.

In [None]:
def gauss_jordan_inverse(matrix: Sequence[Sequence[float]]) -> List[List[float]]:
    n = len(matrix)
    aug = [list(row) + [1.0 if i == j else 0.0 for j in range(n)] for i, row in enumerate(matrix)]
    for col in range(n):
        pivot_row = max(range(col, n), key=lambda r: abs(aug[r][col]))
        if abs(aug[pivot_row][col]) < 1e-9:
            raise ValueError('Matrix near-singular during inversion')
        if pivot_row != col:
            aug[col], aug[pivot_row] = aug[pivot_row], aug[col]
        pivot = aug[col][col]
        for j in range(2 * n):
            aug[col][j] /= pivot
        for row in range(n):
            if row == col:
                continue
            factor = aug[row][col]
            if factor == 0.0:
                continue
            for j in range(2 * n):
                aug[row][j] -= factor * aug[col][j]
    return [row[n:] for row in aug]

def update_covariance(current: Sequence[Sequence[float]], obs: Sequence[float], decay: float) -> List[List[float]]:
    observation_outer = outer_product(obs, obs)
    decayed = scalar_matrix_mul(current, decay)
    increment = scalar_matrix_mul(observation_outer, 1.0 - decay)
    return add_matrices(decayed, increment)

def solve_weights(mu: Sequence[float], covariance: Sequence[Sequence[float]], risk_aversion: float, leverage_cap: float) -> List[float]:
    dim = len(mu)
    regularised = [list(row) for row in covariance]
    for i in range(dim):
        regularised[i][i] += 1e-6
    inv_cov = gauss_jordan_inverse(regularised)
    raw = mat_vec_mul(inv_cov, mu)
    weights = [value / max(risk_aversion, 1e-6) for value in raw]
    leverage = sum(abs(w) for w in weights)
    if leverage > leverage_cap and leverage > 0:
        scale = leverage_cap / leverage
        weights = [w * scale for w in weights]
    return weights



In [None]:
decay = 0.94
risk_aversion = 8.0
leverage_cap = 3.0
covariance = identity_matrix(len(ASSETS), scale=1e-6)
prev_weights = [0.0 for _ in ASSETS]
realised_pnls: List[float] = []
cumulative_curve: List[float] = []
weights_history: List[List[float]] = []
turnover = 0.0
cumulative = 0.0
for seq, target_scaled, realised in zip(test_sequences, test_targets, online_raw_returns):
    forecast_scaled, _ = model.forward(seq)
    forecast = [value / SCALE for value in forecast_scaled]
    covariance = update_covariance(covariance, realised, decay)
    weights = solve_weights(forecast, covariance, risk_aversion, leverage_cap)
    pnl = sum(weights[i] * realised[i] for i in range(len(ASSETS)))
    realised_pnls.append(pnl)
    cumulative += pnl
    cumulative_curve.append(cumulative)
    weights_history.append(weights)
    turnover += sum(abs(weights[i] - prev_weights[i]) for i in range(len(ASSETS))) / 2.0
    prev_weights = weights

minutes_in_year = 252 * 390
avg_pnl = sum(realised_pnls) / len(realised_pnls)
vol_pnl = statistics.pstdev(realised_pnls) if len(realised_pnls) > 1 else 0.0
annual_return = avg_pnl * minutes_in_year
annual_vol = vol_pnl * math.sqrt(minutes_in_year)
sharpe = (annual_return / annual_vol) if annual_vol > 0 else 0.0
dd = max_drawdown(cumulative_curve) if cumulative_curve else 0.0
avg_turnover = turnover / max(len(weights_history), 1)
print('Annualised return: {:.2f} bp'.format(annual_return * 100.0))
print('Annualised vol   : {:.2f} bp'.format(annual_vol * 100.0))
print('Sharpe ratio     : {:.2f}'.format(sharpe))
print('Max drawdown     : {:.2f} bp'.format(dd * 100.0))
print('Average turnover : {:.4f}'.format(avg_turnover))

print('Latest weight snapshot:')
if weights_history:
    latest = weights_history[-1]
    for asset, weight in list(zip(ASSETS, latest))[:10]:
        print('{}: {:+.4f}'.format(asset, weight))



## Discussion & Extension Roadmap

**Lifecycle.** The prototype demonstrates that a minimal pure-Python stack can ingest minute updates, propagate them through an LSTM forecaster, and deliver refreshed weights via an online mean-variance solver. Training remains lightweight (seconds) because the architecture is compact and the optimiser avoids matrix factorisations beyond a Gauss-Jordan inverse of a 26×26 matrix.

**Potential enhancements.**
1. Replace the handcrafted autodiff with a production-grade library (e.g., PyTorch) to unlock richer architectures, ensembling, and GPU acceleration.
2. Incorporate microstructure features—order-book imbalance, realised volatility, volume—to improve the predictive signal.
3. Impose execution-aware constraints (transaction costs, market impact) and extend the allocator to handle cash targets, turnover budgets, and risk parity objectives.
4. Deploy the estimator in a streaming environment (Kafka / Redis queue) with asynchronous model refresh to meet true low-latency requirements.
