In [1]:
import json

# get the station data
with open('data/clean/stationsNew.json', encoding='utf-8') as file:
    file_contents = file.read()

# get the stations of lines
with open('data/clean/LineStations.json', encoding='utf-8') as file:
    line_contents = file.read()

# get the heuristic data
with open('data/clean/nodes_heuristic.json', encoding='utf-8') as file:
    heuristic_contents = file.read()

heuristics = json.loads(heuristic_contents)
data = json.loads(file_contents)
lines = json.loads(line_contents)
airportLine = {'56': "AsiaWorld-Expo", '47': "Airport",
               '46': "Tsing Yi", '45': "Kowloon", '44': "Hong Kong"}
# store the stations key and value
stations = dict()
for i in data:
    stations[str(i['id'])] = i['name']

# make graph
# ['station1', 'station2', 'cost', 'current_line']
graph = []
for i in data:
    for j in i['connections']:
        graph.append([str(i['id']), str(j['id']), j['cost'], j['fromline']])

# estimate the cost of waiting for the next train in different lines
# the cost of data had been calculated with the cost of waiting for the next train, so we need to minus 2 minutes
expectWaitTime = {
    "Airport Express": 0,
    "Disneyland Resort Line": 0,
    "East Rail Line": 2,
    "Island Line": 1,
    "Kwun Tong Line": 1,
    "Tuen Ma Line": 2,
    "Tung Chung Line": 3,
    "Tseung Kwan O Line": 1,
    "South Island Line": 2,
    "Tsuen Wan Line": 1
}

# store the lines of stations
node_lines = dict()
for i in data:
    node_lines[str(i['id'])] = i['line']

# store the time of changing line in a station
change_lines_time = dict()
for i in data:
    try:
        change_lines_time[str(i['id'])] = i['walkingTime']
    except KeyError:
        pass

# clean the graph and get the nodes
temp = []
temp1 = []
totalcost = 0
for i in graph:
    temp.append(i[0])
    temp1.append(i[1])
nodes = set(temp).union(set(temp1))


In [5]:
# for checking if the line is changed
def checkChangeLine(curLine, nextNode, fromLine):
    if len(curLine) == 0:
        return True
    if curLine[-1] == fromLine:
        return False
    if fromLine == "Walking":
        return True
    if curLine[-1] not in node_lines[nextNode]:
        return True
    else:
        return False

# for finding the walking time when changing line
def findWalkingTime(curNode, toLine):
    try:
        if toLine == "Walking" or toLine == "":
            return 0

        for i in change_lines_time[curNode]:
            l = dict(i)
            if l["to"] == toLine:
                return l['time']
    except KeyError:
        return 0

# for finding the walking time when changing line
def findWaitingTime(toLine):
    try:
        if toLine == "Walking":
            return 0
        
        # print("waiting: ", expectWaitTime[toLine])
        return expectWaitTime[toLine]
    except KeyError:
        return 0

# using A* search algorithm for finding the shortest path

def A_star(graph, costs, open, closed, cur_node, heuristic, totalcosts, line, total_walkTime, stations, smallcost):
    if cur_node in open:
        open.remove(cur_node)

    closed.add(cur_node)
    for i in graph:
        walktime = 0
        waittime = 0
        if (i[0] == cur_node and costs[i[0]][0]+i[2]+heuristic[i[0]] < (costs[i[1]][0]+costs[i[1]][1]) and i[1] not in closed):
            open.add(i[1])
            if(len(line[i[0]]) == 0):
                waittime = findWaitingTime(i[3])
                line[i[0]].append(i[3])
            next_line = line[i[0]].copy()
            if (checkChangeLine(line[i[0]], i[1], i[3])):
                waittime += findWaitingTime(i[3])
                walktime = findWalkingTime(i[0], i[3])
                if i[3] == line[i[0]][-1]:
                    pass
                else:
                    next_line.append(i[3])
            line[i[1]] = next_line
            # print("worktime", walktime)
            # add the walking time when changing line
            if walktime is None:
                # store the cost
                costs[i[1]][0] = costs[i[0]][0]+i[2]
            else:
                # store the total walking time
                if i[3] == "Walking":
                    total_walkTime[i[1]] = total_walkTime[i[0]] + i[2]
                else:
                    total_walkTime[i[1]] = total_walkTime[i[0]] + walktime

                # store the cost with walking time
                costs[i[1]][0] = costs[i[0]][0] + i[2] + walktime + waittime

            # store the path
            if stations[i[1]] == path[i[0]].split(' -> ')[-1]:
                path[i[1]] = path[i[0]]
            else:
                path[i[1]] = path[i[0]] + ' -> ' + stations[i[1]]
            # store the heurisitc of current node
            costs[i[1]][1] = heuristic[i[0]]

            # store the total cost
            totalcosts[i[1]] = costs[i[1]][0]

            # smallest cost
            smallcost[i[1]] = sum(costs[i[1]])
            # print(path[i[1]], totalcosts[i[1]])

    # find the node with the lowest cost
    smallcost[cur_node] = 999999
    small = min(smallcost, key=smallcost.get)
    # print("smallest node", stations[small])
    if small not in closed:
        A_star(graph, costs, open, closed, small, heuristic,
            totalcosts, line, total_walkTime, stations, smallcost)

costs = dict()  # for storing the cost of each node for the program process
path = dict()  # for storing the path
totalcosts = dict()  # for storing the total cost of the path
line = dict()  # for storing the subway line of the path
total_walkTime = dict()  # for storing the total walking time of the path
smallcost = dict()  # for storing the sum of the cost and heuristic of the node
for i in nodes:
    # costs array contain the distance and the heurisitc of current node
    costs[i] = [999999,0]
    path[i] = ' '
    totalcosts[i] = 999999
    line[i] = []
    total_walkTime[i] = 0
    smallcost[i] = 999999
open = set()
closed = set()

# initial the data
start_node_str = input("Enter the Start Station: ")
start_node = list(stations.keys())[list(
    stations.values()).index(start_node_str)]
open.add(start_node)
path[start_node] = start_node_str
costs[start_node] = [0,0]
totalcosts[start_node] = 0
total_walkTime[start_node] = 0
smallcost[start_node] = 0

goal_node_str = input("Enter the Goal Station: ")
if goal_node_str in lines["Airport Express"] and start_node_str in lines["Airport Express"]:
    goal_node = list(airportLine.keys())[list(airportLine.values()).index(goal_node_str)]
else:
    goal_node = list(stations.keys())[list(stations.values()).index(goal_node_str)]

h = heuristics[goal_node]
# program start
A_star(graph, costs, open, closed, start_node,
    h,totalcosts, line, total_walkTime, stations, smallcost)

# print the result
print("Suggested Route:")
print(path[goal_node])
changeTimes = len(line[goal_node])-1
for i in line[goal_node]:
    if i == "Walking":
        changeTimes -= 1
    if changeTimes < 0:
        changeTimes = 0
print("Interchange times: ", changeTimes)
print("Suggested Line: ", line[goal_node])
print("Least cost is: ", totalcosts[goal_node])
print("Estimate walking time is: ", total_walkTime[goal_node])

Suggested Route:
Tsuen Wan -> Tai Wo Hau -> Kwai Hing -> Kwai Fong -> Lai King -> Mei Foo -> Lai Chi Kok -> Cheung Sha Wan -> Sham Shui Po -> Prince Edward -> Mong Kok -> Yau Ma Tei -> Jordan -> Tsim Sha Tsui -> Admiralty -> Exhibition Centre
Interchange times:  1
Suggested Line:  ['Tsuen Wan Line', 'East Rail Line']
Least cost is:  37
Estimate walking time is:  3
