# Projektaufgabe - Roundtrip-Path

Implementieren Sie einen Roundtrip-Path Planer

1. Gegeben sind  
    1. Startposition  
    2. Mehrere Endpositionen
    3. Das Interface des Roundtrip-Path-Planners soll sich *nicht* von den anderen Bahnplanern unterscheiden.
    4. Der zu verwendete Bahnplanungsalgorithmus soll wählbar sein und geeignet übertragen werden können.
    5. Ausgabe: Ein kollisionsfreier Pfad, der von der gegebenen Startposition alle Endpositionen genau einmal erreicht und wieder zur Startposition zurückführt. Dabei soll der Gesamtpfad möglichst kurz sein. Koordinieren Sie den Pfad geeignet (z.B. farblich), so dass sich erkennen lässt, was Start-Punkt, kollisionsfreier Zwischenpfad und Zielpunkte sind. \
    *Anmerkung: NetworkX hat prinzipiell die Möglichkeit auf einem gegebenen Graph dieses Problem zu lösen, sogar auf verschiedene Arten und Weisen. Sie können dies nutzen*

In [None]:
import networkx as nx
from IPPerfMonitor import IPPerfMonitor

import math
from typing import List, Any
import numpy as np
from numpy.typing import NDArray

import matplotlib.pyplot as plt
import traceback
from IPBenchmark import Benchmark
from shapely.geometry import Point, Polygon, LineString
from typing import Tuple, Type, Callable
from networkx.classes.reportviews import NodeView


import IPTestSuite as ts
from IPEnvironment import CollisionChecker
from IPVISBasicPRM import basicPRMVisualize
from IPVISLazyPRM import lazyPRMVisualize
from IPVISVisibilityPRM import visibilityPRMVisualize
from IPVISLazyPRM import lazyPRMVisualize
from IPVISVisibilityPRM import visibilityPRMVisualize
from IPVisibilityPRM import VisPRM
from IPBasicPRM import BasicPRM
from IPLazyPRM import LazyPRM

from IPEnvironmentKin import animateSolution
import IPLazyPRM
import IPVISLazyPRM
from IPPlanarManipulator import PlanarJoint, PlanarRobot
from IPEnvironmentKin import planarRobotVisualize
from IPEnvironmentKin import KinChainCollisionChecker

In [None]:
class Roundtrip_path_planner:
    """
    A class to plan a roundtrip path that visits all start and goal nodes.
    """


    def __init__(self, path_planner):
        assert hasattr(path_planner, "planPath"), "path_planner must have a method called 'planPath'"
        self.graph = nx.Graph() # graph to store all paths between start and goal nodes
        self.tsg_graph = nx.Graph() # graph to store the Traveling Salesman Graph (TSG) containing only start and goal nodes. Each edge contains the actual path between the nodes and the respective summed weight.
        self.path_planner = path_planner
        self._collisionChecker = path_planner._collisionChecker if hasattr(path_planner, "_collisionChecker") else None


    def _compute_path_length(self, graph: nx.Graph, path: List[Any]) -> float:
        """
        Computes the total length of a path in the graph.
        Args:
            graph (nx.Graph): The graph containing the path.
            path (List[Any]): A list of nodes representing the path.
        returns:
            float: The total distance of the path. Returns 0.0 if the path is empty or has less than 2 nodes.
        """
        if not path or len(path) < 2:
            return 0.0
        total_distance = sum(
            math.dist(graph.nodes[u]['pos'], graph.nodes[v]['pos'])
            for u, v in zip(path[:-1], path[1:])
        )
        return total_distance


    

    @IPPerfMonitor
    def planPath(self, startList: List[List[Any]], goalList: List[List[Any]], config) -> List[Any]:
        """
        Plans a roundtrip path that visits all start and goal nodes.
        Args:
            startList (array): start position in planning space. E.g. [[1,2]]
            goalList (array) : goal position in planning space. E.g. [[3,4]]
            config (dict): dictionary with the needed information about the configuration options

        Returns:
            List[List[Any]]: A list representing the roundtrip path visiting all goals and returning to the start.
        """
        start_goal_coordinates = startList + goalList # e.g. [[1,2], [3,4]]
        # add start and target nodes to the graph
        for i, coordinates in enumerate(start_goal_coordinates):
            self.tsg_graph.add_node(i, pos=coordinates)
        # get added target nodes
        target_nodes = self.tsg_graph.nodes
        # Copy the TSG graph to the main graph
        self.graph = self.tsg_graph.copy()
        
        
        # for each target nodes pair compute the path between them
        for i, start_node in enumerate(target_nodes):
            for j, goal_node in enumerate(target_nodes):
                if j <= i:
                    continue
                # plan path from start_node to goal_node
                path: List[Any] = self.path_planner.planPath([self.tsg_graph.nodes[start_node]['pos']], [self.tsg_graph.nodes[goal_node]['pos']], config)
                # Relabel nodes so composition does not merge different nodes (different pos)
                mapping = {node: (start_node if node == "start" else goal_node if node == "goal" else f'{start_node}_{goal_node}_{node}') for node in self.path_planner.graph.nodes}
                self.path_planner.graph = nx.relabel_nodes(self.path_planner.graph, mapping, copy=False)
                path = [mapping[node] for node in path]
                # Merge self.graph and planner.graph
                self.graph = nx.compose(self.path_planner.graph, self.graph)
                # Update TSG graph
                self.tsg_graph.add_edge(start_node, 
                                        goal_node, 
                                        distance=self._compute_path_length(self.graph, path), 
                                        path=path)
                

        
        # Use the Traveling Salesman Problem approximation to find a roundtrip path
        try:
            tsg_solution = np.asarray(nx.algorithms.approximation.traveling_salesman_problem(
                self.tsg_graph,
                weight='distance',
                nodes=target_nodes,
                cycle=True
            ))
        except:
            return []


        # Reconstruct the path from the TSG solution
        roundtrip_path = [0]
        for i in range(len(tsg_solution) - 1):
            path = self.tsg_graph[tsg_solution[i]][tsg_solution[i + 1]]['path']
            if tsg_solution[i] > tsg_solution[i + 1]:
                path = list(reversed(path))
            roundtrip_path.extend(path[1:])
        
        return roundtrip_path


2. Evaluieren Sie ihr Konzept mit BasicPRM, LazyPRM, VisibilityPRM anhand von mindestens 6 Benchmarkumgebungen (3 * 2-DoF Punktroboter, 3 * Planarroboter). Die Szenen der Benchmarkaufgaben für die 2-DoF Punktroboter sollen unterschiedlich aussehen und unterschiedlich schwer sein. Die Szene der Benchmarkaufgaben für die Planarroboter können gleich sein und lediglich die Anzahl der Freiheitsgrade und Start, Ziel und Zwischenpunkte verändert werden.
    1. Betrachten Sie z.B. Anzahl der Kollisionsberechnungen, Planungszeit, Roadmapgröße, Länge Lösungspfad und Stellen Sie die Ergebnisse graphisch dar und diskutieren Sie diese.
    2. Für die 2-DoF Punkt- und Planarroboter stellen Sie sowohl die Vorgehensweise des Roundtrip-Planers (welche Bahnen werden wann geplant, was ist das finale Ergebnis) als auch das Endergebnis als Animation (s. IP-10-0-PlanarManipulator.ipnyb) dar.

In [None]:
# Factory for planners
plannerFactory: dict[str, List[Any]] = dict() # plannerFactory[plannerName] = [plannerClass, configDict, visualizerFunction]

plannerFactory["basePRM"] = [
    BasicPRM, 
    {
        "radius": 3, 
        "numNodes": 300,
        "useKDTree": True
    }, 
    basicPRMVisualize]

plannerFactory["lazyPRM"] = [
    LazyPRM,
    {
        "initialRoadmapSize": 40, 
        "updateRoadmapSize": 20, 
        "kNearest": 5,
        "maxIterations": 40
    },
    lazyPRMVisualize]

plannerFactory["visibilityPRM"] = [
    VisPRM,
    {
        "ntry": 40,
        "reset_graph": True
    },
    visibilityPRMVisualize]

In [None]:
class ResultCollection (object):
    
    def __init__(self, plannerFactoryName, roadMapPlanner, planner, benchmark: Benchmark, solution, perfDataFrame):
        self.plannerFactoryName = plannerFactoryName    # e.g. "basePRM", "lazyPRM", "visibilityPRM"
        self.roadMapPlanner = roadMapPlanner            # the roundtrip path planner instance used to compute the roundtrip path
        self.planner = planner                          # the actual planner instance used to compute the path
        self.benchmark = benchmark                      # the benchmark used to compute the path
        self.solution = solution                        # the computed path as a list of nodes
        self.perfDataFrame = perfDataFrame              # performance data collected during the planning process

In [None]:
# TODO: Debugging needed
def RoadMapPlannerVisualize(
        roundtrip_planner,
        benchmark: Benchmark, 
        solution: List[Any],
        planner_visualizer: Callable) -> None:
    """
    Visualizes the roundtrip path planner and the solution path.
    Args:
        roundtrip_planner: The roundtrip path planner instance.
        benchmark: The benchmark used.
        solution (List[Any]): The computed path as a list of nodes.
        planner_visualizer (Callable): The visualizer function for the path planner.
    """
    #try:
    fig_local = plt.figure(figsize=(10,10))
    ax = fig_local.add_subplot(1,1,1)
    title = roundtrip_planner.path_planner.__class__.__name__
    title += " " + benchmark.name
    if solution == []:
        title += " (No path found!)"
    title += "\n Assumed complexity level " + str(benchmark.level)
    ax.set_title(title)
    planner_visualizer(roundtrip_planner.path_planner, [], ax=ax, nodeSize=50)

    pos = nx.get_node_attributes(roundtrip_planner.graph, 'pos')
    target_positions = benchmark.startList + benchmark.goalList

    # TODO: color solution path of roadmap_planner, as this is not done by the visualizer function of the path planner
    if solution != []:
        # draw nodes based on solution path
        Gsp = nx.subgraph(roundtrip_planner.graph, solution)
        # draw edges based on solution path
        nx.draw_networkx_edges(Gsp, pos, alpha=0.8, edge_color='g', width=10)

    # Color start and goal nodes, as they are not named "start" and "end" in the path planners solution
    target_nodes = range(len(target_positions)) # assuming that the first nodes are start and goal nodes
    nx.draw_networkx_nodes(roundtrip_planner.graph, 
                            pos,
                            nodelist=target_nodes,
                            node_size=300,
                            node_color='#00dd00', 
                            ax = ax)
    
    # Map all target nodes (start + goals) to label "G"
    labels = {node: "G" for node in target_nodes}
    labels[0] = "S"  # Label the first node as "S" for start
    nx.draw_networkx_labels(roundtrip_planner.graph, pos, labels=labels, ax=ax)
            

    #except Exception as e:
    #    print("ERROR: ",benchmark.name, e)
    #    traceback.print_exc()




In [None]:
def run_benchmarks(roadmapPlannerClass, plannerFactory) -> List[ResultCollection]:
    """
    Runs all planners on the benchmarks and collects the results.
    Args:
        roadmapPlannerClass: The class of the roundtrip path planner to be used.
        plannerFactory (dict): A dictionary containing the planner factory, where each key is a planner name and the value is a list containing the planner class, configuration dictionary, and visualizer function.
    Returns:
        List[ResultCollection]: A list of ResultCollection objects containing the results of the path planning.
    """

    resultList: List[ResultCollection] = list()
    testList: List[Any] = ts.benchList

    for key, producer in list(plannerFactory.items()):
        print(key, producer)
        for benchmark in testList:
            plannerClass, config, visualizerFunction = producer
            print ("Planning: " + key + " - " + benchmark.name)
            planner = plannerClass(benchmark.collisionChecker)
            roadmapPlanner = roadmapPlannerClass(planner)
            IPPerfMonitor.clearData()
            try:
                resultList.append(ResultCollection(key,
                                               roadmapPlanner,
                                               planner, 
                                               benchmark,
                                               roadmapPlanner.planPath(
                                                   benchmark.startList,
                                                   benchmark.goalList,
                                                   config),
                    IPPerfMonitor.dataFrame()
                    ),
                )
            except Exception as e:
                print ("PLANNING ERROR ! PLANNING ERROR ! PLANNING ERROR ", e)
                pass
    return resultList


def plot_results(resultList: List[ResultCollection]) -> None:
    """
    Plots the results of the path planning.
    Args:
        resultList (List[ResultCollection]): A list of ResultCollection objects containing the results of the path planning.    
    """
    for result in resultList:
        try:
            RoadMapPlannerVisualize(
            roundtrip_planner= result.roadMapPlanner,
            benchmark= result.benchmark, 
            solution= result.solution,
            planner_visualizer= plannerFactory[result.plannerFactoryName][2])  # visualizerFunction
        except Exception as e:
            print ("Error", e)
            pass


In [None]:
# Run the benchmarks and collect the results
resultList = run_benchmarks(Roundtrip_path_planner, plannerFactory)

In [None]:
# Plot the results
plot_results(resultList)

In [None]:
# Animations for the 2-DoF Robot benchmark solutions
# TODO: Add titles
# TODO: Fix Errors
def animatePlanarRobotResults(resultList):
    for result in resultList:
        checker = result.benchmark.collisionChecker
        if isinstance(checker, KinChainCollisionChecker):
            animateSolution(result.planner, 
                        checker, 
                        result.solution, 
                        plannerFactory[result.plannerFactoryName][2])

In [None]:
animatePlanarRobotResults(resultList)

3. In einem weiteren Schritt entwickeln Sie eine spezielle Variante auf Basis des VisibilityPRM. Wie Sie wissen, ist dieser für Multi-Query Anfragen vorteilhaft nutzbar. Nutzen Sie dies für das Roundtrip-Konzept. Hier müssen Sie eventuell die Art wie Sie den Visibility-PRM aufrufen geschickt verändern. Führen Sie ebenfalls die Evaluation an den Benchmarkumgebungen durch und vergleichen Sie die Ergebnisse mit den Versionen aus 2.) 

In [None]:
class Multi_query_roundtrip_path_planner:

    def __init__(self, path_planner):
        assert hasattr(path_planner, "planPath"), "path_planner must have a method called 'planPath'"
        self.graph = nx.Graph() # graph to store all paths between start and goal nodes
        self.path_planner = path_planner
        self._collisionChecker = path_planner._collisionChecker if hasattr(path_planner, "_collisionChecker") else None


    def _compute_path_length(self, graph: nx.Graph, path: List[Any]) -> float:
        """
        Computes the total length of a path in the graph.
        Args:
            graph (nx.Graph): The graph containing the path.
            path (List[Any]): A list of nodes representing the path.
        returns:
            float: The total distance of the path. Returns 0.0 if the path is empty or has less than 2 nodes.
        """
        if not path or len(path) < 2:
            return 0.0
        total_distance = sum(
            math.dist(graph.nodes[u]['pos'], graph.nodes[v]['pos'])
            for u, v in zip(path[:-1], path[1:])
        )
        return total_distance


    

    @IPPerfMonitor
    def planPath(self, startList: List[List[Any]], goalList: List[List[Any]], config) -> List[Any]:
        """
        Plans a roundtrip path that visits all start and goal nodes.
        Args:
            startList (array): start position in planning space. E.g. [[1,2]]
            goalList (array) : goal position in planning space. E.g. [[3,4]]
            config (dict): dictionary with the needed information about the configuration options

        Returns:
            List[List[Any]]: A list representing the roundtrip path visiting all goals and returning to the start.
        """
        start_goal_coordinates = startList + goalList # e.g. [[1,2], [3,4]]
        # add start and target nodes to the graph
        for i, coordinates in enumerate(start_goal_coordinates):
            self.graph.add_node(i, pos=coordinates)
        # get added target nodes
        target_nodes: nx.classes.reportviews.NodeView = self.graph.nodes
        
        
        # Compute path from start to each other target node.
        start_node = 0
        for goal_node in range(1, len(target_nodes)):
            # plan path from start_node to goal_node
            path: List[Any] = self.path_planner.planPath([self.graph.nodes[start_node]['pos']], [self.graph.nodes[goal_node]['pos']], config)
            # Relabel nodes so composition does not merge different nodes (different pos)
            mapping = {node: (start_node if node == "start" else goal_node if node == "goal" else f'{start_node}_{goal_node}_{node}') for node in self.path_planner.graph.nodes}
            self.path_planner.graph = nx.relabel_nodes(self.path_planner.graph, mapping, copy=False)
            path = [mapping[node] for node in path]
            # Merge self.graph and planner.graph
            self.graph = nx.compose(self.path_planner.graph, self.graph)
            if not path:
                return []

        
        # Use the Traveling Salesman Problem approximation to find a roundtrip path
        # Compered to the single queue approach, we apply the TSP solver to the graph itself.
        tsg_solution = np.asarray(nx.algorithms.approximation.traveling_salesman_problem(
            self.graph,
            weight='distance',
            nodes=target_nodes,
            cycle=True
        ))

        if len(tsg_solution) < 2:
            return []
        
        return list(tsg_solution)


In [None]:
# Factory for multi-query planners
multiqueuePlannerFactory: dict[str, List[Any]] = dict() # plannerFactory[plannerName] = [plannerClass, configDict, visualizerFunction]

multiqueuePlannerFactory["visibilityPRM"] = [
    VisPRM,
    {
        "ntry": 40,
        "reset_graph": False
    },
    visibilityPRMVisualize]

In [None]:
# Run the benchmarks and collect the results
multi_query_result_list = run_benchmarks(Multi_query_roundtrip_path_planner, multiqueuePlannerFactory)

In [None]:
# Plotting the results
plot_results(multi_query_result_list)

In [None]:
animatePlanarRobotResults(multi_query_result_list)

In [None]:
#TODO: Compare results from 3. and 2.

Erkläutern Sie bitte zudem im Endbericht (mind. 1 Seite):
1. Wie funktioniert der Roundtrip-Planer, den Sie verwenden im Detail und warum haben Sie diesen gewählt.
2. Wie können Sie die Bewegungsbahnen optimieren/Glätten? Erläutern SIe kurz eine mögliche Vorgehensweise.

Anmerkung: Bitte checken Sie das Notebook "IP-X-0-Benchmarking-concept.ipnyb" und "IP-X-1-Automated_PlanerTest.ipnyb" für Profiling und Statistiken.

TODO: Endbericht

TODOs
- Erstellen von benchmarks, die tatsächlich mehrere Goals haben. Auch schauen, dass die von der erwarteten Schwierigkeit für einen roudtrip planner passen.
- Beide Ansätze vergleichen. Statistiken und so.
- Bei Aufgabe 3. ist der finale Weg nicht komplett grün markiert. Es fehlet immer das erste und letzte Fragment.
- Manche Plots des naiven planers sehen komisch aus... Was lief da schief?
- Für 2 DoF-Roboter ist ein simpler benchmark jetzt in IPTestSuite eingebunden. Dafür auch welche in verschiedenen Leveln mit mehreren Goals hinzufügen.
- Die 2 DoF-Roboter ergebnisse sind bisher nur im Configurationsraum visualisiert. Besser: Auch irgendwo einmal den Arbeitsraum als Video Zeigen.
- Kompletten Bericht schreiben (Hier muss viel Arbeit reingesteckt werden, weil das am Ende hauptsächlich angeschaut wird)
- naiver plotter reseted nach jedem subpath seinen graph. Muss das sein? Evtl anpassen.