# Currency Arbitrage via QAOA — Lead’s 6 Steps Implemented

This notebook adds a QAOA pipeline that matches the Team Lead’s steps (start everywhere, cost twist, mixer, sample, tune, pick winner).

In [1]:

# Environment & imports (Qiskit 2.3.0 friendly)
try:
    import qiskit
    print("Qiskit:", qiskit.__version__)
except Exception as e:
    print("Qiskit import error:", e)

try:
    import qiskit_algorithms
    print("qiskit_algorithms:", qiskit_algorithms.__version__)
except Exception as e:
    print("qiskit_algorithms import warning (install with `pip install qiskit-algorithms`):", e)

import numpy as np
import itertools
from typing import Dict, Tuple, List, Set
import matplotlib.pyplot as plt

from qiskit.quantum_info import SparsePauliOp
from qiskit_algorithms import QAOA
from qiskit_algorithms.optimizers import COBYLA, SPSA

# Primitives
from qiskit.primitives import StatevectorSampler
try:
    from qiskit.primitives import Sampler as SamplerV1
    _sampler_v1_ok = True
except Exception:
    _sampler_v1_ok = False

print("Primitives -> StatevectorSampler: True, Sampler(V1):", _sampler_v1_ok)


Qiskit: 2.3.0
qiskit_algorithms: 0.4.0
Primitives -> StatevectorSampler: True, Sampler(V1): False


In [2]:

# Variables, log weights, QUBO

def build_variables(n: int) -> Tuple[Dict[Tuple[int,int], int], List[Tuple[int,int]]]:
    idx, rev = {}, []
    k = 0
    for i in range(n):
        for j in range(n):
            if i == j:
                continue
            idx[(i, j)] = k
            rev.append((i, j))
            k += 1
    return idx, rev


def log_weight_matrix(rates: List[List[float]]) -> np.ndarray:
    n = len(rates)
    W = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            if i == j:
                W[i, j] = 0.0
            else:
                rij = float(rates[i][j])
                W[i, j] = np.log(rij) if rij > 0 else -1e9
    return W


def build_qubo(W: np.ndarray, A: float, B: float, C: float):
    n = W.shape[0]
    idx, rev = build_variables(n)
    m = len(rev)
    Q = np.zeros((m, m))
    c0 = 0.0

    for (i, j), k in idx.items():
        if i != j:
            Q[k, k] += -W[i, j]

    outgoing = {i: [] for i in range(n)}
    incoming = {j: [] for j in range(n)}
    for (i, j), k in idx.items():
        outgoing[i].append(k)
        incoming[j].append(k)

    for i in range(n):
        vars_i = outgoing[i]
        for a, b in itertools.combinations(sorted(vars_i), 2):
            Q[min(a,b), max(a,b)] += 2 * A
        for a in vars_i:
            Q[a, a] += -A
        c0 += A

    for j in range(n):
        vars_j = incoming[j]
        for a, b in itertools.combinations(sorted(vars_j), 2):
            Q[min(a,b), max(a,b)] += 2 * B
        for a in vars_j:
            Q[a, a] += -B
        c0 += B

    all_vars = list(range(m))
    for a, b in itertools.combinations(sorted(all_vars), 2):
        Q[min(a,b), max(a,b)] += 2 * C
    for a in all_vars:
        Q[a, a] += -C
    c0 += C

    return Q, c0, idx, rev


In [3]:

# QUBO -> Ising

def qubo_to_ising(Q: np.ndarray, c0: float) -> Tuple[SparsePauliOp, float]:
    m = Q.shape[0]
    a = np.diag(Q).copy()
    b_terms = {}
    for i in range(m):
        for j in range(i+1, m):
            if Q[i, j] != 0.0:
                b_terms[(i, j)] = Q[i, j]

    const = c0 + 0.5 * np.sum(a) + 0.25 * np.sum(list(b_terms.values()))
    h = np.zeros(m)
    for i in range(m):
        s = 0.0
        for j in range(m):
            if j == i: continue
            u, v = (min(i, j), max(i, j))
            s += b_terms.get((u, v), 0.0)
        h[i] = -0.5 * a[i] - 0.25 * s

    paulis, coeffs = [], []
    for i in range(m):
        if abs(h[i]) > 1e-12:
            label = ['I'] * m
            label[i] = 'Z'
            paulis.append(''.join(label)[::-1])
            coeffs.append(h[i])

    for (i, j), val in b_terms.items():
        Jij = 0.25 * val
        if abs(Jij) > 1e-12:
            label = ['I'] * m
            label[i] = 'Z'
            label[j] = 'Z'
            paulis.append(''.join(label)[::-1])
            coeffs.append(Jij)

    if abs(const) > 1e-12:
        paulis.append('I' * m)
        coeffs.append(const)

    op = SparsePauliOp.from_list(list(zip(paulis, coeffs)))
    return op, const


In [4]:

# Decoding & scoring

def decode_solution(bitstr: str, rev_index: List[Tuple[int,int]]) -> Set[Tuple[int,int]]:
    bits = list(map(int, bitstr[::-1]))
    edges = set()
    for k, b in enumerate(bits):
        if b == 1:
            edges.add(rev_index[k])
    return edges


def check_constraints(edges: Set[Tuple[int,int]], n: int):
    outd = {i: 0 for i in range(n)}
    ind  = {j: 0 for j in range(n)}
    for (i, j) in edges:
        outd[i] += 1
        ind[j]  += 1
    feasible = all(v <= 1 for v in outd.values()) and all(v <= 1 for v in ind.values())
    return feasible, outd, ind


def extract_cycles(edges: Set[Tuple[int,int]], n: int) -> List[List[int]]:
    next_of = {i: None for i in range(n)}
    for (i, j) in edges:
        next_of[i] = j
    visited = [False]*n
    cycles = []
    for start in range(n):
        if visited[start]:
            continue
        cur = start
        path = []
        seen = {}
        while cur is not None and not visited[cur]:
            seen[cur] = len(path)
            path.append(cur)
            visited[cur] = True
            cur = next_of[cur]
            if cur in seen:
                cyc = path[seen[cur]:]
                if len(cyc) >= 2:
                    cycles.append(cyc)
                break
    return cycles


def cycle_return(cycle: List[int], rates: List[List[float]]) -> float:
    prod = 1.0
    for i in range(len(cycle)):
        u = cycle[i]
        v = cycle[(i+1) % len(cycle)]
        prod *= float(rates[u][v])
    return prod


def score_solution(edges: Set[Tuple[int,int]], rates: List[List[float]]):
    n = len(rates)
    cycles = extract_cycles(edges, n)
    if not cycles:
        return 1.0, []
    returns = [cycle_return(c, rates) for c in cycles]
    best = max(returns)
    return best, cycles


In [5]:

# QAOA run: optimization with StatevectorSampler, sampling with Sampler(V1)

def run_qaoa(cost_op: SparsePauliOp, p: int = 3, optimizer=None, shots: int = 1000):
    if optimizer is None:
        optimizer = COBYLA(maxiter=200)

    sv_sampler = StatevectorSampler()
    qaoa = QAOA(sampler=sv_sampler, optimizer=optimizer, reps=p)
    result = qaoa.compute_minimum_eigenvalue(operator=cost_op)

    m = cost_op.num_qubits
    prob_by_str = {}

    try:
        params = result.optimal_point
        qc = qaoa.ansatz.bind_parameters(params)
        qc_meas = qc.copy().measure_all(inplace=False)
        if globals().get('_sampler_v1_ok', False):
            sampler = SamplerV1()
            quasi = sampler.run([qc_meas], shots=shots).result().quasi_dists[0]
            prob_by_str = {format(k, f'0{m}b'): float(v) for k, v in quasi.items()}
        else:
            from qiskit.quantum_info import Statevector
            sv = Statevector.from_instruction(qc)
            probs = np.abs(sv.data)**2
            prob_by_str = {format(k, f'0{m}b'): float(probs[k]) for k in range(2**m)}
    except Exception as e:
        print("Sampling fallback:", e)
        prob_by_str = {}

    best_str = max(prob_by_str, key=prob_by_str.get) if prob_by_str else None
    return result, best_str, prob_by_str


In [6]:

# Build + Solve helpers

def build_arbitrage_cost_operator(rates: List[List[float]], penalty_scale: float = 5.0):
    W = log_weight_matrix(rates)
    maxw = max(1.0, float(np.max(np.abs(W))))
    Q, c0, idx, rev = build_qubo(W, A=penalty_scale*maxw, B=penalty_scale*maxw, C=maxw)
    cost_op, const = qubo_to_ising(Q, c0)
    return cost_op, (idx, rev), const


def solve_arbitrage_qaoa(rates: List[List[float]], currencies: List[str], p: int = 3, shots: int = 1000, optimizer=None):
    cost_op, (idx, rev), const = build_arbitrage_cost_operator(rates)
    qaoa_res, best_str, dist = run_qaoa(cost_op, p=p, optimizer=optimizer, shots=shots)

    if best_str is None:
        return {
            "best_bitstring": None,
            "distribution": dist,
            "edges": set(),
            "feasible": False,
            "out_degree": {},
            "in_degree": {},
            "cycles": [],
            "best_cycle_return": 1.0,
            "log_return": 0.0,
            "qaoa_result": qaoa_res
        }

    # Decode & score
    edges = decode_solution(best_str, rev)
    feasible, outd, ind = check_constraints(edges, n=len(currencies))
    best_return, cycles = score_solution(edges, rates)

    named_cycles = [[currencies[v] for v in cyc] for cyc in cycles]
    named_edges = {(currencies[i], currencies[j]) for (i, j) in edges}

    return {
        "best_bitstring": best_str,
        "distribution": dist,
        "edges": named_edges,
        "feasible": feasible,
        "out_degree": outd,
        "in_degree": ind,
        "cycles": named_cycles,
        "best_cycle_return": best_return,
        "log_return": float(np.log(best_return)) if best_return > 0 else float('-inf'),
        "qaoa_result": qaoa_res
    }


In [None]:

# Example run (replace with real FX data)

currencies = ["USD","EUR","JPY","GBP"]
rates = [
    [1.0,    0.92,   150.0, 0.80],
    [1.09,   1.0,    163.0, 0.86],
    [0.0067, 0.0061, 1.0,   0.0050],
    [1.25,   1.16,   200.0, 1.0]
]

res = solve_arbitrage_qaoa(rates, currencies, p=3, shots=1000, optimizer=COBYLA(maxiter=200))

print("Most frequent bitstring:", res["best_bitstring"]) 
print("Selected edges:", res["edges"]) 
print("Feasible (deg≤1):", res["feasible"]) 
print("Cycles:", res["cycles"]) 
print("Best cycle return (product):", res["best_cycle_return"], "  log-return:", res["log_return"]) 

if res["distribution"]:
    top = sorted(res["distribution"].items(), key=lambda kv: kv[1], reverse=True)[:10]
    labels, probs = zip(*top)
    plt.figure(figsize=(8,3))
    plt.bar(range(len(probs)), probs, tick_label=labels)
    plt.xticks(rotation=45, ha='right')
    plt.title("Top-10 bitstrings (QAOA sampling)")
    plt.ylabel("Probability")
    plt.tight_layout()
    plt.show()
else:
    print("No distribution available.")


  return splu(A).solve
  return spsolve(Q, P)
  self._set_intXint(row, col, x.flat[0])
