In [7]:
import torch
import numpy as np
from typing import List, Dict, Tuple, Union, Optional

# Ising

In [8]:
def optimize_ising(
    J: torch.Tensor,
    h: torch.Tensor,
    agents: int = 10,
    steps: int = 1000,
    dtype: torch.dtype = torch.float32,
    device: str = "cpu"
) -> Tuple[torch.Tensor, float]:
    """
    Optimize an Ising model problem using simulated particle dynamics.
    
    Args:
        J: Square matrix of spin-spin couplings
        h: Vector of external field values
        agents: Number of parallel optimization agents to run
        steps: Maximum number of simulation steps
        dtype: Data type for calculations
        device: Computation device ("cpu" or "cuda")
    
    Returns:
        Tuple of (optimal spin configuration, minimum energy)
    """
    # Move inputs to specified device and dtype
    J = J.to(dtype=dtype, device=device)
    h = h.to(dtype=dtype, device=device)
    N = J.shape[0]
    
    # Ensure the coupling matrix is symmetric and has zero diagonal
    J_sym = (J + J.t()) / 2.0
    torch.diagonal(J_sym)[:] = 0.0
    
    # Initialize particle positions and momenta randomly in [-1, 1]
    x = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    y = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    
    # Set simulation parameters
    dt = 0.05
    c0 = 0.5 * (N - 1)**0.5 / torch.sqrt((J_sym**2).sum())
    
    # Run simulation
    for step in range(steps):
        # Calculate a(t) - system parameter that increases over time
        t = step * dt
        a_t = min(dt * step * 0.01, 1.0)
        a0 = 1.0  # Constant system parameter
        
        # Update momentum with damping term
        y += dt * ((a_t - a0) * x)
        
        # Update position based on momentum
        x += dt * y
        
        # Update momentum based on interaction forces
        y += dt * c0 * (J_sym @ x)
        
        # Apply boundary conditions (inelastic walls)
        outside_range = torch.abs(x) > 1.0
        x.clamp_(-1.0, 1.0)
        y[outside_range] = 0.0
    
    # Convert continuous positions to discrete spins
    s = torch.sign(x)
    
    # Calculate energies for all agents
    energies = -0.5 * (s.T @ J_sym @ s).diag() + (h @ s).T
    
    # Find best solution
    best_idx = energies.argmin()
    return s[:, best_idx], energies[best_idx].item()

# QUBO

In [9]:
def optimize_qubo(
    Q: torch.Tensor,
    minimize: bool = False,
    agents: int = 128,
    steps: int = 5000,
    dtype: torch.dtype = torch.float32,
    device: str = "cpu"
) -> Tuple[torch.Tensor, float]:
    """
    Optimize a Quadratic Unconstrained Binary Optimization (QUBO) problem.
    
    Args:
        Q: Square cost matrix for the QUBO problem
        minimize: If True, minimize the objective; if False, maximize it
        agents: Number of parallel optimization agents to run
        steps: Maximum number of simulation steps
        dtype: Data type for calculations
        device: Computation device ("cpu" or "cuda")
    
    Returns:
        Tuple of (optimal binary vector, objective value)
    """
    # Convert input to tensor if needed
    if not isinstance(Q, torch.Tensor):
        Q = torch.tensor(Q)
    
    # Move input to specified device and dtype
    Q = Q.to(device=device, dtype=dtype)
    N = Q.shape[0]
    
    # Initialize particle positions and momenta randomly in [-1, 1]
    x = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    y = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    
    # Set simulation parameters
    dt = 0.01
    a0 = 1.0  # System parameter (constant)
    c0 = 0.5 * (N - 1) ** 0.5 / torch.sqrt((Q**2).sum())
    
    # Adjust matrix for minimization if needed
    if minimize:
        Q = -Q
    
    # Ensure the cost matrix is symmetric
    Q_sym = 0.5 * (Q + Q.t())
    
    # Run simulation
    for step in range(steps):
        # Calculate a(t) - system parameter that increases over time
        t = step * dt
        a_t = min(dt * step * 0.01, 1.0)
        
        # Update momentum with damping term
        y += dt * ((a_t - a0) * x)
        
        # Update position based on momentum
        x += dt * y
        
        # Calculate discrete spins for force calculation
        s = torch.sign(x)
        
        # Update momentum based on interaction forces
        y += dt * c0 * (Q_sym @ s)
        
        # Apply boundary conditions (inelastic walls)
        outside_range = torch.abs(x) > 1.0
        x.clamp_(-1.0, 1.0)
        y[outside_range] = 0.0
    
    # Convert to binary values (0 or 1) from spins (-1 or 1)
    binary_values = (torch.sign(x) + 1) / 2
    
    # Calculate objective values for all agents
    values = (binary_values.T @ Q @ binary_values).diag()
    
    # Find best solution
    best_idx = values.argmax()
    best_value = values[best_idx].item()
    
    # Adjust value sign if minimizing
    if minimize:
        best_value = -best_value
    
    return binary_values[:, best_idx], best_value

# Markowitz

In [10]:
def optimize_portfolio(
    cov_matrix: torch.Tensor,
    ret: torch.Tensor,
    risk: float = 1.0,
    bits: int = 3,
    agents: int = 10,
    steps: int = 1000,
    dtype: torch.dtype = torch.float32,
    device: str = "cpu"
) -> Tuple[torch.Tensor, float]:
    """
    Optimize a portfolio allocation using the Markowitz model.
    
    Args:
        cov_matrix: Covariance matrix of asset returns
        ret: Vector of expected returns for each asset
        risk: Weight factor for risk in the optimization
        bits: Number of bits for discretizing portfolio weights
        agents: Number of parallel optimization agents to run
        steps: Maximum number of simulation steps
        dtype: Data type for calculations
        device: Computation device ("cpu" or "cuda")
        
    Returns:
        Tuple of (optimal portfolio weights, expected gain)
    """
    # Move inputs to specified device and dtype if needed
    if not isinstance(cov_matrix, torch.Tensor):
        cov_matrix = torch.tensor(cov_matrix, dtype=dtype, device=device)
    if not isinstance(ret, torch.Tensor):
        ret = torch.tensor(ret, dtype=dtype, device=device)
    
    N = cov_matrix.shape[0]
    
    # Construct quadratic term for optimization (-0.5 * risk * covariance)
    J = -0.5 * risk * cov_matrix
    
    # Initialize particle positions and momenta randomly in [-1, 1]
    x = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    y = 2 * torch.rand(N, agents, dtype=dtype, device=device) - 1
    
    # Set simulation parameters
    a0 = 1.0  # System parameter (constant)
    c0 = 0.5 * (N - 1)**0.5 / torch.sqrt((J**2).sum())
    dt = 0.5 * (N - 1)**0.5 / torch.sqrt((J**2).sum())
    
    # Run simulation
    for step in range(steps):
        # Calculate a(t) - system parameter that increases over time
        t = step * dt
        a_t = min(dt * step * 0.01, 1.0)
        
        # Update momentum with damping term
        y += dt * ((a_t - a0) * x)
        
        # Update position based on momentum
        x += dt * y
        
        # Update momentum based on interaction forces
        y += dt * c0 * (J @ x)
        
        # Apply boundary conditions (inelastic walls)
        outside_range = torch.abs(x) > 1.0
        x.clamp_(-1.0, 1.0)
        y[outside_range] = 0.0
    
    # Convert to discrete weights using spin values and bits precision
    w = ((torch.where(x >= 0, 1.0, -1.0) + 1) / 2) * (2**bits - 1)
    
    # Calculate expected gains for all agents
    # Return term: w^T * r, Risk term: -0.5 * risk * w^T * cov * w
    gains = torch.einsum('ij,j->i', w.T, ret) - 0.5 * risk * torch.einsum('ij,jk,ik->i', w.T, cov_matrix, w.T)
    
    # Find best solution
    best_idx = gains.argmax()
    return w[:, best_idx], gains.max().item()

# Knapsack

In [11]:
def optimize_knapsack(
    weights: List[int],
    prices: List[Union[int, float]],
    max_weight: int,
    agents: int = 1000,
    steps: int = 10000,
    verbose: bool = False,
    dtype: torch.dtype = torch.float32,
    device: Optional[str] = None
) -> Dict[str, Union[List[int], int, float, str]]:
    """
    Solve the knapsack problem using a physics-inspired optimization approach.
    
    Args:
        weights: List of item weights
        prices: List of item prices/values
        max_weight: Maximum weight capacity
        agents: Number of parallel optimization agents to run
        steps: Maximum number of simulation steps
        verbose: If True, print progress information
        dtype: Data type for calculations
        device: Computation device ("cuda" if available, else "cpu")
    
    Returns:
        Dictionary with optimization results including selected items, total cost,
        total weight, and solution status
    """
    # Validate inputs
    n_items = len(weights)
    if len(prices) != n_items:
        raise ValueError("Length of weights and prices must be the same")
    
    # Set device
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    else:
        device = torch.device(device)
    
    # Calculate penalty term for constraints
    penalty = float(sum(prices))
    
    # Construct QUBO matrix for knapsack problem
    weights_array = np.array(weights).reshape(1, -1)
    range_array = np.arange(max_weight + 1).reshape(1, -1)
    
    # Build components of the constraint matrix
    upper_left = weights_array.T @ weights_array
    upper_right = -weights_array.T @ range_array
    lower_left = -range_array.T @ weights_array
    lower_right = 1 + range_array.T @ range_array
    
    # Combine into full matrix
    top_half = np.hstack([upper_left, upper_right])
    bottom_half = np.hstack([lower_left, lower_right])
    matrix = np.vstack([top_half, bottom_half])
    
    # Convert to tensor and scale by penalty
    J = -0.5 * penalty * torch.tensor(matrix, dtype=dtype, device=device)
    torch.diagonal(J)[:] = 0
    
    # Construct field vector for optimization
    N = n_items + max_weight + 1
    extended_costs = np.zeros(N)
    extended_costs[:n_items] = prices
    unit_array = np.zeros(N)
    unit_array[n_items:] = 1
    vector = -2 * penalty * unit_array.reshape(-1, 1) - extended_costs.reshape(-1, 1)
    h = torch.tensor(vector, dtype=dtype, device=device).reshape(-1)
    
    # Initialize particle positions and momenta randomly in [-1, 1]
    x = 2 * torch.rand(size=(J.shape[0], agents), dtype=dtype, device=device) - 1
    y = 2 * torch.rand(size=(J.shape[0], agents), dtype=dtype, device=device) - 1
    
    # Set simulation parameters
    dt = 0.01
    a0 = 1.0  # System parameter (constant)
    c0 = 0.5 * (J.shape[0] - 1) ** 0.5 / torch.sqrt(torch.sum(J**2))
    
    # Run simulation
    for step in range(steps):
        # Calculate a(t) - system parameter that increases over time
        t = step * dt
        a_t = min(dt * step * 0.01, 1.0)
        
        # Update momentum with damping term
        y += dt * ((a_t - a0) * x)
        
        # Update position based on momentum
        x += dt * y
        
        # Update momentum based on interaction forces
        y += dt * c0 * (J @ x + h.unsqueeze(1))
        
        # Apply boundary conditions (inelastic walls)
        outside_range = torch.abs(x) > 1.0
        x.clamp_(-1.0, 1.0)
        y[outside_range] = 0.0
    
    # Convert continuous positions to discrete spins and binary values
    s = torch.sign(x)
    binary_solution = (s + 1) / 2
    
    # Convert tensors for weights and prices
    weights_tensor = torch.tensor(weights, dtype=dtype, device=device)
    prices_tensor = torch.tensor(prices, dtype=dtype, device=device)
    
    # Calculate weight and value for each agent's solution
    agent_weights = (binary_solution[:n_items] * weights_tensor.unsqueeze(1)).sum(dim=0).cpu().numpy()
    agent_values = (binary_solution[:n_items] * prices_tensor.unsqueeze(1)).sum(dim=0).cpu().numpy()
    
    # Find feasible solutions (not exceeding max weight)
    feasible_indices = np.where(agent_weights <= max_weight)[0]
    
    # Prepare result dictionary
    result = {
        "items": [],
        "total_cost": 0,
        "total_weight": 0,
        "status": "not optimized"
    }
    
    # Find best feasible solution
    if len(feasible_indices) > 0:
        best_agent_idx = feasible_indices[np.argmax(agent_values[feasible_indices])]
        selected_items = binary_solution[:n_items, best_agent_idx] > 0.5
        result["items"] = torch.where(selected_items)[0].cpu().numpy().tolist()
        result["total_cost"] = float(agent_values[best_agent_idx])
        result["total_weight"] = float(agent_weights[best_agent_idx])
        result["status"] = "success"
    else:
        result["status"] = "failed"
    
    return result

# Tests

In [12]:
# Test optimize_ising
torch.manual_seed(42)
J = torch.tensor([[0, -2, 3], [-2, 0, 1], [3, 1, 0]])
h = torch.tensor([1, -4, 2])
s, energy = optimize_ising(J, h)
assert torch.equal(torch.tensor([-1.0, 1.0, -1.0], dtype=torch.float32), s)
assert -11.0 == energy

# Test optimize_qubo
P = 15.5
Q = torch.tensor([
    [2, -P, -P, 0, 0, 0],
    [0, 2, -P, -P, 0, 0],
    [0, 0, 2, -2 * P, 0, 0],
    [0, 0, 0, 2, -P, 0],
    [0, 0, 0, 0, 4.5, -P],
    [0, 0, 0, 0, 0, 3],
])
binary_vector, objective_value = optimize_qubo(Q, minimize=False, agents=10)
assert torch.equal(
    torch.tensor([1.0, 0.0, 0.0, 1.0, 0.0, 1.0], dtype=torch.float32), binary_vector
)
assert 7.0 == objective_value

# Test optimize_portfolio
cov = torch.tensor([[1.0, 1.2, 0.7], [1.2, 1.0, -1.9], [0.7, -1.9, 1.0]])
ret = torch.tensor([0.2, 0.05, 0.17])
w, gain = optimize_portfolio(cov, ret)
assert torch.equal(torch.tensor([0.0, 7.0, 7.0]), w)

# Test optimize_knapsack
weights = [12, 1, 1, 4, 2]
prices = [4, 2, 1, 10, 2]
max_weight = 15
result = optimize_knapsack(
    weights=weights,
    prices=prices,
    max_weight=max_weight,
    verbose=True
)
assert result["items"] == [1, 2, 3, 4]
assert result["total_cost"] == 15
assert result["total_weight"] == 8