
# Portfolio Optimization: Mean–Variance (Continuous) → QUBO Selection (Binary) — Colab-Ready

This single notebook walks through two complementary approaches **on the same synthetic dataset**:

1. **Classical Mean–Variance (Markowitz)** with continuous weights  
   - Build synthetic returns, estimate \(\mu\) and \(\Sigma\)  
   - Solve **Min-Variance** at target returns and **Max-Sharpe**  
   - Plot the **efficient frontier**
   - Optional sector constraints example

2. **QUBO Portfolio Selection (Cardinality-Constrained)** with binary picks  
   - Select exactly \(K\) assets (equal-weight on selected)  
   - Build a **QUBO** energy and solve via **simulated annealing** (`neal`)  
   - Decode the selected set and compute portfolio stats

All sections are **runnable in Colab**. The QUBO part reuses the exact same \(\mu\) and \(\Sigma\) estimated in the mean–variance section.


In [None]:

# Install dependencies if needed (works in Colab).
# CVXPY for convex QP; dimod/neal for QUBO simulated annealing.

def _silent_imports():
    flags = {"cvxpy": False, "dimod": False, "neal": False}
    try:
        import cvxpy as cp  # noqa: F401
        flags["cvxpy"] = True
    except Exception:
        pass
    try:
        import dimod  # noqa: F401
        flags["dimod"] = True
    except Exception:
        pass
    try:
        import neal  # noqa: F401
        flags["neal"] = True
    except Exception:
        pass
    return flags

flags = _silent_imports()
if not flags["cvxpy"]:
    %pip -q install cvxpy
if not flags["dimod"] or not flags["neal"]:
    %pip -q install dimod neal

# Recheck
flags = _silent_imports()
print("CVXPY:", flags["cvxpy"], "| dimod:", flags["dimod"], "| neal:", flags["neal"])


In [None]:

# ==== Shared Synthetic Dataset ====
import numpy as np, pandas as pd

rng = np.random.default_rng(2025)

n_assets = 16
n_periods = 520
tickers = [f"A{i+1}" for i in range(n_assets)]

# Factor model for correlated returns
F = rng.normal(0, 1, size=(n_periods, 3))
B = rng.normal(0, 0.6, size=(n_assets, 3))
epsilon = rng.normal(0, 0.02, size=(n_periods, n_assets))

returns = F @ B.T + epsilon
mu = returns.mean(axis=0)                     # expected returns
Sigma = np.cov(returns, rowvar=False)         # covariance

df_mu = pd.DataFrame({"ticker": tickers, "mu": mu})
df_S = pd.DataFrame(Sigma, index=tickers, columns=tickers)

print("Assets:", tickers)
df_mu.head(), df_S.iloc[:5,:5]



## Part 1 — Mean–Variance Optimization (Continuous Weights)

We solve:
- **Min-Variance** subject to target returns and long-only budget.
- **Max-Sharpe** portfolio (long-only).

If **CVXPY** is missing, we use a **Dirichlet sampling heuristic** as a fallback.


In [None]:

import numpy as np

try:
    import cvxpy as cp
    HAVE_CVXPY = True
except Exception:
    HAVE_CVXPY = False

def solve_min_variance_for_return(mu, Sigma, r_target, long_only=True):
    n = len(mu)
    if HAVE_CVXPY:
        w = cp.Variable(n)
        objective = cp.quad_form(w, Sigma)
        constr = [cp.sum(w) == 1, mu @ w >= r_target]
        if long_only:
            constr += [w >= 0]
        prob = cp.Problem(cp.Minimize(objective), constr)
        prob.solve(solver=cp.SCS, verbose=False)
        return w.value, prob.value, prob.status
    else:
        # Heuristic fallback
        rng = np.random.default_rng(7)
        best = (None, np.inf)
        for _ in range(25000):
            w = rng.dirichlet(np.ones(n))
            r = mu @ w
            if r >= r_target:
                var = w @ Sigma @ w
                if var < best[1]:
                    best = (w, var)
        w, var = best
        return w, var if var!=np.inf else None, "heuristic" if w is not None else "infeasible"

def solve_max_sharpe(mu, Sigma, rf=0.0, long_only=True):
    n = len(mu)
    if HAVE_CVXPY:
        # Max Sharpe via convex proxy: minimize variance with fixed excess return, then normalize
        w = cp.Variable(n)
        ret = mu @ w
        risk = cp.quad_form(w, Sigma)
        constr = [ret - rf == 1]
        if long_only:
            constr += [w >= 0]
        # Add sum(w)=v (free) to avoid trivial solution; we'll rescale later
        v = cp.Variable()
        constr += [cp.sum(w) == v]
        prob = cp.Problem(cp.Minimize(risk), constr)
        prob.solve(solver=cp.SCS, verbose=False)
        w_raw = w.value
        if w_raw is None or np.allclose(w_raw.sum(), 0):
            return None, None, "failed"
        w1 = np.maximum(w_raw, 0) if long_only else w_raw
        w1 = w1 / w1.sum()
        sharpe = (mu @ w1 - rf) / np.sqrt(w1 @ Sigma @ w1 + 1e-12)
        return w1, sharpe, "cvxpy"
    else:
        # Heuristic sampling
        rng = np.random.default_rng(11)
        best = (None, -np.inf)
        for _ in range(60000):
            w = rng.dirichlet(np.ones(n))
            r = mu @ w
            v = w @ Sigma @ w
            s = (r - rf) / np.sqrt(v + 1e-12)
            if s > best[1]:
                best = (w, s)
        return best[0], best[1], "heuristic"


In [None]:

import numpy as np, pandas as pd

targets = np.linspace(df_mu["mu"].min(), df_mu["mu"].max(), 24)
frontier = []
for r_t in targets:
    w, var, status = solve_min_variance_for_return(df_mu["mu"].values, df_S.values, r_t, long_only=True)
    if w is not None and var is not None:
        frontier.append({"target_return": r_t, "variance": float(var), "std": float(np.sqrt(var)), "status": status})
df_frontier = pd.DataFrame(frontier)

w_sharpe, sharpe_ratio, sharpe_status = solve_max_sharpe(df_mu["mu"].values, df_S.values, rf=0.0, long_only=True)
ret_sh = float(df_mu["mu"].values @ w_sharpe) if w_sharpe is not None else None
vol_sh = float(np.sqrt(w_sharpe @ df_S.values @ w_sharpe)) if w_sharpe is not None else None

print("Max-Sharpe status:", sharpe_status)
print("Max-Sharpe return:", ret_sh)
print("Max-Sharpe vol:", vol_sh)
print("Max-Sharpe ratio:", sharpe_ratio)
df_frontier.head()


In [None]:

import matplotlib.pyplot as plt
import numpy as np

plt.figure()
if not df_frontier.empty:
    plt.scatter(df_frontier["std"], df_frontier["target_return"])
    plt.title("Efficient Frontier (std vs return)")
    plt.xlabel("Volatility (std)")
    plt.ylabel("Expected Return")
if w_sharpe is not None:
    plt.scatter([((w_sharpe @ df_S.values @ w_sharpe)**0.5)], [df_mu["mu"].values @ w_sharpe])
plt.tight_layout()



### Optional: Sector Caps (Example)
Impose simple sector caps (toy labels) with CVXPY, if available.


In [None]:

try:
    import cvxpy as cp
    HAVE_CVXPY = True
except Exception:
    HAVE_CVXPY = False

sectors = {t: ("Tech" if i%3==0 else ("Health" if i%3==1 else "Fin")) for i,t in enumerate(df_mu["ticker"])}
sector_cap = {"Tech": 0.5, "Health": 0.5, "Fin": 0.5}

if HAVE_CVXPY:
    n = len(df_mu)
    w = cp.Variable(n)
    risk = cp.quad_form(w, df_S.values)
    ret = df_mu["mu"].values @ w
    constr = [cp.sum(w) == 1, w >= 0]
    for s, cap in sector_cap.items():
        idx = [k for k,t in enumerate(df_mu["ticker"]) if sectors[t]==s]
        if idx:
            constr += [cp.sum(w[idx]) <= cap]
    # Example trade-off: minimize risk - 0.5 * return
    prob = cp.Problem(cp.Minimize(risk - 0.5*ret), constr)
    prob.solve(solver=cp.SCS, verbose=False)
    w_sec = w.value
    print("Sector-constrained status:", prob.status)
else:
    print("CVXPY not available; skipping sector-constrained example.")



## Part 2 — QUBO Portfolio Selection (Binary, Cardinality \(K\))

Using the **same \(\mu\) and \(\Sigma\)**, we now pick **exactly \(K\)** assets (equal weight among selected).  
Define \(z_i\in\{0,1\}\) with the energy to **minimize**:
\[
E = \lambda \frac{1}{K^2} z^\top \Sigma z \;\;-\;\; (1-\lambda)\frac{1}{K}\sum_i \mu_i z_i \;+\; A\,\big(\sum_i z_i - K\big)^2,
\]
with large enough penalty \(A\) to enforce cardinality.


In [None]:

from collections import defaultdict
import numpy as np
import dimod, neal

N = len(df_mu)
K = 6
lam = 0.6

# Penalty magnitude (tunable)
A_pen = float(10.0 * max(1e-9, np.max(np.abs(df_mu["mu"].values))))

Q = defaultdict(float)
mu_vec = df_mu["mu"].values
S = df_S.values

# Risk term
coef_risk = lam / (K**2)
for i in range(N):
    for j in range(N):
        Q[(i,j)] += coef_risk * S[i,j]

# Return term (linear on diagonal because QUBO stores linear as diag)
coef_ret = - (1 - lam) / K
for i in range(N):
    Q[(i,i)] += coef_ret * mu_vec[i]

# Cardinality penalty: A*(sum z - K)^2 = A*(sum z_i + 2 sum_{i<j} z_i z_j - 2K sum z_i + K^2)
for i in range(N):
    Q[(i,i)] += A_pen * (1 - 2*K)
for i in range(N):
    for j in range(i+1, N):
        Q[(i,j)] += 2 * A_pen

bqm = dimod.BinaryQuadraticModel.from_qubo(dict(Q))

sampler = neal.SimulatedAnnealingSampler()
sampleset = sampler.sample(bqm, num_reads=4000)
best = sampleset.first

import numpy as np, pandas as pd
z = np.array([best.sample.get(i, 0) for i in range(N)])
sel_idx = np.where(z==1)[0]
sel_tickers = [df_mu["ticker"].iloc[i] for i in sel_idx]

print("Selected K (should be", K, "):", int(z.sum()))
print("Selected tickers:", sel_tickers)


In [None]:

import numpy as np, pandas as pd

if z.sum() > 0:
    w_eq = z / z.sum()  # equal weight over selected
    port_mu = float(mu @ w_eq)
    port_var = float(w_eq @ Sigma @ w_eq)
    port_std = port_var**0.5
else:
    port_mu = None
    port_std = None

viol = int(abs(int(z.sum()) - K))
print("Cardinality violation (abs):", viol)
print("QUBO energy:", best.energy)
print("Portfolio expected return:", port_mu)
print("Portfolio volatility (std):", port_std)

df_selected = pd.DataFrame({
    "ticker": df_mu["ticker"],
    "selected": z.astype(int),
    "mu": df_mu["mu"]
})
display(df_selected[df_selected["selected"]==1])
