In [82]:
import numpy as np
import scipy.stats as st
import logging
from enum import Enum

In [83]:
class Direction(Enum):
        STRAIGHT = 1
        REVERSE = 2

class Graph():
    def __init__(self, start, finish,\
        alpha: np.double = np.float64(1), beta: np.double = np.float64(1),\
            ro: np.double = np.float64(1), name='Graph'):
        """
        Parameters:
            - start: starting node
            - finish: objective node
            - alpha: pheromone influence
            - beta: desirability influence
            - name: name of the graph
        
        Returns:
            - new graph
        """
        self._start = start
        self._finish = finish
        self._parameters = dict({
            'name': name,
            'edges': dict({}),
            'validated': False,
            'alpha': np.float64(alpha),
            'beta': np.float64(beta),
            'ro': np.float64(ro)
        })

    def __eq__(self, other):
        return self.name == other.name

    def __str__(self):
        _str = self.name
        for v in self.edges:
            _str += "\n\t" + str(v)
        return _str
    
    @property
    def name(self):
        return self._parameters['name']

    @property
    def start(self):
        return self._start

    @property
    def finish(self):
        return self._finish

    @property
    def edges(self):
        return self._parameters['edges']

    @property
    def validated(self):
        return self._parameters['validated']

    @property
    def alpha(self):
        return self._parameters['alpha']

    @property
    def beta(self):
        return self._parameters['beta']

    @property
    def ro(self):
        return self._parameters['ro']

    def _step_probability(self, edge):
        return np.float64(
            (
                np.math.pow(edge.pheromone, self.alpha)
            *
                np.math.pow(edge.desirability, self.beta)
            )
            /
            (
                np.math.pow(np.sum(np.array([_edge.pheromone for _edge in self.edges.values()])), self.alpha)
            +
                np.math.pow(np.sum(np.array([_edge.desirability for _edge in self.edges.values()])), self.beta)
            )
        )

    def add_edge(self, edge):
        if edge.name in self.edges:
            logging.warning(
                'Edge {' + edge.name + '} already in Graph { ' + self.name + ' }'
            )
        else:
            self.edges[edge.name] = edge

    def add_edges(self, edges):
        for edge in edges:
            self.add_edge(edge=edge)

    def get_edge(self, edge_name):
        if self.edges[edge_name] is None:
            logging.critical(
                'Edge {' + edge_name + '} not in Graph { ' + self.name + ' }'
            )
            return None
        else:
            return self.edges[edge_name]

    def get_edges_from_node(self, node):
        return list(filter(lambda e: e.node_left == node, self.edges.values()))

    def step(self, node, direction: Direction):
        """
        At a given node, computes an step and
        returns the edge chosen to be traversed

        Params: 
            - node: Node type, where the ant is placed
            - direction: Direction type, direction of the ant
        Returns:
            - edge: Edge type, edge to be traversed in the step
        """
        if direction == Direction.REVERSE:
            _ftr_fn = lambda e: e.node_right == node
        else:
            _ftr_fn = lambda e: e.node_left == node
        valid_edges = list(filter(_ftr_fn, self.edges.values()))
        probs = np.array([self._step_probability(edge) for edge in valid_edges])
        if np.sum(probs) == 0:
            choice = np.random.choice(valid_edges, size=1)  # first round is purely random
        else:
            probs /= np.sum(probs)  # normalize in case they don't sum 1
            choice = np.random.choice(valid_edges, size=1, p=probs)
        return choice[0]


class Node():
    def __init__(self, pos, name=None):
        self._name = name if name is not None else\
            'Node [' + str(pos) + ' ]'
        self._parameters = dict({
            'pos': pos
        })

    def __eq__(self, other):
        return self.position == other.position
        
    def __str__(self):
        return self.name

    @property
    def name(self):
        return self._name
        
    @property
    def position(self):
        return self._parameters['pos']

class Edge():
    def __init__(self, node_left, node_right, cost, name=None):
        if node_left == node_right:
            logging.critical('Edge with node ' + str(node_left) + ' @ self_loop')
            return None
        self._name = name if name is not None else\
            'Edge [' + str(node_left) + ' -> ' + str(node_right) +\
                ' @ Cost ' + str(cost) + ']'
        self._parameters = dict({
            'left': node_left,
            'right': node_right,
            'cost': cost,
            'pheromone': np.float64(0),
            'desirability': np.float64(1 / cost)
        })

    def __eq__(self, other):
        return self.name == other.name and\
            self.node_left == other.node_left and\
                self.node_right == other.node_right

    def __str__(self):
        return self.name + '{P: ' + str(self.pheromone) + ' }'

    @property
    def name(self):
        return self._name

    @property
    def node_left(self):
        return self._parameters['left']

    @property
    def node_right(self):
        return self._parameters['right']

    @property
    def cost(self):
        return self._parameters['cost']

    @property
    def pheromone(self):
        return self._parameters['pheromone']
    
    @property
    def desirability(self):
        return self._parameters['desirability']
    
    def update_pheromone(self, ro, delta):
        self._parameters['pheromone'] = max(0, np.float64((1-ro)*self.pheromone + delta))

    def nodes_by_direction(self, direction: Direction):
        """
        Given the direction in which edge is traversed,
        it returns the src_node and dst_node in order

        Parameters:
            - direction: Direction; in which direction node is traversed
        Returns:
            - (src_node, dst_node) tuple
        """
        if direction == Direction.STRAIGHT:
            src_node = self.node_left
            dst_node = self.node_right
        else:
            src_node = self.node_right
            dst_node = self.node_left
        return (src_node, dst_node)


In [77]:
class Colony():
    class Ant():
        def __init__(self, colony, node: Node,\
             identifier: int=0, direction=Direction.STRAIGHT):
            """
            Instantiates an ant

            Params:
                - colony: Colony type, to which ant belongs
                - position: Node type, where the ant is found when instantiated
                - identifier: for debug purposes so we can recognise the ant later (might be unused)
                - direction: Direction type, either STRAIGHT or REVERSE
            """
            self._parameters = dict({
                'node': node,
                'direction': direction,
                'tour_completed': False,
                'colony': colony,
                'identifier': identifier,
                'walked': np.float64(0),
                'trace': []  # array of Node type
            })

        @property
        def node(self):
            return self._parameters['node']

        @property
        def direction(self):
            return self._parameters['direction']

        @property
        def tour_completed(self):
            return self._parameters['tour_completed']

        @property
        def colony(self):
            return self._parameters['colony']

        @property
        def identifier(self):
            return self._parameters['identifier']

        @property
        def walked(self):
            return self._parameters['walked']

        @property
        def trace(self):
            return self._parameters['trace']

        def _walk(self, distance):
            """Walks the given distance"""
            self._parameters['walked'] += np.float64(distance)

        def _visit(self, node: Node):
            """This method is called when a new node is reached"""
            self._current_node = node
            self._trace.append(node)

        def update_direction(self, direction: Direction):
            self._parameters['direction'] = direction

        def complete_tour(self):
            self.tour_completed = True

        def traverse_edge(self, edge: Edge):
            src_node, dst_node = edge.nodes_by_direction(self.direction)
            if src_node != self.node:
                logging.error('Trying to traverse a edge with starting node\
                    different from the one the ant is found. Not performing the step.')
                return
            if self.tour_completed:
                # TODO
                pass
            self._walk(edge.cost)
            self._visit(dst_node)
    
    def __init__(self, graph: Graph, colony_size: int):
        """
        Instantiates an ant colony optimization
        """
        self._graph = graph
        self._colony_size = colony_size
        self._ants = [Colony.Ant(graph.start, graph.start, i) for i in range(0, colony_size)]

    def start(self):
        starting_node = self._graph.start
        finishing_node = self._graph.finish
        for ant in self.ants:
            # TODO
            pass

In [80]:
node11 = Node(1, 'node11')
node12 = Node(2, 'node12')
edge1 = Edge(node11, node12, 3)

node21 = Node(3, 'node21')
node22 = Node(4, 'node22')
edge2 = Edge(node21, node22, 1)

node31 = Node(5, 'node31')
node32 = Node(6, 'node32')
edge3 = Edge(node31, node32, 7)

node41 = Node(7, 'node41')
node42 = Node(8, 'node42')
edge4 = Edge(node41, node42, 5)

edges = [edge1, edge2, edge3, edge4]
graph = Graph(start=node11, finish=node32, alpha=1, beta=1, ro=.15, name='MyGraph')
graph.add_edges(edges)

graph.step(node11)

<__main__.Edge at 0x7f2116f3d700>