# Assignment 1

Deadline: 19.03.2026, 12:00 CET

Lorenzo Barbero 25-743-709 lorenzo.barbero@uzh.ch


In [56]:
# Import standard libraries
import os
import timeit # To compute runtimes
from typing import Optional
from pathlib import Path
import sys
import warnings

# Repo root = parent of the folder containing this notebook (assignments/)
project_root = Path.cwd().resolve().parent  # if notebook is opened from assignments/
# If you sometimes open notebook with CWD elsewhere, use the notebook file path method (see Fix 2)

src_path = project_root / "src"

sys.path.insert(0, str(src_path))
# Import third-party libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import qpsolvers

# Import local modules
#project_root = os.path.dirname(os.path.dirname(os.getcwd()))   # Change this path if needed
src_path = os.path.join(project_root, 'qpmwp-course\\src')
sys.path.append(project_root)
sys.path.append(src_path)
from estimation.covariance import Covariance, is_pos_def, make_pos_def
from estimation.covariance import CovarianceSpecification
from estimation.expected_return import ExpectedReturn
from optimization.constraints import Constraints
from optimization.optimization import Optimization, Objective
from optimization.optimization_data import OptimizationData
from optimization.quadratic_program import QuadraticProgram, USABLE_SOLVERS
from helper_functions import simulate_correlated_gbm
from optimization.optimization import MeanVariance

In [57]:
qpsolvers.available_solvers

['cvxopt', 'daqp', 'highs', 'osqp', 'qpalm', 'quadprog']

## 1. Solver Horse Race

### 1.a)
(3 points)

Generate a synthetic dataset of dimension TxN, T=1000, N=50, and compute a vector of expected returns, q, and a covariance matrix, P, using classes ExpectedReturn and Covariance respectively.

In [58]:

# Set the dimensions
T = 1000       # Number of time steps
N = 50         # Number of assets
rnd_seed = 42  # Random seed for reproducibility

# Set random seed for reproducibility
np.random.seed(rnd_seed)

# Generate a random mean vector and covariance matrix.
# Make sure the covariance matrix is positive definite.
mu = np.random.rand(N) * 0.1  # Random mean returns between 0 and 0.1
A = np.random.rand(N, N) * 0.30
sigma = np.dot(A, A.transpose())  # Create a positive definite covariance matrix


# Generate correlated geometric Brownian motion paths and compute discrete returns
prices = simulate_correlated_gbm(mu=mu, sigma=sigma, T=T, random_seed=None)
returns = prices.pct_change().dropna()

# Compute the vector of expected returns from df using class ExpectedReturn
er = ExpectedReturn()             # create the object Expected Return
q = er.estimate(returns, False)   # Compute the expected returns given a dataset

# Compute the covariance matrix from df using class Covariance
spec = CovarianceSpecification(check_positive_definite = True) 
cov = Covariance(spec)
P = cov.estimate(returns, False)


# Display the results
print("Vector of expected returns (q):")
print(q)

print("\nCovariance matrix (P):")
print(P)

Vector of expected returns (q):
Asset_1    -0.001235
Asset_2    -0.000573
Asset_3    -0.001864
Asset_4    -0.001404
Asset_5    -0.001396
Asset_6    -0.001234
Asset_7    -0.001131
Asset_8    -0.001326
Asset_9    -0.001700
Asset_10   -0.001371
Asset_11   -0.001797
Asset_12   -0.002495
Asset_13   -0.000711
Asset_14   -0.001647
Asset_15   -0.000453
Asset_16   -0.002267
Asset_17   -0.001529
Asset_18   -0.001706
Asset_19   -0.000318
Asset_20   -0.001324
Asset_21   -0.001282
Asset_22   -0.001834
Asset_23   -0.001896
Asset_24   -0.000965
Asset_25   -0.000195
Asset_26   -0.001425
Asset_27   -0.001035
Asset_28   -0.002322
Asset_29   -0.000763
Asset_30   -0.001304
Asset_31   -0.001391
Asset_32   -0.001719
Asset_33   -0.002232
Asset_34   -0.001094
Asset_35   -0.000186
Asset_36   -0.001623
Asset_37   -0.002252
Asset_38   -0.001026
Asset_39   -0.001402
Asset_40   -0.000981
Asset_41   -0.001206
Asset_42   -0.000732
Asset_43   -0.001189
Asset_44   -0.001619
Asset_45   -0.000965
Asset_46   -0.001834
As

In [59]:
print(type(P))
print(type(q))

<class 'pandas.core.frame.DataFrame'>
<class 'pandas.core.series.Series'>


### 1.b)
(3 points)

Instantiate a constraints object by injecting column names of the return series created in 1.a) as ids and add:
- a budget constaint (i.e., asset weights have to sum to one)
- lower bounds of 0.0 for all assets
- upper bounds of 0.2 for all assets
- group contraints such that the sum of the weights of the first 15 assets is <= 0.3, the sum of assets 16 to 45 is <= 0.4 and the sum of assets 41 to 50 is <= 0.5

In [60]:
# Instantiate the Constraints class
constraints = Constraints(ids = returns.columns.tolist())

# Add budget constraint
constraints.add_budget()

# Add box constraints (i.e., lower and upper bounds)
constraints.add_box(box_type = "LongOnly", upper = 0.2)
# Add linear constraints
G = pd.DataFrame(0.0, index=["c1","c2","c3"], columns = constraints.ids)
G.loc["c1",constraints.ids[0:15]] = 1
G.loc["c2",constraints.ids[15:45]] = 1
G.loc["c3",constraints.ids[45:]] = 1

sense = pd.Series(["<=", "<=", "<="])
rhs = pd.Series([0.3, 0.4, 0.5])

constraints.add_linear(G = G, sense = sense, rhs = rhs)

# Display some columns of the G matrix to verify that the constraints have been set up correctly
constraints.linear['G'][['Asset_1', 'Asset_15', 'Asset_16', 'Asset_40', 'Asset_41', 'Asset_50']]

Unnamed: 0,Asset_1,Asset_15,Asset_16,Asset_40,Asset_41,Asset_50
c1,1.0,1.0,0.0,0.0,0.0,0.0
c2,0.0,0.0,1.0,1.0,1.0,0.0
c3,0.0,0.0,0.0,0.0,0.0,1.0


### 1.c) 
(4 points)

Solve a Mean-Variance optimization problem (using coefficients P and q in the objective function) which satisfies the above defined constraints.
Repeat the task for all open-source solvers in qpsolvers that you could install and compare the results in terms of:

- runtime
- accuracy: value of the primal problem.
- reliability: are all constraints fulfilled? Extract primal residuals, dual residuals and duality gap.

Generate a DataFrame with the solvers as column names and the following row index: 'solution_found': bool, 'objective': float, 'primal_residual': float, 'dual_residual': float, 'duality_gap': float, 'runtime': float.

Put NA's for solvers where the optimization failed for some reason.




In [61]:
# Extract the constraints in the format required by the solver
GhAb = constraints.to_GhAb()
G, h = GhAb["G"], GhAb["h"]
A, b = GhAb["A"], GhAb["b"]

# Bounds
lb = constraints.box["lower"].to_numpy(dtype=float) if constraints.box["box_type"] != "NA" else None
ub = constraints.box["upper"].to_numpy(dtype=float) if constraints.box["box_type"] != "NA" else None

# Define a dictionary to store the results in case a solver fails
result_on_fail =  {
    'solution_found': False,
    'objective_builtin': np.nan,
    'primal_residual': np.nan,
    'dual_residual' : np.nan,
    'duality_gap': np.nan,
    'runtime': np.nan,
}

# Loop over solvers, instantiate the quadratic program, solve it and store the results in a dictionary.

def get_maybe_callable(obj, name, default=np.nan, ndigits=6):
    try:
        val = getattr(obj, name, default)
        val = val() if callable(val) else val

        if isinstance(val, np.ndarray):
            val = val.item()   # safely extract scalar

        return round(float(val), ndigits)

    except Exception:
        return default

## I defined a function for the primal residuals
def primal_residual(G, h, A, b, lb, ub, x):
    """
    Max constraint violation (0 is perfect):
    - equalities: max |Ax - b|
    - inequalities: max max(Gx - h, 0)
    - bounds: max max(lb - x, 0), max max(x - ub, 0)
    """
    x = np.asarray(x, dtype=float)
    res = 0.0

    # Equality constraints Ax = b
    if A is not None and b is not None:
        res = max(res, float(np.max(np.abs(A @ x - b))))

    # Inequality constraints Gx <= h
    if G is not None and h is not None:
        res = max(res, float(np.max(np.maximum(G @ x - h, 0.0))))

    # Lower bounds lb <= x
    if lb is not None:
        res = max(res, float(np.max(np.maximum(lb - x, 0.0))))

    # Upper bounds x <= ub
    if ub is not None:
        res = max(res, float(np.max(np.maximum(x - ub, 0.0))))

    return res

## Loop 
results = {}

for solver in sorted(list(USABLE_SOLVERS)):

    out = result_on_fail.copy()

    try:
        t0 = timeit.default_timer()

        mv = MeanVariance(constraints = constraints,
                          solver_name = solver
        )
        optimization_data = OptimizationData(return_series=returns)
        mv.set_objective(optimization_data)
        mv.solve()

        t1 = timeit.default_timer()
        out["runtime"] = t1 - t0

        sol = mv.model.results.get("solution", None)

        found = bool(getattr(sol, "found", False)) if sol is not None else False
        out["solution_found"] = found

        # If solver failed, store fail template and jump to next solver
        if not found:
            results[solver] = out
            continue

        x = getattr(sol, "x", None)

        # If no solution vector, treat as fail
        if x is None or np.any(np.isnan(x)):
            results[solver] = out
            continue

        x = np.asarray(x, dtype=float)

        # Accuracy
        out["objective"] = mv.model.objective_value(x)

        # Reliability (feasibility)
        out["primal_residual"] = primal_residual(G, h, A, b, lb, ub, x)

        # Dual diagnostics (if provided by backend)
        out["dual_residual"] = get_maybe_callable(sol, "dual_residual", np.nan)
        out["duality_gap"]   = get_maybe_callable(sol, "duality_gap", np.nan)

        results[solver] = out

    except Exception as e:
        print(f"[{solver}] crashed with: {type(e).__name__}: {e}")
        results[solver] = out

In [62]:


for solver in sorted(list(USABLE_SOLVERS)):

    out = result_on_fail.copy()

    try:
        t0 = timeit.default_timer()

        mv = MeanVariance(constraints = constraints,
                          solver_name = solver
        )
        optimization_data = OptimizationData(return_series=returns)
        mv.set_objective(optimization_data)
        mv.solve()

        t1 = timeit.default_timer()
        out["runtime"] = t1 - t0

        sol = mv.results
        sol_model = mv.model.results.get("solution", None)


        found = bool(sol.get( "status", False)) if sol is not None else False
        out["solution_found"] = found

        # If solver failed, store fail template and jump to next solver
        if not found:
            results[solver] = out
            continue

        w = sol.get( "weights", None)
        x = pd.Series(w).reindex(constraints.ids).to_numpy(dtype=float)

        # If no solution vector, treat as fail
        if x is None or np.any(np.isnan(x)):
            results[solver] = out
            continue


        # Accuracy
        if not solver == ["analytical"]:
            out["objective"] = mv.model.objective_value(x) 
        else:
            out["objective"] = mv.objective_value(x)
            
        # Reliability (feasibility)
        out["primal_residual"] = primal_residual(G, h, A, b, lb, ub, x)

        # Dual diagnostics (if provided by backend)
        if not solver == "analytical":
            out["dual_residual"] = get_maybe_callable(sol_model, "dual_residual", np.nan)
            out["duality_gap"]   = get_maybe_callable(sol_model, "duality_gap", np.nan)
        
        else:
            out["dual_residual"] = 0
            out["duality_gap"] = 0

        results[solver] = out

    except Exception as e:
        print(f"[{solver}] crashed with: {type(e).__name__}: {e}")
        results[solver] = out

Print and visualize the results

In [63]:
df_results = pd.DataFrame(results).reindex(
    index=[
        "solution_found",
        "objective",
        "primal_residual",
        "dual_residual",
        "duality_gap",
        "runtime"
    ]
)

df_results

Unnamed: 0,cvxopt,daqp,osqp,qpalm,quadprog
solution_found,True,True,True,True,True
objective,0.001577,0.001577,0.001566,0.001577,0.001577
primal_residual,0.0,0.0,0.001032,0.000001,0.0
dual_residual,0.0,0.0,0.000009,0.0,0.0
duality_gap,0.0,0.0,0.000012,0.0,0.0
runtime,0.00198,0.000943,0.001693,0.001794,0.001156


## 2. Analytical Solution to Minimum-Variance Problem

(5 points)

- Create a `MinVariance` class that follows the structure of the `MeanVariance` class.
- Implement the `solve` method in `MinVariance` such that if `solver_name = 'analytical'`, the analytical solution is computed and stored within the object (if such a solution exists). If not, call the `solve` method from the parent class.
- Create a `Constraints` object by injecting the same ids as in part 1.b) and add a budget constraint.
- Instantiate a `MinVariance` object by setting `solver_name = 'analytical'` and passing instances of `Constraints` and `Covariance` as arguments.
- Create an `OptimizationData` object that contains an element `return_series`, which consists of the synthetic data generated in part 1.a).
- Solve the optimization problem using the created `MinVariance` object and compare the results to those obtained in part 1.c).


In [64]:
# Define class MinVariance
class MinVariance(Optimization):

    def __init__(self,
                 constraints: Constraints,
                 covariance: Optional[Covariance] = None,
                 **kwargs):
        super().__init__(
            constraints=constraints,
            **kwargs
        )
        self.covariance = Covariance() if covariance is None else covariance

    def set_objective(self, optimization_data: OptimizationData) -> None:
        X = optimization_data['return_series']
        covmat = self.covariance.estimate(X=X, inplace=False)
        mu = np.zeros(X.shape[1])
        self.objective = Objective(
            q = mu ,
            P = covmat * 2,
        )
        return None

    def solve(self) -> None:
        if self.params.get('solver_name') == 'analytical':
            self.solve_analytical()

            return None
        else:
            return super().solve()
        
    def solve_analytical(self) -> None:
        budget = self.constraints.budget
        Amat = np.asarray(budget["Amat"])
        rhs  = budget["rhs"]

        GhAb = self.constraints.to_GhAb()
        G, h = GhAb["G"], GhAb["h"]
        A, b = GhAb["A"], GhAb["b"]

        if b is not None:
            b = np.array(np.atleast_1d(b), dtype=float, copy=True)

        if h is not None:
            h = np.array(np.atleast_1d(h), dtype=float, copy=True)

        lb = self.constraints.box['lower'].to_numpy() if self.constraints.box['box_type'] != 'NA' else None
        ub = self.constraints.box['upper'].to_numpy() if self.constraints.box['box_type'] != 'NA' else None
        
        n_eq = 0 if A is None else A.shape[0]
        has_other_eq = n_eq > 1
        
        has_ineq = (G is not None) or (h is not None)
        has_box  = (lb is not None) or (ub is not None)
        has_bud  = (Amat is not None) and (rhs is not None)
        
        if has_ineq or has_box or has_other_eq:
            warnings.warn(
            """Analytical solution only available with budget constraints.
               Box constraints, other equality constraints and inequality constraints will be ignored.""",
                UserWarning)
        
        if not has_bud:
             raise ValueError("Budget constraints needed for analytical solution")
        if not rhs == 1:
             raise ValueError("Budget constraints must sum to 1")
        if budget.get("sense") != "=":
            raise ValueError("Analytical solution requires an equality budget constraint.")
        
        P = self.objective.coefficients["P"]
        Sigma = np.asarray(P / 2)
        z = np.linalg.solve(Sigma, Amat)
        denom = Amat @ z
        xstar = z / denom

        if not np.isfinite(denom) or abs(denom) < 1e-12:
            status = False

        ids = self.constraints.ids
        weights = pd.Series(xstar[:len(ids)], index=ids)

        status = True if np.all(np.isfinite(xstar)) else False
        

        self.results.update({
             'weights' : weights.to_dict(),
             'status': status,        })
        
        return None

    def objective_value(self) -> float:
        results = self.results
        constraints = self.constraints
        if results == None:
            raise ValueError("results is empty")

        P = self.objective.coefficients["P"] 
        q = self.objective.coefficients["q"]
        w = results["weights"]
        x = pd.Series(w).reindex(constraints.ids).to_numpy(dtype=float)
        objective = 0.5 * x.T @ P @ x
        return objective
       
        
        









# Create a constraints object with just a budget constraint
minvar_cons = Constraints(ids=returns.columns.tolist())
minvar_cons.add_budget(rhs=np.array([1.0]), sense="=")
# Instantiate the MinVariance class

# Prepare the optimization data and prepare the optimization problem

# Solve the optimization problem and print the weights

minvar_results = {}
all_methods = list(USABLE_SOLVERS) + ["analytical"]

GhAb = minvar_cons.to_GhAb()
G, h = GhAb["G"], GhAb["h"]
A, b = GhAb["A"], GhAb["b"]

# Bounds
lb = minvar_cons.box["lower"].to_numpy(dtype=float) if minvar_cons.box["box_type"] != "NA" else None
ub = minvar_cons.box["upper"].to_numpy(dtype=float) if minvar_cons.box["box_type"] != "NA" else None


for solver in sorted(all_methods):

    out = result_on_fail.copy()

    try:
        t0 = timeit.default_timer()

        minvar = MinVariance(constraints = minvar_cons,
                          solver_name = solver
        )
        optimization_data = OptimizationData(return_series=returns)
        minvar.set_objective(optimization_data)
        minvar.solve()

        t1 = timeit.default_timer()
        out["runtime"] = t1 - t0

        sol = minvar.results
        sol_model = None
        if not solver == "analytical":
            sol_model = minvar.model.results.get("solution", None)


        found = bool(sol.get( "status", False)) if sol is not None else False
        out["solution_found"] = found

        # If solver failed, store fail template and jump to next solver
        if not found:
            results[solver] = out
            continue

        w = sol.get( "weights", None)
        x = pd.Series(w).reindex(minvar_cons.ids).to_numpy(dtype=float)

        # If no solution vector, treat as fail
        if x is None or np.any(np.isnan(x)):
            results[solver] = out
            continue


        # Accuracy
        if not solver == "analytical":
            out["objective"] = minvar.model.objective_value(x) 
        else:
            out["objective"] = minvar.objective_value()
            
        # Reliability (feasibility)
        out["primal_residual"] = primal_residual(G, h, A, b, lb, ub, x)

        # Dual diagnostics (if provided by backend)
        if not solver == "analytical":
            out["dual_residual"] = get_maybe_callable(sol_model, "dual_residual", np.nan)
            out["duality_gap"]   = get_maybe_callable(sol_model, "duality_gap", np.nan)
        
        else:
            out["dual_residual"] = 0
            out["duality_gap"] = 0

        minvar_results[solver] = out

    except Exception as e:
        print(f"[{solver}] crashed with: {type(e).__name__}: {e}")
        minvar_results[solver] = out


In [65]:
df_results = pd.DataFrame(minvar_results).reindex(
    index=[
        "solution_found",
        "objective",
        "primal_residual",
        "dual_residual",
        "duality_gap",
        "runtime"
    ]
)

df_results

Unnamed: 0,analytical,cvxopt,daqp,osqp,qpalm,quadprog
solution_found,True,True,True,True,True,True
objective,0.000089,0.000089,0.000089,0.000089,0.000089,0.000089
primal_residual,0.0,0.0,0.0,0.000308,0.0,0.0
dual_residual,0,0.0,0.0,0.000474,0.0,0.0
duality_gap,0,0.0,0.0,0.000474,0.000006,0.0
runtime,0.00112,0.000976,0.001164,0.001283,0.000687,0.00043
