# Directed Chinese Postman Problem

<b>Goal:</b> Implement an algorithm to solve the directed Chinese postman problem and apply it to two examples.

## (a) Implementation

The algorithm is loosely described in Section 3.3.3 of the lecture notes.

In [6]:
import matplotlib.pyplot as plt
import networkx as nx

%matplotlib inline

In [7]:
def chinese_postman_tour(G, U):
    '''Computes a Chinese postman tour in a directed graph.
    
    Args:
        G: A strongly connected weighted netwrokx digraph.
        U: A weakly connected set of arcs of G.
        
    Returns:
        A list of arcs in a Chinese postman tour in G that contains every arc
        of U at least once.
    '''
    # Construct graph induced by U
    G_U = nx.MultiDiGraph()
    G_U.add_edges_from(U)
    
    # Define alpha
    alpha = {node: G_U.in_degree(node) - G_U.out_degree(node) for node in G_U}
    
    # Define auxiliary bipartite graph
    H = nx.Graph()
    V_1 = [node for node in G_U if alpha[node] > 0]
    V_2 = [node for node in G_U if alpha[node] < 0]    
    for v1 in V_1:
        for v2 in V_2:
            dist = nx.shortest_path_length(G, v1, v2, 'weight')
            H.add_weighted_edges_from((f'{v1}.{i}', f'{v2}.{j}', dist) for i in range(alpha[v1]) for j in range(-alpha[v2]))
    
    # Find min weight perfect matching
    M = nx.bipartite.minimum_weight_full_matching(H)
    
    # Create backreferences for vertices in H
    orig_node = {}
    orig_node.update({f'{v1}.{i}': v1 for v1 in V_1 for i in range(alpha[v1])})
    orig_node.update({f'{v2}.{i}': v2 for v2 in V_2 for i in range(-alpha[v2])})
    
    # Add shortest paths given by matchings to graph induced by U
    for v1 in V_1:
        for i in range(alpha[v1]):
            path = nx.shortest_path(G, v1, orig_node[M[f'{v1}.{i}']], 'weight')
            path_edges = zip(path, path[1:])
            G_U.add_edges_from(path_edges)
    
    return list(nx.eulerian_circuit(G_U))

In [8]:
# Test on example from script
G = nx.DiGraph()
G.add_weighted_edges_from([
    (1, 2, 3),
    (1, 4, 1),
    (2, 3, 1),
    (3, 6, 2),
    (4, 2, 1),
    (4, 9, 2),
    (5, 2, 2),
    (5, 4, 1),
    (6, 5, 3),
    (6, 7, 5),
    (7, 8, 2),
    (8, 6, 1),
    (8, 9, 2),
    (9, 1, 1)
])
U = [(2, 3), (3, 6), (4, 2), (4, 9), (5, 2), (6, 5), (6, 7), (7, 8)]
tour = chinese_postman_tour(G, U)
print(f"Tour: {tour}")
print(f"Total tour weight: {sum(G.edges[e]['weight'] for e in tour)}")

Tour: [(2, 3), (3, 6), (6, 5), (5, 2), (2, 3), (3, 6), (6, 7), (7, 8), (8, 9), (9, 1), (1, 4), (4, 9), (9, 1), (1, 4), (4, 2)]
Total tour weight: 27


---

## (b) Skiing in Zermatt

In [9]:
# Define the graph
Zermatt = nx.DiGraph()
Zermatt.add_weighted_edges_from([
    ('Zermatt', 'Riffelalp', 19),
    ('Zermatt', 'Winkelmatten', 10),
    ('Riffelalp', 'Zermatt', 19),
    ('Riffelalp', 'Furi', 12),
    ('Winkelmatten', 'Zermatt', 10),
    ('Winkelmatten', 'Furi', 12),
    ('Winkelmatten', 'Schwartzsee', 15),
    ('Furi', 'Steg', 20),
    ('Furi', 'Winkelmatten', 10),
    ('Furgg', 'Furi', 5),
    ('Furgg', 'Steg', 11),
    ('Furgg', 'Fun Park', 12),
    ('Schwartzsee', 'Furgg', 5),
    ('Schwartzsee', 'Winkelmatten', 24),
    ('Steg', 'Klein Matterhorn', 20),
    ('Steg', 'Furggsattel', 18),
    ('Steg', 'Furgg', 7),
    ('Fun Park', 'Steg', 10),
    ('Klein Matterhorn', 'Furggsattel', 22),
    ('Furggsattel', 'Steg', 15),
    ('Furggsattel', 'Fun Park', 9),
    ('Furggsattel', 'Schwartzsee', 16)
])
ski_runs = [
    ('Riffelalp', 'Furi'),
    ('Winkelmatten', 'Zermatt'),
    ('Furi', 'Winkelmatten'),
    ('Furgg', 'Furi'),
    ('Schwartzsee', 'Furgg'),
    ('Schwartzsee', 'Winkelmatten'),
    ('Steg', 'Furgg'),
    ('Fun Park', 'Steg'),
    ('Klein Matterhorn', 'Furggsattel'),
    ('Furggsattel', 'Steg'),
    ('Furggsattel', 'Fun Park'),
    ('Furggsattel', 'Schwartzsee')
]

# Find the tour
tour = chinese_postman_tour(Zermatt, ski_runs)

# Sort such that it starts at Zermatt
idx = [i for i, v in enumerate(tour) if v[0] == 'Zermatt'][0]
sorted_tour = tour[idx:] + tour[0:idx]

# Print results
print(f"Tour from Zermatt:\n{sorted_tour}")
print(f"Total duration: {sum(Zermatt.edges[e]['weight'] for e in tour)}min")

Tour from Zermatt:
[('Zermatt', 'Riffelalp'), ('Riffelalp', 'Furi'), ('Furi', 'Steg'), ('Steg', 'Klein Matterhorn'), ('Klein Matterhorn', 'Furggsattel'), ('Furggsattel', 'Steg'), ('Steg', 'Furggsattel'), ('Furggsattel', 'Fun Park'), ('Fun Park', 'Steg'), ('Steg', 'Furgg'), ('Furgg', 'Steg'), ('Steg', 'Furggsattel'), ('Furggsattel', 'Schwartzsee'), ('Schwartzsee', 'Winkelmatten'), ('Winkelmatten', 'Schwartzsee'), ('Schwartzsee', 'Furgg'), ('Furgg', 'Furi'), ('Furi', 'Winkelmatten'), ('Winkelmatten', 'Zermatt')]
Total duration: 266min


---

## (c) Software testing

In [10]:
# Define the graph
software = nx.DiGraph()
software.add_weighted_edges_from([
    (0, 1, 1),
    (0, 2, 1),
    (0, 5, 1),
    (0, 6, 1),
    (0, 8, 1),
    (1, 5, 1),
    (2, 8, 1),
    (3, 6, 1),
    (4, 0, 1),
    (5, 3, 1),
    (5, 4, 1),
    (6, 1, 1),
    (6, 7, 1),
    (7, 0, 1),
    (8, 7, 1)
])

# variables holding the best tour so far
min_walk = None
min_walk_len = None

# large weight for extra edge
large_weight = len(software.edges)+1

# Note that adding edges to the software graph might create parallel edges.
# We could use a multigraph, but we can also replace the extra edge by a path
# of length two via an auxiliary vertex.
aux_vertex = 'auxiliary vertex'
software.add_node(aux_vertex)

# find best walk
for end_node in range(9):
    # add extra edges (see comment above)
    software.add_edge(end_node, aux_vertex, weight=large_weight)
    software.add_edge(aux_vertex, 0, weight=large_weight)
    
    # compute tour and walk
    tour = chinese_postman_tour(software, list(software.edges))
    idx = [i for i, e in enumerate(tour) if e[0] == end_node and e[1] == aux_vertex][0]
    walk = tour[idx+2:] + tour[0:idx]
    walk_len = sum(software.edges[e]['weight'] for e in walk)
    
    # replace shortest walk found so far if new one is better
    if min_walk_len is None or walk_len < min_walk_len:
        min_walk = walk
        min_walk_len = walk_len
    
    # reset graph for next iteration
    software.remove_edges_from([(end_node, aux_vertex), (aux_vertex, 0)])

print(f"Shortest testing sequence: {min_walk}")
print(f"Length: {min_walk_len}")

Shortest testing sequence: [(0, 2), (2, 8), (8, 7), (7, 0), (0, 8), (8, 7), (7, 0), (0, 6), (6, 1), (1, 5), (5, 3), (3, 6), (6, 7), (7, 0), (0, 5), (5, 4), (4, 0), (0, 1)]
Length: 18
