In [None]:
from pydantic import BaseModel
from typing import List, Optional, Tuple, Any
from pydantic import confloat
import random
import numpy as np
import random
from copy import deepcopy
from typing import Optional, List

In [102]:
class AntColonyModelParams(BaseModel):
    alpha: float = 3.0
    beta: float = 2.0
    evaporation_rate: confloat(gt=0, lt=1) = 0.5
    initial_pheromone: float = 1.0
    n_iterations: int = 100

In [106]:
class Point(BaseModel):
    idx: int
    x: float
    y: float

class Segment(BaseModel):
    from_p: Point
    to_p: Point
    distance: float
    pheromone: Optional[float] = None
    probability: Optional[float] = None

class Trail(BaseModel):
    segments: List[Segment]
    total_distance: Optional[float] = None

class TrailMatrix(BaseModel):
    matrix: List[Trail]

class CollectionOfAntsTrails(BaseModel):
    trails: List[Trail]

In [89]:
def random_points(n_points: int, max_val: int) -> List[Tuple[int, int]]:
    points = set()
    while len(points) < n_points:
        p = (random.randint(0, max_val), random.randint(0, max_val))
        points.add(p)  # set automatically ignores duplicates
    return list(points)

def eject_object(object: Any, name: str = ""):
    path = object.__class__.__name__ + name + ".json"
    with open(path, "w") as f:
        f.write(object.model_dump_json())

In [88]:
def calc_distance(a: Point, b: Point) -> float:
    dx = a.x - b.x
    dy = a.y - b.y
    return (dx ** 2 + dy ** 2) ** 0.5

def get_segment_pheromone_density(segment: Segment, model_params: AntColonyModelParams) -> float:
    pheromone_density = np.power(segment.pheromone, model_params.alpha) / np.power(segment.distance, model_params.beta)
    return pheromone_density

In [95]:
class TrailChooser:
    """
    Responsible for building factual trails from probable ones,
    using pheromone and distance probabilities from a TrailMatrix.
    """

    def __init__(self, matrix: TrailMatrix):
        self.matrix: TrailMatrix = matrix
        self.model_params = AntColonyModelParams()

    # ============================================================
    # 🔹 Helper Methods
    # ============================================================

    def _get_pheromone_from_matrix(self, segment: Segment) -> float:
        """
        Search in the TrailMatrix for an equivalent segment
        (same two point indices, regardless of direction)
        and return its pheromone.
        """
        a_idx, b_idx = segment.from_p.idx, segment.to_p.idx

        for trail in self.matrix.matrix:
            for seg in trail.segments:
                seg_a, seg_b = seg.from_p.idx, seg.to_p.idx
                if {a_idx, b_idx} == {seg_a, seg_b}:
                    return seg.pheromone

        # Fallback default
        return 1.0

    def _compute_total_density(self, segments: List[Segment]) -> float:
        """
        Compute total pheromone density across a set of segments.
        """
        return sum(get_segment_pheromone_density(seg, self.model_params) for seg in segments)

    def _update_segment_probabilities(self, segments: List[Segment], total_density: float):
        """
        Update probability of each segment given the total trail pheromone density.
        """
        for seg in segments:
            seg_density = get_segment_pheromone_density(seg, self.model_params)
            seg.probability = seg_density / total_density if total_density > 0 else 1 / len(segments)

    def _choose_segment(self, segments: List[Segment]) -> Segment:
        """
        Perform a weighted random choice of a segment based on its probability.
        """
        rnd = random.random()
        cumulative = 0.0
        for seg in segments:
            cumulative += seg.probability
            if rnd <= cumulative:
                return seg
        return segments[-1]  # fallback

    def _update_remaining_segments(self, chosen_seg: Segment, remaining_segments: List[Segment]):
        """
        Update the remaining segments after choosing one:
        - from_p becomes chosen_seg.to_p
        - distance is recalculated
        - pheromone is synchronized from self.matrix
        """
        new_from_point = chosen_seg.to_p

        for seg in remaining_segments:
            seg.from_p = deepcopy(new_from_point)
            seg.distance = calc_distance(seg.from_p, seg.to_p)
            seg.pheromone = self._get_pheromone_from_matrix(seg)
            seg.probability = None

    # ============================================================
    # 🔹 Main Selection Logic
    # ============================================================

    def choose_trail(self, probable_trail: Trail) -> Trail:
        """
        Builds a factual trail by probabilistically selecting segments based on
        pheromone and distance. After each selection, updates the remaining
        segments' geometry and synchronizes pheromone from the global matrix.
        """
        available_segments = deepcopy(probable_trail.segments)
        factual_segments = []

        while available_segments:
            # Step 1: Compute total pheromone density
            total_density = self._compute_total_density(available_segments)

            # Step 2: Update probabilities
            self._update_segment_probabilities(available_segments, total_density)

            # Step 3: Choose a segment probabilistically
            chosen_seg = self._choose_segment(available_segments)

            # Step 4: Add chosen segment to factual trail
            factual_seg = deepcopy(chosen_seg)
            factual_seg.probability = None
            factual_segments.append(factual_seg)

            # Step 5: Remove chosen segment
            available_segments.remove(chosen_seg)

            # Step 6: Stop if no segments left
            if not available_segments:
                break

            # Step 7: Update remaining segments (geometry + pheromone)
            self._update_remaining_segments(chosen_seg, available_segments)

            # Step 8: If only one segment remains, add it and stop
            if len(available_segments) == 1:
                last_seg = deepcopy(available_segments[0])
                last_seg.probability = None
                factual_segments.append(last_seg)
                break
                
        total_distance = sum(seg.distance for seg in factual_segments)
        factual_trail = Trail(segments=factual_segments, total_distance=total_distance)

        # Step 9: Return the new factual trail
        return factual_trail


In [96]:
class ProbabilityUpdater:
    def __init__(self, matrix: TrailMatrix):
        self.matrix: TrailMatrix = matrix
        self.model_params = AntColonyModelParams()

    def _get_segment_probability(self, segment: Segment, trail_pheromone_density: float) -> float:
        segment_pheromone_density = get_segment_pheromone_density(segment, self.model_params)
        probability = segment_pheromone_density / trail_pheromone_density
        return probability

    def _get_trail_pheromone_density(self, trail: Trail) -> float:
        pheromone_density = 0
        for segment in trail.segments:
            pheromone_density += get_segment_pheromone_density(segment, self.model_params)
        return pheromone_density

    def update_probabilities(self) -> TrailMatrix:
        for trail in self.matrix.matrix:
            trail_pheromone_density = self._get_trail_pheromone_density(trail)
            for segment in trail.segments:
                segment.probability = self._get_segment_probability(segment, trail_pheromone_density)
        return self.matrix

In [97]:
class PheromoneUpdater:
    def __init__(self, matrix: TrailMatrix):
        self.matrix: TrailMatrix = matrix
        self.model_params = AntColonyModelParams()

    def _is_segment_in_trail(self, segment: Segment, trail: Trail) -> bool:
        for seg in trail.segments:
            if (
                seg.from_p.idx == segment.from_p.idx
                and seg.to_p.idx == segment.to_p.idx
            ) or (
                seg.from_p.idx == segment.to_p.idx
                and seg.to_p.idx == segment.from_p.idx
            ):
                return True
        return False

    def _update_pheromone(self, segment: Segment, chosen_trails: TrailMatrix) -> float:
        current_pheromone = segment.pheromone
        scattered_factor_sum = 0
        for chosen_trail in chosen_trails.matrix:
            if self._is_segment_in_trail(segment, chosen_trail):
                scattered_factor_sum += (1 / chosen_trail.total_distance)

        new_pheromone = current_pheromone * (1 - self.model_params.evaporation_rate) + scattered_factor_sum
        return new_pheromone

    def update_pheromones(self, chosen_trails: TrailMatrix) -> TrailMatrix:
        for trail in self.matrix.matrix:
            for segment in trail.segments:
                segment.pheromone = self._update_pheromone(segment, chosen_trails)

In [98]:
class TrailMatrixManager:
    def __init__(self, matrix: TrailMatrix):
        self.matrix: TrailMatrix = matrix
        self.model_params = AntColonyModelParams()

    def get_trail_matrix(self) -> TrailMatrix:
        return self.matrix

    def update_probabilities(self):
        updater = ProbabilityUpdater(self.matrix)
        return updater.update_probabilities()

    def choose_trails(self) -> TrailMatrix:
        trail_chooser = TrailChooser(self.matrix)
        new_trails = []
        for probable_trail in self.matrix.matrix:
            new_trail = trail_chooser.choose_trail(probable_trail)
            new_trails.append(new_trail)
        return TrailMatrix(matrix=new_trails)
            
    def update_pheromones(self, chosen_trails: TrailMatrix) -> TrailMatrix:
        updater = PheromoneUpdater(self.matrix)
        return updater.update_pheromones(chosen_trails)

In [100]:
points = random_points(4, 5)
trail_matrix = build_trail_matrix(points)
eject_object(trail_matrix, "_initial_matrix")

In [101]:
manager = TrailMatrixManager(trail_matrix)
manager.update_probabilities()
updated_matrix = manager.get_trail_matrix()
eject_object(updated_matrix, "_updated_probabilities")
chosen_trails = manager.choose_trails()
eject_object(chosen_trails, "_chosen_trails")
manager.update_pheromones(chosen_trails)
eject_object(manager.get_trail_matrix(), "_updated_pheromones")

In [107]:
class AntColonyModel:
    def __init__(self, points: List[tuple]):
        self.matrix = self._build_trail_matrix(points)
        self.model_params = AntColonyModelParams()
    def _build_trail_matrix(self, points: List[tuple]) -> TrailMatrix:
        point_objs = [Point(idx=i + 1, x=pt[0], y=pt[1]) for i, pt in enumerate(points)]
        trails = []
        for i, from_p in enumerate(point_objs):
            segments = []
            for j, to_p in enumerate(point_objs):
                if i != j:
                    distance = calc_distance(from_p, to_p)
                    segment = Segment(from_p=from_p, to_p=to_p, distance=distance, pheromone=1)
                    segments.append(segment)
            trail = Trail(segments=segments)
            trails.append(trail)

        return TrailMatrix(matrix=trails)

    def _get_min_trail(self, trails: TrailMatrix) -> Trail:
        min_trail = None
        min_distance = float('inf')
        for trail in trails.matrix:
            if trail.total_distance < min_distance:
                min_distance = trail.total_distance
                min_trail = trail
        return min_trail

    def run(self) -> Tuple[Trail, CollectionOfAntsTrails]:

        n_iterations = self.model_params.n_iterations
        collection_of_ants_trails = []
        first_trail = None

        manager = TrailMatrixManager(self.matrix)

        for i in range(n_iterations):
            manager.update_probabilities()

            chosen_trails = manager.choose_trails()

            shortest_trail = self._get_min_trail(chosen_trails)
            collection_of_ants_trails.append(shortest_trail)
            if i == 0:
                first_trail = shortest_trail

            manager.update_pheromones(chosen_trails)

        return first_trail, CollectionOfAntsTrails(trails=collection_of_ants_trails)

In [108]:
points = random_points(4, 5)
model = AntColonyModel(points)
first_trail, collection_of_ants_trails = model.run()
eject_object(first_trail, "_first_trail")
eject_object(collection_of_ants_trails, "_collection_of_ants_trails")
