In [8]:
import numpy as np
import pandas as pd
import time
from typing import Tuple, List, Dict, Any
import matplotlib.pyplot as plt
import dimod
from dwave.system import LeapHybridSampler
from dimod import Binary, BinaryQuadraticModel
import neal  # Simulated annealing solver

class PortfolioOptimizer:
    """
    Portfolio optimization class to solve the risk minimization problem:
    min x^T * Sigma * x
    s.t. sum(x_i) = n
         mu^T * x >= R*
    where:
    - x is a binary vector (0/1) indicating which assets are selected
    - Sigma is the covariance matrix
    - n is the number of assets to select (budget constraint)
    - mu is the expected return vector
    - R* is the minimum required return
    """
    
    def __init__(self, returns_data: pd.DataFrame):
        """
        Initialize the portfolio optimizer with historical returns data
        
        Args:
            returns_data: DataFrame with assets as columns and time periods as rows
        """
        self.returns_data = returns_data
        self.n_assets = returns_data.shape[1]
        
        # Calculate return vector (5-year return in the paper)
        self.mu = returns_data.mean().values
        
        # Calculate covariance matrix
        self.sigma = returns_data.cov().values
        
    def build_qubo(self, n: int, R_star: float, lambda_1: float, lambda_2: float) -> BinaryQuadraticModel:
        """
        Build QUBO formulation of the portfolio optimization problem
        
        Args:
            n: Number of assets to select
            R_star: Minimum required return
            lambda_1: Penalty coefficient for budget constraint
            lambda_2: Penalty coefficient for return constraint
            
        Returns:
            QUBO model as a BinaryQuadraticModel
        """
        # Initialize QUBO model
        bqm = dimod.BinaryQuadraticModel('BINARY')
        
        # Add binary variables
        variables = [Binary(f'x_{i}') for i in range(self.n_assets)]
        
        # Objective: minimize risk (x^T * Sigma * x)
        for i in range(self.n_assets):
            for j in range(self.n_assets):
                if i == j:
                    bqm.add_variable(f'x_{i}', self.sigma[i, i])
                else:
                    bqm.add_interaction(f'x_{i}', f'x_{j}', 2 * self.sigma[i, j])
        
        # Constraint 1: budget constraint (sum(x_i) = n)
        # We use (sum(x_i) - n)^2 = sum(x_i)^2 - 2n*sum(x_i) + n^2
        for i in range(self.n_assets):
            bqm.add_variable(f'x_{i}', lambda_1 * (1 - 2*n))
            for j in range(i+1, self.n_assets):
                bqm.add_interaction(f'x_{i}', f'x_{j}', 2 * lambda_1)
        
        # Constant term n^2 can be ignored in optimization
        
        # Constraint 2: return constraint (mu^T * x >= R_star)
        # We use (mu^T * x - R_star)^2 for >= constraint with slack
        if R_star > 0:
            for i in range(self.n_assets):
                # Linear terms
                bqm.add_variable(f'x_{i}', lambda_2 * (self.mu[i]**2 - 2*self.mu[i]*R_star))
                # Quadratic terms
                for j in range(i+1, self.n_assets):
                    bqm.add_interaction(f'x_{i}', f'x_{j}', 2 * lambda_2 * self.mu[i] * self.mu[j])
        
        return bqm
    
    def estimate_parameters(self, n: int) -> Tuple[float, float]:
        """
        Estimate good values for lambda_1 and lambda_2 based on the paper's approach
        
        Args:
            n: Number of assets to select
            
        Returns:
            Tuple of (lambda_1, lambda_2) estimates
        """
        # Estimate lambda_1
        # Find stock i for which the sum of the smallest n values of sigma_ij is the largest
        max_sum = 0
        for i in range(self.n_assets):
            # Get row i of sigma matrix and sort
            row = sorted(self.sigma[i])
            # Sum the smallest n values (excluding sigma_ii which is at index 0)
            sum_n = sum(row[1:n+1])
            max_sum = max(max_sum, sum_n)
        
        lambda_1_estimate = max_sum
        
        # Estimate lambda_2
        # This is a simplification of the approach in the paper
        if n < self.n_assets:
            # Average difference between smallest n sums
            sorted_sums = []
            for i in range(self.n_assets):
                # Calculate sum for each asset with the smallest n-1 other assets
                row = [(j, self.sigma[i][j]) for j in range(self.n_assets) if j != i]
                row.sort(key=lambda x: x[1])
                sorted_sums.append(sum(x[1] for x in row[:n-1]))
            
            sorted_sums.sort()
            A1 = np.mean(np.diff(sorted_sums[:n]))
            
            # Average positive difference in mu between these n stocks
            sorted_mu = sorted(self.mu)
            A2 = np.mean(np.diff(sorted_mu[-n:]))
            
            if A2 > 0:
                lambda_2_estimate = A1 / A2
            else:
                lambda_2_estimate = 0.1  # Default value if A2 is not positive
        else:
            lambda_2_estimate = 0.1  # Default value for edge case
            
        return lambda_1_estimate, lambda_2_estimate
    
    def solve_with_dwave_hybrid(self, n: int, R_star: float, 
                               lambda_1: float = None, lambda_2: float = None,
                               time_limit: float = None) -> Dict[str, Any]:
        """
        Solve the portfolio optimization problem using D-Wave's hybrid solver
        
        Args:
            n: Number of assets to select
            R_star: Minimum required return
            lambda_1: Penalty for budget constraint (estimated if None)
            lambda_2: Penalty for return constraint (estimated if None)
            time_limit: Time limit for the solver in seconds
            
        Returns:
            Dictionary with solution information
        """
        # Estimate parameters if not provided
        if lambda_1 is None or lambda_2 is None:
            est_lambda_1, est_lambda_2 = self.estimate_parameters(n)
            lambda_1 = lambda_1 if lambda_1 is not None else est_lambda_1
            lambda_2 = lambda_2 if lambda_2 is not None else est_lambda_2
        
        # Build QUBO
        bqm = self.build_qubo(n, R_star, lambda_1, lambda_2)
        
        # Create the hybrid sampler
        sampler = LeapHybridSampler()
        
        # Set time limit if provided
        kwargs = {}
        if time_limit is not None:
            kwargs['time_limit'] = time_limit
        
        # Solve the problem
        start_time = time.time()
        response = sampler.sample(bqm, **kwargs)
        end_time = time.time()
        
        # Extract solution
        best_sample = response.first.sample
        energy = response.first.energy
        
        # Convert solution to array
        solution = np.zeros(self.n_assets)
        for i in range(self.n_assets):
            if f'x_{i}' in best_sample and best_sample[f'x_{i}'] == 1:
                solution[i] = 1
        
        # Calculate metrics
        risk = self.calculate_risk(solution)
        total_return = self.calculate_return(solution)
        selected_count = int(sum(solution))
        
        return {
            'solution': solution,
            'risk': risk,
            'return': total_return,
            'selected': selected_count,
            'energy': energy,
            'time': end_time - start_time,
            'lambda_1': lambda_1,
            'lambda_2': lambda_2
        }
    
    def solve_with_simulated_annealing(self, n: int, R_star: float, 
                                     lambda_1: float = None, lambda_2: float = None,
                                     num_reads: int = 1000) -> Dict[str, Any]:
        """
        Solve using simulated annealing
        
        Args:
            n: Number of assets to select
            R_star: Minimum required return
            lambda_1: Penalty for budget constraint (estimated if None)
            lambda_2: Penalty for return constraint (estimated if None)
            num_reads: Number of annealing runs
            
        Returns:
            Dictionary with solution information
        """
        # Estimate parameters if not provided
        if lambda_1 is None or lambda_2 is None:
            est_lambda_1, est_lambda_2 = self.estimate_parameters(n)
            lambda_1 = lambda_1 if lambda_1 is not None else est_lambda_1
            lambda_2 = lambda_2 if lambda_2 is not None else est_lambda_2
        
        # Build QUBO
        bqm = self.build_qubo(n, R_star, lambda_1, lambda_2)
        
        # Create the simulated annealing sampler
        sampler = neal.SimulatedAnnealingSampler()
        
        # Solve the problem
        start_time = time.time()
        response = sampler.sample(bqm, num_reads=num_reads)
        end_time = time.time()
        
        # Extract solution
        best_sample = response.first.sample
        energy = response.first.energy
        
        # Convert solution to array
        solution = np.zeros(self.n_assets)
        for i in range(self.n_assets):
            if f'x_{i}' in best_sample and best_sample[f'x_{i}'] == 1:
                solution[i] = 1
        
        # Calculate metrics
        risk = self.calculate_risk(solution)
        total_return = self.calculate_return(solution)
        selected_count = int(sum(solution))
        
        return {
            'solution': solution,
            'risk': risk,
            'return': total_return,
            'selected': selected_count,
            'energy': energy,
            'time': end_time - start_time,
            'lambda_1': lambda_1,
            'lambda_2': lambda_2
        }
    
    def solve_with_genetic_algorithm(self, n: int, R_star: float, 
                                   population_size: int = 100, 
                                   max_generations: int = 100) -> Dict[str, Any]:
        """
        Solve using a genetic algorithm approach
        
        Args:
            n: Number of assets to select
            R_star: Minimum required return
            population_size: Size of the population
            max_generations: Maximum number of generations
            
        Returns:
            Dictionary with solution information
        """
        from deap import base, creator, tools, algorithms
        import random
        
        # Create fitness and individual types
        creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
        creator.create("Individual", list, fitness=creator.FitnessMin)
        
        # Initialize toolbox
        toolbox = base.Toolbox()
        
        # Define individual initialization to have exactly n ones
        def init_individual():
            ind = [0] * self.n_assets
            ones_indices = random.sample(range(self.n_assets), n)
            for idx in ones_indices:
                ind[idx] = 1
            return creator.Individual(ind)
        
        toolbox.register("individual", init_individual)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)
        
        # Define evaluation function
        def evaluate(individual):
            ind_array = np.array(individual)
            
            # Calculate risk
            risk = self.calculate_risk(ind_array)
            
            # Add penalties for constraint violations
            penalty = 0
            
            # Budget constraint penalty
            selected_count = sum(individual)
            if selected_count != n:
                penalty += abs(selected_count - n) * 1000
            
            # Return constraint penalty
            total_return = self.calculate_return(ind_array)
            if total_return < R_star:
                penalty += (R_star - total_return) * 1000
            
            return risk + penalty,
        
        toolbox.register("evaluate", evaluate)
        
        # Define genetic operators
        def custom_crossover(ind1, ind2):
            # Make sure offspring maintain exactly n selected assets
            while True:
                # Regular one-point crossover
                idx = random.randint(1, len(ind1) - 1)
                off1 = ind1[:idx] + ind2[idx:]
                off2 = ind2[:idx] + ind1[idx:]
                
                # Check if they have exactly n ones
                if sum(off1) == n and sum(off2) == n:
                    break
                
            return creator.Individual(off1), creator.Individual(off2)
        
        def custom_mutation(individual):
            # Swap-based mutation to maintain n selected assets
            if random.random() < 0.5:
                return individual,
                
            # Find positions of 0s and 1s
            ones_indices = [i for i, val in enumerate(individual) if val == 1]
            zeros_indices = [i for i, val in enumerate(individual) if val == 0]
            
            # Randomly select one 1 and one 0 to swap
            if ones_indices and zeros_indices:
                idx_one = random.choice(ones_indices)
                idx_zero = random.choice(zeros_indices)
                
                individual[idx_one] = 0
                individual[idx_zero] = 1
            
            return individual,
        
        toolbox.register("mate", custom_crossover)
        toolbox.register("mutate", custom_mutation)
        toolbox.register("select", tools.selTournament, tournsize=3)
        
        # Create population
        start_time = time.time()
        pop = toolbox.population(n=population_size)
        
        # Run the algorithm
        algorithms.eaSimple(pop, toolbox, cxpb=0.7, mutpb=0.2, 
                          ngen=max_generations, verbose=False)
        
        # Get best solution
        best_ind = tools.selBest(pop, 1)[0]
        end_time = time.time()
        
        # Convert to numpy array
        solution = np.array(best_ind)
        
        # Calculate metrics
        risk = self.calculate_risk(solution)
        total_return = self.calculate_return(solution)
        selected_count = int(sum(solution))
        
        return {
            'solution': solution,
            'risk': risk,
            'return': total_return,
            'selected': selected_count,
            'time': end_time - start_time
        }
    
    def calculate_risk(self, x: np.ndarray) -> float:
        """Calculate the risk (variance) of the portfolio"""
        return x @ self.sigma @ x
    
    def calculate_return(self, x: np.ndarray) -> float:
        """Calculate the expected return of the portfolio"""
        return np.dot(self.mu, x)
    
    def is_feasible(self, x: np.ndarray, n: int, R_star: float) -> bool:
        """Check if solution is feasible"""
        return sum(x) == n and self.calculate_return(x) >= R_star


def load_stock_data(file_path: str) -> pd.DataFrame:
    """Load stock returns data from CSV file"""
    df = pd.read_csv(file_path, index_col=0, parse_dates=True)
    return df


def generate_synthetic_data(n_assets: int = 100, n_periods: int = 20, seed: int = 42) -> pd.DataFrame:
    """Generate synthetic stock return data for testing"""
    np.random.seed(seed)
    
    # Generate random returns with some correlation structure
    # Create base factors (market, sectors, etc.)
    n_factors = 5
    factor_returns = np.random.normal(0.01, 0.05, (n_periods, n_factors))
    
    # Generate asset returns based on factors plus idiosyncratic returns
    asset_returns = np.zeros((n_periods, n_assets))
    
    for i in range(n_assets):
        # Random factor loadings
        factor_loadings = np.random.uniform(-1, 1, n_factors)
        factor_loadings = factor_loadings / np.sum(np.abs(factor_loadings))  # Normalize
        
        # Generate returns for this asset
        systematic_returns = factor_returns @ factor_loadings
        idiosyncratic_returns = np.random.normal(0.005, 0.03, n_periods)
        asset_returns[:, i] = systematic_returns + idiosyncratic_returns
    
    # Convert to DataFrame
    columns = [f'Asset_{i}' for i in range(n_assets)]
    index = pd.date_range('2020-01-01', periods=n_periods, freq='Q')
    returns_df = pd.DataFrame(asset_returns, index=index, columns=columns)
    
    return returns_df


def parameter_grid_search(optimizer: PortfolioOptimizer, n: int, R_star: float, 
                         lambda_1_range: List[float], lambda_2_range: List[float]) -> Dict:
    """
    Perform grid search to find optimal lambda parameters
    
    Args:
        optimizer: Portfolio optimizer instance
        n: Number of assets to select
        R_star: Minimum required return
        lambda_1_range: List of lambda_1 values to test
        lambda_2_range: List of lambda_2 values to test
    
    Returns:
        Dictionary with best parameters and results
    """
    best_risk = float('inf')
    best_params = None
    best_result = None
    
    for lambda_1 in lambda_1_range:
        for lambda_2 in lambda_2_range:
            result = optimizer.solve_with_dwave_hybrid(n, R_star, lambda_1, lambda_2)
            
            # Check if solution is feasible and better than current best
            solution_is_feasible = optimizer.is_feasible(result['solution'], n, R_star)
            
            if solution_is_feasible and result['risk'] < best_risk:
                best_risk = result['risk']
                best_params = (lambda_1, lambda_2)
                best_result = result
    
    return {
        'best_lambda_1': best_params[0],
        'best_lambda_2': best_params[1],
        'best_result': best_result
    }


def run_comparative_analysis(data_path: str = None, N: int = 100, n: int = 20, 
                           R_star: float = 2500, use_synthetic: bool = True):
    """
    Run comparative analysis of different optimization methods
    
    Args:
        data_path: Path to CSV file with returns data (if not using synthetic data)
        N: Number of assets to consider
        n: Number of assets to select
        R_star: Minimum required return
        use_synthetic: Whether to use synthetic data
    """
    # Load or generate data
    if use_synthetic:
        returns_data = generate_synthetic_data(N, 20)
    else:
        returns_data = load_stock_data(data_path)
        # If the data has more assets than N, take the first N
        if returns_data.shape[1] > N:
            returns_data = returns_data.iloc[:, :N]
    
    # Initialize optimizer
    optimizer = PortfolioOptimizer(returns_data)
    
    # Estimate lambda parameters
    est_lambda_1, est_lambda_2 = optimizer.estimate_parameters(n)
    print(f"Estimated lambda_1: {est_lambda_1:.2f}, lambda_2: {est_lambda_2:.2f}")
    
    # Define lambda ranges for grid search
    lambda_1_values = [est_lambda_1 * 0.8, est_lambda_1, est_lambda_1 * 1.2]
    lambda_2_values = [0, est_lambda_2 * 0.5, est_lambda_2, est_lambda_2 * 1.5]
    
    # Perform grid search
    print("Running parameter grid search...")
    grid_search_results = parameter_grid_search(
        optimizer, n, R_star, lambda_1_values, lambda_2_values
    )
    
    # Use best parameters from grid search
    best_lambda_1 = grid_search_results['best_lambda_1']
    best_lambda_2 = grid_search_results['best_lambda_2']
    print(f"Best lambda_1: {best_lambda_1:.2f}, lambda_2: {best_lambda_2:.2f}")
    
    # Run all solvers
    print("\nRunning D-Wave Hybrid solver...")
    dwave_results = optimizer.solve_with_dwave_hybrid(n, R_star, best_lambda_1, best_lambda_2)
    
    print("Running Simulated Annealing solver...")
    sa_results = optimizer.solve_with_simulated_annealing(n, R_star, best_lambda_1, best_lambda_2)
    
    print("Running Genetic Algorithm solver...")
    ga_results = optimizer.solve_with_genetic_algorithm(n, R_star)
    
    # Display results
    print("\nResults Summary:")
    print("-" * 60)
    print(f"{'Method':<20} {'Risk':<10} {'Return':<10} {'Selected':<10} {'Time (s)':<10}")
    print("-" * 60)
    print(f"{'D-Wave Hybrid':<20} {dwave_results['risk']:<10.2f} {dwave_results['return']:<10.2f} {dwave_results['selected']:<10} {dwave_results['time']:<10.2f}")
    print(f"{'Simulated Annealing':<20} {sa_results['risk']:<10.2f} {sa_results['return']:<10.2f} {sa_results['selected']:<10} {sa_results['time']:<10.2f}")
    print(f"{'Genetic Algorithm':<20} {ga_results['risk']:<10.2f} {ga_results['return']:<10.2f} {ga_results['selected']:<10} {ga_results['time']:<10.2f}")
    print("-" * 60)
    
    # Check if solutions are feasible
    print("\nFeasibility Check:")
    print(f"D-Wave Hybrid: {optimizer.is_feasible(dwave_results['solution'], n, R_star)}")
    print(f"Simulated Annealing: {optimizer.is_feasible(sa_results['solution'], n, R_star)}")
    print(f"Genetic Algorithm: {optimizer.is_feasible(ga_results['solution'], n, R_star)}")
    
    # Compare selected assets
    dwave_selected = np.where(dwave_results['solution'] == 1)[0]
    sa_selected = np.where(sa_results['solution'] == 1)[0]
    ga_selected = np.where(ga_results['solution'] == 1)[0]
    
    print("\nAsset Selection Overlap:")
    print(f"D-Wave/SA: {len(np.intersect1d(dwave_selected, sa_selected))}/{n}")
    print(f"D-Wave/GA: {len(np.intersect1d(dwave_selected, ga_selected))}/{n}")
    print(f"SA/GA: {len(np.intersect1d(sa_selected, ga_selected))}/{n}")


def run_dwave_benchmark(N_values: List[int], n: int = 50, R_star: float = 3500):
    """
    Run D-Wave benchmark for different problem sizes (similar to Table 4 in the paper)
    
    Args:
        N_values: List of different problem sizes to test
        n: Number of assets to select
        R_star: Minimum required return
    """
    results = []
    
    for N in N_values:
        print(f"\nRunning benchmark for N={N}, n={n}, R*={R_star}")
        
        # Generate synthetic data
        returns_data = generate_synthetic_data(N, 20)
        
        # Initialize optimizer
        optimizer = PortfolioOptimizer(returns_data)
        
        # Estimate parameters
        est_lambda_1, est_lambda_2 = optimizer.estimate_parameters(n)
        print(f"Estimated lambda_1: {est_lambda_1:.2f}, lambda_2: {est_lambda_2:.2f}")
        
        # Run solvers
        print("Running D-Wave Hybrid solver...")
        dwave_results = optimizer.solve_with_dwave_hybrid(n, R_star, est_lambda_1, est_lambda_2)
        
        print("Running Simulated Annealing solver...")
        sa_results = optimizer.solve_with_simulated_annealing(n, R_star, est_lambda_1, est_lambda_2)
        
        print("Running Genetic Algorithm solver...")
        ga_results = optimizer.solve_with_genetic_algorithm(n, R_star)
        
        # Store results
        results.append({
            'N': N,
            'n': n,
            'R_star': R_star,
            'HQPU_risk': dwave_results['risk'],
            'HQPU_time': dwave_results['time'],
            'SA_risk': sa_results['risk'],
            'SA_time': sa_results['time'],
            'GA_risk': ga_results['risk'],
            'GA_time': ga_results['time'],
            'HQPU_feasible': optimizer.is_feasible(dwave_results['solution'], n, R_star),
            'SA_feasible': optimizer.is_feasible(sa_results['solution'], n, R_star),
            'GA_feasible': optimizer.is_feasible(ga_results['solution'], n, R_star)
        })
    
    # Create DataFrame and display results
    results_df = pd.DataFrame(results)
    print("\nBenchmark Results:")
    print(results_df)
    
    # Plot results
    plt.figure(figsize=(12, 8))
    
    # Plot risk
    plt.subplot(2, 1, 1)
    plt.plot(results_df['N'], results_df['HQPU_risk'], 'o-', label='D-Wave Hybrid')
    plt.plot(results_df['N'], results_df['SA_risk'], 's-', label='Simulated Annealing')
    plt.plot(results_df['N'], results_df['GA_risk'], '^-', label='Genetic Algorithm')
    plt.xlabel('Number of assets (N)')
    plt.ylabel('Risk')
    plt.title('Risk vs. Problem Size')
    plt.legend()
    plt.grid(True)
    
    # Plot time
    plt.subplot(2, 1, 2)
    plt.plot(results_df['N'], results_df['HQPU_time'], 'o-', label='D-Wave Hybrid')
    plt.plot(results_df['N'], results_df['SA_time'], 's-', label='Simulated Annealing')
    plt.plot(results_df['N'], results_df['GA_time'], '^-', label='Genetic Algorithm')
    plt.xlabel('Number of assets (N)')
    plt.ylabel('Solution time (seconds)')
    plt.title('Solution Time vs. Problem Size')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    return results_df


def main():
    # Example usage
    print("Portfolio Optimization using D-Wave Quantum Annealer")
    print("===================================================")
    
    # Option 1: Run comparative analysis for a single problem instance
    # run_comparative_analysis(N=100, n=20, R_star=2500)
    
    # Option 2: Run benchmark for different problem sizes
    N_values = [50, 100, 150, 200]
    run_dwave_benchmark(N_values, n=25, R_star=1500)


if __name__ == "__main__":
    main()

Portfolio Optimization using D-Wave Quantum Annealer

Running benchmark for N=50, n=25, R*=1500
Estimated lambda_1: 0.00, lambda_2: 0.36
Running D-Wave Hybrid solver...


  index = pd.date_range('2020-01-01', periods=n_periods, freq='Q')


ValueError: API token not defined