# A* Pathfinding Algorithm for Street Networks
## Google Colab Compatible Version

This notebook implements the A* pathfinding algorithm for finding optimal paths in street networks. It uses the Viana do Castelo street network data and allows optimization based on:
- Distance (meters)
- Duration (minutes) 
- Unlevel percentage

The notebook includes:
1. Data loading and preprocessing
2. Interactive street selection
3. A* algorithm implementation
4. Path visualization with maps
5. Performance metrics

## 1. Install Required Libraries and Setup

In [None]:
# Install required packages for Google Colab
!pip install matplotlib networkx numpy pandas

# Import libraries
import csv
import heapq
import math
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
import numpy as np
from datetime import datetime
from google.colab import files
import io

## 2. Upload Data File

Upload the `viana_streets_network_named.csv` file to Colab

In [None]:
# Upload the CSV file
print("Please upload the 'viana_streets_network_named.csv' file:")
uploaded = files.upload()

# Get the filename
csv_filename = list(uploaded.keys())[0]
print(f"Uploaded file: {csv_filename}")

## 3. A* Algorithm Implementation

In [None]:
class AStarStreets:
    def __init__(self, graph, coordinates, s_start, s_goal, weight_type='distance'):
        self.graph = graph
        self.coordinates = coordinates
        self.s_start = s_start
        self.s_goal = s_goal
        self.weight_type = weight_type  # 'distance', 'duration', or 'unlevel'
        self.OPEN = []
        self.CLOSED = set()
        self.PARENT = {}
        self.g = {}
        self.h = {}
        self.path = []
        self.visited_nodes = set()

    def run(self):
        """Main A* algorithm execution"""
        if not self.find_path():
            print(f"No path found from {self.s_start} to {self.s_goal}.")
            return []
        
        self.calculate_costs()
        return self.path

    def find_path(self):
        """A* pathfinding algorithm"""
        # Initialize the starting point
        self.g[self.s_start] = 0
        self.h[self.s_start] = self.calculate_heuristic(self.s_start)
        heapq.heappush(self.OPEN, (self.g[self.s_start] + self.h[self.s_start], self.s_start))

        while self.OPEN:
            current_f, current_node = heapq.heappop(self.OPEN)
            
            if current_node == self.s_goal:
                self.reconstruct_path()
                return True

            self.CLOSED.add(current_node)
            self.visited_nodes.add(current_node)
            
            for neighbor_data in self.graph.get(current_node, []):
                neighbor = neighbor_data['destination']
                
                if neighbor in self.CLOSED:
                    continue

                # Calculate cost based on the selected weight type
                if self.weight_type == 'distance':
                    cost = neighbor_data['distance_meters']
                elif self.weight_type == 'duration':
                    cost = neighbor_data['duration_minutes']
                else:  # unlevel
                    cost = neighbor_data['unlevel_percent']

                new_g = self.g[current_node] + cost

                if neighbor not in self.g or new_g < self.g[neighbor]:
                    self.g[neighbor] = new_g
                    self.PARENT[neighbor] = current_node
                    self.h[neighbor] = self.calculate_heuristic(neighbor)
                    f_cost = new_g + self.h[neighbor]
                    heapq.heappush(self.OPEN, (f_cost, neighbor))
        
        return False

    def reconstruct_path(self):
        """Reconstruct the path from start to goal"""
        current = self.s_goal
        path = []
        while current is not None:
            path.append(current)
            current = self.PARENT.get(current)
        path.reverse()
        self.path = path

    def calculate_heuristic(self, node):
        """Calculate heuristic using Euclidean distance and unlevel_percent"""
        if node not in self.coordinates or self.s_goal not in self.coordinates:
            return 0
        
        # Euclidean distance between coordinates
        node_coords = self.coordinates[node]
        goal_coords = self.coordinates[self.s_goal]
        
        dx = node_coords['lon'] - goal_coords['lon']
        dy = node_coords['lat'] - goal_coords['lat']
        euclidean_dist = math.sqrt(dx*dx + dy*dy)
        
        # Convert to approximate meters (rough conversion)
        meters = euclidean_dist * 111000  # Approximate meters per degree
        
        return meters

    def calculate_costs(self):
        """Calculate and display total costs for the found path"""
        if len(self.path) < 2:
            return
            
        total_distance = 0
        total_duration = 0
        total_unlevel = 0
        
        for i in range(len(self.path) - 1):
            current = self.path[i]
            next_node = self.path[i + 1]
            
            # Find the edge data
            for neighbor_data in self.graph.get(current, []):
                if neighbor_data['destination'] == next_node:
                    total_distance += neighbor_data['distance_meters']
                    total_duration += neighbor_data['duration_minutes']
                    total_unlevel += neighbor_data['unlevel_percent']
                    break
        
        print(f"\nPath found with {len(self.path)} nodes")
        print(f"Nodes explored: {len(self.visited_nodes)}")
        print(f"Total Distance: {total_distance:.2f} meters")
        print(f"Total Duration: {total_duration:.2f} minutes")
        print(f"Average Unlevel: {total_unlevel/len(self.path):.2f}%")

## 4. Data Loading Functions

In [None]:
def read_street_csv(file_path):
    """Read the street network CSV and build graph structure"""
    graph = {}
    coordinates = {}
    seen_connections = set()
    
    with open(file_path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            origin = row['origin']
            destination = row['destination']
            distance = float(row['distance_meters'])
            duration = float(row['duration_minutes'])
            unlevel = float(row['unlevel_percent'])
            lon = float(row['intersect_lon'])
            lat = float(row['intersect_lat'])
            
            # Avoid duplicate connections
            connection_key = tuple(sorted([origin, destination]))
            if connection_key in seen_connections:
                continue
            seen_connections.add(connection_key)
            
            # Initialize graph nodes
            if origin not in graph:
                graph[origin] = []
            if destination not in graph:
                graph[destination] = []
            
            # Add bidirectional edges
            graph[origin].append({
                'destination': destination,
                'distance_meters': distance,
                'duration_minutes': duration,
                'unlevel_percent': unlevel
            })
            
            graph[destination].append({
                'destination': origin,
                'distance_meters': distance,
                'duration_minutes': duration,
                'unlevel_percent': unlevel
            })
            
            # Store coordinates (using the intersection coordinates)
            coordinates[origin] = {'lon': lon, 'lat': lat}
            coordinates[destination] = {'lon': lon, 'lat': lat}
    
    return graph, coordinates

## 5. Load and Display Data

In [None]:
# Load the street network data
print("Loading street network data...")
graph, coordinates = read_street_csv(csv_filename)

print(f"Loaded {len(graph)} streets with {sum(len(edges) for edges in graph.values())//2} connections")

# Display first 20 streets
streets = sorted(list(graph.keys()))
print("\nFirst 20 available streets:")
for i, street in enumerate(streets[:20]):
    print(f"{i}: {street}")

if len(streets) > 20:
    print(f"... and {len(streets) - 20} more streets")

## 6. Interactive Street Selection and Algorithm Execution

In [None]:
# Function to get user input for start and end streets
def get_street_input(streets):
    """Get user input for start and end streets with validation"""
    print("Available streets:")
    for i, street in enumerate(streets):
        print(f"{i}: {street}")
    
    print("\n" + "="*50)
    
    # Get start street
    while True:
        try:
            start_input = input("Enter the START street (name or index): ").strip()
            
            # Try to parse as index first
            try:
                start_index = int(start_input)
                if 0 <= start_index < len(streets):
                    start_street = streets[start_index]
                    break
                else:
                    print(f"Index must be between 0 and {len(streets)-1}")
                    continue
            except ValueError:
                # Try to find by name
                matching_streets = [s for s in streets if start_input.lower() in s.lower()]
                if len(matching_streets) == 1:
                    start_street = matching_streets[0]
                    break
                elif len(matching_streets) > 1:
                    print(f"Multiple streets match '{start_input}':")
                    for i, street in enumerate(matching_streets):
                        print(f"  {streets.index(street)}: {street}")
                    print("Please be more specific or use the index.")
                    continue
                else:
                    print(f"No street found matching '{start_input}'. Please try again.")
                    continue
        except KeyboardInterrupt:
            print("\nOperation cancelled.")
            return None, None
        except Exception as e:
            print(f"Error: {e}. Please try again.")
            continue
    
    # Get end street
    while True:
        try:
            end_input = input("Enter the END street (name or index): ").strip()
            
            # Try to parse as index first
            try:
                end_index = int(end_input)
                if 0 <= end_index < len(streets):
                    end_street = streets[end_index]
                    break
                else:
                    print(f"Index must be between 0 and {len(streets)-1}")
                    continue
            except ValueError:
                # Try to find by name
                matching_streets = [s for s in streets if end_input.lower() in s.lower()]
                if len(matching_streets) == 1:
                    end_street = matching_streets[0]
                    break
                elif len(matching_streets) > 1:
                    print(f"Multiple streets match '{end_input}':")
                    for i, street in enumerate(matching_streets):
                        print(f"  {streets.index(street)}: {street}")
                    print("Please be more specific or use the index.")
                    continue
                else:
                    print(f"No street found matching '{end_input}'. Please try again.")
                    continue
        except KeyboardInterrupt:
            print("\nOperation cancelled.")
            return None, None
        except Exception as e:
            print(f"Error: {e}. Please try again.")
            continue
    
    # Get optimization criteria
    while True:
        try:
            print("\nOptimization criteria:")
            print("0: Distance (meters)")
            print("1: Duration (minutes)")
            print("2: Unlevel percentage")
            
            criteria_input = input("Enter optimization criteria (0, 1, or 2): ").strip()
            criteria = int(criteria_input)
            
            if criteria in [0, 1, 2]:
                break
            else:
                print("Please enter 0, 1, or 2.")
                continue
        except ValueError:
            print("Please enter a valid number (0, 1, or 2).")
            continue
        except KeyboardInterrupt:
            print("\nOperation cancelled.")
            return None, None
    
    return start_street, end_street, criteria

print("Ready to get user input for streets...")

In [None]:
# Get user input for start and end streets
print("Please select the start and end streets for pathfinding:")
result = get_street_input(streets)

if result[0] is None or result[1] is None:
    print("Operation cancelled. Please run this cell again to select streets.")
else:
    start_street, end_street, criteria = result
    
    weight_types = ['distance', 'duration', 'unlevel']
    weight_type = weight_types[criteria]
    
    print(f"\nSelected route:")
    print(f"Start: {start_street}")
    print(f"End: {end_street}")
    print(f"Optimization: {weight_type}")
    
    print(f"\nFinding optimal path using A* algorithm...")
    
    # Run A* algorithm
    start_time = datetime.now()
    astar = AStarStreets(graph, coordinates, start_street, end_street, weight_type)
    path = astar.run()
    end_time = datetime.now()
    
    if path:
        print(f"\nOptimal path found in {end_time - start_time}:")
        for i, street in enumerate(path):
            print(f"{i+1}: {street}")
    else:
        print("No path found.")

## 7. Path Visualization

In [None]:
def visualize_path_with_exploration(graph, coordinates, path=None, visited_nodes=None, algorithm_name="A*"):
    """Visualize the street network with path and exploration highlighting"""
    plt.figure(figsize=(14, 10))
    
    # Extract coordinates for plotting
    lons = [coord['lon'] for coord in coordinates.values()]
    lats = [coord['lat'] for coord in coordinates.values()]
    
    # Plot all intersections
    plt.scatter(lons, lats, c='lightblue', s=20, alpha=0.6, label='Intersections')
    
    # Draw edges
    for street, neighbors in graph.items():
        if street in coordinates:
            start_coord = coordinates[street]
            for neighbor_data in neighbors:
                neighbor = neighbor_data['destination']
                if neighbor in coordinates:
                    end_coord = coordinates[neighbor]
                    plt.plot([start_coord['lon'], end_coord['lon']], 
                            [start_coord['lat'], end_coord['lat']], 
                            'lightgray', linewidth=0.5, alpha=0.6)
    
    # Highlight visited nodes (exploration)
    if visited_nodes:
        visited_lons = [coordinates[street]['lon'] for street in visited_nodes if street in coordinates]
        visited_lats = [coordinates[street]['lat'] for street in visited_nodes if street in coordinates]
        plt.scatter(visited_lons, visited_lats, c='orange', s=40, alpha=0.7, label='Explored Nodes', zorder=4)
    
    # Highlight path if provided
    if path and len(path) > 1:
        path_lons = [coordinates[street]['lon'] for street in path if street in coordinates]
        path_lats = [coordinates[street]['lat'] for street in path if street in coordinates]
        
        # Draw path
        plt.plot(path_lons, path_lats, 'red', linewidth=3, alpha=0.8, label=f'{algorithm_name} Path')
        plt.scatter(path_lons, path_lats, c='red', s=50, alpha=0.9, zorder=5)
        
        # Mark start and end
        if path:
            start_coord = coordinates[path[0]]
            end_coord = coordinates[path[-1]]
            plt.scatter(start_coord['lon'], start_coord['lat'], c='green', s=100, 
                       marker='o', label='Start', zorder=6)
            plt.scatter(end_coord['lon'], end_coord['lat'], c='purple', s=100, 
                       marker='s', label='End', zorder=6)
    
    plt.title(f'Viana do Castelo Street Network - {algorithm_name} Algorithm', fontsize=14, weight='bold')
    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

# Visualize the result if path was found
if 'path' in locals() and path:
    print(f"\nVisualizing the A* optimal path from {start_street} to {end_street}...")
    print(f"Path includes {len(path)} streets with {len(astar.visited_nodes)} nodes explored")
    visualize_path_with_exploration(graph, coordinates, path, astar.visited_nodes, "A*")
elif 'graph' in locals():
    print("\nNo path found to visualize. Showing the street network...")
    visualize_path_with_exploration(graph, coordinates)
else:
    print("Please run the previous cells to load data and find a path first.")

## 8. Interactive Street Selection Widget (Optional)

Uncomment and run this cell for interactive street selection:

In [None]:
# # Interactive widgets for street selection
# from ipywidgets import interact, widgets
# 
# def run_astar_interactive(start_idx, end_idx, criteria):
#     start_street = streets[start_idx]
#     end_street = streets[end_idx]
#     weight_types = ['distance', 'duration', 'unlevel']
#     weight_type = weight_types[criteria]
#     
#     print(f"Finding path from '{start_street}' to '{end_street}' optimizing for {weight_type}...")
#     
#     astar = AStarStreets(graph, coordinates, start_street, end_street, weight_type)
#     path = astar.run()
#     
#     if path:
#         visualize_path(graph, coordinates, path)
#     else:
#         print("No path found.")
# 
# # Create interactive widgets
# interact(run_astar_interactive,
#          start_idx=widgets.IntSlider(min=0, max=len(streets)-1, step=1, value=0, description='Start:'),
#          end_idx=widgets.IntSlider(min=0, max=len(streets)-1, step=1, value=5, description='End:'),
#          criteria=widgets.Dropdown(options=[(n, i) for i, n in enumerate(['Distance', 'Duration', 'Unlevel'])], 
#                                   value=0, description='Optimize:'))

## 9. Performance Analysis

In [None]:
# Compare A* with different optimization criteria
def compare_optimization_criteria(start_street, end_street):
    """Compare A* performance with different optimization criteria"""
    results = []
    
    for i, weight_type in enumerate(['distance', 'duration', 'unlevel']):
        print(f"\nTesting A* with {weight_type} optimization...")
        
        start_time = datetime.now()
        astar_comp = AStarStreets(graph, coordinates, start_street, end_street, weight_type)
        path_comp = astar_comp.run()
        end_time = datetime.now()
        
        if path_comp:
            results.append({
                'optimization': weight_type,
                'path_length': len(path_comp),
                'nodes_explored': len(astar_comp.visited_nodes),
                'execution_time': (end_time - start_time).total_seconds()
            })
    
    if results:
        df = pd.DataFrame(results)
        print("\n" + "="*60)
        print("A* COMPARISON RESULTS")
        print("="*60)
        print(df.to_string(index=False))
    
    return results

# Run comparison using the same streets selected by the user
if 'start_street' in locals() and 'end_street' in locals():
    print(f"\nRunning comparison for route: {start_street} → {end_street}")
    comparison_results = compare_optimization_criteria(start_street, end_street)
else:
    print("Please run the previous cell to select streets first, then run this cell for comparison.")

In [None]:
# Test multiple random paths for performance analysis
import random

def performance_test(num_tests=5):
    """Run performance tests on random street pairs"""
    results = []
    
    print(f"Running {num_tests} random performance tests...")
    
    for i in range(num_tests):
        # Select random start and end streets
        start_street = random.choice(streets)
        end_street = random.choice(streets)
        
        if start_street == end_street:
            continue
            
        print(f"\nTest {i+1}: {start_street} -> {end_street}")
        
        for weight_type in ['distance', 'duration', 'unlevel']:
            start_time = datetime.now()
            astar_test = AStarStreets(graph, coordinates, start_street, end_street, weight_type)
            path_test = astar_test.run()
            end_time = datetime.now()
            
            if path_test:
                results.append({
                    'test': i+1,
                    'start': start_street,
                    'end': end_street,
                    'weight_type': weight_type,
                    'path_length': len(path_test),
                    'nodes_explored': len(astar_test.visited_nodes),
                    'execution_time': (end_time - start_time).total_seconds()
                })
    
    # Create performance summary
    if results:
        df = pd.DataFrame(results)
        print("\n" + "="*60)
        print("A* PERFORMANCE SUMMARY")
        print("="*60)
        summary = df.groupby('weight_type')[['path_length', 'nodes_explored', 'execution_time']].agg(['mean', 'std'])
        print(summary.round(4))
        
        print("\n" + "="*60)
        print("DETAILED RESULTS")
        print("="*60)
        print(df.to_string(index=False))
    
    return results

def run_performance_test():
    """Run performance test with user confirmation"""
    if 'graph' not in locals():
        print("Please load the graph data first by running the previous cells.")
        return
    
    response = input("Run performance test on 5 random street pairs? (y/n): ").strip().lower()
    if response == 'y' or response == 'yes':
        return performance_test(5)
    else:
        print("Performance test cancelled.")
        return []

# Run performance test (optional)
print("Optional: Run performance analysis on random street pairs")
print("This will test A* with different optimization criteria on random routes")
perf_results = run_performance_test()