# **ROBOTALK 2025**

## STRAIGHT LINES ARE BORING, LET'S USE CLOTHOIDS

-------------------------------------------------

Οι απαιτούμενες βιβλιοθήκες για το συγκεκριμένο workshop είναι οι εξής:

In [None]:
!pip3 install networkx
!pip3 install matplotlib
!pip3 install ipywidgets
!pip3 install pyclothoids

Αφού κατεβάσαμε τις απαιτούμενες βιβλιοθήκες θα τις κάνουμε $\text{import}$:

In [None]:
import networkx as nx
from collections import deque
import matplotlib.pyplot as plt
from google.colab import output
import ipywidgets as widgets
from IPython.display import display, clear_output
from matplotlib import rcParams
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np
from pyclothoids import Clothoid, SolveG2
import math
from types import DynamicClassAttribute
import cv2
from google.colab.patches import cv2_imshow

In [None]:
# Global variables to share data between cells
G = None
positions = None
start_node = None
target_nodes = None
fpath = None

## The Track

-------------------------------------------------

Στη συνέχεια, θα φορτώσουμε τον γράφο μας και θα εξαγάγουμε τις αναγκαίες πληροφορίες του με τη βοήθεια της βιβλιοθήκης $networkx$:

In [None]:
def load_and_display_graph(graphml_file):
    """
    Load and display a graph from a GraphML file
    """
    # Load the graph
    global G, positions
    G = nx.read_graphml(graphml_file)

    # Get node positions from graph data
    positions = {
        node: (float(G.nodes[node]['x']), float(G.nodes[node]['y']))
        for node in G.nodes()
    }
    return G, positions


Αν τώρα θέλουμε να εμφανίσουμε το γράφο μπορούμε απλά να τρέξουμε:

In [None]:
G, positions = _____________ # We want to load the graphml file to be ready for display

# Create the plot
plt.figure(figsize=(12, 8))

# Draw the graph
nx.draw(G,
        pos = positions,
        with_labels = True,  # Show node labels
        node_color = 'lightblue',
        node_size = 500,
        font_size = 10,
        edge_color = 'gray')

plt.title("Track Graph Visualization", fontsize = 18)
plt.axis('equal')  # Equal aspect ratio
plt.grid(True)
plt.show()

Η συγκεκριμένη πίστα στην πραγματική ζωή είναι η παρακάτω:

In [None]:
image = cv2.imread('lab_track.jpg')  # Replace with your image file
cv2_imshow(image)

## Path Finding
-------------------------------------------------

Αφού φορτώσαμε επιτυχώς το γράφο, είμαστε έτοιμοι να εφαρμόσουμε τον $BFS$ αλγοριθμό μας ώστε να βρούμε το βέλτιστο μονοπάτι το οποίο ξεκινάει από την αφετηρία μας και περνάει από όλους τους $target$ κόμβους που έχουμε ορίσει.\
***Προσοχή:*** Επειδή ο γράφος μας είναι κατευθυνόμενος, υπάρχουν συνδυασμοί κόμβων που δεν μπορούμε να τοποθετήσουμε ως $target$ $nodes$. Σε αυτή την περίπτωση η διαδρομή μας πάλι θα σχεδιαστεί, απλά θα "πετάξει" κάποιους από τους προορισμούς μας εκτός εμφανίζοντας το αντίστοιχο μήνυμα.\
Ο BFS που γνωρίζουμε ωστόσο δεν επαρκεί για την εύρεση διαδρομής μέσω πολλαπλών στόχων.\
Αυτό που θα κάνουμε είναι μια παραλλαγή του:
* Σε κάθε βήμα, ο κύριος βρόχος τρέχει $BFS$ για κάθε εναπομείναντα στόχο, επιλέγοντας πάντα αυτόν με τη συντομότερη διαδρομή από τον τρέχοντα κόμβο.

(Προφανώς αυτή η προσέγγιση δεν είναι η βέλτιστη για το συγκεκριμένο πρόβλημα. Για έναν μικρό στατικό γράφο θα μπορούσαμε να υπολογίσουμε τις βέλτιστες διαδρομές από κάθε κόμβο προς κάθε άλλο με τον αλγόριθμο $Floyd-Warshall$, $Ο(n^3)$ και να τις αποθηκεύσουμε σε έναν μνημονικό πίνακα. Αλλιώς, μια υλοποίηση πιο κοντά στην παρούσα είναι η αντικατάσταση του $BFS$ με τον αλγόριθμο $Dijkstra$ με πολυπλοκότητα $Ο(m*n^2)$), όπου m ο αριθμός των κόμβων-στόχων.

In [None]:
def find_path_through_nodes(G, start_node, target_nodes):
    """
    Find a path that visits all target nodes using BFS
    """
    def bfs_to_target(current_node, target):
    """Find shortest path between two nodes using BFS"""

        # TODO: Initialize the queue with the starting node and its path
        # (The queue should contain tuples of (node, path_to_node))
        queue = ______([(current_node, [current_node])])

        # TODO: Initialize the set of visited nodes
        # (Start with the current_node to avoid revisiting it)
        visited = ______([current_node])

        while queue:
            # TODO: Get the next node and its path from the queue
            # (Use the appropriate method to remove from the left side of the queue)
            node, path = queue.______()

            # Check neighbors
            for next_node in G.neighbors(str(node)):
                # TODO: Convert the node ID from string to integer for easier handling
                next_node = ______(next_node)

                # TODO: Check if we've already visited this node
                if next_node ___ __ ______:
                    # TODO: Create a new path by adding this node to the current path
                    new_path = path + [______]

                    # TODO: Check if we've reached the target node
                    if next_node == ______:
                        return new_path

                    # TODO: Mark this node as visited
                    ______.add(next_node)

                    # TODO: Add the new node and its path to the queue
                    queue.______((____, new_path))

        # TODO: Return the appropriate value if no path is found
        return ______

    # Convert nodes to integers for easier handling
    target_nodes = [int(n) for n in target_nodes]
    start_node = int(start_node)

    # Initialize the final path with the start node
    final_path = [start_node]
    current_node = start_node

    # Find path to each target node in sequence
    # TODO: Create a copy of target_nodes to track remaining targets
    # (Hint: We need a new list we can modify without affecting the original)
    remaining_targets = ______.copy()

    while remaining_targets:
        # Find the closest remaining target
        best_path = None
        best_target = None

        for target in remaining_targets:
            path = bfs_to_target(current_node, target)
            # TODO: Check if this path is better than our current best path
            # (We need to check if the path exists and if it's shorter than the best path so far)
            if path and (best_path is None or len(path) < ______):
                best_path = path
                best_target = target

        # If we cannot reach any remaining target, we leave the node out of the path
        if best_path is None:
            print(f"Cannot reach remaining targets: {remaining_targets}")
            break

        # Add the path (excluding the start node which is already included)
        final_path.extend(best_path[1:])
        current_node = best_target # Change the starting node as we moved on
        remaining_targets.remove(best_target)

    global fpath
    fpath = final_path

    return final_path

Ο παρακάτω κώδικας εμφανίζει το τελικό μονοπάτι που θα ακολουθήσουμε. Για την οπτικοποίηση του υπάρχουν δύο κουμπία:
* Πατώντας το $\text{next node}$, εμφανίζουμε βήμα βήμα το δρόμο που θα ακολουθήσουμε.
* Ενώ το δεξί κουμπί μας δείχνει απευθείας το τελικό μονοπάτι.

In [None]:
def plot_interactive(nodes_to_pass_through):
    # Set figure size
    rcParams['figure.figsize'] = (12, 8)

    # Convert node IDs to strings since NetworkX uses string identifiers
    nodes_to_pass_through_str = [str(node) for node in nodes_to_pass_through]

    # Define equivalent nodes and overlapping edges
    equivalent_nodes_map = {"3": ["50", "79"], "50": ["3", "79"], "79": ["3", "50"]}
    overlapping_edges = {
        "78": ["79", "50"],
        "80": ["50", "3"],
        "4": ["79", "3"]
    }

    # Extend the path to include equivalent nodes immediately after the original node
    extended_path = []
    for node in nodes_to_pass_through_str:
        extended_path.append(node)
        if node in equivalent_nodes_map:
            extended_path.extend(equivalent_nodes_map[node])  # Add equivalent nodes

    # Use the global graph and positions
    global G, positions
    node_positions = positions
    # Start with the first node highlighted
    current_path_index = 0
    current_path = [extended_path[0]]
    current_edges = []

    def draw_graph(highlighted_nodes, highlighted_edges):
        # Clear previous plots
        plt.close('all')

        # Create new figure with compact layout
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111)
        ax.axis("equal")

        # Set node colors
        node_colors = ['green' if node in highlighted_nodes else 'skyblue' for node in G.nodes()]

        # Set edge colors
        edge_colors = []
        for edge in G.edges():
            if edge in highlighted_edges or tuple(reversed(edge)) in highlighted_edges:
                edge_colors.append('green')
            else:
                edge_colors.append('black')

        # Draw graph
        nx.draw(G, pos=node_positions, ax=ax, with_labels=True, node_color=node_colors,
                node_size=500, edge_color=edge_colors, width=2, linewidths=1, font_size=10)

        plt.title(f"Path Progress: {len(current_path)}/{len(extended_path)} nodes", fontsize=14, pad=10)

        plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05)

        plt.show()

        display_widgets()

    def display_widgets():
        # Create centered container for buttons with minimal margin
        center_box = widgets.Box([button_container], layout=widgets.Layout(
            display='flex',
            flex_flow='row',
            align_items='center',
            justify_content='center',
            width='100%',
            margin='0px'
        ))
        display(center_box)

    def next_step(b):
        nonlocal current_path, current_edges, current_path_index

        if current_path_index < len(extended_path) - 1:
            current_path_index += 1
            next_node = extended_path[current_path_index]

            # Add the edge from the last node to the next node (if applicable)
            if len(current_path) > 0:
                last_node = current_path[-1]
                current_edges.append((last_node, next_node))

                # Add edges for equivalent nodes and their overlaps simultaneously
                if next_node in equivalent_nodes_map:
                    for equivalent_node in equivalent_nodes_map[next_node]:
                        if equivalent_node not in current_path:
                            current_path.append(equivalent_node)
                        current_edges.append((last_node, equivalent_node))

                        # Handle overlapping edges
                        if equivalent_node in overlapping_edges:
                            for overlap_source in overlapping_edges[equivalent_node]:
                                if overlap_source in current_path:
                                    current_edges.append((overlap_source, equivalent_node))

            # Now add the next node to the path if not already added
            if next_node not in current_path:
                current_path.append(next_node)

            # Redraw with the updated path and edges
            clear_output(wait=True)
            draw_graph(current_path, current_edges)

    def show_all(b):
        nonlocal current_path, current_edges, current_path_index

        # Highlight the entire path and all overlapping edges
        current_path = extended_path.copy()
        current_path_index = len(extended_path) - 1
        current_edges = []

        for i in range(len(current_path) - 1):
            current_edges.append((current_path[i], current_path[i + 1]))

        # Add overlapping edges for all equivalent nodes
        for target, sources in overlapping_edges.items():
            for source in sources:
                if source in current_path and target in current_path:
                    current_edges.append((source, target))

        # Redraw with the full path
        clear_output(wait=True)
        draw_graph(current_path, current_edges)

    # Create buttons with compact styling
    next_button = widgets.Button(
        description="Next Node",
        layout=widgets.Layout(width='180px', height='36px'),
        button_style='primary'
    )
    next_button.on_click(next_step)

    show_all_button = widgets.Button(
        description="Show Full Path",
        layout=widgets.Layout(width='180px', height='36px'),
        button_style='success'
    )
    show_all_button.on_click(show_all)

    # Center buttons with minimal spacing
    button_container = widgets.HBox([next_button, show_all_button],
                                    layout=widgets.Layout(
                                        display='flex',
                                        flex_flow='row',
                                        align_items='center',
                                        justify_content='center',
                                        width='100%',
                                        gap='10px'  # Reduced gap between buttons
                                    ))

    # Apply CSS for compact layout
    display(widgets.HTML("""
    <style>
    .output_subarea {
        max-width: 100%;
        flex-direction: column;
        align-items: center;
        margin: 0 auto;
    }
    .widget-hbox {
        margin-top: -15px !important;  /* Negative margin to pull buttons closer to plot */
    }
    .jupyter-widgets {
        margin-top: 0 !important;
    }
    .output_wrapper {
        display: flex;
        flex-direction: column;
        align-items: center;
    }
    .output_area {
        margin-bottom: 0 !important;
    }
    </style>
    """))

    # Initial plot
    draw_graph(current_path, current_edges)

Ήρθε η ώρα να δούμε τα αποτελέσματα.

In [None]:
def starting_commands():
    global start_node, target_nodes
    print("Welcome to the path finding tool!")
    print("To start, please enter the ID of the starting node:")
    start_node = input()
    print("Please enter the IDs of the target nodes separated by commas:")
    target_nodes = input().split(",")
    return start_node, target_nodes


# TODO: Call the function to get input data from starting commands
# (Hint: We need to initialize our input variables)
input_data = ______()

# Get start node and target nodes from input data
start_node = input_data[0]  # Get this from the widget output
target_nodes = input_data[1]  # Get this from the widget output

# TODO: Check if we have valid start and target nodes before finding the path
# (Hint: We need to validate our inputs before proceeding)
if ______ and target_nodes:
    path = find_path_through_nodes(G, start_node, target_nodes)

    if path:
        print(f"Found path: {path}")
        plot_interactive(path)
    else:
        print("No valid path found")

## Clothoids
-------------------------------------------------

In [None]:
class VehicleParameters:
    def __init__(self):
        self.wheelbase = 0.26  # 260 mm
        self.max_wheel_angle = math.radians(90)  # 35 degrees in radians

def calculate_max_curvature(vehicle_params):
    # TODO: Calculate the max curvature the car can achieve
    # (Hint: Max_curvature = 1 / min_radius)
    return ___________________________________________


def evaluate_clothoid_constraints(clothoid_list, vehicle_params):
    max_curvature = calculate_max_curvature(vehicle_params)
    for clothoid in clothoid_list:
        if(max(clothoid.KappaStart,clothoid.KappaEnd) > max_curvature):
          return False

    return True

def generate_clothoid_segment(x0, y0, theta0, k0, x1, y1, theta1, k1, vehicle_params):
    try_params = [
        (1, 0), (2, 0), (3, 10),
        (5, 15), (10, 20), (0.5, 1)
    ]
    for Dmax, dmax in try_params:
        segment = SolveG2(x0, y0, theta0, k0, x1, y1, theta1, k1, Dmax, dmax)
        is_valid = evaluate_clothoid_constraints(segment, vehicle_params)
        # TODO: Check if the segment created is feasible
        if ______:
            return segment

    return None

def generate_clothoid_path(path_positions, vehicle_params):
    if len(path_positions) < 2:
        return []

    angles = []
    for i in range(len(path_positions)-1):
      dx = path_positions[i+1][0]-path_positions[i][0]
      dy = path_positions[i+1][1]-path_positions[i][1]
      angles.append(math.atan2(dy, dx))

    angles.append(angles[-1])

    for i in range(len(path_positions)-1):
      dx = path_positions[i+1][0]-path_positions[i][0]
      dy = path_positions[i+1][1]-path_positions[i][1]
      # TODO: Compute the angle in radians between the x-axis and the vector (dx, dy)
      angles.append(math._________)

    clothoid_segments = []
    # Generate clothoid segments for each pair of path positions
    for i in range(len(path_positions) - 1):
        pos1 = path_positions[i]
        theta1 = angles[i]
        pos2 = path_positions[i+1]
        theta2 = angles[i+1]

        # TODO: Generate a clothoid segment connecting the starting and the target positions
        segment = generate_clothoid_segment(_____, _____, theta1, 0, _____, _____, theta2, 0, vehicle_params)
        if segment is None:
            print(f"Failed to generate segment from {[pos1]} to {pos2}")
            return None

        # Add the segment to the list
        clothoid_segments.extend(segment)

    return clothoid_segments

def visualize_clothoid_path(clothoid_segments, path_positions, path_nodes):

    plt.figure(figsize=(12, 8))
    x_coords = [pos[0] for pos in path_positions]
    y_coords = [pos[1] for pos in path_positions]
    plt.plot(x_coords, y_coords, 'bo', markersize=10, label='Path Nodes')

    for i, (x, y) in enumerate(zip(x_coords, y_coords)):
        plt.annotate(
            str(path_nodes[i]),
            (x, y),
            textcoords="offset points",
            xytext=(0, 10),  # moves the text 10 points upward
            ha='center',
            fontsize=10,
            fontfamily='serif',
            fontstyle='italic'
        )

    if clothoid_segments:
        num_segments = len(clothoid_segments)

        colors = cm.rainbow(np.linspace(0, 1, num_segments))

        # Create segment labels with node information
        segment_labels = []

        for i in range(len(path_nodes)-1):
            segment_labels.append(f'{path_nodes[i]} → {path_nodes[i+1]}')
            segment_labels.append(f'{path_nodes[i]} → {path_nodes[i+1]}')
            segment_labels.append(f'{path_nodes[i]} → {path_nodes[i+1]}')


        for i, clothoid in enumerate(clothoid_segments):

            X, Y = clothoid.SampleXY(100)
            if (i%3!=0):
                plt.plot(X, Y, color=colors[i], linewidth=2)
            else:
                plt.plot(X, Y, color=colors[i], linewidth=2, label=f'Segment {segment_labels[i]}')


    plt.axis('equal')
    plt.grid(True)
    plt.legend(ncol=2, loc='upper left', bbox_to_anchor=(1, 1))

    plt.title('Clothoid Path')
    plt.tight_layout()
    plt.show()

In [None]:
# global variable path
if fpath:
    # Convert the node IDs to positions
    path_positions = [positions[str(node)] for node in path]

    # Initialize vehicle parameters
    vehicle_params = VehicleParameters()
    clothoid_segments = generate_clothoid_path(path_positions, vehicle_params)

    if clothoid_segments:
        # TODO: Visualize the final result
        ____________________________________________________
    else:
        print("Failed to generate clothoid path")

$$\text{Ευχαριστούμε πολύ!!}$$