In [1]:
from typing import Set, Union, List, Generator, Iterator, Iterable, Tuple
from collections import namedtuple
import hashlib

Node = namedtuple('nodes', ['id'])
Edge = namedtuple('edge', ['id', 'src', 'dst', 'length'])

### Load data into nodes and edges

In [2]:
nodes = []
edges = []

with open('../data/shortest_route/citymapper-coding-test-graph.dat', 'r') as f:
    data = f.readlines()
    
# Number of vertices
N = int(data[0].strip("\n"))
for i in range(1, N+1):
    nodes.append(Node(int(data[i].strip("\n"))))

# Number of edges
E = int(data[N+1])
for i in range(N+2, E+1):
    src, dst, length = data[i].strip("\n").split(" ")
    e_id = hashlib.sha256("{}_{}".format(src,dst).encode()).hexdigest()
    edges.append(Edge(e_id, int(src), int(dst), int(length)))

### Graph class

In [3]:
class CityMapperGraph(object):
    
    def __init__(self, 
                 nodes: Union[Iterator[Node]]=None, 
                 edges: Union[Iterator[Edge]]=None):
        self.nodes = set(nodes) if nodes else set()
        self.edges = set(edges) if edges else set()
        self.adjacency = {}
        self.inverted_edges = {}
        self._populate_indexes()
        
    def make_edge_id(self, src: Union[str, int, Node], dst: Union[str, int, Node]) -> str:
        if isinstance(src, Node):
            src = src.id
        if isinstance(dst, Node):
            dst = dst.id
        return hashlib.sha256("{}_{}".format(str(src),str(dst)).encode()).hexdigest()
        
    def _populate_indexes(self):
        for e in self.edges:
            # Populate adjacency list for source -> destination
            if e.src in self.adjacency:
                self.adjacency[e.src].update([e.dst])
            else:
                self.adjacency[e.src] = set([e.dst])
            # Populate adjacency list for destination -> source
            if e.dst in self.adjacency:
                self.adjacency[e.dst].update([e.src])
            else:
                self.adjacency[e.dst] = set([e.src])
            if e.id not in self.inverted_edges:
                self.inverted_edges[e.id] = [e.src, e.dst, e.length]
                
    def add_nodes(self, nodes: Union[Node,
                                     str, 
                                     int, 
                                     Iterable[Union[Node, str, int]]]
                 ):
        if isinstance(nodes, (str, int)):
            node = Node(int(nodes))
            self.nodes.update([node])
        elif isinstance(nodes, Iterable):
            for node in nodes:
                if isinstance(node, (int, str)):
                    node = Node(int(node))
                self.nodes.update([node])
                
    def add_edges(self, edges: Union[Edge, Iterable[Edge]]):
        if isinstance(edge, Edge):
            self.add_nodes([e.src, e.dst])
            self.edges.update([e])
                
    def find_nodes(self, nodes: Union[str, 
                                     int, 
                                     Iterable[Union[Node, str, int]]]
                  ) -> Generator[bool, None, int]:
        count = 0
        if isinstance(nodes, (str, int)):
            if Node(int(nodes)) in self.nodes:
                count +=1
                yield True
        elif isinstance(nodes, Iterable):
            for node in nodes:
                if isinstance(node, (str, int)):
                    node = Node(Node)
                if node in self.nodes:
                    count +=1
                    yield True
        return self

    def find_edges(self, edges: Union[str,
                                     Tuple[Union[str, Node], Union[str, Node]],
                                     Iterable[Union[Edge, str]]]
                 ) -> Generator[bool, None, int]:
        count = 0
        if isinstance(edges, str):
            if edges in self.inverted_edges:
                count += 1
                yield True
        elif isinstance(edges, Edge):
            if edges.id in self.inverted_edges:
                count += 1
                yield True
        elif isinstance(edges, Tuple):
            edge_id = self.make_edge_id(edges[0], edges[1])
            if edge_id in self.inverted_edges:
                count += 1
                yield True            
        elif isinstance(edges, Iterable):
            for edge in edges:
                if isinstance(edge, Edge):
                    edge = edge.id
                if isinstance(edge, Tuple):
                    edge = self.make_edge_id(edge[0], edge[1])
                if edge in self.inverted_edges:
                    count += 1
                    yield True
        return count

In [23]:

list(graph.find_nodes(['1084140204'])), # Passing an array of string
list(graph.find_nodes([1084140204])), # Passing an array of integer
list(graph.find_nodes(nodeTrue)), # Passing in a Node[True]

[n in graph.nodes for n in [Node(1084140204)]]

[True]

### Unit Tests

In [5]:
graph = CityMapperGraph()
nodeTrue = Node(316319897)
nodeFalse = Node(0)
edgeTrue = Edge("1c0597617b180d1bbe34e89bdd5370d923428868554c7190274535614b806d33", 316319897, 316319936, 121)
edgeFalse = Edge("false", 0, 1, 0)
srcNodeTrue, dstNodeTrue = edgeTrue.src, edgeTrue.dst

graph.add_nodes(1084140204)
graph.add_nodes([nodeTrue, srcNodeTrue, dstNodeTrue])

#########
# Nodes #
#########
pos_results = [
    list(graph.find_nodes('1084140204')), # Passing a string
    list(graph.find_nodes(['1084140204'])), # Passing an array of string
    list(graph.find_nodes(1084140204)), # Passing an integer
    list(graph.find_nodes([1084140204])), # Passing an array of integer
    list(graph.find_nodes(nodeTrue)), # Passing in a Node
    list(graph.find_nodes([nodeTrue])) # Passing in an array of Node
]

neg_results = [
    list(graph.find_nodes('0')), # Passing a string
    list(graph.find_nodes(['0'])), # Passing an array of string
    list(graph.find_nodes(0)), # Passing an integer
    list(graph.find_nodes([0])), # Passing an array of integer
    list(graph.find_nodes(nodeFalse)), # Passing in a Node
    list(graph.find_nodes([nodeFalse])) # Passing in an array of Node
]

N = len(pos_results)
M = len(neg_results)

assert all([len(pos_results[i]) == 1 for i in range(N)])
assert all([pos_results[i] for i in range(N)])
assert all([len(neg_results[i]) == 0 for i in range(M)])
assert all([not neg_results[i] for i in range(M)])

#########
# Edges #
#########

# pos_results = [
#     list(graph.find_edges(edgeTrue.id)), # Passing a string
#     list(graph.find_edges(edgeTrue)), # Passing a string
#     list(graph.find_edges([edgeTrue.id])), # Passing a string
#     list(graph.find_edges([edgeTrue])), # Passing a string
#     list(graph.find_edges((srcNodeTrue, dstNodeTrue))), 
#     list(graph.find_edges([(srcNodeTrue, dstNodeTrue)]))
# ]

# neg_results = [
#     list(graph.find_edges(edgeFalse.id)), # Passing a string
#     list(graph.find_edges(edgeFalse)), # Passing a string
#     list(graph.find_edges([edgeFalse.id])), # Passing a string
#     list(graph.find_edges([edgeFalse])), # Passing a string
#     list(graph.find_edges((nodeTrue, nodeFalse))), 
#     list(graph.find_edges([(nodeTrue, nodeFalse)])),
#     list(graph.find_edges((nodeFalse, nodeTrue))), 
#     list(graph.find_edges([(nodeFalse, nodeTrue)]))    
# ]

# N = len(pos_results)
# M = len(neg_results)

# assert all([len(pos_results[i]) == 1 for i in range(N)])
# assert all([pos_results[i] for i in range(N)])
# assert all([len(neg_results[i]) == 0 for i in range(M)])
# assert all([not neg_results[i] for i in range(M)])

AssertionError: 