In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from memory_profiler import profile
import heapq
from collections import deque
import time
from typing import List, Tuple, Dict, Set
import os
from dataclasses import dataclass
from PIL import Image  # For reading PNG files


@dataclass
class Node:
    x: int
    y: int
    g_cost: float = float('inf')
    h_cost: float = 0
    parent: 'Node' = None

    @property
    def f_cost(self) -> float:
        return self.g_cost + self.h_cost

    def __lt__(self, other: 'Node') -> bool:
        return self.f_cost < other.f_cost


class GridMap:
    def __init__(self, grid: np.ndarray):
        """
        grid: a 2D numpy array with values:
          0 -> free cell
          1 -> obstacle
        """
        self.grid = grid
        self.height, self.width = grid.shape

    @classmethod
    def from_file(cls, filepath: str) -> 'GridMap':
        """
        Load grid from a PNG file. Adjust thresholding logic
        based on whether black or white is an obstacle.
        """
        if not filepath.lower().endswith('.png'):
            raise ValueError("Only .png files are supported by from_file().")

        # Open PNG as grayscale
        img = Image.open(filepath).convert('L')
        arr = np.array(img)

        # Example threshold: black (pixel < 128) => obstacle => 1, else => 0
        grid = (arr < 128).astype(int)

        return cls(grid)

    def is_valid_position(self, x: int, y: int) -> bool:
        """Check if position is within grid bounds and not an obstacle (0 -> free)."""
        return (
            0 <= x < self.width
            and 0 <= y < self.height
            and self.grid[y, x] == 0
        )

    def get_neighbors(self, node: 'Node') -> List[tuple]:
        """Get valid neighboring positions (up, right, down, left)."""
        directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
        neighbors = []
        for dx, dy in directions:
            new_x, new_y = node.x + dx, node.y + dy
            if self.is_valid_position(new_x, new_y):
                neighbors.append((new_x, new_y))
        return neighbors


class Bresenham:
    @staticmethod
    def get_line(start: tuple, end: tuple) -> List[tuple]:
        """Generate points along Bresenham's line between start and end."""
        x1, y1 = start
        x2, y2 = end
        points = []
        dx = abs(x2 - x1)
        dy = abs(y2 - y1)
        x, y = x1, y1
        sx = 1 if x1 < x2 else -1
        sy = 1 if y1 < y2 else -1

        if dx > dy:
            err = dx / 2.0
            while x != x2:
                points.append((x, y))
                err -= dy
                if err < 0:
                    y += sy
                    err += dx
                x += sx
        else:
            err = dy / 2.0
            while y != y2:
                points.append((x, y))
                err -= dx
                if err < 0:
                    x += sx
                    err += dy
                y += sy

        points.append((x2, y2))
        return points


class PathFinder:
    def __init__(self, grid_map: GridMap):
        self.grid_map = grid_map

    @staticmethod
    def manhattan_distance(p1: tuple, p2: tuple) -> float:
        """Calculate Manhattan distance between two points."""
        return abs(p1[0] - p2[0]) + abs(p1[1] - p2[1])

    @profile
    def a_star(self, start: tuple, end: tuple,
               ils_region: Set[tuple] = None) -> (List[tuple], Dict):
        """A* pathfinding algorithm."""
        start_time = time.time()
        start_node = Node(start[0], start[1], g_cost=0)
        start_node.h_cost = self.manhattan_distance(start, end)

        open_set = [start_node]
        closed_set = set()
        nodes = {start: start_node}

        while open_set:
            current = heapq.heappop(open_set)

            if (current.x, current.y) == end:
                path = []
                while current:
                    path.append((current.x, current.y))
                    current = current.parent

                metrics = {
                    'time': time.time() - start_time,
                    'visited_nodes': len(closed_set),
                    'path_length': len(path)
                }
                return path[::-1], metrics

            closed_set.add((current.x, current.y))

            for nx, ny in self.grid_map.get_neighbors(current):
                if ils_region and (nx, ny) not in ils_region:
                    continue

                if (nx, ny) in closed_set:
                    continue

                next_pos = (nx, ny)
                if next_pos not in nodes:
                    nodes[next_pos] = Node(nx, ny)
                next_node = nodes[next_pos]

                tentative_g = current.g_cost + 1

                if tentative_g < next_node.g_cost:
                    next_node.parent = current
                    next_node.g_cost = tentative_g
                    next_node.h_cost = self.manhattan_distance(next_pos, end)

                    if next_node not in open_set:
                        heapq.heappush(open_set, next_node)

        return [], {
            'time': time.time() - start_time,
            'visited_nodes': len(closed_set),
            'path_length': 0
        }

    @profile
    def dijkstra(self, start: tuple, end: tuple,
                 ils_region: Set[tuple] = None) -> (List[tuple], Dict):
        """Dijkstra's algorithm implementation."""
        start_time = time.time()
        start_node = Node(start[0], start[1], g_cost=0)

        open_set = [start_node]
        closed_set = set()
        nodes = {start: start_node}

        while open_set:
            current = heapq.heappop(open_set)

            if (current.x, current.y) == end:
                path = []
                while current:
                    path.append((current.x, current.y))
                    current = current.parent

                metrics = {
                    'time': time.time() - start_time,
                    'visited_nodes': len(closed_set),
                    'path_length': len(path)
                }
                return path[::-1], metrics

            closed_set.add((current.x, current.y))

            for nx, ny in self.grid_map.get_neighbors(current):
                if ils_region and (nx, ny) not in ils_region:
                    continue
                if (nx, ny) in closed_set:
                    continue

                next_pos = (nx, ny)
                if next_pos not in nodes:
                    nodes[next_pos] = Node(nx, ny)
                next_node = nodes[next_pos]

                tentative_g = current.g_cost + 1

                if tentative_g < next_node.g_cost:
                    next_node.parent = current
                    next_node.g_cost = tentative_g

                    if next_node not in open_set:
                        heapq.heappush(open_set, next_node)

        return [], {
            'time': time.time() - start_time,
            'visited_nodes': len(closed_set),
            'path_length': 0
        }

    @profile
    def bfs(self, start: tuple, end: tuple,
            ils_region: Set[tuple] = None) -> (List[tuple], Dict):
        """Breadth-First Search implementation."""
        start_time = time.time()
        visited = set([start])
        queue = deque([(start, [start])])

        while queue:
            current, path = queue.popleft()

            if current == end:
                metrics = {
                    'time': time.time() - start_time,
                    'visited_nodes': len(visited),
                    'path_length': len(path)
                }
                return path, metrics

            node = Node(current[0], current[1])
            for nx, ny in self.grid_map.get_neighbors(node):
                if ils_region and (nx, ny) not in ils_region:
                    continue
                if (nx, ny) not in visited:
                    visited.add((nx, ny))
                    new_path = path + [(nx, ny)]
                    queue.append(((nx, ny), new_path))

        return [], {
            'time': time.time() - start_time,
            'visited_nodes': len(visited),
            'path_length': 0
        }

    @profile
    def dfs(self, start: tuple, end: tuple,
            ils_region: Set[tuple] = None) -> (List[tuple], Dict):
        """Depth-First Search implementation."""
        start_time = time.time()
        visited = set([start])
        stack = [(start, [start])]

        while stack:
            current, path = stack.pop()

            if current == end:
                metrics = {
                    'time': time.time() - start_time,
                    'visited_nodes': len(visited),
                    'path_length': len(path)
                }
                return path, metrics

            node = Node(current[0], current[1])
            for nx, ny in self.grid_map.get_neighbors(node):
                if ils_region and (nx, ny) not in ils_region:
                    continue
                if (nx, ny) not in visited:
                    visited.add((nx, ny))
                    new_path = path + [(nx, ny)]
                    stack.append(((nx, ny), new_path))

        return [], {
            'time': time.time() - start_time,
            'visited_nodes': len(visited),
            'path_length': 0
        }

    @profile
    def best_first_search(self, start: tuple, end: tuple,
                          ils_region: Set[tuple] = None) -> (List[tuple], Dict):
        """Best-First Search implementation."""
        start_time = time.time()
        start_node = Node(start[0], start[1])
        start_node.h_cost = self.manhattan_distance(start, end)

        open_set = [start_node]
        closed_set = set()
        nodes = {start: start_node}

        while open_set:
            current = heapq.heappop(open_set)

            if (current.x, current.y) == end:
                path = []
                while current:
                    path.append((current.x, current.y))
                    current = current.parent

                metrics = {
                    'time': time.time() - start_time,
                    'visited_nodes': len(closed_set),
                    'path_length': len(path)
                }
                return path[::-1], metrics

            closed_set.add((current.x, current.y))

            for nx, ny in self.grid_map.get_neighbors(current):
                if ils_region and (nx, ny) not in ils_region:
                    continue
                if (nx, ny) in closed_set:
                    continue

                next_pos = (nx, ny)
                if next_pos not in nodes:
                    nodes[next_pos] = Node(nx, ny)
                next_node = nodes[next_pos]

                # Best-First uses heuristic only (no g_cost)
                if next_node not in open_set:
                    next_node.parent = current
                    next_node.h_cost = self.manhattan_distance(next_pos, end)
                    heapq.heappush(open_set, next_node)

        return [], {
            'time': time.time() - start_time,
            'visited_nodes': len(closed_set),
            'path_length': 0
        }


class Visualizer:
    @staticmethod
    def plot_side_by_side(
        grid: np.ndarray,
        path_standard: List[tuple],
        path_ils: List[tuple],
        ils_region: Set[tuple],
        algo_name: str,
        map_name: str,
        ax_titles=("Standard", "Incremental Line Search")
    ):
        """
        Plot a side-by-side comparison of Standard vs. ILS
        on a single figure for a given algorithm and map.
        """
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))

        # Left: Standard
        Visualizer.plot_grid_with_path(
            grid=grid,
            path=path_standard,
            ils_region=None,
            title=f"{algo_name} - {ax_titles[0]}",
            ax=axes[0]
        )

        # Right: ILS
        Visualizer.plot_grid_with_path(
            grid=grid,
            path=path_ils,
            ils_region=ils_region,
            title=f"{algo_name} - {ax_titles[1]}",
            ax=axes[1]
        )

        fig.suptitle(f"{algo_name} Comparison on {map_name}", fontsize=14)
        plt.tight_layout()

        # Save in 'analysis_plots' directory
        os.makedirs("analysis_plots", exist_ok=True)
        filename = f"{map_name}_{algo_name}_comparison.png".replace(".png","_compare.png")
        save_path = os.path.join("analysis_plots", filename)
        plt.savefig(save_path, dpi=150)
        plt.show()

    @staticmethod
    def plot_grid_with_path(
        grid: np.ndarray,
        path: List[tuple],
        ils_region: Set[tuple] = None,
        title: str = "",
        ax: plt.Axes = None
    ):
        """Plot grid with highlighted path and optional ILS region."""
        if ax is None:
            _, ax = plt.subplots()

        ax.imshow(grid, cmap='binary')

        if ils_region and len(ils_region) > 0:
            rr_y, rr_x = zip(*ils_region)
            ax.scatter(rr_x, rr_y, color='yellow', alpha=0.3, s=30, label='ILS Region')

        if path:
            path_y, path_x = zip(*[(y, x) for x, y in path])
            ax.plot(path_x, path_y, 'r-', linewidth=2, label='Path')

        ax.set_title(title)
        ax.grid(True)
        ax.legend()

    @staticmethod
    def plot_comparison_metrics(results_df: pd.DataFrame) -> None:
        """
        Create multiple visualization types for time, visited_nodes, path_length
        across each algorithm and approach (Standard vs. ILS).
        """
        os.makedirs("analysis_plots", exist_ok=True)

        metrics = ['time', 'visited_nodes', 'path_length']

        df_comp = results_df.copy()
        # Add approach column
        df_comp['approach'] = df_comp['algorithm'].apply(
            lambda x: 'ILS' if 'ILS' in x else 'Standard'
        )
        # Clean the algorithm name
        df_comp['algorithm'] = (
            df_comp['algorithm'].str.replace(' (Standard)', '', regex=False)
                              .str.replace(' (ILS)', '', regex=False)
        )

        # 1) Bar plots for each metric
        plt.figure(figsize=(14, 5 * len(metrics)))
        for i, metric in enumerate(metrics, 1):
            plt.subplot(len(metrics), 1, i)
            sns.barplot(data=df_comp, x='algorithm', y=metric, hue='approach', dodge=True)
            plt.title(f'{metric.replace("_", " ").title()} Comparison')
            plt.xticks(rotation=45)
            plt.legend(loc='best')
        plt.tight_layout()
        barplot_path = os.path.join("analysis_plots", "barplots_comparison.png")
        plt.savefig(barplot_path, dpi=150)
        plt.show()

        # 2) Swarm plots for each metric (to see distribution)
        plt.figure(figsize=(14, 5 * len(metrics)))
        for i, metric in enumerate(metrics, 1):
            plt.subplot(len(metrics), 1, i)
            sns.swarmplot(data=df_comp, x='algorithm', y=metric, hue='approach', dodge=True)
            plt.title(f'Swarm Plot: {metric.replace("_", " ").title()}')
            plt.xticks(rotation=45)
            plt.legend(loc='best')
        plt.tight_layout()
        swarmplot_path = os.path.join("analysis_plots", "swarmplots_comparison.png")
        plt.savefig(swarmplot_path, dpi=150)
        plt.show()

        # 3) Line plot for each metric (group by approach, each map as x-axis)
        plt.figure(figsize=(14, 5 * len(metrics)))
        for i, metric in enumerate(metrics, 1):
            plt.subplot(len(metrics), 1, i)
            sns.lineplot(data=df_comp, x='map', y=metric, hue='approach', style='algorithm', markers=True)
            plt.title(f'Line Plot: {metric.replace("_", " ").title()} by Map')
            plt.xticks(rotation=45)
            plt.legend(loc='best')
        plt.tight_layout()
        lineplot_path = os.path.join("analysis_plots", "lineplots_comparison.png")
        plt.savefig(lineplot_path, dpi=150)
        plt.show()

        # 4) Violin plot
        plt.figure(figsize=(14, 5 * len(metrics)))
        for i, metric in enumerate(metrics, 1):
            plt.subplot(len(metrics), 1, i)
            sns.violinplot(data=df_comp, x='algorithm', y=metric, hue='approach', split=True)
            plt.title(f'Violin Plot: {metric.replace("_", " ").title()}')
            plt.xticks(rotation=45)
            plt.legend(loc='best')
        plt.tight_layout()
        violinplot_path = os.path.join("analysis_plots", "violinplots_comparison.png")
        plt.savefig(violinplot_path, dpi=150)
        plt.show()

        # 5) Histogram of each metric
        plt.figure(figsize=(14, 4 * len(metrics)))
        for i, metric in enumerate(metrics, 1):
            plt.subplot(len(metrics), 1, i)
            sns.histplot(data=df_comp, x=metric, hue='approach', kde=True, multiple='stack')
            plt.title(f'Histogram of {metric.replace("_", " ").title()}')
            plt.legend(loc='best')
        plt.tight_layout()
        histplot_path = os.path.join("analysis_plots", "histograms_metrics.png")
        plt.savefig(histplot_path, dpi=150)
        plt.show()

        # 6) Pairwise relationships among time, visited_nodes, path_length
        pairplot_data = df_comp[["time", "visited_nodes", "path_length", "approach"]]
        g = sns.pairplot(pairplot_data, hue='approach', corner=True)
        pairplot_path = os.path.join("analysis_plots", "pairwise_relationships.png")
        g.savefig(pairplot_path, dpi=150)
        plt.show()

        # 7) Correlation heatmap among the 3 main metrics
        corr_data = df_comp[["time", "visited_nodes", "path_length"]]
        corr_matrix = corr_data.corr()
        plt.figure(figsize=(5, 4))
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f")
        plt.title('Correlation among Time, Visited Nodes, and Path Length')
        corr_path = os.path.join("analysis_plots", "correlation_heatmap.png")
        plt.savefig(corr_path, dpi=150)
        plt.show()

    @staticmethod
    def display_comparison_table(results_df: pd.DataFrame):
        """
        Display a table that compares each standard algorithm with its ILS counterpart
        side by side AND includes a percentage improvement column.
        Also show an aggregated improvement summary across all maps per algorithm.
        """
        # 1) Parse approach and unify the base algorithm name
        df_comp = results_df.copy()
        df_comp['approach'] = df_comp['algorithm'].apply(
            lambda x: 'ILS' if 'ILS' in x else 'Standard'
        )
        df_comp['algorithm'] = (df_comp['algorithm']
                                .str.replace(' (Standard)', '', regex=False)
                                .str.replace(' (ILS)', '', regex=False))

        # 2) Pivot to get side-by-side columns: time_Standard, time_ILS, etc.
        pivoted = df_comp.pivot_table(
            index=['map', 'algorithm'], 
            columns='approach', 
            values=['time', 'visited_nodes', 'path_length']
        )
        pivoted.columns = ['_'.join(col).strip() for col in pivoted.columns.values]
        pivoted = pivoted.reset_index()

        # 3) Percentage improvement: 
        #    improvement_time% = ((Standard - ILS) / Standard) * 100
        def pct_improvement(std_val, ils_val):
            if std_val == 0:  # avoid division by zero
                return 0
            return ((std_val - ils_val) / std_val) * 100

        pivoted['time_pct_improvement'] = pivoted.apply(
            lambda row: pct_improvement(row.get('time_Standard', 0), 
                                        row.get('time_ILS', 0)), axis=1
        )
        pivoted['visited_nodes_pct_improvement'] = pivoted.apply(
            lambda row: pct_improvement(row.get('visited_nodes_Standard', 0), 
                                        row.get('visited_nodes_ILS', 0)), axis=1
        )
        pivoted['path_length_pct_improvement'] = pivoted.apply(
            lambda row: pct_improvement(row.get('path_length_Standard', 0), 
                                        row.get('path_length_ILS', 0)), axis=1
        )

        print("\n== Metrics Comparison Table (Standard vs. Incremental Line Search) ==\n")
        print(pivoted.to_string(index=False))

        # 4) Summarize average percentage improvement across all maps by algorithm
        improvement_summary = pivoted.groupby('algorithm', as_index=False).agg({
            'time_pct_improvement': 'mean',
            'visited_nodes_pct_improvement': 'mean',
            'path_length_pct_improvement': 'mean'
        })

        print("\n== Average Percentage Improvement Summary (Standard vs. ILS) ==\n")
        print(improvement_summary.to_string(index=False))

        # 5) Save the pivoted table and improvement summary to CSV
        os.makedirs("analysis_plots", exist_ok=True)
        pivoted.to_csv(os.path.join("analysis_plots", "comparison_table.csv"), index=False)
        improvement_summary.to_csv(os.path.join("analysis_plots", "improvement_summary.csv"), index=False)


def create_ils_region(
    grid_map: GridMap,
    start: tuple,
    end: tuple,
    region_width_percentage: float = 0.05  # smaller width
) -> Set[tuple]:
    """
    Create an 'Incremental Line Search' region around Bresenham's line
    from start to end. region_width_percentage determines the width 
    relative to the smallest dimension of the grid.
    """
    line_points = set(Bresenham.get_line(start, end))
    ils_region = set()

    region_width = int(min(grid_map.width, grid_map.height) * region_width_percentage)

    for x, y in line_points:
        for dx in range(-region_width, region_width + 1):
            for dy in range(-region_width, region_width + 1):
                nx, ny = x + dx, y + dy
                if grid_map.is_valid_position(nx, ny):
                    ils_region.add((nx, ny))

    return ils_region


def main():
    grid_folder = 'grid_maps'
    results = []

    if not os.path.exists(grid_folder):
        os.makedirs(grid_folder)
        print(f"Created '{grid_folder}' directory. Please add .png map files and run again.")
        return

    for filename in os.listdir(grid_folder):
        if not filename.lower().endswith('.png'):
            continue

        print(f"\nProcessing map: {filename}")
        filepath = os.path.join(grid_folder, filename)

        try:
            grid_map = GridMap.from_file(filepath)
            pathfinder = PathFinder(grid_map)

            # Define start and end points
            start = (0, 0)
            end = (grid_map.width - 1, grid_map.height - 1)

            # Corridor-like region for Incremental Line Search
            region_width_percentage = 0.05  # Adjust as needed
            ils_region = create_ils_region(grid_map, start, end, region_width_percentage)

            # List of algorithms
            algorithms = [
                ('A*', pathfinder.a_star),
                ('Dijkstra', pathfinder.dijkstra),
                ('BFS', pathfinder.bfs),
                ('DFS', pathfinder.dfs),
                ('Best-First', pathfinder.best_first_search)
            ]

            for algo_name, algo_func in algorithms:
                print(f"Running {algo_name} (Standard)...")
                path_standard, metrics_std = algo_func(start, end)
                metrics_std.update({
                    'algorithm': f"{algo_name} (Standard)",
                    'map': filename,
                    # Additional info for CSV
                    'start_x': start[0],
                    'start_y': start[1],
                    'end_x': end[0],
                    'end_y': end[1],
                    'corridor_width': region_width_percentage,
                })
                results.append(metrics_std)

                print(f"Running {algo_name} (ILS)...")
                path_ils, metrics_ils = algo_func(start, end, ils_region)
                metrics_ils.update({
                    'algorithm': f"{algo_name} (ILS)",
                    'map': filename,
                    # Additional info for CSV
                    'start_x': start[0],
                    'start_y': start[1],
                    'end_x': end[0],
                    'end_y': end[1],
                    'corridor_width': region_width_percentage,
                })
                results.append(metrics_ils)

                # Side by side comparison for Standard vs. ILS
                Visualizer.plot_side_by_side(
                    grid_map.grid,
                    path_standard,
                    path_ils,
                    ils_region,
                    algo_name,
                    filename
                )

        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")
            continue

    if not results:
        print("No results generated. Please check your PNG maps.")
        return

    # Create DataFrame with all results
    results_df = pd.DataFrame(results)

    # Save all raw results to a CSV
    # This now includes start_x, start_y, end_x, end_y, and corridor_width.
    results_df.to_csv('pathfinding_results.csv', index=False)

    # 1) Create extended plots and store them
    Visualizer.plot_comparison_metrics(results_df)

    # 2) Display & save a comparison table with % improvements
    Visualizer.display_comparison_table(results_df)

    print("\nAll results have been saved to 'pathfinding_results.csv', and detailed plots/tables are in 'analysis_plots' folder.")


if __name__ == "__main__":
    main()
