In [None]:
# Quantum Portfolio Optimization with Qiskit

# !pip install qiskit qiskit-optimization qiskit-aer
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit_optimization.converters import QuadraticProgramToQubo
from qiskit.algorithms import QAOA
from qiskit_aer.primitives import Estimator
from qiskit.primitives import Sampler
from qiskit.utils import algorithm_globals
from qiskit_optimization.translators import from_docplex_mp
from docplex.mp.model import Model

import numpy as np

#  A set C of Securities 
class Security:
    def __init__(self, name, market_price, min_trade, max_trade, inventory, min_increment):
        self.name = name
        self.market_price = market_price
        self.min_trade = min_trade
        self.max_trade = max_trade
        self.inventory = inventory
        self.min_increment = min_increment

C = [
    Security('AAPL', 180.0, 10, 100, 250, 5),
    Security('GOOG', 2800.0, 1, 20, 50, 1),
    Security('TSLA', 720.0, 5, 60, 150, 5)
]

#  A set L of Risk Buckets 
#  A set J of characteristics

class Characteristic:
    def __init__(self, name, target, upper_guardrail, lower_guardrail, current_value):
        self.name = name
        self.target = target
        self.upper_guardrail = upper_guardrail
        self.lower_guardrail = lower_guardrail
        self.current_value = current_value

    def binary_status(self):
        up = int(self.current_value > self.upper_guardrail)
        down = int(self.current_value < self.lower_guardrail)
        return {'up': up, 'down': down}
    
class RiskBucket:
    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        self.bonds = set()  
        self.characteristics = {}

    def add_bond(self, bond):
        self.bonds.add(bond)

    def remove_bond(self, bond):
        self.bonds.discard(bond)

    def add_characteristic(self, characteristic: Characteristic):
        self.characteristics[characteristic.name] = characteristic

    def evaluate_characteristics(self):
        return {name: char.binary_status() for name, char in self.characteristics.items()}

    def list_bonds(self):
        return list(self.bonds)
    
class Bond:
    def __init__(self, name, rating, duration):
        self.name = name
        self.rating = rating    # e.g. 'AAA', 'BBB'
        self.duration = duration  # in years


# 🧪 Sample usage
bond1 = Bond("Bond A", "AAA", 5)
bond2 = Bond("Bond B", "BBB", 10)
bond3 = Bond("Bond C", "AAA", 3)

low_risk = RiskBucket("Low Risk")
long_term = RiskBucket("Long Duration")

low_risk.add_bond(bond1)
low_risk.add_bond(bond3)
long_term.add_bond(bond1)
long_term.add_bond(bond2)

L = [
    low_risk,
    long_term
]

#  Global variables
N= 10 # Max number of bonds in portfolio
MinResidualCashFlow = 0
MaxResidualCashFlow = 0

# Constraints
def max_bonds_constraint(portfolio, N):
    return sum([1 for b in portfolio if b.xc > 0]) <= N

def residual_cashflow_constraint(pc, mvb, Rc, xc):
    return Rc.min() / mvb <= xc <= Rc.max() / mvb

# Objective funtion

def compute_objective(securities, buckets, contribution_map):
    """
    securities: list of Security objects with .xc (0/1 or weight)
    buckets: list of RiskBucket objects
    contribution_map: dict structured as {bucket_name: {char_name: [contrib_per_security]}}
    """
    error = 0
    for bucket in buckets:
        b_name = bucket.bucket_name
        for char_name, char_obj in bucket.characteristics.items():
            contribs = contribution_map[b_name][char_name]  # List of floats per security
            x_vector = np.array([getattr(sec, 'xc', 0) for sec in securities])
            value = np.dot(contribs, x_vector)
            deviation = (value - char_obj.target) ** 2
            error += deviation
    return error


# Operators

def pauli_z_operator(index, num_qubits):
    op = 1
    for i in range(num_qubits):
        if i == index:
            op = op ^ Z
        else:
            op = op ^ I
    return op

def get_count_constraint_hamiltonian(N, num_qubits):
    H = 0
    for i in range(num_qubits):
        H += (I - Z) / 2  # y_c = (1 - Z_c) / 2
    H = (H - N) ** 2
    return H


def get_residual_cashflow_hamiltonian(securities, target, MV=100):
    num_qubits = len(securities)
    H = 0
    for i, sec in enumerate(securities):
        z = pauli_z_operator(i, num_qubits)
        coeff = sec.market_price / MV
        H += coeff * z
    H = (H - target) ** 2
    return 0.25 * H

def get_objective_hamiltonian(securities, f_matrix, P_matrix, MoIc, k_matrix, target, c):
    num_qubits = len(securities)
    H = 0
    for i in range(num_qubits):
        Z_i = pauli_z_operator(i, num_qubits)
        y_i = (I - Z_i) / 2  # y_c
        for j in range(num_qubits):
            coeff = f_matrix[i][j] * P_matrix[i][j] / (2 * c)
            H += coeff * (MoIc * y_i + k_matrix[i][j] * target)
    return H


def get_total_hamiltonian(H_obj, H_count, H_rc, H_risk, lambda1=1.0, lambda2=1.0):
    return H_obj + lambda1 * H_count + lambda2 * H_rc + H_risk


def build_quadratic_program(securities, target_cashflow, max_count, contribution_map, buckets, penalty_weights):
    """
    Creates a QuadraticProgram from given constraints and objective logic.
    """
    qp = Model(name='Quantum Portfolio Optimization')
    y = {}  # Binary decision variable y_c
    for i, sec in enumerate(securities):
        y[i] = qp.binary_var(name=f'y_{i}')

    # Risk Deviation Penalty
    total_error = 0
    for bucket in buckets:
        b_name = bucket.bucket_name
        for char_name, char_obj in bucket.characteristics.items():
            contribs = contribution_map[b_name][char_name]
            expr = qp.sum(contribs[i] * y[i] for i in range(len(securities)))
            deviation = (expr - char_obj.target) ** 2
            total_error += deviation
    qp.minimize(total_error)

    # Cardinality constraint: sum(y_c) ≤ max_count → Penalty
    count_expr = qp.sum(y[i] for i in range(len(securities)))
    qp.add_constraint(count_expr <= max_count, ctname="cardinality")

    # Residual Cash Flow Constraint (as penalty)
    cashflow_expr = qp.sum(securities[i].market_price * y[i] for i in range(len(securities)))
    residual_deviation = (cashflow_expr - target_cashflow) ** 2
    qp.minimize(qp.objective_expr + penalty_weights['residual'] * residual_deviation)

    return from_docplex_mp(qp)

def run_qaoa_optimization(qp, reps=1, seed=42):
    algorithm_globals.random_seed = seed
    qaoa = QAOA(reps=reps, sampler=Sampler())
    optimizer = MinimumEigenOptimizer(qaoa)
    result = optimizer.solve(qp)
    return result

# Example securities with dummy prices
C = [
    Security('AAPL', 180.0, 10, 100, 250, 5),
    Security('GOOG', 2800.0, 1, 20, 50, 1),
    Security('TSLA', 720.0, 5, 60, 150, 5)
]

# Dummy contribution map (1 char per bucket)
contribution_map = {
    "Low Risk": {
        "duration": [5, 10, 3]
    },
    "Long Duration": {
        "duration": [5, 10, 3]
    }
}

# Assign characteristics to buckets
char = Characteristic(name="duration", target=7, upper_guardrail=9, lower_guardrail=5, current_value=6)
low_risk = RiskBucket("Low Risk")
long_term = RiskBucket("Long Duration")
for b in [low_risk, long_term]:
    b.add_characteristic(char)
    for i in range(3): b.add_bond(f"Bond{i}")

buckets = [low_risk, long_term]

qp = build_quadratic_program(
    securities=C,
    target_cashflow=3000,
    max_count=2,
    contribution_map=contribution_map,
    buckets=buckets,
    penalty_weights={'residual': 0.01}
)

result = run_qaoa_optimization(qp)
print("Optimal selection:", result.x)
print("Objective value:", result.fval)


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

def visualize_portfolio_result(securities, result, contribution_map, buckets, target_cashflow):
    y_vals = result.x
    selected = [sec.name for i, sec in enumerate(securities) if y_vals[i] == 1]
    prices = [sec.market_price for sec in securities]
    
    df = pd.DataFrame({
        'Security': [sec.name for sec in securities],
        'Selected': y_vals,
        'Price': prices
    })

    # ✅ Plot 1: Portfolio Selection
    plt.figure(figsize=(10, 4))
    plt.bar(df['Security'], df['Selected'], color='green')
    plt.title("Selected Securities")
    plt.ylabel("Selected (1=True, 0=False)")
    plt.show()

    # ✅ Plot 2: Cash Flow
    total_cashflow = sum(prices[i] * y_vals[i] for i in range(len(securities)))
    plt.figure(figsize=(6, 4))
    plt.bar(['Target', 'Achieved'], [target_cashflow, total_cashflow], color=['blue', 'orange'])
    plt.title("Residual Cash Flow Comparison")
    plt.ylabel("USD")
    plt.show()

    # ✅ Plot 3: Risk Deviation per Characteristic
    for bucket in buckets:
        for char_name, char_obj in bucket.characteristics.items():
            contribs = contribution_map[bucket.bucket_name][char_name]
            actual = sum([contribs[i] * y_vals[i] for i in range(len(securities))])
            target = char_obj.target

            plt.figure(figsize=(5, 4))
            plt.bar(['Target', 'Actual'], [target, actual], color=['blue', 'red'])
            plt.title(f"{bucket.bucket_name} - {char_name} deviation")
            plt.ylabel("Value")
            plt.show()


visualize_portfolio_result(
    securities=C,
    result=result,
    contribution_map=contribution_map,
    buckets=buckets,
    target_cashflow=3000
)