In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
from pyqubo import Array
import neal
import matplotlib.pyplot as plt
import requests


In [5]:
def download_data(assets, start, end):
    data = yf.download(assets, start=start, end=end)
    data = data.loc[:,('Adj Close', slice(None))]
    data.columns = assets 
    return data

def calculate_expected_returns(data, fillna_method=None, threshold=0.01):
    """
    Calculate the expected returns of the assets, with handling for extreme values and NaN columns.

    Args:
    data (DataFrame): The input data containing asset prices.
    fillna_method (str, optional): Method to handle NaN values ('fillna' to fill, 'median' for median fill, 
                                   or 'dropna' to drop). Defaults to None.
    threshold (float, optional): Threshold for identifying extreme returns, defaults to 1% (0.01).

    Returns:
    List: A list of expected returns.
    """
    returns = data.pct_change()

    # Fill NaN values based on the specified method
    if fillna_method == 'fillna':
        returns = returns.fillna(method='ffill').fillna(method='bfill')
    elif fillna_method == 'median':
        returns = returns.fillna(returns.median())
    elif fillna_method == 'dropna':
        returns = returns.dropna()

    # Remove columns that are still all NaN (if any)
    returns = returns.dropna(axis=1, how='all')

    # Cap and floor extreme returns at the threshold
    capped_returns = np.clip(returns, -threshold, threshold)

    # Calculate mean, handling any remaining NaN values
    return capped_returns.mean().fillna(0).tolist()

def calculate_covariance_matrix(data, fillna_method=None, threshold=0.01):
    """
    Calculate the covariance matrix of the assets' returns, with handling for extreme values and NaN columns.

    Args:
    data (DataFrame): The input data containing asset prices.
    fillna_method (str, optional): Method to handle NaN values ('fillna' to fill, 'median' for median fill, 
                                   or 'dropna' to drop). Defaults to None.
    threshold (float, optional): Threshold for identifying extreme returns, defaults to 1% (0.01).

    Returns:
    List[List]: A 2D list representing the covariance matrix.
    """
    returns = data.pct_change()

    # Fill NaN values based on the specified method
    if fillna_method == 'fillna':
        returns = returns.fillna(method='ffill').fillna(method='bfill')
    elif fillna_method == 'median':
        returns = returns.fillna(returns.median())
    elif fillna_method == 'dropna':
        returns = returns.dropna()

    # Remove columns that are still all NaN (if any)
    returns = returns.dropna(axis=1, how='all')

    # Cap and floor extreme returns at the threshold
    capped_returns = np.clip(returns, -threshold, threshold)

    # Calculate and return the covariance matrix
    return capped_returns.cov().values.tolist()

def get_latest_prices(data):
    return data.iloc[-1].tolist()  # Get the last row of the dataframe, which has the latest prices


In [3]:
# Project: Portfolio Optimization_Budget constraint
class PortfolioQUBO_v1(object):
    def __init__(self, assets, E_R, Cov, A, B, theta1, theta2, theta3):
        self.theta1 = theta1
        self.theta2 = theta2
        self.theta3 = theta3
        self.assets = len(assets)
        self.E_R = E_R  
        self.Cov = Cov
        self.A = A
        self.B = B
        self.array = Array.create('asset', shape=(self.assets), vartype='BINARY')

    def Return(self):
        # Objective: Maximize returns
        H = sum(-self.E_R[i] * self.array[i] for i in range(self.assets))
        return self.theta1 * H
        
    def Risk(self):
        # Objective: Minimize risk (as measured by portfolio variance)
        H = sum(self.Cov[i][j] * self.array[i] * self.array[j] for i in range(self.assets) for j in range(self.assets))
        return self.theta2 * H
    def Budget(self):
        # Constraint: Total budget spent should be close to B
        H = sum(self.A[i] * self.array[i] for i in range(self.assets))
        return self.theta3 * (H - self.B)**2

In [6]:
# Importing necessary libraries
import pandas as pd

# Reading data from the new CSV file
file_path = '../data_p/quantum_data.address.csv'
data = pd.read_csv(file_path)

# Assuming the CSV contains columns for assets and their historical prices
# We will process this data similarly to how the original data was processed

# Calculate expected returns, covariance matrix, and latest prices
E_R = calculate_expected_returns(data)
print("E_R:", E_R)
Cov = calculate_covariance_matrix(data)
print("Cov:", Cov)
A = get_latest_prices(data)
print("A:", A)

# Define other parameters
B = 1000
theta1, theta2, theta3 = 0.5, 0.3, 0.2

# Create QUBO object and compile
portfolio_qubo = PortfolioQUBO_v1(data.columns.tolist(), E_R, Cov, A, B, theta1, theta2, theta3)
objective = portfolio_qubo.Return() + portfolio_qubo.Risk() + portfolio_qubo.Budget()
model = objective.compile()
qubo, offset = model.to_qubo()

# Solve QUBO using Simulated Annealing Sampler
sampler = neal.SimulatedAnnealingSampler()
response = sampler.sample_qubo(qubo)

# Print results
for sample, energy in response.data(['sample', 'energy']):
    print(sample, energy)


E_R: [-7.246376811594201e-05, 1.2570459970846428e-20, -0.00019406351779261018, -0.00033899105402449415, -0.00014492753623188408, -0.00028985507246376816, -0.00033899105402449415, 0.0007317073170731708, -0.00028985507246376816, 0.0006976744186046512, -7.246376811594204e-05, 0.0, -7.246376811594201e-05, -7.246376811594201e-05, -7.246376811594201e-05, 0.0, -7.246376811594201e-05, -7.246376811594199e-05, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.0005882352941176471, 0.0, -0.0005882352941176471, -0.0007317073170731707, -0.0002439024390243903, -0.005333333333333333, 0.0007317073170731706, -0.0007317073170731707, -0.0002439024390243903, -0.005333333333333333, 0.0007317073170731706, -0.0007317073170731707, -0.0002439024390243903, -0.005333333333333333, 0.0007317073170731706, -0.0007317073170731707, -0.0002439024390243903, -0.005333333333333333, 0.0007317073170731706, -8.896017825522087e-20, 0.0, 0.0, 0.0, 0.0002564102564102564]
Cov: [[3.284142600232731e-05, 1.0218978102189785e-05, 2.175615801620977e-06

IndexError: list index out of range

In [8]:
# Project: Portfolio Optimization_Budget constraint
class PortfolioQUBO_v1(object):
    def __init__(self, assets, E_R, Cov, A, B, theta1, theta2, theta3):
        self.theta1 = theta1
        self.theta2 = theta2
        self.theta3 = theta3
        self.assets = len(assets)
        self.E_R = E_R  
        self.Cov = Cov
        self.A = A
        self.B = B
        self.array = Array.create('asset', shape=(self.assets), vartype='BINARY')

    def Return(self):
        # Objective: Maximize returns
        H = sum(-self.E_R[i] * self.array[i] for i in range(self.assets))
        return self.theta1 * H
        
    def Risk(self):
        # Objective: Minimize risk (as measured by portfolio variance)
        H = sum(self.Cov[i][j] * self.array[i] * self.array[j] for i in range(self.assets) for j in range(self.assets))
        return self.theta2 * H
    def Budget(self):
        # Constraint: Total budget spent should be close to B
        H = sum(self.A[i] * self.array[i] for i in range(self.assets))
        return self.theta3 * (H - self.B)**2

In [None]:
# PortfolioQUBO_v2_nAssets and Return constraints are added to the PortfolioQUBO_v2
class PortfolioQUBO_v2(object):
    def __init__(self, assets, Sigma, n, Mu, R_dot, lambda1, lambda2, lambda3):        
        self.lambda1 = lambda1
        self.lambda2 = lambda2
        self.lambda3 = lambda3
        self.assets = assets
        self.Sigma = Sigma  
        self.n = n
        self.Mu = Mu
        self.R_dot = R_dot
        self.array = Array.create('asset', shape=(assets), vartype='BINARY')

    def Risk(self):
        """Minimize the portfolio variance."""
        H = sum(self.Sigma[i][j] * self.array[i] * self.array[j] for i in range(self.assets) for j in range(self.assets))      
        return self.lambda1 * H
  
    def nAssets(self):
        """Ensure the portfolio has exactly n assets."""
        H = sum(self.array[i] for i in range(self.assets))
        return self.lambda2 * (H - self.n)**2
    
    def Return(self):
        """Ensure the portfolio has an expected return close to R_dot."""
        H = sum(self.Mu[i] * self.array[i] for i in range(self.assets))
        return self.lambda3 * (H - self.R_dot)**2


In [None]:
assets = ['JCI', 'TGT', 'CMCSA', 'CPB', 'MO', 'APA', 'MMC', 'JPM',
              'ZION', 'PSA', 'BAX', 'BMY', 'LUV', 'PCAR', 'TXT', 'TMO',
              'DE', 'MSFT', 'HPQ', 'SEE', 'VZ', 'CNP', 'NI', 'T', 'BA',
              'AAPL', 'AMZN', 'GOOG', 'BRK-B', 'JNJ', 'TSLA', 'QCOM',
              'NVDA', 'MA','KO', 'IBM', 'PFE', 'XOM', 'DD', 'C']
data = download_data(assets, start='2015-01-01', end='2019-12-30')
    
    # Calculate expected returns, covariance matrix, and latest prices
E_R = calculate_expected_returns(data)
Cov = calculate_covariance_matrix(data)
A = get_latest_prices(data)

    # Define other parameters
B = 1000
theta1, theta2, theta3 = 0.5, 0.3, 0.2
    
    # Create QUBO object and compile
portfolio_qubo = PortfolioQUBO_v1(assets, E_R, Cov, A, B, theta1, theta2, theta3)
objective = portfolio_qubo.Return() + portfolio_qubo.Risk() + portfolio_qubo.Budget()
model = objective.compile()
qubo, offset = model.to_qubo()

    # Solve QUBO using Simulated Annealing Sampler
sampler = neal.SimulatedAnnealingSampler()
response = sampler.sample_qubo(qubo)
    
    # Print results
for sample, energy in response.data(['sample', 'energy']):
    print(sample, energy)

In [None]:
assets = ['JCI', 'TGT', 'CMCSA', 'CPB', 'MO', 'APA', 'MMC', 'JPM',
          'ZION', 'PSA', 'BAX', 'BMY', 'LUV', 'PCAR', 'TXT', 'TMO',
          'DE', 'MSFT', 'HPQ', 'SEE', 'VZ', 'CNP', 'NI', 'T', 'BA',
          'AAPL', 'AMZN', 'GOOG', 'BRK-B', 'JNJ', 'TSLA', 'QCOM',
          'NVDA', 'MA','KO', 'IBM', 'PFE', 'XOM', 'DD', 'C']

data = download_data(assets, start='2015-01-01', end='2019-12-30')

# Calculate expected returns, covariance matrix, and latest prices
E_R = calculate_expected_returns(data)
Cov = calculate_covariance_matrix(data)
A = get_latest_prices(data)

# Define other parameters
n = 10  # For instance, if you want to select 10 assets
R_dot = 0.02  # Some desired return, modify as needed
theta1, theta2, theta3 = 0.5, 0.3, 0.2

# Create QUBO object and compile using PortfolioQUBO_v2
portfolio_qubo = PortfolioQUBO_v2(len(assets), Cov, n, E_R, R_dot, theta1, theta2, theta3)
objective = portfolio_qubo.Return() + portfolio_qubo.Risk() + portfolio_qubo.nAssets()
model = objective.compile()
qubo, offset = model.to_qubo()

# Solve QUBO using Simulated Annealing Sampler
sampler = neal.SimulatedAnnealingSampler()
response = sampler.sample_qubo(qubo)

# Print results
for sample, energy in response.data(['sample', 'energy']):
    print(sample, energy)

In [None]:
# multifactor QUBO
class fm3_QUBO(object):
    def __init__(self, assets, SR, Cov, A, nbits, TF, theta1, theta2, theta3):
        self.theta1 = theta1
        self.theta2 = theta2
        self.theta3 = theta3
        self.assets = len(assets)
        self.SR = SR  
        self.Cov = Cov
        self.A = A
        self.nbits = nbits
        self.TF = TF
        self.array = Array.create('asset', shape=(self.assets, nbits), vartype='BINARY')
        
    def Return(self):
        H = sum(-self.SR[i] * sum(self.array[i][s] * 2**s for s in range(self.nbits)) for i in range(self.assets))
        return self.theta1 * H
        
    def Risk(self):
        H = 0
        for i in range(self.assets):
            for j in range(i, self.assets):
                for si in range(self.nbits):
                    for sj in range(si + 1, self.nbits):
                        H += self.Cov[i][j] * self.array[i][si] * self.array[j][sj]
        return self.theta2 * H
  
    def Budget(self):
        H = sum(sum(self.array[i][s] * 2**s for s in range(self.nbits)) for i in range(self.assets))
        return self.theta3 * (H - self.TF)**2

    


In [None]:
assets = ['JCI', 'TGT', 'CMCSA', 'CPB', 'MO', 'APA', 'MMC', 'JPM',
              'ZION', 'PSA', 'BAX', 'BMY', 'LUV', 'PCAR', 'TXT', 'TMO',
              'DE', 'MSFT', 'HPQ', 'SEE', 'VZ', 'CNP', 'NI', 'T', 'BA',
              'AAPL', 'AMZN', 'GOOG', 'BRK-B', 'JNJ', 'TSLA', 'QCOM',
              'NVDA', 'MA','KO', 'IBM', 'PFE', 'XOM', 'DD', 'C']
data = download_data(assets, start='2015-01-01', end='2019-12-30')
    
    # Calculate expected returns, covariance matrix, and latest prices
E_R = calculate_expected_returns(data)
Cov = calculate_covariance_matrix(data)
A = get_latest_prices(data)

    # Define other parameters
TF = 1000
nbits = 5
theta1, theta2, theta3 = 0.5, 0.3, 0.2
    
    # Create QUBO object and compile
portfolio_qubo = fm3_QUBO(assets, E_R, Cov, A, nbits, TF, theta1, theta2, theta3)
objective = portfolio_qubo.Return() + portfolio_qubo.Risk() + portfolio_qubo.Budget()
model = objective.compile()
qubo, offset = model.to_qubo()

    # Solve QUBO using Simulated Annealing Sampler
sampler = neal.SimulatedAnnealingSampler()
response = sampler.sample_qubo(qubo)
    
    # Print results
for sample, energy in response.data(['sample', 'energy']):
    print(sample, energy)


In [None]:
# PortfolioQUBO_v2 with multifactor 
class PortfolioQUBO_v2_multifactor(object):
    def __init__(self, assets, Sigma, n, Mu, R_dot, lambda1, lambda2, lambda3):        
        self.lambda1 = lambda1
        self.lambda2 = lambda2
        self.lambda3 = lambda3
        self.assets = assets
        self.Sigma = Sigma  
        self.nbits = n
        self.Mu = Mu
        self.R_dot = R_dot
        self.array = Array.create('asset', shape=(assets,self.nbits), vartype='BINARY')

    def Risk(self):
        """Minimize the portfolio variance."""
        H = 0
        for i in range(self.assets):
            for j in range(i, self.assets):
                for si in range(self.nbits):
                    for sj in range(si + 1, self.nbits):
                        H += self.Sigma[i][j] * self.array[i][si] * self.array[j][sj]
        return self.lambda1 * H

    def Return(self):
        """Ensure the portfolio has an expected return close to R_dot."""  
        H = sum(self.Mu[i]*sum(self.array[i][s] * 2**(s) for s in range(self.nbits)) for i in range(self.assets))      
        return self.lambda3*(H - self.R_dot*(2**(self.nbits+1)-1))**2

In [None]:
assets = ['JCI', 'TGT', 'CMCSA', 'CPB', 'MO', 'APA', 'MMC', 'JPM',
          'ZION', 'PSA', 'BAX', 'BMY', 'LUV', 'PCAR', 'TXT', 'TMO',
          'DE', 'MSFT', 'HPQ', 'SEE', 'VZ', 'CNP', 'NI', 'T', 'BA',
          'AAPL', 'AMZN', 'GOOG', 'BRK-B', 'JNJ', 'TSLA', 'QCOM',
          'NVDA', 'MA','KO', 'IBM', 'PFE', 'XOM', 'DD', 'C']

data = download_data(assets, start='2015-01-01', end='2019-12-30')

# Calculate expected returns, covariance matrix, and latest prices
E_R = calculate_expected_returns(data)
Cov = calculate_covariance_matrix(data)
A = get_latest_prices(data)

# Define other parameters
nbits = 5 # #bits to express the available capital
R_dot = 0.02  # Some desired return, modify as needed
theta1, theta2, theta3 = 0.5, 0.3, 0.2

# Create QUBO object and compile using PortfolioQUBO_v2
portfolio_qubo = PortfolioQUBO_v2_multifactor(len(assets), Cov, nbits, E_R, R_dot, theta1, theta2, theta3)
objective = portfolio_qubo.Return() + portfolio_qubo.Risk() 
model = objective.compile()
qubo, offset = model.to_qubo()

# Solve QUBO using Simulated Annealing Sampler
sampler = neal.SimulatedAnnealingSampler()
response = sampler.sample_qubo(qubo)

# Print results
for sample, energy in response.data(['sample', 'energy']):
    print(sample, energy)

In [None]:
# Fetch S&P 500 tickers from Wikipedia
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
response = requests.get(url)

# Use pandas read_html to parse the website
table = pd.read_html(response.text)
df = table[0]
tickers = df['Symbol'].tolist()

In [None]:
def download_and_organize_data(assets, start_date, end_date):
    data_dict = {}
    failed_tickers = []
    successful_tickers = []

    for ticker in assets:
        try:
            # Attempt to download the data
            data = yf.download(ticker, start=start_date, end=end_date)
            if not data.empty:  # Ensure downloaded data is not empty
                successful_tickers.append(ticker)
                data_dict[ticker] = data['Adj Close']
            else:
                failed_tickers.append(ticker)
        except Exception as e:
            # Log the failed tickers and their respective errors
            print(f"Failed to download data for {ticker}. Reason: {e}")
            failed_tickers.append(ticker)

    # Organize the data
    df = pd.concat(data_dict, axis=1)
    df.columns = successful_tickers
    return df, failed_tickers


In [None]:
start_date = "2020-01-01"
end_date = "2022-01-01"
data, failed_tickers = download_and_organize_data(tickers, start_date, end_date)

print("Downloaded Data:")
print(data.head())  # Display the head of the successfully downloaded data
print("\nFailed to download data for:", failed_tickers)


In [None]:
# Calculate expected returns, covariance matrix, and the latest prices
E_R = calculate_expected_returns(data)
Cov = calculate_covariance_matrix(data)
A = get_latest_prices(data)