In [2]:
%matplotlib inline
import googlemaps
from datetime import datetime
import requests
import folium
from folium.vector_layers import PolyLine
import polyline
from geopy import distance
import csv

# Replace with your Google Map API key here
gmaps = googlemaps.Client(key="YOUR_API")

OVERPASS = "http://overpass-api.de/api/interpreter"

In [8]:
MAX_SEGMENT_LENGTH = 100
FOCUS_POINT_THRESHOLD = 40
ZOOM_LEVEL = 16
CSV_FILE_LOC = "./output.csv"


map = folium.Map(prefer_canvas=True, tiles="cartodbpositron")
t = folium.TileLayer("cartodbpositron").add_to(map)

class Node:
    def __init__(self, lat, lon, _type):
        self.loc = [lat, lon]
        self.type = _type
    
    def closeTo(self, loc, direction):
        if(distance.distance(loc, self.loc).m < FOCUS_POINT_THRESHOLD):
            if(loc[0] - self.loc[0] >0) == direction[0] and (loc[1] - self.loc[1] >0) == direction[1]:
                return True
        return False 

    def __repr__(self):
        return self.__str__()
    def __str__(self):
        return "Location: " + ' '.join([str(value) for value in self.loc])

class Way:
    def __init__(self,start_pos, end_pos, distance, duration, polyline):
        self.p = polyline
        self.way_locs = []
        self.start_pos = start_pos
        self.end_pos = end_pos
        self.distance = distance
        self.duration = duration
        self.tags = ""
        self.wayid = 0
        self.name = None
        self.lane = None
        self.maxspeed = None
        self.type_of_road = None
        self.distance = []
        self.slope = []
        self.focus_points=[]
        self.elevation = []
        self.current =[]
        self.free = []
        self.sum_weight = 0
        self.decode()
        self.getElevation()
        self.getRealTimeTraffic()
        self.getWayAttr()
        # if(self.sum_weight != 0):
        self.duration = self.duration / (len(self.way_locs) -1)

    def getElevation(self):
        for p in self.way_locs:
            self.elevation.append([i["elevation"] for i in gmaps.elevation(p)])


    def getWayAttr(self):
        if(self.wayid == 0): self.reverseGeoCoding()
        print(self.wayid)
        query_sentence = """[out:json];
            way(%s);
            (
            ._;
            >;
            );
            out tags;"""
        response = requests.get(OVERPASS, params={'data': query_sentence % self.wayid})
        # print("way", response.status_code)
        counter  = 1
        while(counter < 5 and response.status_code != 200):
            print("Query Road Attribute Timeout, Retrying ", counter)
            counter +=1
            response = requests.get(OVERPASS, params={'data': query_sentence % self.wayid})
        if(response.status_code != 200):
            print("OVERPASS QUERY ERROR: Status Code ", response.status_code)
            return
        response = response.json()
        self.tags = response["elements"][-1]["tags"]
        if("name" in self.tags):
            self.name = self.tags["name"]
        if "maxspeed" in self.tags:
            self.maxspeed = self.tags["maxspeed"]
        if "highway" in self.tags:
            self.type_of_road = self.tags["highway"]
        if "lane" in self.tags:
            self.lane = self.tags["lanes"]
        self.getFocusPoint()
        

    def getFocusPoint(self):
        if(self.name is None):
                return
        query = """[out:json];way["name"="%s"]""" %self.name
        if(self.lane != None):
            query = query +"""["lanes"="%s"]""" %self.lane
        query = query + """->.sameway;foreach.sameway->.this_way{node(w.this_way)->.this_ways_nodes;node.this_ways_nodes(if:is_tag("highway")||is_tag("railway"));way.linked_ways["highway"]->.linked_ways;out;}"""
        response = requests.get(OVERPASS, params={'data': query})
        counter  = 1
        while(counter < 5 and response.status_code != 200):
            print("Query Focus Point Timeout, Retrying ", counter)
            counter +=1
            response = requests.get(OVERPASS, params={'data': query})
        if(response.status_code != 200):
            print("OVERPASS QUERY ERROR: Status Code ", response.status_code)
            return
        # print(response.status_code)
        response = response.json()
        for element in response["elements"]:
            t = None
            if ("highway" in element["tags"]):
                t = element["tags"]["highway"]
            if ("crossing" in element["tags"]):
                t = "crossing"
            if("railway" in element["tags"]):
                t = element["tags"]["railway"]

            self.focus_points.append(Node(element["lat"], element["lon"], t))
        return

    def reverseGeoCoding(self):
        test = "https://nominatim.geocoding.ai/reverse?lat=%s&lon=%s&zoom="+str(ZOOM_LEVEL)+"&format=jsonv2"
        # Zoom 16 to filter building
        # might sitll locate to wrong place 
        # (instead of way we are interested, try use another location or make the zoom be lower)
        l = int(len(self.way_locs)/2)
        response = requests.get(test%(self.way_locs[l][0], self.way_locs[l][1]))
        response = response.json()
        # print(response)
        self.wayid = response["osm_id"]

    def getRealTimeTraffic(self):
        # Replace with your TomTom API key here
        key = "YOUR_API"
        for loc in self.way_locs:
            query = """https://api.tomtom.com/traffic/services/4/flowSegmentData/relative0/16/json?point=%s%%2C%s&unit=KMPH&openLr=false&key="""
            query = query%(loc[0], loc[1]) + key
            response = requests.get(query)
            response = response.json()
            # print(response["flowSegmentData"]["currentSpeed"])
            # print(response["flowSegmentData"]["freeFlowSpeed"])
            self.current.append(float(response["flowSegmentData"]["currentSpeed"]))
            self.free.append(float(response["flowSegmentData"]["freeFlowSpeed"]))
            self.sum_weight = self.sum_weight + (self.current[-1] / self.free[-1])

    def decode(self):
        tmp = polyline.decode(self.p)
        sz = len(tmp) -1;
        idx = 0
        while idx < sz :
            distance_geopy = distance.distance(tmp[idx], tmp[idx+1]).m
            # print(distance_geopy)
            while(distance_geopy > MAX_SEGMENT_LENGTH): 
                tmp.insert(idx+1, ((tmp[idx][0]+tmp[idx+1][0])/2, (tmp[idx][1]+tmp[idx+1][1])/2))
                distance_geopy = distance.distance(tmp[idx], tmp[idx+1]).m
            sz = len(tmp)-1
            idx += 1

        self.distance = [0]    
        for i in range(len(tmp) - 1):
            d = distance.distance(tmp[i], tmp[i+1]).m
            self.distance.append(d)

        self.way_locs = tmp

    def __str__(self):
        data = (' '.join([str(value) for value in self.start_pos]), 
        ' '.join([str(value) for value in self.end_pos]),
        self.distance,
        self.duration)
        format_string = "Start loc: %s \nEnd Location: %s \nDistance: %s \nDuration: %s"

        return (format_string % data)

    def export(self):
        # _filter = "traffic_signals"
        with open(CSV_FILE_LOC, 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            idx = 0
            for idx in range(1, len(self.way_locs)):
                weight = (self.current[idx]/self.free[idx] - 0.5)
                duration = None
                # weight should not equal to zero
                if weight < 0: 
                    duration = self.duration * (1 + weight)
                else:
                    duration = self.duration * weight;
                isTrafficSignal = False
                isCrossing = False
                isBusStop = False
                isOther = False
                for node in self.focus_points:
                    # if(node.type == _filter):
                    #     continue
                    if(node.closeTo(self.way_locs[idx], 
                    [self.way_locs[idx][0] - self.way_locs[idx-1][0] > 0, self.way_locs[idx][1] - self.way_locs[idx-1][1] > 0])):
                        if node.type == "traffic_signals":
                            isTrafficSignal = True
                        elif node.type == "crossing":
                            isCrossing = True
                        elif node.type == "bus_stop":
                            isBusStop = True
                        else:
                            isOther = True
                        if isBusStop and isCrossing and isTrafficSignal and isOther:
                            break
                writer.writerow([
                self.way_locs[idx][0],
                self.way_locs[idx][1],
                self.elevation[idx][0],
                self.distance[idx],
                float(self.elevation[idx][0] -self.elevation[idx-1][0])/ self.distance[idx],
                isTrafficSignal,
                isCrossing,
                isBusStop,
                isOther,
                self.name,
                self.maxspeed,
                self.lane,
                self.type_of_road,
                self.current[idx]/self.free[idx],
                duration
                ])
            

    def plot_map(self):
        folium.Marker(location = self.start_pos, popup=self.name).add_to(map)
        folium.Marker(location = self.end_pos).add_to(map)
        idx = 0
        for intermidate_point in self.way_locs:
            grade = 0
            if idx > 0: 
                grade = (self.elevation[idx][0] - self.elevation[idx-1][0]) / (self.distance[idx])
            tag = "Distance to previous point: "+ str(self.distance[idx]) + \
            "\nElevation: " + str(self.elevation[idx]) + \
            "\nGrade: "  + str(grade) + \
            "\nTraffic indicator: " + str(self.current[idx]/self.free[idx])
            folium.CircleMarker(
            location=intermidate_point, radius=2, fill=False, color='red', opacity=0.2, popup= tag).add_to(map)
            idx += 1
        p = PolyLine(locations=polyline.decode(self.p), popup=self.tags, opacity=0.7).add_to(map)

        for focus_node in self.focus_points:
            folium.CircleMarker(
                location= focus_node.loc,
                color = 'green', radius=5,popup=focus_node.type
            ).add_to(map)

        return map
            




In [6]:

header = ['lat', 'lng', 'elevation', 'distance_to_previous_point', 'gradient', 
'traffic_signals', 'crossing', 'bus_stops','other', 'road_name',
'speedlimit', 'lane', 'type_of_road', 'realtime_traffic', 'duration']
now = datetime.now()
print(now)
with open(CSV_FILE_LOC, 'w', encoding='UTF8', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(header)
    writer.writerow([None, None, None, None, None, None,None, None, None, None, None, None, None,now])
directions_result = gmaps.directions(
    "Goteborg, SE",
    "Bor√•s, SE",
    mode="driving",
    departure_time=now,
    alternatives= True
)

# print(directions_result[0]["overview_polyline"]['points'])
ne_lat = float(directions_result[0]["bounds"]["northeast"]["lat"])
ne_lng = float(directions_result[0]["bounds"]["northeast"]["lng"])
sw_lat = float(directions_result[0]["bounds"]["southwest"]["lat"])
sw_lng = float(directions_result[0]["bounds"]["southwest"]["lng"])

map.fit_bounds([[ne_lat, ne_lng], [sw_lat, sw_lng]])



2022-05-23 14:33:00.442026


In [10]:
ways = []
print(len(directions_result[0]["legs"][0]["steps"]))
for i, leg in enumerate(directions_result[0]["legs"][0]["steps"]):
    ways.append(
        Way(([leg["start_location"]["lat"], leg["start_location"]["lng"]]),
        ([leg["end_location"]["lat"], leg["end_location"]["lng"]]),
        leg["distance"]["value"],
        leg["duration"]["value"],
        leg["polyline"]["points"]))
    ways[i].export()
    ways[i].plot_map()

map


20
150285001
Query Road Attribute Timeout, Retrying  1
347540891
Query Road Attribute Timeout, Retrying  1
150284902
Query Road Attribute Timeout, Retrying  1
Query Road Attribute Timeout, Retrying  2
95093109
Query Road Attribute Timeout, Retrying  1
30049012
23408361
1022912231


ZeroDivisionError: float division by zero