In [8]:
import heapq
import functools
import itertools

from copy import copy
from collections import deque, defaultdict
from dataclasses import dataclass
from typing import Set, Tuple, List, Dict, Any

In [9]:
@dataclass(frozen=True)
class DirectedEdge:
    _from: int
    _to: int
    _weight: int

In [10]:
class EdgeWeightedDiagraph:
    V: int
    _adj: Tuple[Set[DirectedEdge]]

    def __init__(self, V: int):
        self.V = V
        self.adj = tuple(set() for _ in range(V))
        self.edges = set()

    def add_edge(self, e: DirectedEdge) -> None:
        self.adj[e._from].add(e)
        self.edges.add(e)

In [11]:
class IndexPriorityQueue:
    _data: List[Tuple[int, Any]]
    _entry_finder: Dict[int, Any]

    _TOMB_STONE = float('inf')

    def __init__(self):
        self._data = []
        self._entry_finder = {}

    def is_empty(self) ->bool:
        return len(self._entry_finder) == 0

    def contains(self, key: Any) -> bool:
        return key in self._entry_finder

    def insert(self, key: Any, val: Any, priority: int) -> None:
        if self.contains(key):
            raise KeyError('key already exists in entry')

        entry = [priority, key, val]

        self._entry_finder[key] = entry
        heapq.heappush(self._data, entry)

    def remove(self, key: Any) -> Any:
        entry = self._entry_finder.pop(key)
        val, entry[-1] = entry[-1], IndexPriorityQueue._TOMB_STONE
        return val

    def update(self, key: Any, priority: int) -> None:
        if not self.contains(key):
            raise KeyError('key not found')

        self.insert(key, self.remove(key), priority)

    def pop(self) -> Any:
        while self._entry_finder:
            priority, key, val = heapq.heappop(self._data)

            if val is not IndexPriorityQueue._TOMB_STONE:
                del self._entry_finder[key]

                return key, val

        raise KeyError('pop from an empty priority queue')

In [12]:
pq = IndexPriorityQueue()

pq.insert(2, "YOLO", 2)
pq.insert(3, "YOLO AGAIN", 3)

print(pq._data)
print(pq._entry_finder)

pq.update(3, 1)

print(pq._data)
print(pq._entry_finder)

print(pq.pop())
print(pq.pop())

print(pq._data)
print(pq._entry_finder)

[[2, 2, 'YOLO'], [3, 3, 'YOLO AGAIN']]
{2: [2, 2, 'YOLO'], 3: [3, 3, 'YOLO AGAIN']}
[[1, 3, 'YOLO AGAIN'], [3, 3, inf], [2, 2, 'YOLO']]
{2: [2, 2, 'YOLO'], 3: [1, 3, 'YOLO AGAIN']}
(3, 'YOLO AGAIN')
(2, 'YOLO')
[[3, 3, inf]]
{}


In [13]:
class DijkstraSP:
    def __init__(self, G: EdgeWeightedDiagraph, start: int, end: int):
        self.G = G
        self.edge_to = [-1] * self.G.V
        self.dist_to = [float('inf')] * self.G.V

        self._start = start
        self._end = end

        self._pq = IndexPriorityQueue()

        self._execute()

    def _relax(self, edge: DirectedEdge) -> None:
        v, w = edge._from, edge._to
        optimized_cost = self.dist_to[v] + edge._weight

        if optimized_cost < self.dist_to[w]:
            self.dist_to[w] = optimized_cost
            self.edge_to[w] = edge

        if self._pq.contains(w):
            self._pq.update(key=w, priority=optimized_cost)
        else:
            self._pq.insert(key=w, val=optimized_cost, priority=optimized_cost)

    def _execute(self):
        self.dist_to[self._start] = 0
        self._pq.insert(key=self._start, val=0, priority=0)

        while not self._pq.is_empty():
            v, _ = self._pq.pop()

            if v == self._end:
                return

            for edge in self.G.adj[v]:
                self._relax(edge)

    def cost(self) -> float:
        return self.dist_to[self._end]

    def route(self) -> Tuple[DirectedEdge]:
        if self.cost() == float('inf'):
            return tuple()

        end = self._end
        result = deque()

        while end != self._start:
            edge = self.edge_to[end]

            result.appendleft(edge)

            end = edge._from

        return tuple(result)

In [14]:
G = EdgeWeightedDiagraph(8)

G.add_edge(DirectedEdge(0, 1, 5))
G.add_edge(DirectedEdge(0, 4, 9))
G.add_edge(DirectedEdge(0, 7, 8))
G.add_edge(DirectedEdge(1, 2, 12))
G.add_edge(DirectedEdge(1, 3, 15))
G.add_edge(DirectedEdge(1, 7, 4))
G.add_edge(DirectedEdge(2, 3, 3))
G.add_edge(DirectedEdge(2, 6, 11))
G.add_edge(DirectedEdge(3, 6, 9))
G.add_edge(DirectedEdge(4, 5, 4))
G.add_edge(DirectedEdge(4, 6, 20))
G.add_edge(DirectedEdge(4, 7, 5))
G.add_edge(DirectedEdge(5, 2, 1))
G.add_edge(DirectedEdge(5, 6, 13))
G.add_edge(DirectedEdge(7, 5, 6))
G.add_edge(DirectedEdge(7, 2, 7))

In [15]:
dsp = DijkstraSP(G, 0, 6)

print(dsp.cost())
print('----------------')
print(dsp.route())

25
----------------
(DirectedEdge(_from=0, _to=4, _weight=9), DirectedEdge(_from=4, _to=5, _weight=4), DirectedEdge(_from=5, _to=2, _weight=1), DirectedEdge(_from=2, _to=6, _weight=11))


In [16]:
class FordBellman:
    def __init__(self, G: EdgeWeightedDiagraph, start: int):
        self.G = G
        self.edge_to = [-1] * self.G.V
        self.dist_to = [float('inf')] * self.G.V

        self._start = start

        self._execute()

    def _relax(self, edge: DirectedEdge) -> bool:
        v, w = edge._from, edge._to
        optimized_cost = self.dist_to[v] + edge._weight

        if optimized_cost < self.dist_to[w]:
            self.dist_to[w] = optimized_cost
            self.edge_to[w] = edge

            return True

        return False

    def _execute(self):
        self.dist_to[self._start] = 0

        for _ in range(self.G.V):
            has_any_optimization = False

            for v in range(self.G.V):
                for edge in self.G.adj[v]:
                    has_any_optimization |= self._relax(edge)

            if not has_any_optimization:
                return

    def has_negative_cycle(self) -> bool:
        for v in range(self.G.V):
            for edge in self.G.adj[v]:
                if self._relax(edge):
                    return True

        return False

    def cost(self, end: int) -> float:
        return self.dist_to[end]

    def route(self, end: int) -> Tuple[DirectedEdge]:
        if self.cost(end) == float('inf'):
            return tuple()

        result = deque()

        while end != self._start:
            edge = self.edge_to[end]

            result.appendleft(edge)

            end = edge._from

        return tuple(result)

In [17]:
fb = FordBellman(G, 0)

print(fb.cost(6))
print('----------------')
print(fb.route(6))
print('----------------')
print(fb.has_negative_cycle())

25
----------------
(DirectedEdge(_from=0, _to=4, _weight=9), DirectedEdge(_from=4, _to=5, _weight=4), DirectedEdge(_from=5, _to=2, _weight=1), DirectedEdge(_from=2, _to=6, _weight=11))
----------------
False


In [18]:
class FloydWarshall:
    def __init__(self, G: EdgeWeightedDiagraph):
        self.G = G
        self.trace = defaultdict(lambda: -1)
        self.dist_to = defaultdict(lambda: float('inf'))

        self._execute()

    def _execute(self):
        for edge in self.G.edges:
            self.dist_to[edge._from, edge._to] = edge._weight
            self.trace[edge._from, edge._to] = edge._from

        for k in range(self.G.V):
            for v in range(self.G.V):
                for w in range(self.G.V):
                    optimized_cost = self.dist_to[v, k] + self.dist_to[k, w]

                    if optimized_cost < self.dist_to[v, w]:
                        self.dist_to[v, w] = optimized_cost
                        self.trace[v, w] = self.trace[k, w]

    def cost(self, start: str, end: str) -> float:
        return self.dist_to[start, end]

    def route(self, start: str, end: str) -> Tuple[str]:
        if self.trace[start, end] == -1:
            return tuple()

        path = deque([end])

        while start != end:
            end = self.trace[start, end]
            path.appendleft(end)
            
        return tuple(path)

In [19]:
fb = FloydWarshall(G)

print(fb.cost(0, 6))
print('----------------')
print(fb.route(0, 6))

25
----------------
(0, 4, 5, 2, 6)


## Detect negative cycle for directed graph

In [11]:
class DirectedGraph:
    def __init__(self, V: int):
        self.V = V
        self.graph = defaultdict(set)

    def add_edge(self, v: int, w: int) -> None:
        self.graph[v].add(w)
        
    def is_cyclic(self) -> bool:
        marker = set() 
        recursion_stack = set()

        def is_cyclic_until(v):
            marker.add(v)
            recursion_stack.add(v)

            for w in self.graph[v]:
                if w not in marker:
                    return is_cyclic_until(w)
                elif w in recursion_stack:
                    return True

            recursion_stack.remove(v)

            return False

        for v in range(self.V):
            if v not in marker and is_cyclic_until(v): 
                return True

        return False

In [12]:
g = DirectedGraph(4)

g.add_edge(0, 1)
g.add_edge(0, 2)
g.add_edge(1, 2)
g.add_edge(2, 0)
g.add_edge(2, 3)
g.add_edge(3, 3)

if g.is_cyclic(): 
    print("Graph has a cycle")
else: 
    print("Graph has no cycle")

Graph has a cycle


In [13]:
class UnionFind:
    def __init__(self, V: int):
        self._data = [i for i in range(V)]
        self._size = [1] * V

    def get_root(self, v: int) -> int:
        while v != self._data[v]:
            self._data[v] = self._data[self._data[v]]
            v = self._data[v]

        return v

    def is_connected(self, v: int, w: int) -> bool:
        return self.get_root(v) == self.get_root(w)

    def union(self, v: int, w: int) -> None:
        v_root, w_root = self.get_root(v), self.get_root(w)

        if v_root == w_root:
            return

        if self._size[v_root] < self._size[w_root]:
            smaller_root, larger_root = v_root, w_root
        else:
            smaller_root, larger_root = w_root, v_root

        self._data[smaller_root] = self._data[larger_root]
        self._size[larger_root] += self._size[smaller_root]

In [14]:
@functools.total_ordering
@dataclass(frozen=True)
class Edge:
    v: int
    w: int
    weight: int

    @property
    def either(self):
        return self.v

    def other(self, v: int) -> int:
        return self.w if v == self.v else self.v

    def __lt__(self, other: 'Edge') -> bool:
        if type(other) is not Edge:
            raise TypeError()

        return self.weight < other.weight

In [15]:
class EdgeWeightedGraph:
    V: int
    _adj: Tuple[Set[Edge]]

    def __init__(self, V: int):
        self.V = V
        self.adj = tuple(set() for _ in range(V))

    def add_edge(self, e: Edge) -> None:
        self.adj[e.v].add(e)
        self.adj[e.w].add(e)

    def all_edges(self) -> Set[Edge]:
        return set(itertools.chain.from_iterable(self.adj))

In [16]:
class KrusalMST:
    def __init__(self, G: EdgeWeightedGraph):
        self.G = G
        self.mst = []

        self._execute()

    def _execute(self):
        pq = [edge for edge in self.G.all_edges()]

        heapq.heapify(pq)

        uf = UnionFind(self.G.V)

        while pq and len(self.mst) < self.G.V - 1:
            edge = heapq.heappop(pq)

            if not uf.is_connected(edge.v, edge.w):
                self.mst.append(edge)
                uf.union(edge.v, edge.w)

        if len(self.mst) != self.G.V - 1:
            self.mst = []

In [17]:
G = EdgeWeightedGraph(8)

G.add_edge(Edge(0, 7, 0.16))
G.add_edge(Edge(2, 3, 0.17))
G.add_edge(Edge(1, 7, 0.19))
G.add_edge(Edge(0, 2, 0.26))
G.add_edge(Edge(5, 7, 0.28))
G.add_edge(Edge(1, 3, 0.29))
G.add_edge(Edge(1, 5, 0.32))
G.add_edge(Edge(2, 7, 0.34))
G.add_edge(Edge(4, 5, 0.35))
G.add_edge(Edge(1, 2, 0.36))
G.add_edge(Edge(4, 7, 0.37))
G.add_edge(Edge(0, 4, 0.38))
G.add_edge(Edge(6, 2, 0.40))
G.add_edge(Edge(3, 6, 0.52))
G.add_edge(Edge(6, 0, 0.58))
G.add_edge(Edge(6, 4, 0.93))

G.adj

({Edge(v=0, w=7, weight=0.16),
  Edge(v=0, w=2, weight=0.26),
  Edge(v=0, w=4, weight=0.38),
  Edge(v=6, w=0, weight=0.58)},
 {Edge(v=1, w=7, weight=0.19),
  Edge(v=1, w=3, weight=0.29),
  Edge(v=1, w=5, weight=0.32),
  Edge(v=1, w=2, weight=0.36)},
 {Edge(v=2, w=3, weight=0.17),
  Edge(v=0, w=2, weight=0.26),
  Edge(v=2, w=7, weight=0.34),
  Edge(v=1, w=2, weight=0.36),
  Edge(v=6, w=2, weight=0.4)},
 {Edge(v=2, w=3, weight=0.17),
  Edge(v=1, w=3, weight=0.29),
  Edge(v=3, w=6, weight=0.52)},
 {Edge(v=4, w=5, weight=0.35),
  Edge(v=4, w=7, weight=0.37),
  Edge(v=0, w=4, weight=0.38),
  Edge(v=6, w=4, weight=0.93)},
 {Edge(v=5, w=7, weight=0.28),
  Edge(v=1, w=5, weight=0.32),
  Edge(v=4, w=5, weight=0.35)},
 {Edge(v=6, w=2, weight=0.4),
  Edge(v=3, w=6, weight=0.52),
  Edge(v=6, w=0, weight=0.58),
  Edge(v=6, w=4, weight=0.93)},
 {Edge(v=0, w=7, weight=0.16),
  Edge(v=1, w=7, weight=0.19),
  Edge(v=5, w=7, weight=0.28),
  Edge(v=2, w=7, weight=0.34),
  Edge(v=4, w=7, weight=0.37)})

In [18]:
%timeit k_mst = KrusalMST(G); k_mst.mst

64.4 µs ± 5.22 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [19]:
class PrimMST:
    def __init__(self, G: EdgeWeightedGraph):
        self.G = G
        self.mst = []

        self._pq = []
        self._marker = set()

        self._execute()

    def _dfs(self, v: int) -> None:
        self._marker.add(v)

        for edge in self.G.adj[v]:
            if edge.other(v) not in self._marker:
                heapq.heappush(self._pq, edge)

    def _execute(self):
        self._dfs(0)

        while pq and len(self.mst) < self.G.V - 1:
            edge = heapq.heappop(self._pq)
            v, w = edge.v, edge.w

            if v in self._marker and w in self._marker:
                continue

            if v not in self._marker:
                self._dfs(v)

            if w not in self._marker:
                self._dfs(w)

            self.mst.append(edge)

        if len(self.mst) != self.G.V - 1:
            self.mst = []

In [20]:
%timeit p_mst = PrimMST(G); p_mst.mst

47.3 µs ± 4.63 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [21]:
@dataclass
class FlowEdge:
    v: int
    w: int
    capacity: float
    flow: float = 0.0

    def other(self, vertex: int) -> int:
        if vertex == self.v:
            return self.w
        elif vertex == self.w:
            return self.v
        else:
            raise ValueError

    def residual_capacity_to(self, vertex: int) -> float:
        if vertex == self.v:
            return self.flow
        elif vertex == self.w:
            return self.capacity - self.flow
        else:
            raise ValueError

    def add_residual_flow_to(self, vertex: int, delta: float) -> None:
        if vertex == self.v:
            self.flow -= delta
        elif vertex == self.w:
            self.flow += delta
        else:
            raise ValueError

In [22]:
class FlowNetwork:
    def __init__(self, V):
        self.V = V
        self.adj = tuple([] for _ in range(V))

    def add_edge(self, edge: FlowEdge) -> None:
        self.adj[edge.v].append(edge)
        self.adj[edge.w].append(edge)

In [51]:
class FordFulkerson:
    def __init__(self, G: FlowNetwork, start: int, end: int):
        self.G = G
        self.value = 0.0

        self._start = start
        self._end = end

        self._execute()

    def in_cut(self, vertex: int) -> bool:
        return vertex in self.marker

    def _has_augmenting_path(self) -> bool:
        self.edge_to = [None] * G.V
        self.marker = {self._start}

        queue = deque([self._start])

        while queue:
            v = queue.popleft()

            for edge in self.G.adj[v]:
                w = edge.other(v)

                if edge.residual_capacity_to(w) > 0 and w not in self.marker:
                    self.edge_to[w] = edge
                    self.marker.add(w)
                    queue.append(w)

        return self._end in self.marker

    def _execute(self):
        while self._has_augmenting_path():
            bottle = float('inf')

            v = self._end
            while v != self._start:
                bottle = min(bottle, self.edge_to[v].residual_capacity_to(v))
                v = self.edge_to[v].other(v)

            v = self._end
            while v != self._start:
                self.edge_to[v].add_residual_flow_to(v, bottle)
                v = self.edge_to[v].other(v)

            self.value += bottle

In [35]:
G = FlowNetwork(6)

G.add_edge(FlowEdge(0, 1, 2.0))
G.add_edge(FlowEdge(0, 2, 3.0))
G.add_edge(FlowEdge(1, 3, 3.0))
G.add_edge(FlowEdge(1, 4, 1.0))
G.add_edge(FlowEdge(2, 3, 1.0))
G.add_edge(FlowEdge(2, 4, 1.0))
G.add_edge(FlowEdge(3, 5, 2.0))
G.add_edge(FlowEdge(4, 5, 3.0))

G.adj

([FlowEdge(v=0, w=1, capacity=2.0, flow=0.0),
  FlowEdge(v=0, w=2, capacity=3.0, flow=0.0)],
 [FlowEdge(v=0, w=1, capacity=2.0, flow=0.0),
  FlowEdge(v=1, w=3, capacity=3.0, flow=0.0),
  FlowEdge(v=1, w=4, capacity=1.0, flow=0.0)],
 [FlowEdge(v=0, w=2, capacity=3.0, flow=0.0),
  FlowEdge(v=2, w=3, capacity=1.0, flow=0.0),
  FlowEdge(v=2, w=4, capacity=1.0, flow=0.0)],
 [FlowEdge(v=1, w=3, capacity=3.0, flow=0.0),
  FlowEdge(v=2, w=3, capacity=1.0, flow=0.0),
  FlowEdge(v=3, w=5, capacity=2.0, flow=0.0)],
 [FlowEdge(v=1, w=4, capacity=1.0, flow=0.0),
  FlowEdge(v=2, w=4, capacity=1.0, flow=0.0),
  FlowEdge(v=4, w=5, capacity=3.0, flow=0.0)],
 [FlowEdge(v=3, w=5, capacity=2.0, flow=0.0),
  FlowEdge(v=4, w=5, capacity=3.0, flow=0.0)])

In [36]:
ff = FordFulkerson(G, 0, 5)

print(ff.value)
print('----------------')

4.0
----------------
[None, None, FlowEdge(v=0, w=2, capacity=3.0, flow=2.0), None, None, None]
{0, 2}


## Bipartite problem

N students apply for N jobs

Each get several offers

Is there a way to match all student to jobs

```
1. Alice
    Adobe
    Amazon
    Google

2. Bob
    Adobe
    Amazon

3. Carol
    Adobe
    Facebook
    Google

4. Dave
    Amazon
    Yahoo

5. Eliza
    Amazon
    Yahoo

6. Adobe
    Alice
    Bob
    Carol

7. Amazon
    Alice
    Bob
    Dave
    Eliza

8. Facebook
    Carol

9. Google
    Alice
    Carol

10. Yahoo
    Dave
    Eliza
```

Expect perfect match:

```
Alice —— Google
Bob —— Adobe
Carol —— Facebook
Dave —— Yahoo
Eliza —— Amazon
```

In [68]:
G = FlowNetwork(12)

for i in range(1, 6):
    G.add_edge(FlowEdge(0, i, 1.0))

"""
1. Alice
    Adobe
    Amazon
    Google
"""
G.add_edge(FlowEdge(1, 6, 1.0))
G.add_edge(FlowEdge(1, 7, 1.0))
G.add_edge(FlowEdge(1, 9, 1.0))

"""
2. Bob
    Adobe
    Amazon
"""
G.add_edge(FlowEdge(2, 6, 1.0))
G.add_edge(FlowEdge(2, 7, 1.0))

"""

3. Carol
    Adobe
    Facebook
    Google
"""
G.add_edge(FlowEdge(3, 6, 1.0))
G.add_edge(FlowEdge(3, 8, 1.0))
G.add_edge(FlowEdge(3, 9, 1.0))

"""
4. Dave
    Amazon
    Yahoo
"""
G.add_edge(FlowEdge(4, 7, 1.0))
G.add_edge(FlowEdge(4, 10, 1.0))

"""
5. Eliza
    Amazon
    Yahoo
"""
G.add_edge(FlowEdge(5, 7, 1.0))
G.add_edge(FlowEdge(5, 10, 1.0))

"""
6. Adobe
    Alice
    Bob
    Carol

7. Amazon
    Alice
    Bob
    Dave
    Eliza

8. Facebook
    Carol

9. Google
    Alice
    Carol

10. Yahoo
    Dave
    Eliza
"""
for i in range(6, 11):
    G.add_edge(FlowEdge(i, 11, 1.0))

G.adj

ff = FordFulkerson(G, 0, 11)

print(ff.value)
print('----------------')
print([i for i in range(12) if ff.in_cut(i)])

DATA_MAPPING = {
    1: 'Alice',
    2: 'Bob',
    3: 'Carol',
    4: 'Dave',
    5: 'Eliza',
    6: 'Adobe',
    7: 'Amazon',
    8: 'Facebook',
    9: 'Google',
    10: 'Yahoo'
}

for i in range(1, 6):
    for edge in G.adj[i]:
        if edge.flow == 1.0 and edge.v != 0 and edge.w != 11:
            print(f'{DATA_MAPPING[edge.v]} --- {DATA_MAPPING[edge.w]}')

5.0
----------------
[0]
Alice --- Google
Bob --- Adobe
Carol --- Facebook
Dave --- Yahoo
Eliza --- Amazon
