In [152]:
import numpy as np
import itertools as it
from itertools import product

from scipy.optimize import dual_annealing
from wrapdisc import Objective
from wrapdisc.var import ChoiceVar
import operator

import pandas as pd

In [4]:
# Function to generate random probability distributions for bounding boxes
def generate_random_probabilities(num_boxes, num_states):
    """
    Generate random probability distributions for each bounding box.
    
    Parameters:
    - num_boxes: int, number of bounding boxes
    - num_states: int, number of states (e.g., "tree", "glass", "car", "cup")

    Returns:
    - numpy array of shape (num_boxes, num_states), where each row is a
      probability distribution over the states for a bounding box.
    """
    probabilities = np.random.rand(num_boxes, num_states)  # Generate random values
    probabilities /= probabilities.sum(axis=1, keepdims=True)  # Normalize each row to sum to 1
    return probabilities

# Generate the 4x4 relational probability matrix (for the 4 states)
def generate_symmetric_matrix(num_states):
    """Generate an n x n symmetric matrix for 0-based indexing, where n = num_states."""
    mat = np.random.rand(num_states, num_states)  # Generate random values
    mat = (mat + mat.T) / 2  # Make the matrix symmetric
    np.fill_diagonal(mat, 1)  # Fill the diagonal with 1s for self-relations
    return mat

In [171]:
# Optimized Global Energy Function
def global_energy_optimized(current_states, observation, relational_matrix):
    energy = 0  # Initialize energy to 0
    num_vertices = len(current_states)  # Number of bounding boxes
    pairwise_weights = relational_matrix[current_states[:, None], current_states]  # Optimized submatrix extraction
    
    # 1. Factor in the observation likelihood for the bounding box (vectorized log)
    obs_probs = observation[np.arange(num_vertices), current_states]  # Extract the probabilities for the current states
    energy = -np.sum(np.log(obs_probs))  # Sum the log probabilities and subtract from energy
    
    # 2. Consider 2-tuples and calculate the weight (vectorized log of pairwise weights)
    i_upper, j_upper = np.triu_indices(num_vertices, k=1)  # Get indices for all 2-tuples (upper triangle)
    energy -= np.sum(np.log(pairwise_weights[i_upper, j_upper]))  # Subtract log of all pairwise weights

    # 3. Consider n-tuples (n >= 3) and calculate the average weight (optimize using NumPy broadcasting)
    for n in range(3, num_vertices + 1):
        tuples = list(it.combinations(range(num_vertices), n))
        all_pairwise_weights = np.array([
            pairwise_weights[np.ix_(t, t)][np.triu_indices(n, k=1)]
            for t in tuples
        ])
        if all_pairwise_weights.size > 0:
            energy -= np.log(np.mean(all_pairwise_weights))  # Subtract log of the mean of all tuple weights
    
    return energy

In [170]:
# Function to run global_energy with all possible state combinations (optimized)
def run_global_energy_all_states(num_boxes, num_states, observation, relational_matrix):
    # Generate all possible state combinations directly as a NumPy array
    all_possible_states = np.array(list(product(range(num_states), repeat=num_boxes)))
    results = []
    
    for states in all_possible_states:
        # Pass states as a NumPy array directly
        energy = global_energy_optimized(states, observation, relational_matrix)
        results.append((states, energy))
    
    # Convert results to a DataFrame
    df_results = pd.DataFrame(results, columns=["State Combination", "Global Energy"])
    
    # Sort by global energy
    df_results = df_results.sort_values(by="Global Energy", ascending=True).reset_index(drop=True)
    
    return df_results


In [163]:
# Optimized Simulated Annealing for Discrete Integer Optimization
def discrete_simulated_annealing_optimized(num_boxes, num_states, observation, relational_matrix, max_iter=1000, initial_temp=1.0, cooling_rate=0.99):
    # Random initial states
    current_states = np.random.randint(0, num_states, size=num_boxes)
    current_energy = global_energy(current_states, observation, relational_matrix)
    
    best_states = current_states.copy()
    best_energy = current_energy
    
    temperature = initial_temp
    rand_state = np.random.default_rng()  # Use the faster random number generator
    
    for i in range(max_iter):
        # Randomly choose a box and change its state (in-place update)
        new_states = current_states.copy()  # Avoid unnecessary array allocation by modifying in-place
        random_box = rand_state.integers(0, num_boxes)  # Use faster RNG for random box
        new_state = rand_state.integers(0, num_states)  # Use faster RNG for new state
        new_states[random_box] = new_state
        
        # Calculate the energy of the new state configuration
        new_energy = global_energy(new_states, observation, relational_matrix)
        
        # Decide whether to accept the new state based on Metropolis criterion
        if new_energy < current_energy or rand_state.random() < np.exp((current_energy - new_energy) / temperature):
            current_states = new_states
            current_energy = new_energy
        
        # Update the best state if the new configuration is better
        if current_energy < best_energy:
            best_states = current_states
            best_energy = current_energy
        
        # Cool down the system (precompute cooling steps if needed)
        temperature *= cooling_rate
    
    return best_states, best_energy

In [114]:
# # Simulated Annealing WITH WRAPDISC

# Objective wrapper for discrete variables
def wrapped_energy_fn(wrapped_states):
    """Wrapped function to decode categorical states and compute global energy."""
    current_states = wrapped_states  # No further decoding needed for this setup
    return global_energy(current_states, observation, relational_matrix)

# Wrap the global energy function using wrapdisc
wrapped_objective = Objective(
    wrapped_energy_fn,
    variables=[
        ChoiceVar(states),  # Each bounding box can be in one of the 4 states [0, 1, 2, 3]
        ChoiceVar(states),
        ChoiceVar(states),
    ]
)


In [154]:
num_boxes = 3
num_states = 4
states = [0, 1, 2, 3]  # States corresponding to ["Glass", "Car", "Tree", "Cup"]

# Generate random current state as vecotr of length num_boxes ranging in the states
current_states = np.random.choice(states, size=num_boxes)
print("Random Current State:")
print(current_states)

# Assuming we have a 4x4 relational probability matrix for the 4 states (this should come from the ConceptNet)
relational_matrix = generate_symmetric_matrix(num_states)
print("\nRandom Relational Probability Matrix (ConceptNet):")
print(relational_matrix)

# Generate random probability distributions for 3 bounding boxes and 4 states (this should come from the Computer Vision observation)
observation = generate_random_probabilities(num_boxes=num_boxes, num_states=num_states)
print("\nRandom Bounding Box Probabilities (Computer Vision):")
print(observation, "\n")

Random Current State:
[3 2 3]

Random Relational Probability Matrix (ConceptNet):
[[1.         0.64389731 0.29195695 0.42536708]
 [0.64389731 1.         0.62929592 0.616207  ]
 [0.29195695 0.62929592 1.         0.41250271]
 [0.42536708 0.616207   0.41250271 1.        ]]

Random Bounding Box Probabilities (Computer Vision):
[[0.15808451 0.22438054 0.2498798  0.36765516]
 [0.37634462 0.15045102 0.22527668 0.24792768]
 [0.29422046 0.16869235 0.45371019 0.083377  ]] 



In [173]:
# Run global_energy for all possible state combinations and get the sorted results
df_all_states_results = run_global_energy_all_states(
    num_boxes=num_boxes,
    num_states=num_states,
    observation=observation,
    relational_matrix=relational_matrix
)

# Display the DataFrame with state combinations and global energy values
print(df_all_states_results)

   State Combination  Global Energy
0          [2, 2, 2]       3.667498
1          [0, 0, 0]       4.045301
2          [1, 0, 0]       4.846544
3          [3, 3, 3]       4.879611
4          [1, 2, 2]       4.985313
..               ...            ...
59         [0, 3, 2]       7.977553
60         [2, 1, 3]       8.191111
61         [0, 1, 3]       8.578884
62         [2, 0, 3]       8.796420
63         [0, 2, 3]       9.767446

[64 rows x 2 columns]


In [175]:
n_runs = 100
# Function to run the simulated annealing multiple times and collect results
def run_simulated_annealing_multiple_times(n_runs, num_boxes, num_states, observation, relational_matrix):
    results = {}
    
    for _ in range(n_runs):
        optimized_states, minimized_energy = discrete_simulated_annealing_optimized(
            num_boxes=num_boxes, 
            num_states=num_states, 
            observation=observation, 
            relational_matrix=relational_matrix,
            max_iter=1000,
            initial_temp=1.0,
            cooling_rate=0.99
        )
        
        # Convert list to a tuple to use as a dictionary key
        state_tuple = tuple(optimized_states)
        
        # If the state is already in results, update the count and energy
        if state_tuple in results:
            results[state_tuple]['count'] += 1
        else:
            results[state_tuple] = {'minimized_energy': minimized_energy, 'count': 1}
    
    # Convert results to a DataFrame
    df_results = pd.DataFrame([
        {'Optimized States': states, 'Minimized Global Energy': data['minimized_energy'], 'Count': data['count']}
        for states, data in results.items()
    ])
    
    # Sort by minimized energy
    df_results = df_results.sort_values(by="Minimized Global Energy", ascending=True).reset_index(drop=True)
    
    return df_results

# Run the simulation and get the ordered results
df_ordered_results = run_simulated_annealing_multiple_times(
    n_runs=n_runs,
    num_boxes=num_boxes,
    num_states=num_states,
    observation=observation,
    relational_matrix=relational_matrix
)

print(df_ordered_results)

  Optimized States  Minimized Global Energy  Count
0        (2, 2, 2)                 3.667498     69
1        (0, 0, 0)                 4.045301     27
2        (3, 3, 3)                 4.879611      4


In [161]:
# Get the bounds from the wrapped objective
bounds = wrapped_objective.bounds

# Use dual annealing to minimize the wrapped energy function
result = dual_annealing(wrapped_objective, bounds=bounds, seed=42, maxiter=10000)

# Decode the solution back into the categorical form
decoded_solution = wrapped_objective.decode(result.x)
print("Optimized States:", decoded_solution)
print("Minimized Global Energy:", result.fun)
print("Cause of the termination:", result.message)

Optimized States: (1, 1, 1)
Minimized Global Energy: 3.32759880300248
Cause of the termination: ['Maximum number of iteration reached']


In [162]:
#IMPORTANT: SOMEHOW THE WRAPDISC DUAL ANNEALING IS NOT GIVEN THE COORECT VALUE FOR THE MINIMIZED GLOBAL ENERGY, ALTHOUGH THE OPTIMIZED STATES ARE NOT BAD
global_energy(decoded_solution, observation, relational_matrix)

np.float64(5.168208180216731)