In [31]:
import os

from dotenv import load_dotenv
from geojson import FeatureCollection
from mapboxgl.utils import create_color_stops
from mapboxgl.viz import LinestringViz

from src.geometry.geometry import shorten_line
from src.helpers.helpers import format_matrix
from src.map.map import satellite_style
from src.osrm.interface import OSRMInterface
from src.routing_problem.connection import LaneletConnection
from src.routing_problem.creator.creator import create_routing_problem
from src.routing_problem.routing_problem import RoutingProblem

In [2]:
% load_ext autoreload
% autoreload 2

In [3]:
load_dotenv()
token = os.getenv('MAPBOX_ACCESS_TOKEN')

In [4]:
routing_problem: RoutingProblem = create_routing_problem("data/stuttgart.osm")
geojson = FeatureCollection(features=routing_problem.to_json())

viz = LinestringViz(data=geojson,
                    color_default="#3ad21b",
                    line_width_default='2',
                    center=(9.092, 48.731),
                    zoom=18,
                    style=satellite_style)
viz.show()



In [29]:
from src.routing_problem.maneuver.maneuver_type import ManeuverType
from src.routing_problem.maneuver.modifier import ManeuverModifier

# Load routing problem
routing_problem: RoutingProblem = create_routing_problem("data/stuttgart.osm")

# Shorten lanelets
for lanelet in routing_problem.lanelets:
    if lanelet.get_length() > 50:
        lanelet.nodes = shorten_line(lanelet.nodes, 12, cut_beginning=True)
        lanelet.nodes = shorten_line(lanelet.nodes, 20, cut_beginning=False)
    elif lanelet.get_length() > 30:
        lanelet.nodes = shorten_line(lanelet.nodes, 5, cut_beginning=True)
        lanelet.nodes = shorten_line(lanelet.nodes, 5, cut_beginning=False)
    elif lanelet.get_length() > 10:
        lanelet.nodes = shorten_line(lanelet.nodes, 3, cut_beginning=True)
        lanelet.nodes = shorten_line(lanelet.nodes, 3, cut_beginning=False)
    elif lanelet.get_length() > 5:
        lanelet.nodes = shorten_line(lanelet.nodes, 1, cut_beginning=True)
        lanelet.nodes = shorten_line(lanelet.nodes, 1, cut_beginning=False)

maneuvers = dict()
for segment in routing_problem.segments:
    maneuvers.update(segment.next_maneuvers)

# Create connections. First for left and right turns, then for forward.
connections = []
for segment in routing_problem.segments:
    for previous_segment in segment.previous_segments:
        maneuver = maneuvers[(previous_segment.id, segment.id)]
        left = maneuver.modifier in [ManeuverModifier.Left, ManeuverModifier.SlightLeft,
                                     ManeuverModifier.SharpLeft, ManeuverModifier.UTurn]
        right = maneuver.modifier in [ManeuverModifier.Right, ManeuverModifier.SlightRight,
                                      ManeuverModifier.SharpRight]
        merge = maneuver.type == ManeuverType.Merge
        # Left
        if left and not merge or right and merge:
            for i, lanelet in enumerate(segment.lanelets):
                if i < len(previous_segment.lanelets):
                    connections.append(LaneletConnection(previous_segment.lanelets[i],
                                                         lanelet,
                                                         maneuver))
        # Right
        elif right and not merge or left and merge:
            for i, lanelet in enumerate(reversed(segment.lanelets)):
                if i < len(previous_segment.lanelets):
                    connections.append(LaneletConnection(previous_segment.lanelets[-(i + 1)],
                                                         lanelet,
                                                         maneuver))

# Now connect straights
for segment in routing_problem.segments:
    for previous_segment in segment.previous_segments:
        maneuver = maneuvers[(previous_segment.id, segment.id)]
        if maneuver.modifier != ManeuverModifier.Straight:
            continue

        lanelets_to = len(segment.lanelets)
        lanelets_from = len(previous_segment.lanelets)
        free_lanelets_to = list(filter(lambda x: not x.has_incoming_connection, segment.lanelets))
        free_lanelets_from = list(filter(lambda x: not x.has_outgoing_connection, previous_segment.lanelets))
        free_lanelets_to_count = len(free_lanelets_to)
        free_lanelets_from_count = len(free_lanelets_from)

        if lanelets_to == lanelets_from:
            for i in range(lanelets_to):
                connections.append(LaneletConnection(previous_segment.lanelets[i],
                                                     segment.lanelets[i],
                                                     maneuver))
        elif free_lanelets_to_count == lanelets_from:
            for i in range(free_lanelets_to_count):
                connections.append(LaneletConnection(previous_segment.lanelets[i],
                                                     free_lanelets_to[i],
                                                     maneuver))
        elif lanelets_to == free_lanelets_from_count:
            for i in range(lanelets_to):
                connections.append(LaneletConnection(free_lanelets_from[i],
                                                     segment.lanelets[i],
                                                     maneuver))
        elif lanelets_to > free_lanelets_from_count:
            for i in range(lanelets_to):
                connections.append(LaneletConnection(free_lanelets_from[min(i, free_lanelets_from_count - 1)],
                                                     segment.lanelets[i],
                                                     maneuver))
        elif lanelets_to < free_lanelets_from_count:
            for i in range(free_lanelets_from_count):
                connections.append(LaneletConnection(free_lanelets_from[i],
                                                     segment.lanelets[min(i, lanelets_to - 1)],
                                                     maneuver))

# Convert stuff to geojson
rp_json = routing_problem.to_json()
connections_json = [connection.to_json() for connection in connections]
geojson = FeatureCollection(features=rp_json + connections_json)

# Show everything on map
color_breaks = [0, 1]
color_stops = create_color_stops(color_breaks, colors=["#3ad21b", "#ff0505"])
viz = LinestringViz(data=geojson,
                    color_property="type",
                    color_stops=color_stops,
                    line_width_default='2',
                    center=(9.092, 48.731),
                    zoom=18,
                    style=satellite_style)
viz.show()



In [None]:
sources = [int(segment.id) for segment in routing_problem.segments]
destinations = sources

durations_matrix = OSRMInterface.request_table(sources, destinations, "")[1]
matrix = format_matrix(sources, destinations, durations_matrix)
