In [None]:
import yfinance as yf
from qiskit_finance.data_providers import RandomDataProvider
import datetime
import numpy as np

In [None]:
#constraints

budget = 2000
risk_tolerance = 0.5 # q 
penalty = 10**4 #lambda 

In [None]:
#calculating the return
def daily_returns(yesterday, today):
    if yesterday == 0:
        return 0
    return (today - yesterday) / yesterday

def expected_return(days, list_of_returns):
    return sum(list_of_returns) / days

def variance_of_asset(list_of_returns, days):
    mean_return = expected_return(days, list_of_returns)
    return sum((r - mean_return) ** 2 for r in list_of_returns) / days

def covariance(asset1_returns, asset2_returns, days):
    mean_asset1 = expected_return(days, asset1_returns)
    mean_asset2 = expected_return(days, asset2_returns)
    return sum((asset1_returns[i] - mean_asset1) * (asset2_returns[i] - mean_asset2) for i in range(days)) / days

In [None]:
import pandas as pd

tickers = ['AAPL', 'IBM', 'NFLX', 'TSLA']
data = yf.download(tickers, start="2023-01-01", end="2024-01-01")

close_prices = data['Close']

#close_prices = close_prices.pct_change().dropna()

def compute_return_and_variance(ticker, close_prices):
    list_of_returns = []
    for i in range(1, len(close_prices[ticker])):
        daily_return = daily_returns(close_prices[ticker].iloc[i-1], close_prices[ticker].iloc[i])
        list_of_returns.append(daily_return)
    average_return = expected_return(len(list_of_returns), list_of_returns)
    variance = variance_of_asset(list_of_returns, len(list_of_returns))
    print(f"Average daily return for {ticker}: {average_return:.4f}")
    print(f"Variance of daily returns for {ticker}: {variance:.4f}")
    return average_return, variance

for ticker in tickers:
    compute_return_and_variance(ticker, close_prices)




another way of calculating returns

In [None]:
returns = close_prices.pct_change().dropna()
mean_returns = returns.mean()
for ticker in tickers:
    print(f"Average daily return for {ticker}: {mean_returns[ticker]:.4f}")
cov_matrix = returns.cov()
print("\nCovariance Matrix:")   
print(cov_matrix)

In [None]:
Sigma = cov_matrix.to_numpy()

QUBO (quadratic unconstrained binary optimization)

In [None]:
import numpy as np
last_day = close_prices.iloc[-1]
asset_prices = [(ticker, last_day[ticker]) for ticker in tickers]
print("\nAsset Prices on Last Day:")
for ticker, price in asset_prices:
    print(f"{ticker}: ${price:.2f}")

#calculate n_i max for each asset:
n_i_max_list = []
for ticker, price in asset_prices:
    n_i_max = np.floor(budget / price)
    n_i_max_list.append((ticker, n_i_max))
    print(f"Maximum number of shares for {ticker} with budget ${budget}: {n_i_max}")


In [None]:
d_i_list = []
for ticker, n_i_max in n_i_max_list:
    if n_i_max > 0:
        d_i = int(np.floor(np.log2(n_i_max)))
    else:
        d_i = 0
    d_i_list.append((ticker, d_i))
    print(f"Value of d_i for {ticker}: {d_i}")


In [None]:
d_values = [d_i for _, d_i in d_i_list]
num_assets = len(d_values)
total_bits = sum(d+ 1 for d in d_values)
print(f"\nTotal number of bits required for all assets: {total_bits}")

constructing the encoding matrix $C$

In [None]:
C = np.zeros((num_assets, total_bits))

col = 0 

for i, d in enumerate(d_values):
    for j in range(d+1):
        C[i, col] = 2 ** j
        col += 1


Calculating 
$$
P'= P/B \\
\mu' = P' \circ \mu \\
\Sigma' = (P' \circ \Sigma)^T \circ P'\\
\mu^{\prime\prime} = C^T \mu' \\
\Sigma'' = C^T \Sigma'C \\
P'' = C^T P'
$$

In [None]:
# List P of current prices of assets 
P = np.array([price for _, price in asset_prices])
P_prime = P / budget
P_double_prime = C.T @ P_prime


mu = np.array([mean_returns[ticker] for ticker in tickers])
mu_prime = P_prime * mu 

mu_prime_series = pd.Series(mu_prime, index=tickers)


print("\nScaled expected return vector µ′:")
print(mu_prime_series)



In [None]:
P_prime_sigma = P_prime * Sigma
Sigma_prime = P_prime_sigma.T * P_prime 

mu_double_prime = C.T @ mu_prime

Sigma_double_prime = C.T @ Sigma_prime @ C


Constructing the Hamiltonian

In [None]:
from qiskit.quantum_info import SparsePauliOp, Pauli

n = len(P_double_prime) #total number of binary variables
from qiskit.circuit.library import RealAmplitudes, PauliTwoDesign
from qiskit_algorithms import VQE
from qiskit_algorithms.optimizers import COBYLA
from qiskit.primitives import StatevectorEstimator, Sampler
from qiskit.quantum_info import Operator
estimator = StatevectorEstimator()

optimizer = COBYLA()

The mathematical proof can be found in QUBOtoIsing.pdf that I have in this folder

In [None]:
#first hamiltonian
pi1 = 0.5 * P_double_prime
beta1 = -0.5 * np.sum(P_double_prime) + 1
h_vec = 0.5 * mu_double_prime - 0.5 * risk_tolerance * np.sum(Sigma_double_prime, axis=1)
J_mat = -0.25 * Sigma_double_prime
const_term1 = 0.5 * np.sum(mu_double_prime) - 0.25 * risk_tolerance * np.sum(Sigma_double_prime)
pauli_dict2 = {}

for i in range(n):
    label = ['I'] * n
    label[n-i-1] = 'Z'
    pauli_str = ''.join(label)
    pauli_dict2[pauli_str] = pauli_dict2.get(pauli_str, 0) + h_vec[i]

for i in range(n):
    for j in range(i , n):
        Jij = float(J_mat[i, j])
        if Jij != 0:
            if i == j:
                identity_str = 'I' * n
                pauli_dict2[identity_str] = pauli_dict2.get(identity_str, 0) + Jij
            else:
                label = ['I'] * n
                label[n-i-1] = 'Z'
                label[n-j-1] = 'Z'
                pauli_str = ''.join(label)
                pauli_dict2[pauli_str] = pauli_dict2.get(pauli_str, 0) + 2 * Jij


for i in range(n):
    label = ['I'] * n
    label[n-i-1] = 'Z'
    pauli_str = ''.join(label)
    coeff = -2 * penalty * beta1 * pi1[i]
    pauli_dict2[pauli_str] = pauli_dict2.get(pauli_str, 0) + coeff

for i in range(n):
    for j in range(i, n):
        factor = +penalty * pi1[i] * pi1[j]
        if i == j:
            identity_str = 'I' * n
            pauli_dict2[identity_str] = pauli_dict2.get(identity_str, 0) + factor
        else:
            label = ['I'] * n
            label[n-i-1] = 'Z'
            label[n-j-1] = 'Z'
            pauli_str = ''.join(label)
            pauli_dict2[pauli_str] = pauli_dict2.get(pauli_str, 0) + 2*factor

pauli_dict2['I' * n] = pauli_dict2.get('I' * n, 0) + penalty * beta1**2 + const_term1

pauli_terms2 = list(pauli_dict2.items())

print("\nPauli terms for the Hamiltonian:")
for term, coeff in pauli_terms2:
    print(f"{term}: {coeff}")


In [None]:
hamiltonian1 = SparsePauliOp.from_list(pauli_terms2)
Operator(hamiltonian1).draw('latex')

In [None]:
ansatz = RealAmplitudes(hamiltonian1.num_qubits, reps=1, entanglement='linear')
ansatz2 = PauliTwoDesign(hamiltonian1.num_qubits, reps=1)
ansatz2.decompose().draw('mpl')

In [None]:
cost_history_dict = {
    "prev_vector": None,
    "iters": 0,
    "cost_history": [],
    "parameters": [],
}

def cost_func_vqe(parameters, ansatz, hamiltonian, estimator):
    """Return estimate of energy from estimator

    Parameters:
        params (ndarray): Array of ansatz parameters
        ansatz (QuantumCircuit): Parameterized ansatz circuit
        hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
        estimator (Estimator): Estimator primitive instance

    Returns:
        float: Energy estimate
    """

    estimator_job = estimator.run([(ansatz, hamiltonian, [parameters])])
    estimator_result = estimator_job.result()[0]

    cost = estimator_result.data.evs[0]
    cost_history_dict["iters"] += 1
    cost_history_dict["prev_vector"] = parameters
    cost_history_dict["cost_history"].append(cost)
    cost_history_dict["parameters"].append(parameters)
    return cost

In [None]:
from scipy.optimize import minimize
import numpy as np
from qiskit_aer import Aer
initial_params = np.random.uniform(low= -np.pi, high=np.pi, size=ansatz2.num_parameters)
result = minimize(cost_func_vqe, initial_params, args=(ansatz2, hamiltonian1, estimator), method="COBYLA",options={'maxiter': 50000, 'disp': True})
print(result)



$$
n = Cb
$$
Where $C$ is the conversion matrix, $b$ is the binary sequence generated from the opimized circuit and $n$ is the shares vector 

In [None]:
from qiskit.quantum_info import Statevector
from qiskit import transpile
# Assign optimal parameters to ansatz
final_circuit = ansatz2.assign_parameters(result.x)

final_circuit.measure_all()
simluator = Aer.get_backend('qasm_simulator')
transpiled_circuit = transpile(final_circuit, simluator)
job_state = simluator.run(transpiled_circuit, shots=1000)

result = job_state.result()

counts = result.get_counts(final_circuit)

bitstring = max(counts, key=counts.get)
#bitstring = bitstring[::-1]  # reverse Qiskit 

print("Most likely bitstring:", bitstring)

# Convert to vector
b = np.array([int(bit) for bit in bitstring])
n = C @ b
print("\nOptimal number of shares to buy for each asset:")
for i, ticker in enumerate(tickers):
    print(f"{ticker}: {int(n[i])} shares")

In [None]:
from qiskit.visualization import plot_histogram
plot_histogram(counts)

In [None]:

total_cost_scaled = P_double_prime.T @ b
print(f"\nTotal cost of the portfolio (scaled, normalized): {total_cost_scaled:.4f}") 
objective_function_penalty = -penalty*(P_double_prime.T @ b - 1)**2
print(f"Objective function value with penalty: {objective_function_penalty:.4f}")
if total_cost_scaled > 1:
    print("Total cost exceeds budget after scaling.")
else:
    print("Total cost is within budget after scaling.")
