In [1]:
import numpy as np
import pandas as pd
from sklearn.covariance import LedoitWolf
import matplotlib.pyplot as plt
from matplotlib import cm
import seaborn as sns
import os
import warnings
import cvxpy as cp

In [2]:
# reproducibility
RNG = np.random.default_rng(42)
warnings.filterwarnings('ignore')

In [3]:
# ---------------------------- Data utilities ----------------------------

def make_mock_data(n_assets=50, n_days=252*3, seed=42):
    """Create mock price returns and ESG scores for demonstration.

    Returns:
      prices: pd.DataFrame shape (n_days, n_assets)
      esg: pd.Series length n_assets (0..1 normalized)
    """
    rng = np.random.default_rng(seed)  # random number generator
    
    # Create factor structure
    n_factors = max(3, n_assets // 10)
    T = n_days
    F = rng.normal(scale=0.02, size=(T, n_factors))
    loadings = rng.normal(scale=0.5, size=(n_assets, n_factors))

    specific = rng.normal(scale=0.01, size=(T, n_assets))
    returns = F.dot(loadings.T) + specific
    # add small drift
    mu_true = rng.normal(0.0005, 0.001, size=n_assets)
    returns += mu_true

    # build cumulative prices
    prices = 100 * np.exp(np.cumsum(returns, axis=0))
    dates = pd.date_range(end=pd.Timestamp.today(), periods=T, freq='B')
    prices_df = pd.DataFrame(prices, index=dates, columns=[f'A{i}' for i in range(n_assets)])

    # ESG scores
    esg_raw = rng.uniform(0, 1, size=n_assets)
    # add sector bias
    sector_bias = np.linspace(-0.2, 0.2, n_assets)
    esg = esg_raw + sector_bias * 0.1
    esg = (esg - esg.min()) / (esg.max() - esg.min())
    esg_s = pd.Series(esg, index=prices_df.columns)

    return prices_df, esg_s

In [4]:
prices, esg = make_mock_data(n_assets=50, n_days=252*3, seed=42)

## Compute mu sigma

In [5]:
def compute_mu_sigma(prices, lookback=252, shrinkage=True):
    """Estimate expected returns (mu) and covariance (Sigma) from price data.

    - mu: annualized simple mean of daily returns
    - Sigma: annualized covariance using Ledoit-Wolf if shrinkage=True
    """
    # compute daily returns
    rets = prices.pct_change().dropna()
    mu_daily = rets.mean()
    mu_annual = mu_daily * 252

    if shrinkage:
        lw = LedoitWolf().fit(rets.values)
        Sigma_daily = lw.covariance_
    else:
        Sigma_daily = np.cov(rets.values.T)

    Sigma_annual = Sigma_daily * 252
    mu = mu_annual.values
    Sigma = Sigma_annual
    assets = rets.columns.tolist()
    return mu, Sigma, assets


# Solvers

## QP Solver

In [6]:
# ---------------------------- Baseline QP ----------------------------

import cvxpy as cp
import numpy as np
import pandas as pd
from typing import Union # For type hinting

def solve_scalarized_qp(
    Sigma: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    allow_short: bool = False,
    allow_cash: bool = False, # New parameter
    solver = cp.OSQP
) -> np.ndarray:
    """Solves a convex quadratic portfolio optimization problem.

    This function finds the optimal portfolio weights 'w' that minimize a
    scalarized objective function combining portfolio variance (risk),
    expected return, and ESG score, subject to standard portfolio constraints.

    The objective function minimized is:
        w^T * Sigma * w - lamb * (mu^T * w) - eta * (esg^T * w)
    Effectively, it maximizes a utility function U = lamb*Return + eta*ESG - Risk.

    Args:
        Sigma (np.array | pd.DataFrame): Covariance matrix (n x n). Must be positive semi-definite.
        mu (np.array | pd.Series): Expected returns vector (n,).
        esg (np.array | pd.Series): ESG scores vector (n,).
        lamb (float, optional): Weight for the expected return term. Defaults to 1.0.
        eta (float, optional): Weight for the ESG score term. Defaults to 0.1.
        lb (float, optional): Lower bound for individual asset weights. Defaults to 0.0.
        ub (float, optional): Upper bound for individual asset weights. Defaults to 1.0.
        allow_short (bool, optional): If True, ignores the lb >= 0 constraint. Defaults to False.
        allow_cash (bool, optional): If True, constraints sum(weights) <= 1 instead of == 1,
                                     allowing for a portion of the portfolio to be uninvested (cash).
                                     Defaults to False (fully invested).
        solver (cp.Solver, optional): The CVXPY solver. Defaults to cp.OSQP.

    Returns:
        np.array: Optimal portfolio weights (shape n,).

    Raises:
        RuntimeError: If the solver fails.
        ValueError: If input dimensions are inconsistent.
    """
    # --- Input Validation and Conversion ---
    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    n = len(mu_np)

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    if isinstance(Sigma, pd.DataFrame):
        Sigma_np = Sigma.values
    else:
        Sigma_np = np.array(Sigma)

    if len(esg_np) != n or Sigma_np.shape != (n, n):
         raise ValueError(
             f"Dimension mismatch: n={n}, mu={len(mu_np)}, esg={len(esg_np)}, Sigma={Sigma_np.shape}"
         )
    # ----------------------------------------

    w = cp.Variable(n, name="weights")

    # --- Objective Function ---
    objective = cp.quad_form(w, Sigma_np) - lamb * (mu_np @ w) - eta * (esg_np @ w)

    # --- Constraints ---
    constraints = []

    # Budget Constraint: sum(w) == 1 (fully invested) or sum(w) <= 1 (allow cash)
    if allow_cash:
        constraints.append(cp.sum(w) <= 1)
        # If allowing cash, weights must still be non-negative unless shorting is also allowed
        if not allow_short:
             constraints.append(w >= 0) # Ensure weights don't go negative to fund cash
    else:
        constraints.append(cp.sum(w) == 1)

    # Individual Weight Bounds
    if not allow_short:
        # If sum(w) <= 1 is used, lb might technically not be needed if >= 0 already added
        # but including it handles cases where lb > 0 is desired.
        constraints.append(w >= lb)
    # If allow_short is True, we don't add w >= lb constraint.
    # The upper bound always applies.
    constraints.append(w <= ub)

    # --- Solve Problem ---
    prob = cp.Problem(cp.Minimize(objective), constraints)
    try:
        prob.solve(solver=solver, verbose=False)
        if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
             raise RuntimeError(f'QP solver failed or did not find optimal solution. Status: {prob.status}')
        if w.value is None:
             raise RuntimeError('QP solver finished but weights are None.')

        result_w = np.array(w.value).flatten()
        # Clean near-zero values
        result_w[np.abs(result_w) < 1e-8] = 0.0
        return result_w
    except Exception as e:
        raise RuntimeError(f'QP solver failed during execution: {e}')


In [7]:
# --- Example Usage ---
n_assets = 5
Sigma_example = np.cov(np.random.rand(n_assets, 100))
mu_example = np.random.rand(n_assets) * 0.1
esg_example = np.random.rand(n_assets)

# Fully invested
w_fully_invested = solve_scalarized_qp(Sigma_example, mu_example, esg_example, allow_cash=False)
print(f"Fully invested weights sum: {w_fully_invested.sum():.4f}")
print(w_fully_invested)

# Allow cash (if optimal solution involves holding cash)
w_allow_cash = solve_scalarized_qp(Sigma_example, mu_example, esg_example, allow_cash=True)
print(f"\nAllow cash weights sum: {w_allow_cash.sum():.4f}")
print(w_allow_cash)
if w_allow_cash.sum() < 0.9999:
    print(f"Cash held: {1 - w_allow_cash.sum():.2%}")

Fully invested weights sum: 1.0000
[0.20824409 0.03271252 0.         0.16669804 0.59234536]

Allow cash weights sum: 1.0000
[0.20824409 0.03271252 0.         0.16669804 0.59234536]


## CVaR solver

In [8]:
# ---------------------------- CVaR optimization ----------------------------

import cvxpy as cp
import numpy as np
import pandas as pd # Assuming pandas might be involved
from typing import Union # For type hinting

# ---------------------------- CVaR optimization ----------------------------

def solve_cvar_lp(
    returns_scenarios: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    alpha: float = 0.95,
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    allow_short: bool = False,
    allow_cash: bool = False, # New parameter
    solver = cp.OSQP
) -> np.ndarray:
    """Solves portfolio optimization minimizing CVaR - lambda*Return - eta*ESG.

    This function uses a linear programming formulation (based on Rockafellar-Uryasev)
    to find the optimal portfolio weights 'w' that minimize a weighted combination
    of Conditional Value-at-Risk (CVaR) and negative expected return and ESG score.

    The objective function minimized is approximately:
        CVaR_{alpha}(Loss) - lamb * (mu^T * w) - eta * (esg^T * w)
    where Loss = -Return.

    Args:
        returns_scenarios (np.array | pd.DataFrame): Shape (T, n) of T scenarios
                                                    for n assets. Each row is one
                                                    scenario's returns.
        mu (np.array | pd.Series): Shape (n,) of expected returns.
        esg (np.array | pd.Series): Shape (n,) of ESG scores.
        alpha (float, optional): Confidence level for CVaR (e.g., 0.95 for 95% CVaR).
                                 Defaults to 0.95.
        lamb (float, optional): Weight for the expected return term in the objective.
                                Higher values prioritize return. Defaults to 1.0.
        eta (float, optional): Weight for the ESG term in the objective. Higher values
                               prioritize ESG. Defaults to 0.1.
        lb (float, optional): Lower bound for individual asset weights. Defaults to 0.0.
        ub (float, optional): Upper bound for individual asset weights. Defaults to 1.0.
        allow_short (bool, optional): If True, ignores the non-negativity implied by lb=0.0
                                      and allows weights to be negative. Defaults to False.
        allow_cash (bool, optional): If True, constraints sum(weights) <= 1 instead of == 1,
                                     allowing for a portion of the portfolio to be uninvested (cash).
                                     Defaults to False (fully invested).
        solver (cp.Solver, optional): CVXPY solver to use (e.g., cp.OSQP, cp.ECOS, cp.SCS).
                                      Defaults to cp.OSQP.

    Returns:
        np.array: Optimal portfolio weights, shape (n,).

    Raises:
        RuntimeError: If the solver fails or does not find an optimal solution.
        ValueError: If input dimensions are inconsistent.
    """
    # --- Input Validation and Conversion ---
    if isinstance(returns_scenarios, pd.DataFrame):
        returns_scenarios_np = returns_scenarios.values
    else:
        returns_scenarios_np = np.array(returns_scenarios)

    T, n = returns_scenarios_np.shape

    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    # Check dimensions
    if len(mu_np) != n or len(esg_np) != n:
        raise ValueError(
            f"Dimension mismatch: n={n} from scenarios, mu={len(mu_np)}, esg={len(esg_np)}"
        )
    # ----------------------------------------

    # --- CVXPY Variables ---
    w = cp.Variable(n, name="weights")
    v = cp.Variable(1, name="VaR_alpha")          # Auxiliary variable for VaR
    xi = cp.Variable(T, nonneg=True, name="losses_over_VaR") # Auxiliary variables for losses exceeding VaR

    # --- Calculate Portfolio Losses per Scenario ---
    portfolio_losses = -returns_scenarios_np @ w  # Shape (T,)

    # --- Core CVaR Constraint ---
    # xi_t >= Loss_t - v  (for all t=1..T)
    # Ensures xi captures the positive part of (Loss_t - v)
    cvar_core_constraint = xi >= portfolio_losses - v

    # --- Define Constraints List ---
    constraints = [
        w <= ub,                 # Upper bound on weights
        cvar_core_constraint     # The CVaR formulation constraint
        # xi >= 0 is handled by cp.Variable(T, nonneg=True)
    ]

    # --- Budget Constraint (Allow Cash or Fully Invested) ---
    if allow_cash:
        constraints.append(cp.sum(w) <= 1)
        # If allowing cash, weights usually must still be non-negative
        # unless shorting is *also* allowed. Add non-negativity if needed.
        if not allow_short:
             constraints.append(w >= 0) # Prevent negative weights funding cash
    else:
        # Fully invested
        constraints.append(cp.sum(w) == 1)

    # --- Lower Bound / Short Selling Constraint ---
    if not allow_short:
        # Add the lower bound constraint (potentially redundant if allow_cash added w >= 0)
        # but handles cases where lb > 0 is desired.
        constraints.append(w >= lb)
    # If allow_short is True, we simply don't add w >= lb.

    # --- Objective Function ---
    # CVaR = VaR + Average Tail Loss
    cvar_term = v + (1.0 / ((1 - alpha) * T)) * cp.sum(xi)
    # Minimize: CVaR - lambda * Expected Return - eta * ESG Score
    objective = cvar_term - lamb * (mu_np @ w) - eta * (esg_np @ w)

    # --- Problem Setup & Solving ---
    prob = cp.Problem(cp.Minimize(objective), constraints)
    prob.solve(solver=cp.OSQP, verbose=False)
    if w.value is None:
        raise RuntimeError('CVaR LP solver failed')
    return np.array(w.value).reshape(-1,)

In [9]:
# --- Example Usage ---
n_assets = 5
T_scenarios = 200
returns_scenarios_example = np.random.randn(T_scenarios, n_assets) / 100
mu_example = pd.Series(returns_scenarios_example.mean(axis=0))
esg_example = np.random.rand(n_assets)

# Fully invested
w_cvar_full = solve_cvar_lp(returns_scenarios_example, mu_example, esg_example, allow_cash=False, solver=cp.OSQP)
print(f"CVaR Fully invested weights sum: {w_cvar_full.sum():.4f}")
print(np.round(w_cvar_full, 4))

# Allow cash
w_cvar_cash = solve_cvar_lp(returns_scenarios_example, mu_example, esg_example, allow_cash=True, solver=cp.OSQP)
print(f"\nCVaR Allow cash weights sum: {w_cvar_cash.sum():.4f}")
print(np.round(w_cvar_cash, 4))
if w_cvar_cash.sum() < 0.9999:
    print(f"Cash held: {1 - w_cvar_cash.sum():.2%}")

CVaR Fully invested weights sum: 1.0000
[ 0.  1. -0.  0.  0.]

CVaR Allow cash weights sum: 1.0000
[0. 1. 0. 0. 0.]


---
## NSGA-II (ignore)
This code uses the NSGA-II (Non-dominated Sorting Genetic Algorithm II), a popular evolutionary algorithm, to solve a multi-objective portfolio optimization problem. Instead of finding a single "best" portfolio (scalarized optomization), it aims to find a set of optimal portfolios representing the best possible trade-offs between three conflicting goals:
1. Minimize Risk
2. Maximize Return
3. Maximize ESG Score

It accommodates discrete cardinality constraints and scales flexibly. Itâ€™s conceptually compatible with quantum hybrid solvers that can explore these Pareto surfaces faster in high-dimensional ESG-constrained spaces

In [46]:
# ---------------------------- NSGA-II multiobjective ----------------------------

from pymoo.factory import get_sampling, get_crossover, get_mutation, get_problem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize
from pymoo.factory import get_termination
from pymoo.core.problem import ElementwiseProblem

class PortfolioProblem(ElementwiseProblem):
    """
    Defines the multi-objective portfolio optimization problem for pymoo using NSGA-II.
    """
    
    def __init__(self, Sigma, mu, esg, lb=0.0, ub=1.0, cardinality_k=None):
        self.Sigma = Sigma
        self.mu = mu
        self.esg = esg
        self.n = len(mu)
        self.lb = lb
        self.ub = ub
        self.k = cardinality_k
        super().__init__(n_var=self.n, n_obj=3, n_constr=0, xl=lb, xu=ub)

    def _evaluate(self, x, out, *args, **kwargs):
        """
        Evaluates a single candidate solution (portfolio weight vector).

        This method takes a raw weight vector `x` generated by the NSGA-II
        operators (like crossover, mutation), repairs it to ensure it represents
        a feasible portfolio (non-negative, sums to 1, meets cardinality limit),
        and then calculates the three objective function values for this
        feasible portfolio.

        Args:
            x (np.array): A candidate weight vector from the algorithm (n,).
                          May not be feasible initially.
            out (dict): A dictionary to store the evaluation results.
                        `out['F']` stores the objective values [risk, -ret, -esg].
                        `out['X']` stores the repaired, feasible weight vector.
        """
        # Ensure weights from crossover/mutation >= 0
        x_repaired = np.maximum(x, 0.0)

        # --- Normalize Weights to Sum to 1 ---
        sum_x = x_repaired.sum()
        
        # Use a small tolerance to handle potential floating point issues
        if sum_x <= 1e-8:
            # If sum is effectively zero, reset to an equal-weight portfolio
            # This avoids division by zero and keeps the process going.
            x_repaired = np.ones(self.n) / self.n
        else:
            # Normalize to ensure weights sum exactly to 1
            x_repaired = x_repaired / sum_x

        # --- Repair Step 3: Enforce Cardinality Constraint (Optional) ---
        if self.k is not None and self.k < self.n:
            # Check if k is valid
            if self.k <= 0:
                 x_repaired = np.zeros(self.n) # Portfolio with zero weight everywhere
            else:
                # Find the indices of the 'k' largest weights
                idx = np.argsort(-x_repaired)[:self.k]

                # Create a mask: 1 for top-k assets, 0 otherwise
                mask = np.zeros_like(x_repaired)
                mask[idx] = 1.0

                # Apply the mask to zero out non-top-k weights
                x_repaired = x_repaired * mask

                # --- Repair Step 3b: Re-normalize after cardinality ---
                # Ensure the remaining top-k weights still sum to 1
                sum_x_k = x_repaired.sum()
                if sum_x_k <= 1e-8:
                    # If the sum of top k became zero (e.g., all were tiny),
                    # distribute weight equally among the initially chosen k assets.
                    equal_weight_k = 1.0 / self.k
                    x_repaired[idx] = equal_weight_k # Assign equal weight only to the selected indices
                    # Ensure others remain zero if they weren't in idx
                    x_repaired[~np.isin(np.arange(self.n), idx)] = 0.0

                else:
                    x_repaired = x_repaired / sum_x_k

        # --- Calculate Objective Values using the Repaired Weights ---
        # Objective 1: Minimize Risk (Variance)
        risk = float(x_repaired.T @ self.Sigma @ x_repaired)

        # Objective 2: Minimize Negative Return (i.e., Maximize Return)
        neg_return = float(-self.mu @ x_repaired)

        # Objective 3: Minimize Negative ESG Score (i.e., Maximize ESG)
        neg_esg = float(-self.esg @ x_repaired)

        # --- Store Results for pymoo ---
        # Store objective values in the specified key 'F'
        out["F"] = [risk, neg_return, neg_esg]
        # Store the final, feasible, repaired weight vector in 'X'
        # This ensures that the solutions returned by minimize() are valid
        out["X"] = x_repaired

In [58]:
# ---------------------------- Cardinality two-stage heuristic ----------------------------

def solve_cardinality_heuristic_two_stage(
    Sigma: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    k: int,
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    gamma: float = 1e-3,
    solver = cp.OSQP
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Approximates a cardinality-constrained portfolio optimization using a two-stage heuristic.

    Directly enforcing a cardinality constraint (limiting the number of assets with
    non-zero weights to 'k') makes the optimization problem NP-hard (non-convex).
    This function implements a common heuristic to find an approximate solution:

    1.  **Stage 1 (Regularized QP):** Solve the original scalarized QP problem
        but add a small L2 (Ridge) regularization term (gamma * I) to the
        covariance matrix. This encourages smaller weights overall, potentially
        making less important assets have weights closer to zero, although it
        doesn't guarantee exact sparsity like L1 regularization would.
        The objective is: min w^T (Sigma + gamma*I) w - lamb * mu^T w - eta * esg^T w

    2.  **Stage 2 (Restricted QP):** Select the 'k' assets with the largest
        absolute weights from the Stage 1 solution. Then, re-solve the *original*
        QP problem (without regularization), but restrict the solution space by
        forcing the weights of the non-selected (n-k) assets to be exactly zero.

    Args:
        Sigma (Union[np.ndarray, pd.DataFrame]): Covariance matrix (n x n).
                                                Must be positive semi-definite.
        mu (Union[np.ndarray, pd.Series]): Expected returns vector (n,).
        esg (Union[np.ndarray, pd.Series]): ESG scores vector (n,).
        k (int): The desired maximum number of assets with non-zero weights
                 (the cardinality constraint). Must be > 0 and <= n.
        lamb (float, optional): Weight for the expected return term in the objective.
                                Defaults to 1.0.
        eta (float, optional): Weight for the ESG score term in the objective.
                               Defaults to 0.1.
        lb (float, optional): Lower bound for weights (e.g., 0.0 for long-only).
                              This bound is applied in both stages, respecting the
                              non-negativity constraint unless explicitly set lower.
                              Defaults to 0.0.
        ub (float, optional): Upper bound for weights. Defaults to 1.0.
        gamma (float, optional): The strength of the L2 (Ridge) regularization
                                 added in Stage 1. A small positive value helps
                                 stabilize the selection process. Defaults to 1e-3.
        solver (cp.Solver, optional): The CVXPY solver to use for both QP stages.
                                      Defaults to cp.OSQP.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
            - w_stage1 (np.ndarray): The raw (dense) weight vector from Stage 1 (n,).
            - w_stage2 (np.ndarray): The final sparse weight vector from Stage 2 (n,),
                                     approximately satisfying the cardinality limit.
            - selected_indices (np.ndarray): The indices of the 'k' assets selected
                                             after Stage 1.

    Raises:
        ValueError: If k is not a valid cardinality number (1 <= k <= n) or if
                    input dimensions are inconsistent after conversion.
        RuntimeError: If either of the underlying QP solvers fail or do not converge
                      to an optimal solution.
    """
    # --- Input Validation and Conversion ---
    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    n = len(mu_np)

    if not isinstance(k, int) or not (0 < k <= n):
        raise ValueError(f"Cardinality k={k} must be an integer between 1 and n={n}")

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    if isinstance(Sigma, pd.DataFrame):
        Sigma_np = Sigma.values
    else:
        Sigma_np = np.array(Sigma)

    # Dimension checks
    if len(esg_np) != n or Sigma_np.shape != (n, n):
        raise ValueError(
            f"Dimension mismatch: n={n}, mu={len(mu_np)}, esg={len(esg_np)}, Sigma={Sigma_np.shape}"
        )

    # --- Stage 1: Solve Regularized QP ---
    # Add small diagonal L2 (Ridge) regularization to Sigma
    Sigma_reg = Sigma_np + gamma * np.eye(n)

    print(f"Running Stage 1: Regularized QP (gamma={gamma})...")
    try:
        # Assuming allow_short=False if lb >= 0, consistent with typical use
        allow_short_stage1 = (lb < 0)
        w_stage1 = solve_scalarized_qp(
            Sigma_reg, mu_np, esg_np, lamb=lamb, eta=eta, lb=lb, ub=ub,
            allow_short=allow_short_stage1, solver=solver
        )
        # Ensure w_stage1 is a flat array
        w_stage1 = np.array(w_stage1).flatten()
        if len(w_stage1) != n: # Post-condition check
             raise RuntimeError("Stage 1 solver returned weights of incorrect dimension.")

    except RuntimeError as e:
        raise RuntimeError(f"Stage 1 QP solver failed: {e}")
    except Exception as e:
        raise RuntimeError(f"An unexpected error occurred in Stage 1: {e}")

    # --- Asset Selection ---
    # Select top-k assets based on the absolute magnitude of weights from Stage 1
    selected_indices = np.argsort(-np.abs(w_stage1))[:k]
    # Create a boolean mask for selected assets for efficient indexing later
    mask = np.zeros(n, dtype=bool)
    mask[selected_indices] = True
    print(f"Stage 1 finished. Selected top {k} asset indices.")

    # --- Stage 2: Re-optimize Restricted QP ---
    print("Running Stage 2: Restricted QP...")
    w_reopt = cp.Variable(n, name="w_restricted")
    objective_stage2 = cp.Minimize(
        cp.quad_form(w_reopt, Sigma_np) - lamb * (mu_np @ w_reopt) - eta * (esg_np @ w_reopt)
    )

    constraints_stage2 = [
        cp.sum(w_reopt) == 1,
        w_reopt <= ub # Upper bound applies to all potentially selected variables
    ]
    # Apply lower bound constraint (respecting potential short allowance via lb)
    constraints_stage2.append(w_reopt >= lb)

    # Enforce zero weights for non-selected assets
    # Using array slicing with the boolean mask is efficient in cvxpy
    constraints_stage2.append(w_reopt[~mask] == 0)

    prob_stage2 = cp.Problem(objective_stage2, constraints_stage2)

    try:
        prob_stage2.solve(solver=solver, verbose=False)

        if prob_stage2.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
            raise RuntimeError(f'Stage 2 Restricted QP solver failed or did not find optimal solution. Status: {prob_stage2.status}')
        if w_reopt.value is None:
            raise RuntimeError('Stage 2 Restricted QP solver finished but weights are None.')

        # Ensure the output is a flat numpy array and clean near-zero values
        w_stage2 = np.array(w_reopt.value).flatten()
        w_stage2[np.abs(w_stage2) < 1e-8] = 0.0 # Clean small values
        w_stage2[~mask] = 0.0 # Explicitly set non-selected to zero

        # Optional: Re-normalize if sum is slightly off due to solver tolerance
        current_sum = w_stage2.sum()
        if not np.isclose(current_sum, 1.0) and current_sum > 1e-8:
             print(f"Warning: Stage 2 weights sum to {current_sum:.6f}. Re-normalizing.")
             w_stage2 /= current_sum
             w_stage2[~mask] = 0.0 # Ensure zeros remain zero after normalization

    except Exception as e:
        raise RuntimeError(f'Stage 2 Restricted QP solver failed during execution: {e}')

    print("Stage 2 finished.")
    return w_stage1, w_stage2, selected_indices

In [47]:
def run_nsga2(Sigma, mu, esg, pop_size=200, generations=200, cardinality_k=None,
              cr_prob=0.9, cr_eta=15, mut_prob=None, mut_eta=20, seed=42):

    """
    Runs the NSGA-II algorithm for multi-objective portfolio optimization.

    This function sets up and executes the NSGA-II algorithm using the
    `PortfolioProblem` definition to find a set of Pareto-optimal portfolio
    allocations that represent the best trade-offs between minimizing risk
    (variance), maximizing expected return, and maximizing ESG score.

    Args:
        Sigma (np.array | pd.DataFrame): Covariance matrix of asset returns (n x n).
        mu (np.array | pd.Series): Vector of expected returns (n,).
        esg (np.array | pd.Series): Vector of ESG scores (n,).
        pop_size (int, optional): The number of individuals (portfolios) in each
                                  generation of the NSGA-II algorithm. Defaults to 200.
        generations (int, optional): The number of generations the algorithm will run.
                                     Defaults to 200.
        cardinality_k (int, optional): The maximum number of assets allowed to have
                                       non-zero weights in the portfolio. Passed to
                                       PortfolioProblem. Defaults to None (no limit).
        cr_prob (float, optional): Crossover probability for the SBX operator.
                                   Defaults to 0.9.
        cr_eta (float, optional): Eta parameter for the SBX crossover operator, controlling
                                  the distribution spread. Defaults to 15.
        mut_prob (float, optional): Mutation probability for the PM operator. If None,
                                    it defaults to 1/n (where n is the number of assets).
                                    Defaults to None.
        mut_eta (float, optional): Eta parameter for the PM mutation operator.
                                   Defaults to 20.
        seed (int, optional): Random seed for reproducibility of the results.
                              Defaults to 42.

    Returns:
        pymoo.optimize.Result: The result object from pymoo's `minimize`.
                               Key attributes include:
                               - `res.X`: A NumPy array where each row is a feasible,
                                          Pareto-optimal portfolio weight vector (n,).
                               - `res.F`: A NumPy array where each row contains the
                                          corresponding objective values
                                          [risk, -return, -esg] for the portfolios in `res.X`.
    """

    # Create the problem instance (handles input conversion)
    problem = PortfolioProblem(Sigma=Sigma, mu=mu, esg=esg, cardinality_k=cardinality_k)

    # Determine default mutation probability if not provided
    n_assets = problem.n
    if mut_prob is None:
        mut_prob = 1.0 / n_assets
    
    algorithm = NSGA2(pop_size=pop_size,
                      sampling=get_sampling('real_random'),
                      crossover=get_crossover('real_sbx'),
                      mutation=get_mutation('real_pm'),
                      eliminate_duplicates=True # Helps maintain diversity
                     )

    termination = get_termination('n_gen', generations)
    res = minimize(problem,
                   algorithm,
                   termination,
                   seed=seed,
                   save_history=False,     # Typically not needed, saves memory
                   verbose=False)

    # res.X are the solutions (normalized already by _evaluate)
    return res

## Cardinality two-stage heuristic (ignore)

In [53]:
# Assuming solve_scalarized_qp is defined elsewhere and works correctly
# from your_module import solve_scalarized_qp

# ---------------------------- Cardinality two-stage heuristic ----------------------------

def solve_cardinality_heuristic_two_stage(
    Sigma: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    k: int,
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    gamma: float = 1e-3
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Approximates a cardinality-constrained portfolio optimization using a two-stage heuristic.

    Directly enforcing a cardinality constraint (limiting the number of assets with
    non-zero weights to 'k') makes the optimization problem NP-hard (non-convex).
    This function implements a common heuristic to find an approximate solution:

    1.  **Stage 1 (Regularized QP):** Solve the original scalarized QP problem
        but add a small L2 (Ridge) regularization term (gamma * I) to the
        covariance matrix. This encourages smaller weights overall, potentially
        making less important assets have weights closer to zero, although it
        doesn't guarantee exact sparsity like L1 regularization would.
        The objective is: min w^T (Sigma + gamma*I) w - lamb * mu^T w - eta * esg^T w

    2.  **Stage 2 (Restricted QP):** Select the 'k' assets with the largest
        absolute weights from the Stage 1 solution. Then, re-solve the *original*
        QP problem (without regularization), but restrict the solution space by
        forcing the weights of the non-selected (n-k) assets to be exactly zero.

    Args:
        Sigma (Union[np.ndarray, pd.DataFrame]): Covariance matrix (n x n).
        mu (Union[np.ndarray, pd.Series]): Expected returns vector (n,).
        esg (Union[np.ndarray, pd.Series]): ESG scores vector (n,).
        k (int): The desired maximum number of assets with non-zero weights
                 (the cardinality constraint). Must be > 0 and <= n.
        lamb (float, optional): Weight for the expected return term. Defaults to 1.0.
        eta (float, optional): Weight for the ESG score term. Defaults to 0.1.
        lb (float, optional): Lower bound for weights (e.g., 0.0 for long-only).
                              Defaults to 0.0.
        ub (float, optional): Upper bound for weights. Defaults to 1.0.
        gamma (float, optional): The strength of the L2 (Ridge) regularization
                                 added in Stage 1. A small positive value.
                                 Defaults to 1e-3.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
            - w_stage1 (np.ndarray): The raw (dense) weight vector from Stage 1.
            - w_stage2 (np.ndarray): The final sparse weight vector from Stage 2,
                                     satisfying the cardinality constraint.
            - selected_indices (np.ndarray): The indices of the 'k' assets selected
                                             after Stage 1.

    Raises:
        ValueError: If k is not a valid cardinality number or if input dimensions
                    are inconsistent.
        RuntimeError: If either of the underlying QP solvers fail.
    """
    # --- Input Validation and Conversion ---
    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    n = len(mu_np)

    if not (0 < k <= n):
        raise ValueError(f"Cardinality k={k} must be between 1 and n={n}")

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    if isinstance(Sigma, pd.DataFrame):
        Sigma_np = Sigma.values
    else:
        Sigma_np = np.array(Sigma)

    if len(esg_np) != n or Sigma_np.shape != (n, n):
         raise ValueError(
             f"Dimension mismatch: n={n}, mu={len(mu_np)}, esg={len(esg_np)}, Sigma={Sigma_np.shape}"
         )
    # --- Stage 1: Solve Regularized QP ---
    # Add small diagonal L2 (Ridge) regularization
    # This helps stabilize the solution and shrinks small weights,
    # aiding the selection process, though not strictly enforcing sparsity.
    Sigma_reg = Sigma_np + gamma * np.eye(n)

    try:
        w_stage1 = solve_scalarized_qp(Sigma_reg, mu_np, esg_np, lamb=lamb, eta=eta, lb=lb, ub=ub)
    except RuntimeError as e:
        raise RuntimeError(f"Stage 1 QP solver failed: {e}")
    except Exception as e:
         raise RuntimeError(f"An unexpected error occurred in Stage 1: {e}")


    # --- Asset Selection ---
    # Select top-k assets based on the absolute magnitude of weights from Stage 1
    # Using absolute value is important if short selling was allowed in stage 1
    selected_indices = np.argsort(-np.abs(w_stage1))[:k]
    # Create a boolean mask for selected assets
    mask = np.zeros(n, dtype=bool)
    mask[selected_indices] = True

    # --- Stage 2: Re-optimize Restricted QP ---
    def reoptimize_restricted():
        """Solves the original QP restricted to the selected assets."""
        w_reopt = cp.Variable(n)
        objective = cp.quad_form(w_reopt, Sigma_np) - lamb * (mu_np @ w_reopt) - eta * (esg_np @ w_reopt)

        constraints = [
            cp.sum(w_reopt) == 1,
            w_reopt <= ub # Upper bound applies to all potentially selected variables
        ]
        # Apply lower bound only if not allowing short selling generally
        # Note: solve_scalarized_qp already handles allow_short logic,
        # but re-applying LB logic here for clarity if needed.
        # This assumes lb=0 if not allow_short was intended.
        if lb >= 0: # Check if non-negativity is implied
             constraints.append(w_reopt >= lb)
        # If lb was negative, the w_reopt >= lb allows negativity for selected assets

        # Enforce zero weights for non-selected assets
        # Create constraint w[i] == 0 for all i not in selected_indices
        zero_constraints = [w_reopt[i] == 0 for i in range(n) if not mask[i]]
        constraints.extend(zero_constraints)

        prob = cp.Problem(cp.Minimize(objective), constraints)
        try:
            prob.solve(solver=cp.OSQP, verbose=False) # Use desired solver
            if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
                raise RuntimeError(f'Restricted QP solver failed or did not find optimal solution. Status: {prob.status}')
            if w_reopt.value is None:
                 raise RuntimeError('Restricted QP solver finished but weights are None.')

            # Ensure the output is a flat numpy array
            result_w = np.array(w_reopt.value).flatten()
            # Clean up potential tiny values in zeroed-out elements
            result_w[~mask] = 0.0
            return result_w

        except Exception as e:
            # Catch potential solver errors during execution
            raise RuntimeError(f'Restricted QP solver failed during execution: {e}')

    w_stage2 = reoptimize_restricted()

    return w_stage1, w_stage2, selected_indices

# Evaluation metric

In [10]:
from typing import Optional, Dict, Union, Tuple, Any

# ---------------------------- Evaluation metrics & plotting ----------------------------

def calculate_portfolio_metrics(
    weights: np.ndarray,
    expected_returns: Union[np.ndarray, pd.Series],
    covariance_matrix: Union[np.ndarray, pd.DataFrame],
    esg_scores: Union[np.ndarray, pd.Series],
    returns_oos: Optional[Union[pd.DataFrame, pd.Series]] = None,
    annualization_factor: int = 252
    ) -> Dict[str, float]:
    """
    Calculates various performance and characteristic metrics for a given portfolio.

    Computes the expected return, risk (variance and volatility), and ESG score
    based on input parameters. Optionally calculates realized out-of-sample (OOS)
    mean return and annualized volatility if OOS returns are provided.

    Args:
        weights (np.ndarray): A 1D array of portfolio weights for n assets (shape n,).
            Weights should ideally sum to 1.
        expected_returns (Union[np.ndarray, pd.Series]): A 1D array or Series of
            expected returns for each asset (shape n,).
        covariance_matrix (Union[np.ndarray, pd.DataFrame]): The covariance matrix
            of asset returns (shape n x n).
        esg_scores (Union[np.ndarray, pd.Series]): A 1D array or Series of ESG
            scores for each asset (shape n,).
        returns_oos (Optional[Union[pd.DataFrame, pd.Series]], optional):
            Out-of-sample historical or simulated returns used to calculate realized
            performance. Should be a DataFrame (T x n) or Series if only one asset,
            where T is the number of periods. Assumes daily returns if using the
            default annualization_factor. Defaults to None.
        annualization_factor (int, optional): The factor used to annualize the
            out-of-sample volatility (e.g., 252 for daily returns, 12 for monthly).
            Defaults to 252.

    Returns:
        Dict[str, float]: A dictionary containing the calculated portfolio metrics:
            - 'expected_return': The expected portfolio return (annualized if inputs are).
            - 'variance': The portfolio variance.
            - 'volatility': The portfolio standard deviation (square root of variance).
            - 'esg_score': The weighted average ESG score of the portfolio.
            - 'oos_mean_return' (optional): The mean of the out-of-sample portfolio returns.
            - 'oos_annualized_volatility' (optional): The annualized standard deviation
              (volatility) of the out-of-sample portfolio returns.

    Raises:
        ValueError: If input dimensions are inconsistent.
    """
    # --- Input Validation and Conversion ---
    n_assets = len(weights)

    if isinstance(expected_returns, pd.Series):
        mu = expected_returns.values
    else:
        mu = np.asarray(expected_returns)

    if isinstance(covariance_matrix, pd.DataFrame):
        Sigma = covariance_matrix.values
    else:
        Sigma = np.asarray(covariance_matrix)

    if isinstance(esg_scores, pd.Series):
        esg = esg_scores.values
    else:
        esg = np.asarray(esg_scores)

    if mu.shape != (n_assets,) or esg.shape != (n_assets,) or Sigma.shape != (n_assets, n_assets):
        raise ValueError(
            f"Input dimension mismatch: weights({weights.shape}), mu({mu.shape}), "
            f"esg({esg.shape}), Sigma({Sigma.shape})"
        )
    if not np.isclose(np.sum(weights), 1.0):
        print("Warning: Portfolio weights do not sum close to 1.")

    # --- Calculate Core Metrics ---
    port_expected_return = float(mu @ weights)
    port_variance = float(weights.T @ Sigma @ weights)
    # Ensure variance is non-negative due to potential floating point errors
    port_variance = max(0, port_variance)
    port_volatility = np.sqrt(port_variance)
    port_esg_score = float(esg @ weights)

    metrics = {
        'expected_return': port_expected_return,
        'variance': port_variance,
        'volatility': port_volatility,
        'esg_score': port_esg_score
    }

    # --- Calculate Out-of-Sample Metrics (Optional) ---
    if returns_oos is not None:
        if isinstance(returns_oos, (pd.DataFrame, pd.Series)):
            oos_returns_np = returns_oos.values
        else:
            oos_returns_np = np.asarray(returns_oos)

        # Handle potential 1D array for single asset OOS returns
        if oos_returns_np.ndim == 1:
             if n_assets != 1:
                 raise ValueError("returns_oos is 1D but multiple assets exist.")
             # Reshape for consistent calculation
             oos_returns_np = oos_returns_np.reshape(-1, 1)
        elif oos_returns_np.shape[1] != n_assets:
             raise ValueError(
                 f"Dimension mismatch for returns_oos: Expected {n_assets} columns, "
                 f"got {oos_returns_np.shape[1]}"
             )

        # Calculate portfolio returns for each OOS period
        portfolio_oos_returns = oos_returns_np @ weights

        # Calculate OOS metrics
        oos_mean = float(np.mean(portfolio_oos_returns))
        oos_vol = float(np.std(portfolio_oos_returns))
        oos_annualized_vol = oos_vol * np.sqrt(annualization_factor)

        metrics['oos_mean_return'] = oos_mean
        metrics['oos_annualized_volatility'] = oos_annualized_vol

    return metrics

In [11]:
def plot_pareto(res, assets, title='Pareto front (risk, -return, -ESG)'):
    F = res.F
    risk = F[:, 0]
    neg_ret = F[:, 1]
    neg_esg = F[:, 2]
    ret = -neg_ret
    esg = -neg_esg

    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')
    p = ax.scatter(risk, ret, esg, c=ret, cmap='viridis')
    ax.set_xlabel('Risk (Variance)')
    ax.set_ylabel('Return')
    ax.set_zlabel('ESG')
    ax.set_title(title)
    fig.colorbar(p, ax=ax, label='Return')
    plt.show()

# Run Portfolio Optimization

In [12]:
def run_portfolio_optimization_suite(
    prices: pd.DataFrame,
    esg_scores: pd.Series,
    config: Dict[str, Any],
    oos_prices = None
) -> Dict[str, Any]:
    """
    Runs a suite of portfolio optimization methods based on the provided configuration.

    This function takes price and ESG data, computes necessary inputs (expected
    returns, covariance matrix), and then executes various optimization
    strategies as specified in the configuration dictionary. It evaluates
    the resulting portfolios and returns a structured dictionary of results.

    Args:
        prices (pd.DataFrame): DataFrame of historical asset prices (rows=dates, cols=assets).
        esg_scores (pd.Series): DataFrame of ESG scores (rows=assets, cols=score).
                                  Must have an index compatible with price columns.
        config (Dict[str, Any]): A dictionary controlling which optimizations to run
                                 and their parameters. Expected keys:
            'compute_inputs': { 'shrinkage': bool }
            'scalarized_qp' (optional): {
                'lamb_grid': List[float],
                'eta_grid': List[float],
                'lb': float, 'ub': float, 'allow_short': bool, 'solver': cp.Solver
            }
            'nsga2' (optional): {
                'pop_size': int, 'generations': int, 'plot': bool
            }
            'cardinality' (optional): {
                'k': int, 'lamb': float, 'eta': float
            }
            'cvar' (optional): {
                'alpha': float, 'lamb': float, 'eta': float, 'use_mu_in_obj': bool
            }
        oos_prices (Optional[pd.DataFrame], optional): Out-of-sample prices for
                                                      evaluating realized metrics.
                                                      Defaults to None.

    Returns:
        Dict[str, Any]: A dictionary containing the results of the executed
                        optimization strategies. Keys correspond to strategy names
                        (e.g., 'scalarized_qp_results', 'nsga2_result', etc.).
    """
    results = {}

    # --- 1. Compute Inputs ---
    print("Computing expected returns and covariance matrix...")
    mu, Sigma, assets = compute_mu_sigma(
        prices,
        shrinkage=config.get('compute_inputs', {}).get('shrinkage', True)
    )
    # Align ESG scores with the assets used (e.g., after dropping NaNs)
    try:
        esg_arr = esg_scores.loc[assets].values.flatten()
    except KeyError:
        raise ValueError("ESG scores index does not match price columns.")
    if len(esg_arr) != len(assets):
         raise ValueError("ESG scores could not be aligned with assets.")

    results['inputs'] = {'mu': mu, 'Sigma': Sigma, 'esg_arr': esg_arr, 'assets': assets}
    print(f"Inputs computed for {len(assets)} assets.")

    # --- Out-of-Sample Returns (if applicable) ---
    returns_oos_df = None
    if oos_prices is not None:
        returns_oos_df = oos_prices.pct_change().dropna()
        # Ensure OOS returns align with the assets used in optimization
        try:
            returns_oos_df = returns_oos_df[assets]
        except KeyError:
             print("Warning: OOS price columns do not perfectly match optimized assets. Using intersection.")
             common_assets = assets.intersection(returns_oos_df.columns)
             returns_oos_df = returns_oos_df[common_assets]
             # Re-align other inputs - this is complex, better to ensure data matches upfront
             # For simplicity here, we'll proceed but metrics might be slightly off
             # A robust implementation might filter assets earlier or raise an error

    # --- 2. Scalarized QP Grid Search ---
    if 'scalarized_qp' in config:
        print("\nRunning Scalarized QP grid search...")
        qp_config = config['scalarized_qp']
        qp_results = []
        for lamb in qp_config.get('lamb_grid', [1.0]):
            for eta in qp_config.get('eta_grid', [0.1]):
                try:
                    w = solve_scalarized_qp(
                        Sigma, mu, esg_arr,
                        lamb=lamb, eta=eta,
                        lb=qp_config.get('lb', 0.0),
                        ub=qp_config.get('ub', 1.0),
                        allow_short=qp_config.get('allow_short', False),
                        allow_cash=qp_config.get('allow_cash', False),
                        solver=qp_config.get('solver', cp.OSQP)
                    )
                    metrics = calculate_portfolio_metrics(w, mu, Sigma, esg_arr, returns_oos=returns_oos_df)
                    qp_results.append({'lambda': lamb, 'eta': eta, 'weights': w, **metrics})
                    print(f"  lambda={lamb:.2f}, eta={eta:.2f} -> "
                          f"ret={metrics['expected_return']:.4f}, "
                          f"vol={metrics['volatility']:.4f}, "
                          f"esg={metrics['esg_score']:.4f}")
                except Exception as e:
                    print(f"  Failed for lambda={lamb}, eta={eta}: {e}")
        results['scalarized_qp_results'] = qp_results

    # --- 3. NSGA-II Multi-Objective ---
    if 'nsga2' in config:
        print("\nRunning NSGA-II Multi-Objective Optimization (may take time)...")
        nsga_config = config['nsga2']
        try:
            # Need to convert Series/DataFrame back for pymoo if it expects them
            # Or ensure run_nsga2 handles numpy arrays correctly (as improved)
            nsga_res = run_nsga2(
                Sigma, mu, esg_arr,
                pop_size=nsga_config.get('pop_size', 200),
                generations=nsga_config.get('generations', 100),
                cardinality_k=nsga_config.get('cardinality_k', None) # Pass cardinality if needed
            )
            print(f"NSGA-II finished. Solutions found: {nsga_res.F.shape[0]}")
            results['nsga2_result'] = nsga_res # Store the raw pymoo result object
            if nsga_config.get('plot', False):
                 print("Plotting Pareto front...")
                 # Ensure plot_pareto can handle the pymoo result object and assets
                 plot_pareto(nsga_res, assets)
        except Exception as e:
            print(f"  NSGA-II failed: {e}")


    # --- 4. Cardinality Two-Stage Heuristic ---
    if 'cardinality' in config:
        print("\nRunning Cardinality Two-Stage Heuristic...")
        card_config = config['cardinality']
        k = card_config.get('k', 10)
        try:
            w1, w2, idx = solve_cardinality_heuristic_two_stage(
                Sigma, mu, esg_arr, k=k,
                lamb=card_config.get('lamb', 1.0),
                eta=card_config.get('eta', 0.1)
            )
            metrics_stage2 = calculate_portfolio_metrics(w2, mu, Sigma, esg_arr, returns_oos=returns_oos_df)
            print(f"  Target k={k}")
            print(f"  Stage 1 non-zero (abs > 1e-6): {np.sum(np.abs(w1) > 1e-6)}")
            print(f"  Stage 2 non-zero (abs > 1e-6): {np.sum(np.abs(w2) > 1e-6)}")
            # print(f"  Top {k} asset indices: {idx}")
            print(f"  Stage 2 Metrics: ret={metrics_stage2['expected_return']:.4f}, "
                  f"vol={metrics_stage2['volatility']:.4f}, "
                  f"esg={metrics_stage2['esg_score']:.4f}")
            results['cardinality_result'] = {
                'k': k, 'weights_stage1': w1, 'weights_stage2': w2,
                'selected_indices': idx, 'metrics': metrics_stage2
            }
        except Exception as e:
            print(f"  Cardinality heuristic failed for k={k}: {e}")

    # --- 5. CVaR Optimization ---
    if 'cvar' in config:
        print("\nRunning CVaR Optimization...")
        cvar_config = config['cvar']
        # Use historical daily returns as scenarios
        if prices.empty:
             print("  Skipping CVaR: No price data available for scenarios.")
        else:
            daily_returns = prices.pct_change().dropna()
             # Align scenarios with assets used in mu/Sigma calculation
            try:
                scenarios = daily_returns[assets].values
                if scenarios.shape[1] != len(assets):
                     raise ValueError("Scenario dimensions don't match assets")

                # Decide whether to include mu term based on config
                cvar_lamb = cvar_config.get('lamb', 0.0) if cvar_config.get('use_mu_in_obj', False) else 0.0

                w_cvar = solve_cvar_lp(
                    scenarios, mu, esg_arr, # Pass mu here
                    alpha=cvar_config.get('alpha', 0.95),
                    lamb=cvar_lamb, # Pass lambda for return term
                    eta=cvar_config.get('eta', 0.1),
                    lb=cvar_config.get('lb', 0.0),
                    ub=cvar_config.get('ub', 1.0),
                    allow_short=cvar_config.get('allow_short', False),
                    allow_cash=qp_config.get('allow_cash', False),
                    solver=cvar_config.get('solver', cp.OSQP)
                )
                metrics_cvar = calculate_portfolio_metrics(w_cvar, mu, Sigma, esg_arr, returns_oos=returns_oos_df)
                print(f"  CVaR alpha={cvar_config.get('alpha', 0.95)}, "
                      f"return lambda={cvar_lamb:.2f}, "
                      f"ESG eta={cvar_config.get('eta', 0.1):.2f}")
                print(f"  Non-zero weights: {np.sum(np.abs(w_cvar) > 1e-6)}")
                print(f"  CVaR Metrics: ret={metrics_cvar['expected_return']:.4f}, "
                      f"vol={metrics_cvar['volatility']:.4f}, "
                      f"esg={metrics_cvar['esg_score']:.4f}")
                results['cvar_result'] = {'weights': w_cvar, 'metrics': metrics_cvar}

            except Exception as e:
                print(f"  CVaR optimization failed: {e}")

    print("\nOptimization suite finished.")
    return results

In [13]:
# Example Configuration:
config_example = {
    'compute_inputs': {'shrinkage': True},
    'scalarized_qp': {
        'lamb_grid': [0.5, 1.0, 2.0],
        'eta_grid': [0.0, 0.2, 0.5],
        'lb': 0.0, 'ub': 0.2, 'allow_short': False, # Example: Max 20% per asset
        'allow_cash': True
    },
    'cvar': {
        'alpha': 0.90, 'lamb': 0.5, 'eta': 0.3, 'use_mu_in_obj': True, 'allow_cash': True
    }
}

# Assuming prices_df and esg_df are loaded pandas DataFrames
results_dict = run_portfolio_optimization_suite(prices, esg, config_example)
print("\n--- Results Summary ---")
print(results_dict.keys())

Computing expected returns and covariance matrix...
Inputs computed for 50 assets.

Running Scalarized QP grid search...
  lambda=0.50, eta=0.00 -> ret=0.6790, vol=0.1598, esg=0.5747
  lambda=0.50, eta=0.20 -> ret=0.6538, vol=0.1633, esg=0.7039
  lambda=0.50, eta=0.50 -> ret=0.5577, vol=0.1583, esg=0.8578
  lambda=1.00, eta=0.00 -> ret=0.7212, vol=0.2372, esg=0.5323
  lambda=1.00, eta=0.20 -> ret=0.7015, vol=0.2173, esg=0.6415
  lambda=1.00, eta=0.50 -> ret=0.6542, vol=0.2232, esg=0.7758
  lambda=2.00, eta=0.00 -> ret=0.7259, vol=0.2474, esg=0.5193
  lambda=2.00, eta=0.20 -> ret=0.7195, vol=0.2397, esg=0.5803
  lambda=2.00, eta=0.50 -> ret=0.7123, vol=0.2468, esg=0.6492

Running CVaR Optimization...
  CVaR alpha=0.9, return lambda=0.50, ESG eta=0.30
  Non-zero weights: 5
  CVaR Metrics: ret=0.6845, vol=0.2462, esg=0.8026

Optimization suite finished.

--- Results Summary ---
dict_keys(['inputs', 'scalarized_qp_results', 'cvar_result'])


# Benchmark

In [17]:
import timeit

setup_code = """
import numpy as np
import pandas as pd
from sklearn.covariance import LedoitWolf
import matplotlib.pyplot as plt
from matplotlib import cm
import seaborn as sns
import os
import warnings
import cvxpy as cp

# reproducibility
RNG = np.random.default_rng(42)
warnings.filterwarnings('ignore')

# ---------------------------- Data utilities ----------------------------

def make_mock_data(n_assets=50, n_days=252*3, seed=42):

    rng = np.random.default_rng(seed)  # random number generator
    
    # Create factor structure
    n_factors = max(3, n_assets // 10)
    T = n_days
    F = rng.normal(scale=0.02, size=(T, n_factors))
    loadings = rng.normal(scale=0.5, size=(n_assets, n_factors))

    specific = rng.normal(scale=0.01, size=(T, n_assets))
    returns = F.dot(loadings.T) + specific
    # add small drift
    mu_true = rng.normal(0.0005, 0.001, size=n_assets)
    returns += mu_true

    # build cumulative prices
    prices = 100 * np.exp(np.cumsum(returns, axis=0))
    dates = pd.date_range(end=pd.Timestamp.today(), periods=T, freq='B')
    prices_df = pd.DataFrame(prices, index=dates, columns=[f'A{i}' for i in range(n_assets)])

    # ESG scores
    esg_raw = rng.uniform(0, 1, size=n_assets)
    # add sector bias
    sector_bias = np.linspace(-0.2, 0.2, n_assets)
    esg = esg_raw + sector_bias * 0.1
    esg = (esg - esg.min()) / (esg.max() - esg.min())
    esg_s = pd.Series(esg, index=prices_df.columns)

    return prices_df, esg_s

prices, esg = make_mock_data(n_assets=50, n_days=252*3, seed=42)

def compute_mu_sigma(prices, lookback=252, shrinkage=True):
    # compute daily returns
    rets = prices.pct_change().dropna()
    mu_daily = rets.mean()
    mu_annual = mu_daily * 252

    if shrinkage:
        lw = LedoitWolf().fit(rets.values)
        Sigma_daily = lw.covariance_
    else:
        Sigma_daily = np.cov(rets.values.T)

    Sigma_annual = Sigma_daily * 252
    mu = mu_annual.values
    Sigma = Sigma_annual
    assets = rets.columns.tolist()
    return mu, Sigma, assets

# ---------------------------- Baseline QP ----------------------------

import cvxpy as cp
import numpy as np
import pandas as pd
from typing import Union # For type hinting

def solve_scalarized_qp(
    Sigma: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    allow_short: bool = False,
    allow_cash: bool = False, # New parameter
    solver = cp.OSQP
) -> np.ndarray:

    # --- Input Validation and Conversion ---
    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    n = len(mu_np)

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    if isinstance(Sigma, pd.DataFrame):
        Sigma_np = Sigma.values
    else:
        Sigma_np = np.array(Sigma)

    if len(esg_np) != n or Sigma_np.shape != (n, n):
         raise ValueError(
             f"Dimension mismatch: n={n}, mu={len(mu_np)}, esg={len(esg_np)}, Sigma={Sigma_np.shape}"
         )
    # ----------------------------------------

    w = cp.Variable(n, name="weights")

    # --- Objective Function ---
    objective = cp.quad_form(w, Sigma_np) - lamb * (mu_np @ w) - eta * (esg_np @ w)

    # --- Constraints ---
    constraints = []

    # Budget Constraint: sum(w) == 1 (fully invested) or sum(w) <= 1 (allow cash)
    if allow_cash:
        constraints.append(cp.sum(w) <= 1)
        # If allowing cash, weights must still be non-negative unless shorting is also allowed
        if not allow_short:
             constraints.append(w >= 0) # Ensure weights don't go negative to fund cash
    else:
        constraints.append(cp.sum(w) == 1)

    # Individual Weight Bounds
    if not allow_short:
        # If sum(w) <= 1 is used, lb might technically not be needed if >= 0 already added
        # but including it handles cases where lb > 0 is desired.
        constraints.append(w >= lb)
    # If allow_short is True, we don't add w >= lb constraint.
    # The upper bound always applies.
    constraints.append(w <= ub)

    # --- Solve Problem ---
    prob = cp.Problem(cp.Minimize(objective), constraints)
    try:
        prob.solve(solver=solver, verbose=False)
        if prob.status not in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
             raise RuntimeError(f'QP solver failed or did not find optimal solution. Status: {prob.status}')
        if w.value is None:
             raise RuntimeError('QP solver finished but weights are None.')

        result_w = np.array(w.value).flatten()
        # Clean near-zero values
        result_w[np.abs(result_w) < 1e-8] = 0.0
        return result_w
    except Exception as e:
        raise RuntimeError(f'QP solver failed during execution: {e}')

from typing import Union # For type hinting

# ---------------------------- CVaR optimization ----------------------------

def solve_cvar_lp(
    returns_scenarios: Union[np.ndarray, pd.DataFrame],
    mu: Union[np.ndarray, pd.Series],
    esg: Union[np.ndarray, pd.Series],
    alpha: float = 0.95,
    lamb: float = 1.0,
    eta: float = 0.1,
    lb: float = 0.0,
    ub: float = 1.0,
    allow_short: bool = False,
    allow_cash: bool = False, # New parameter
    solver = cp.OSQP
) -> np.ndarray:
    
    # --- Input Validation and Conversion ---
    if isinstance(returns_scenarios, pd.DataFrame):
        returns_scenarios_np = returns_scenarios.values
    else:
        returns_scenarios_np = np.array(returns_scenarios)

    T, n = returns_scenarios_np.shape

    if isinstance(mu, (pd.Series, pd.DataFrame)):
        mu_np = mu.values.flatten()
    else:
        mu_np = np.array(mu).flatten()

    if isinstance(esg, (pd.Series, pd.DataFrame)):
        esg_np = esg.values.flatten()
    else:
        esg_np = np.array(esg).flatten()

    # Check dimensions
    if len(mu_np) != n or len(esg_np) != n:
        raise ValueError(
            f"Dimension mismatch: n={n} from scenarios, mu={len(mu_np)}, esg={len(esg_np)}"
        )
    # ----------------------------------------

    # --- CVXPY Variables ---
    w = cp.Variable(n, name="weights")
    v = cp.Variable(1, name="VaR_alpha")          # Auxiliary variable for VaR
    xi = cp.Variable(T, nonneg=True, name="losses_over_VaR") # Auxiliary variables for losses exceeding VaR

    # --- Calculate Portfolio Losses per Scenario ---
    portfolio_losses = -returns_scenarios_np @ w  # Shape (T,)

    # --- Core CVaR Constraint ---
    # xi_t >= Loss_t - v  (for all t=1..T)
    # Ensures xi captures the positive part of (Loss_t - v)
    cvar_core_constraint = xi >= portfolio_losses - v

    # --- Define Constraints List ---
    constraints = [
        w <= ub,                 # Upper bound on weights
        cvar_core_constraint     # The CVaR formulation constraint
        # xi >= 0 is handled by cp.Variable(T, nonneg=True)
    ]

    # --- Budget Constraint (Allow Cash or Fully Invested) ---
    if allow_cash:
        constraints.append(cp.sum(w) <= 1)
        # If allowing cash, weights usually must still be non-negative
        # unless shorting is *also* allowed. Add non-negativity if needed.
        if not allow_short:
             constraints.append(w >= 0) # Prevent negative weights funding cash
    else:
        # Fully invested
        constraints.append(cp.sum(w) == 1)

    # --- Lower Bound / Short Selling Constraint ---
    if not allow_short:
        # Add the lower bound constraint (potentially redundant if allow_cash added w >= 0)
        # but handles cases where lb > 0 is desired.
        constraints.append(w >= lb)
    # If allow_short is True, we simply don't add w >= lb.

    # --- Objective Function ---
    # CVaR = VaR + Average Tail Loss
    cvar_term = v + (1.0 / ((1 - alpha) * T)) * cp.sum(xi)
    # Minimize: CVaR - lambda * Expected Return - eta * ESG Score
    objective = cvar_term - lamb * (mu_np @ w) - eta * (esg_np @ w)

    # --- Problem Setup & Solving ---
    prob = cp.Problem(cp.Minimize(objective), constraints)
    prob.solve(solver=cp.OSQP, verbose=False)
    if w.value is None:
        raise RuntimeError('CVaR LP solver failed')
    return np.array(w.value).reshape(-1,)

"""

# 1. Benchmark Scalarized QP
stmt_qp = """
solve_scalarized_qp(Sigma, mu, esg_arr, lamb=1.0, eta=0.1, lb=0.0, ub=1.0)
"""

# 2. Benchmark CVaR LP
stmt_cvar = """
solve_cvar_lp(scenarios, mu, esg_arr, alpha=0.95, lamb=1.0, eta=0.1)
"""


# --- Run the Benchmark ---
print("\n--- Starting Benchmark ---")
print("This may take several minutes, especially for NSGA-II.")

# Number of times to run each fast solver
n_runs_fast = 10
# Number of times to run the slow (NSGA-II) solver
n_runs_slow = 1

# --- Run QP ---
print(f"\nBenchmarking QP Solver ({n_runs_fast} runs)...")
t_qp = timeit.timeit(stmt=stmt_qp, setup=setup_code, number=n_runs_fast)
avg_t_qp = t_qp / n_runs_fast
print(f"QP Finished. Average time: {avg_t_qp:.6f} seconds")

# --- Run CVaR ---
print(f"\nBenchmarking CVaR Solver ({n_runs_fast} runs)...")
t_cvar = timeit.timeit(stmt=stmt_cvar, setup=setup_code, number=n_runs_fast)
avg_t_cvar = t_cvar / n_runs_fast
print(f"CVaR Finished. Average time: {avg_t_cvar:.6f} seconds")


# --- Print Summary ---
print("\n--- Benchmark Summary ---")
print(f"Scalarized QP (Mean-Variance): {avg_t_qp:.6f} s")
print(f"CVaR (Linear Program):        {avg_t_cvar:.6f} s")
print(f"Cardinality (2-Stage QP):     {avg_t_card:.6f} s")
print(f"NSGA-II (Evolutionary):       {avg_t_nsga2:.6f} s")



--- Starting Benchmark ---
This may take several minutes, especially for NSGA-II.

Benchmarking QP Solver (10 runs)...


NameError: name 'Sigma' is not defined