In [8]:
import os
import gzip
import numpy as np
import json

In [9]:
Ns = 10                 # number of stocks
Nt = 9                  # number of time steps
Nq = 1                  # Number of bits in the binary representation of the stock quantities
Ntot = Ns * Nt * Nq     # Total number of qubits/binary variables
K = 15                  # Budget constraint

# Map [n, t, q] to a single index i in [0, Ntot-1]
def tnq_to_index(t, n, q):
    i = 0
    for t_prime in range(Nt):
        for n_prime in range(Ns):
            for q_prime in range(Nq):
                if (n_prime, t_prime, q_prime) == (n, t, q):
                    return i
                i += 1
    return -1  # Should never reach here if inputs are valid

def index_to_tnq(i):
    q = i % Nq
    n = (i // Nq) % Ns
    t = (i // (Nq * Ns)) % Nt
    return t, n, q

def tn_to_index(t, n):
    i = 0
    for t_prime in range(Nt):
        for n_prime in range(Ns):
            if (n_prime, t_prime) == (n, t):
                return i
            i += 1
    return -1  # Should never reach here if inputs are valid

def index_to_tn(i):
    n = i % Ns
    t = (i // Ns) % Nt
    return t, n


## Load data

In [10]:
path = f"./data/instances/po_a0{Ns}_t{Nt + 1}_orig"

path_covariance = os.path.join(path, "covariance_matrices.txt.gz")
path_prices = os.path.join(path, "stock_prices.txt.gz")

print(path_covariance)
print(path_prices)
skiplines = 9

# read
with gzip.open(path_covariance, 'rt') as f:
    covariance_matrices = []
    for line in f:
        row = [x for x in line.strip().split()]
        covariance_matrices.append(row)

with gzip.open(path_prices, 'rt') as f:
    stock_prices = []
    for line in f:
        stock_prices.append([x for x in line.strip().split()])

covariance_matrices = covariance_matrices[skiplines:]
stock_prices = stock_prices[skiplines:]

./data/instances/po_a010_t10_orig/covariance_matrices.txt.gz
./data/instances/po_a010_t10_orig/stock_prices.txt.gz


## Map stock ticker to numerical index

In [11]:
# Create mapping from stock id to index
stock_to_index = {}
index_to_stock = {}

for i, price in enumerate(stock_prices):
    if int(price[0]) == 0:
        stock_to_index[price[1]] = i
        index_to_stock[i] = price[1]

stock_to_index

{'AAPL': 0,
 'NVDA': 1,
 'MSFT': 2,
 'GOOG': 3,
 'GOOGL': 4,
 'AMZN': 5,
 'META': 6,
 'TSLA': 7,
 'AVGO': 8,
 'WMT': 9}

## Create tensors $\mu_{t,n}$ and $\Sigma_{t, n', n}$

In [12]:
prices = np.zeros((Nt + 1, Ns))
sigma = np.zeros((Nt + 1, Ns, Ns))

for row in stock_prices:
    t = int(row[0])
    stock_id = row[1]
    i = stock_to_index[stock_id]
    prices[t, i] = float(row[2])

for row in covariance_matrices:
    t = int(row[0])
    stock_id1 = row[1]
    stock_id2 = row[2]
    i = stock_to_index[stock_id1]
    j = stock_to_index[stock_id2]
    sigma[t, i, j] = float(row[3])
    sigma[t, j, i] = float(row[3])  # symmetric

# Drop first day of covariance matrices (no returns on day 0)
sigma = sigma[1:]

# Calculate log returns
mu = (prices[1:] - prices[:-1]) / prices[:-1]  # bare returns
mu = np.log(1 + mu)  # log returns

## QUBO

In [13]:
gamma = .5  # risk aversion parameter
zeta = .1  # transaction cost parameter
rho = .1  # penalty parameter for budget constraint

Cq = lambda q: 2**q / K

Q = np.zeros((Ntot, Ntot))
Q.shape
# (A) Return term
for i in range(Ntot):
    t, n, q = index_to_tnq(i)
    Q[i, i] += -mu[t, n] * Cq(q)

# (B) Risk term
# Iterate over time steps
for t in range(Nt):
    # Get indices for time step t
    indices_t = [tnq_to_index(t, n, q) for n in range(Ns) for q in range(Nq)]
    for i in range(len(indices_t)):
        for j in range(len(indices_t)):
            idx_i = indices_t[i]
            idx_j = indices_t[j]
            t_i, n_i, q_i = index_to_tnq(idx_i)
            t_j, n_j, q_j = index_to_tnq(idx_j)
            Q[idx_i, idx_j] += 0.5 * gamma * sigma[t, n_i, n_j] * Cq(q_i) * Cq(q_j)

# (C) Transaction costs
for t in range(Nt-1):
    for n in range(Ns):
        indices_t_n = [tnq_to_index(t, n, q) for q in range(Nq)]
        indices_tp1_n = [tnq_to_index(t + 1, n, q) for q in range(Nq)]

        for i in range(len(indices_t_n)):
            for j in range(len(indices_t_n)):
                idx_i = indices_t_n[i]
                idx_j = indices_t_n[j]
                t_i, n_i, q_i = index_to_tnq(idx_i)
                t_j, n_j, q_j = index_to_tnq(idx_j)
                Q[idx_i, idx_j] += zeta * Cq(q_i) * Cq(q_j)

        for i in range(len(indices_tp1_n)):
            for j in range(len(indices_tp1_n)):
                idx_i = indices_tp1_n[i]
                idx_j = indices_tp1_n[j]
                t_i, n_i, q_i = index_to_tnq(idx_i)
                t_j, n_j, q_j = index_to_tnq(idx_j)
                Q[idx_i, idx_j] += zeta * Cq(q_i) * Cq(q_j)

        for i in range(len(indices_t_n)):
            for j in range(len(indices_tp1_n)):
                idx_i = indices_t_n[i]
                idx_j = indices_tp1_n[j]
                t_i, n_i, q_i = index_to_tnq(idx_i)
                t_j, n_j, q_j = index_to_tnq(idx_j)
                Q[idx_i, idx_j] += -2 * zeta * Cq(q_i) * Cq(q_j)

# (D) Budget constraint penalty for each time step
for t in range(Nt):
    indices_t = [tnq_to_index(t, n, q) for n in range(Ns) for q in range(Nq)]
    for i in range(len(indices_t)):
        for j in range(len(indices_t)):
            idx_i = indices_t[i]
            idx_j = indices_t[j]
            t_i, n_i, q_i = index_to_tnq(idx_i)
            t_j, n_j, q_j = index_to_tnq(idx_j)
            Q[idx_i, idx_j] += rho * Cq(q_i) * Cq(q_j)
        # Linear term from (sum x_i - K)^2 expansion
        Q[idx_i, idx_i] += -2 * rho * Cq(q_i)

# Fraction of non-zero elements
nnz = np.count_nonzero(Q)
total_elements = Q.shape[0] * Q.shape[1]
fraction_nnz = nnz / total_elements
print(f"Fraction of non-zero elements in Q: {fraction_nnz:.4f}")

# Confirm Q symmetric
#assert np.allclose(Q, Q.T), "Q is not symmetric!"

# Save QUBO as .txt
np.savetxt(f"./data/QUBO/qubo_Ns{Ns}_Nt{Nt}_Nq{Nq}_K{K}_gamma{gamma}_zeta{zeta}_rho{rho}.txt", Q, fmt="%.6f")

Q.shape

def qubo(Q):
    Q = 0.5 * (Q + Q.T)
    # Return J, h
    N = Q.shape[0]
    J = {}
    h = {}
    for i in range(N):
        for j in range(i + 1, N):
            J[(i, j)] = Q[i, j] + Q[j, i]
        h[i] = Q[i, i]
    return J, h

J, h = qubo(Q)

with open(f"./data/QUBO/qubo_Ns{Ns}_Nt{Nt}_Nq{Nq}_K{K}_gamma{gamma}_zeta{zeta}_rho{rho}.json", "w") as f:
    json.dump({"J": {f"{k[0]},{k[1]}": v for k, v in J.items()}, "h": {str(k): v for k, v in h.items()}}, f, indent=4)

Fraction of non-zero elements in Q: 0.1210


## QUBO to Ising

In [14]:
def qubo_to_ising(Q):
    """
    Convert a QUBO matrix Q to Ising model parameters J and h.
    Q: numpy array (n x n), not necessarily symmetric.
    Returns:
        J: dict of (i, j): value for i < j
        h: dict of i: value
    """
    Q = np.array(Q)
    Q = (Q + Q.T) / 2

    n = Q.shape[0]
    J = {}
    h = {}

    # Couplings
    for i in range(n):
        for j in range(i + 1, n):
            if Q[i, j] != 0:
                J[(i, j)] = Q[i, j] / 2

    # Local fields
    for i in range(n):
        h[i] = sum(-Q[i, j] / 2 for j in range(n))

    # Energy shift (constant term)

    c = np.sum(Q) / 4 + np.sum(np.diag(Q)) / 4
    return J, h, c

J, h, c = qubo_to_ising(Q)

# Save as JSON
with open(f"./data/Ising/ising_Ns{Ns}_Nt{Nt}_Nq{Nq}_K{K}_gamma{gamma}_zeta{zeta}_rho{rho}.json", "w") as f:
    json.dump({"J": {f"{k[0]},{k[1]}": v for k, v in J.items()}, "h": {str(k): v for k, v in h.items()}, "c": c}, f, indent=4)