In [1]:
import json
from os import path
import numpy as np
import pandas as pd
from pydantic import BaseModel

# HELPER FUNCTION

In [2]:
def loadData():
    try: 
        fileDir = {
            "AMPANG_LRT":"../data/lrtAmpangCoordinates.json",
            "KELANA_JAYA_LRT":"../data/lrtKelanaJayaCoordinates.json",
            "SRI_PETALING_LRT":"../data/lrtSriPetalingCoordinates.json",
            "KAJANG_MRT":"../data/mrtKajangCoordinates.json",
            "PUTRAJAYA_MRT":"../data/mrtPutrajayaCoordinates.json",
        }
        keys = list(fileDir.keys())
        values = list(fileDir.values())

        data = {}
        for key in keys:
            with open(fileDir[key]) as json_file:
                data[key] = json.load(json_file)
        return data
    except Exception as error:
        print(str(error))

In [3]:
data = loadData()

In [4]:
keys = list(data.keys())
values = list(data.values())
for key in keys:
    for value in values:
        if(value.get(key)):print(type(value),list(value.keys()))
        stations = value.get(key)
#         print("stations : " ,stations)
    print("after values loop")

<class 'dict'> ['AMPANG_LRT']
after values loop
<class 'dict'> ['KELANA_JAYA_LRT']
after values loop
<class 'dict'> ['SRI_PETALING_LRT']
after values loop
<class 'dict'> ['KAJANG_MRT']
after values loop
<class 'dict'> ['PUTRAJAYA_MRT']
after values loop


In [5]:
class Coordinates(BaseModel):
    lon:float
    lat:float
 


In [6]:
def HaversineDist(_from:Coordinates,_to:Coordinates):
    try:
        _from = Coordinates(**_from)
        _to = Coordinates(**_to)
        lat1 = _from.lat
        lon1 = _from.lon
        lat2 = _to.lat
        lon2 = _to.lon

        EarthRadius = 6371*1000 # meter
        PI = 22/7
        lat1Radians = lat1 * PI/180
        lat2Radians = lat2 * PI/180
        latDist = (lat2-lat1) * PI/180
        lonDist = (lon2-lon1) * PI/180

        a = np.sin(latDist/2) * np.sin(latDist/2) + np.cos(lat1Radians) * np.cos(lat2Radians) * np.sin(lonDist/2) * np.sin(lonDist/2);
        c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a));

        dist = EarthRadius * c #in metres
        return {"res":dist}

    except Exception as error:
        return {"err": str(error)}

# DATA STRUCTURE

## Stack

In [7]:
class Stack:
    def __init__(self, array:list=[]):
        self.array = array
        
    def push(self,doc):
        self.array.append(doc)
        
    def pop(self):
        return self.array.pop()
    
    def reverse(self):
        self.array.reverse()
    
    def show(self):
        print(self.array)
        
    def isEmpty(self):
        return len(self.array)==0

#Test
s = Stack()
for i in range(5):
    s.push(i)
    s.show()
s.push(5)
s.reverse()
s.show()
s.reverse()

while(not s.isEmpty()):
    s.pop()
    s.show()     

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[5, 4, 3, 2, 1, 0]
[0, 1, 2, 3, 4]
[0, 1, 2, 3]
[0, 1, 2]
[0, 1]
[0]
[]


## Queue

In [8]:
class Queue:
    def __init__(self, array:list =[]):
        self.array = array
        
    def enqueue(self,doc):
        self.array.append(doc)
        
    def dequeue(self):
        return self.array.pop(0)
    
    def reverse(self):
        self.array.reverse()
    
    def show(self):
        print(self.array)
        
    def isEmpty(self):
        return len(self.array)==0
    
#Test
q = Queue()
for i in range(5):
    q.enqueue(i)
    q.show()
    
q.enqueue(5)
q.reverse()
q.show()
q.reverse()

while(not q.isEmpty()):
    q.dequeue()
    q.show()   

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[5, 4, 3, 2, 1, 0]
[1, 2, 3, 4, 5]
[2, 3, 4, 5]
[3, 4, 5]
[4, 5]
[5]
[]


## Graph

In [9]:
class Graph:
    def __init__(self):
        self.edges = {}
        self.vertices = {}
        
    def addVertices(self,doc:dict):
        try:
            if(not doc.get("id")): raise Exception("data.id is required")
            
            #parsing and checking
            doc["id"] = str(doc["id"])
            found = self.vertices.get(doc["id"]) 
            if(found):raise Exception("duplicated Insert")
            
            # Create
            self.vertices[doc["id"]] = doc
            
        except Exception as error:
             print(str(error))
                
    def addEdges(self,fromID:str,toID:str,EdgeProps):
        try:
            #Validation and parsing
            if(not fromID): raise Exception("fromID is required")
            if(not toID): raise Exception("toID is required")
            fromID = str(fromID)
            toID = str(toID)

            if(not self.vertices.get(fromID)) : raise Exception(f"vertices with fromID({fromID}) not exist")
            if(not self.vertices.get(toID)): raise Exception(f"vertices with toID({toID}) not exist")

            #Initialization
            if(not self.edges.get(fromID)): self.edges[fromID] = {};
            if(not self.edges.get(fromID).get(toID)): self.edges[fromID][toID] = {};

            #Create
            if(EdgeProps): self.edges[fromID][toID]  = EdgeProps;

        except Exception as error:
             print(str(error))
        
    def load(self,path:str):
        try:
            with open(path) as json_file:
                parsedData = json.load(json_file)
                if(not parsedData.get("edges")): raise Exception("there is no edges in the file")
                if(not parsedData.get("vertices")): raise Exception("there is no vertices in the file")
        
            self.edges = parsedData.get("edges")
            self.vertices = parsedData.get("vertices")
            print("loaded edges and vertices")
        except Exception as error:
            print(str(error))
            
    def save(self,path:str):
        try:
            json_object = json.dumps({
                "edges": self.edges,
                "vertices": self.vertices
            }, indent=4)
            
            with open(path, "w") as outfile:
                outfile.write(json_object)
        except Exception as error:
            print(str(error))

# HEURISTIC SEARCH

## GRAPH ALGORITHM

In [10]:
class GraphAlgorithm(Graph):
    def __init__(self):
        super().__init__()
        
    def breadthFirstSearch(self,fromID:str,toID:str):
        try:
            #Validation and parsing
            if(not fromID): raise Exception("fromID is required")
            if(not toID): raise Exception("toID is required")
            fromID = str(fromID)
            toID = str(toID)
    
            if(not self.edges.get(fromID)) : raise Exception(f"there is no edges start with fromID : {fromID}")
            fromIDs = list(self.edges.keys())
            found = [from_id for from_id in fromIDs if self.edges.get(from_id).get(toID)]
            if(len(found) == 0): raise Exception(f"there is no edges end with toID : {toID}")

            # early return
            if(fromID==toID): return {"res":{"explored":[fromID], "path":[toID]}}

            #Initialization
            queue = Queue()
            queue.enqueue(fromID)
            visited = []
            backTraceEdge = {}
            

            #iteration : explore path
            while(not queue.isEmpty() ):
                currentNodeID = queue.dequeue()

                if(currentNodeID not in visited): visited.append(currentNodeID)
                
                #early pruning
                if(currentNodeID == toID): break
                if(not self.edges.get(currentNodeID)): continue
                nextNodeIDs = list(self.edges.get(currentNodeID).keys()) 

                for nextNodeID in nextNodeIDs:
                    if(nextNodeID not in visited ): backTraceEdge[nextNodeID] = currentNodeID
                    if(nextNodeID not in visited):  queue.enqueue(nextNodeID)

            #iteration : back Tracing
            backTracePath = Queue([])
            backTracePath.enqueue(toID)
            currentID = toID
            reached = False
            
            while(not reached):
                previousID = backTraceEdge.get(currentID); 
                if(previousID not in backTracePath.array): backTracePath.enqueue(previousID);
                if(previousID==fromID): reached = True
                if(previousID==fromID): break #stopping condition
                currentID = previousID
                    
            backTracePath.reverse()
            return {"res":{"explored":visited, "path":backTracePath.array}}
        
        except Exception as error:
            return {"err":str(error), "errInfo":["class GraphAlgorithm","method breadthFirstSearch"]}
        
    def depthFirstSearch(self,fromID:str,toID:str):
        try:
            #Validation and parsing
            if(not fromID): raise Exception("fromID is required")
            if(not toID): raise Exception("toID is required")
            fromID = str(fromID)
            toID = str(toID)
    
            if(not self.edges.get(fromID)) : raise Exception(f"there is no edges start with fromID : {fromID}")
            fromIDs = list(self.edges.keys())
            found = [from_id for from_id in fromIDs if self.edges.get(from_id).get(toID)]
            if(len(found) == 0): raise Exception(f"there is no edges end with toID : {toID}")

            # early return
            if(fromID==toID): return {"res":{"explored":[fromID], "path":[toID]}};

            #Initialization
            stack = Stack()
            stack.push(fromID)
            visited = []
            backTraceEdge = {}
            

            #iteration : explore path
            while(not stack.isEmpty() ):
                currentNodeID = stack.pop()

                if(currentNodeID not in visited): visited.append(currentNodeID)

                if(not self.edges.get(currentNodeID)): continue
                nextNodeIDs = list(self.edges.get(currentNodeID).keys()) 

                for nextNodeID in nextNodeIDs:
                    if(nextNodeID not in visited): backTraceEdge[nextNodeID] = currentNodeID
                    if(nextNodeID not in visited):  stack.push(nextNodeID)
                
                #early pruning
                if(currentNodeID == toID): break


            #iteration : back Tracing
            backTracePath = Queue([])
            backTracePath.enqueue(toID)
            currentID = toID
            reached = False
            while(not reached):
                previousID = backTraceEdge[currentID]; 
                backTracePath.enqueue(previousID)
                if(previousID==fromID): reached = True
                if(previousID==fromID): break; #stopping condition
                currentID = previousID
            backTracePath.reverse()
            return {"res":{"explored":visited, "path":backTracePath.array}}
        
        except Exception as error:
            return {"err":str(error), "errInfo":["class GraphAlgorithm","method depthFirstSearch"]}
        
    def getRandomVerticesID(self,size=5):
        try:
            randomIDs = []
            sampleSpace = list(self.vertices.keys())
            sampleSize = len(sampleSpace)
            
            while(len(randomIDs) != size):
                randPosition = int(np.floor(np.random.rand()*sampleSize))
                randID = sampleSpace[randPosition]
                found = [d for d in randomIDs if d == randID]
                if len(found)==0 : randomIDs.append(randID)
            return {"res": randomIDs}
        except Exception as error:
            return {"err":str(error), "errInfo":["class GraphAlgorithm","method getRandomVerticesID"]}

# TRAIN NETWORK CLASS

In [11]:
class TrainNetwork(GraphAlgorithm):
    def __init__(self):
        super().__init__()

    def dataLoadingAndPreprocessing(self):
        try:
            # initialization
            Vertices = []
            Edges = []
            
            #load data
            data = loadData()
            keys = list(data.keys())
            values = list(data.values())
            
            #process data
            for key in keys:
                for value in values:
                    if value.get(key) :
                        stations = value.get(key)
                        for i in range(len(stations)):
                            Vertices.append({ 
                                "id": stations[i]["id"],
                                "stationName": stations[i]["stationName"], 
                                "coordinate": stations[i]["coordinate"] 
                            })

                            #Station Edges
                            if(i<len(stations)-1):
                                fromID = stations[i]["id"]
                                toID = stations[i+1]["id"]

                                disRes = HaversineDist(stations[i]["coordinate"],stations[i+1]["coordinate"])
                                if(disRes.get("err")): raise Exception(f'haversine dist error : {str(disRes["err"])}' )
                                distance = disRes["res"]

                                #Bidirectional
                                Edges.append({ "fromID":fromID, "toID":toID, "distance":distance});
                                Edges.append({ "toID": fromID, "fromID": toID, "distance":distance});

                            # Interchange Station Edges
                            if(stations[i].get("interchangeStation")):
                                for itc in stations[i]["interchangeStation"]:
                                    distRes = HaversineDist(stations[i]["coordinate"],itc["coordinate"])
                                    if(distRes.get("err")): raise Exception("exec2 "+str(distRes.get("err")))
                                    dist = distRes["res"]

                                    Edges.append({"fromID": stations[i]["id"], "toID": itc["id"], "distance":dist})
                                 
            return {"Vertices":Vertices,"Edges":Edges}
 
                    
        except Exception as error:
            print("dataLoadingAndPreprocessing error: ",error)
            return {"err":str(error)}
            
    def verticesAndEdgesPopulation(self):
        try:
            loadRes = self.dataLoadingAndPreprocessing()
            if(loadRes.get("err")): raise Exception(loadRes.get("err"))
            Vertices = loadRes["Vertices"]
            Edges = loadRes["Edges"]
                
            for v in Vertices:
                self.addVertices(v)

            for e in Edges:
                fromID = e["fromID"] 
                toID = e["toID"]
                distance = e["distance"]
                self.addEdges(fromID, toID, {"distance":distance})

        except Exception as error:
            print(" error : ", str(error))

In [12]:
trainNetwork = TrainNetwork()
# trainNetwork.verticesAndEdgesPopulation()
# trainNetwork.save("trainNetwork.save.json")
trainNetwork.load("trainNetwork.save.json")
randRes = trainNetwork.getRandomVerticesID(2)

if(randRes.get("err")): print(randRes)
randIDs = randRes.get("res")
searchRes = trainNetwork.breadthFirstSearch(randIDs[0],randIDs[1])
if(searchRes.get("err")) : print(searchRes)
print(randIDs[0],"-->",randIDs[1])
trainPath = searchRes["res"]["path"]
print("trainPath : \n",trainPath)

loaded edges and vertices
SP4 --> KG04
trainPath : 
 ['SP4', 'AG4', 'AG3', 'PY17', 'PY16', 'PY15', 'PY14', 'PY13', 'PY12', 'PY11', 'PY10', 'PY09', 'PY08', 'PY07', 'PY06', 'PY05', 'PY04', 'PY03', 'PY01', 'KG04']


# Data visualization 
### DataFrame

In [13]:
pathData = [trainNetwork.vertices[stationID] for stationID in trainPath]
df = pd.DataFrame(pathData)
df

Unnamed: 0,id,stationName,coordinate
0,SP4,PWTC,"{'lat': '3.1665923', 'lon': '101.6936275'}"
1,AG4,PWTC,"{'lat': '3.1665923', 'lon': '101.6936275'}"
2,AG3,Titiwangsa,"{'lat': '3.1735082', 'lon': '101.6953433'}"
3,PY17,Titiwangsa,"{'lat': '3.17437', 'lon': '101.69586'}"
4,PY16,Sentul Barat,"{'lat': '3.17906', 'lon': '101.68519'}"
5,PY15,Jalan Ipoh,"{'lat': '3.18933', 'lon': '101.68117'}"
6,PY14,Kentonmen,"{'lat': '3.19549', 'lon': '101.67963'}"
7,PY13,Kampung Batu,"{'lat': '3.20482', 'lon': '101.6756'}"
8,PY12,Sri Delima,"{'lat': '3.20699', 'lon': '101.6658'}"
9,PY11,Jinjang,"{'lat': '3.2095', 'lon': '101.6558'}"


# Map Visualization

In [14]:
from folium import folium , Marker, PolyLine, CircleMarker

locations = [
    {
        "location":[
            float(path_data.get("coordinate").get("lat")),
            float(path_data.get("coordinate").get("lon"))
        ],
        "popup":path_data.get("stationName")
    }
    for path_data in pathData
]

lat = 0
lon = 0
points = []
for loc in locations:
    lat += loc.get("location")[0]
    lon += loc.get("location")[1]
    points.append(loc.get("location"))
centrold = [lat/len(locations),lon/len(locations)]

Map =  folium.Map(location=centrold, zoom_start = 12)
PolyLine(points,color="red", weight=3.5, opacity=1).add_to(Map)

for loc in locations:
    #Marker(location=loc.get("location"), popup=loc.get("popup")).add_to(Map)
     CircleMarker(
        location=loc.get("location"),
        popup=loc.get("popup"),
        radius=3,
        color='red',
        fill=True,
        fill_color='red'
   ).add_to(Map)
    
Map