In [70]:
import numpy as np
import pandas as pd

# Exploring Chain 

In [71]:
from abc import ABC, abstractmethod
from functools import lru_cache


@lru_cache
def get_geometric_probs(n: int, p: int) -> np.ndarray:
    vector = np.array([p]) ** np.arange(1, n+1)
    return vector / np.sum(vector)


class MarkovChain(ABC):
    def __init__(self, n_cities: int) -> None:
        self.n_cities = n_cities
        assert self.n_cities > 0

    @abstractmethod
    def next_state(self, state: np.ndarray) -> np.ndarray:
        ...


class RandomWalk(MarkovChain):
    def __init__(self, n_cities: int) -> None:
        super().__init__(n_cities)

    def next_state(self, state: np.ndarray) -> np.ndarray:
        u = np.random.random()
        if u < 1 / (self.n_cities + 1):
            return state

        idx = np.random.randint(n)
        new_state = state.copy()
        new_state[idx] = ~new_state[idx]
        return new_state


class WeightedRandomJumps(MarkovChain):
    def __init__(self, n_cities: int) -> None:
        super().__init__(n_cities)

    def next_state(self, state: np.ndarray) -> np.ndarray:
        n = self.n_cities
        probs = get_geometric_probs(n, 1/2)
        m = np.random.choice(np.arange(1, n+1), p=probs)
        idxs = np.random.choice(np.arange(n), m, replace=False)
        
        new_state = state.copy()
        new_state[idxs] = ~new_state[idxs]
        return new_state

# Metropolis-Hastings

In [72]:
def initalize_random_state(n) :
    return np.random.random(n) < 0.5

In [73]:
from typing import Optional

class MetropolisHastings:
    states: list[np.ndarray]
    scores: np.ndarray

    def __init__(self, n_cities: int, n_chains: int, exploring_chain: MarkovChain, objective_function, states: Optional[list[np.ndarray]] = None):
        self.exploring_chain = exploring_chain
        self.objective_function = objective_function
        
        if states is not None:
            self.states = states
        else:
            self.states = [
                initalize_random_state(n_cities)
                for _ in range(n_chains)
            ]

        self.scores = [
            self.objective_function(state)
            for state in self.states
        ]
        self.scores = np.array(self.scores)

    def acceptance(self, prev_score, new_score):
        return np.clip(np.exp(new_score - prev_score), 0, 1)

    def forward(self):
        for i, state in enumerate(self.states):
            new_state = self.exploring_chain.next_state(state)
            new_score = self.objective_function(new_state)
            u = np.random.random()
            if u <= self.acceptance(self.scores[i], new_score):
                self.states[i] = new_state
                self.scores[i] = new_score



# Solver

In [74]:
from typing import Callable

class Solver:
    def __init__(self, populations: np.ndarray, distance_matrix: np.ndarray, exploring_chain_type: type[MarkovChain]):
        self.populations = populations
        self.distance_matrix = distance_matrix

        self.n_cities = len(populations)
        self.exploring_chain = exploring_chain_type(n_cities=self.n_cities)
        assert self.distance_matrix.shape == (self.n_cities, self.n_cities)

    # TODO: Maybe use jit for that
    def get_surrounding_radius(self, state: np.ndarray):
        return self.distance_matrix[state].T[state].max() / 2

    def get_objective_funtion(self, lambda_: float) -> Callable[[np.ndarray], float]:

        def objective_function(state: np.ndarray) -> float:
            r = self.get_surrounding_radius(state)
            value = np.sum(self.populations[state]) - lambda_ * self.n_cities * np.pi * r ** 2
            return value

        return objective_function

    def simulate_chains(self, lambda_: float = 0.1, n_chains: int = 10, steps: int = 1000):

        mh = MetropolisHastings(
            n_cities=self.n_cities,
            n_chains=n_chains,
            exploring_chain=WeightedRandomJumps(self.n_cities),
            objective_function=self.get_objective_funtion(lambda_)
        )

        best_state = None
        best_score = -np.inf

        step_best_scores = []
        step_best_state_size = []
        for _ in range(steps):
            mh.forward()
            idx = np.argmax(mh.scores)
            step_best_scores.append(mh.scores[idx])
            step_best_state_size.append(mh.states[idx].sum())
            if mh.scores[idx] > best_score:
                best_state = mh.states[idx]
                best_score = mh.scores[idx]

        return {
            "final_states": mh.states,
            "best_score": best_score,
            "best_state": best_state,
            "step_best_scores": step_best_scores,
            "step_best_state_size": step_best_state_size
        }


# Example

In [75]:
populations = np.random.random(100)
positions = np.random.random((100, 2))

distance_matrix = np.zeros((100, 100), dtype=np.float32)
for i in range(100):
    for j in range(i, 100):
        distance_matrix[i, j] = distance_matrix[j, i] = np.sum((positions[i] - positions[j]) ** 2) ** 0.5

In [76]:
solver = Solver(populations, distance_matrix, RandomWalk)
solver.simulate_chains(1, 100, 20_000)["best_score"]

KeyboardInterrupt: 