# Intelligent Systems - Assignment 1: Graph Pathfinding with Genetic Algorithms

## Task 1: Shortest Path Between Two Nodes

This notebook implements a genetic algorithm to find the shortest path between two nodes in a directed graph.

### Authors: [Your Name]
### Date: November 2025

## 1. Setup and Imports

First, let's import all necessary libraries and set up the environment.

In [None]:
# Install required packages if not already installed
import subprocess
import sys

def install_package(package):
    try:
        __import__(package)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Install required packages
install_package('pygad')
install_package('networkx')
install_package('matplotlib')
install_package('numpy')

print("All packages installed successfully!")

In [None]:
# Import necessary libraries
import pygad
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import random
import os
from typing import List, Tuple, Dict, Optional
import time
import pandas as pd
from collections import defaultdict

# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)

print("Libraries imported successfully!")
print(f"PyGAD version: {pygad.__version__}")
print(f"NetworkX version: {nx.__version__}")
print(f"NumPy version: {np.__version__}")

## 2. Graph Representation and File I/O

Let's create a system to read graphs from .txt files in the specified format and represent them using NetworkX.

In [None]:
class GraphManager:
    """Class to manage graph operations including file I/O and graph representation."""

    def __init__(self):
        self.graph = None
        self.num_nodes = 0
        self.node_list = []

    def read_graph_from_file(self, filename: str) -> nx.DiGraph:
        """
        Read a graph from a text file in the specified format:
        First line: number of nodes
        Following lines: node_a node_b distance
        """
        self.graph = nx.DiGraph()

        try:
            with open(filename, 'r') as file:
                lines = file.readlines()

                # Read number of nodes
                self.num_nodes = int(lines[0].strip())

                # Add nodes to graph
                for i in range(1, self.num_nodes + 1):
                    self.graph.add_node(i)

                # Read edges
                for line in lines[1:]:
                    if line.strip():  # Skip empty lines
                        parts = line.strip().split()
                        if len(parts) >= 3:
                            node_a, node_b, distance = int(parts[0]), int(parts[1]), float(parts[2])
                            self.graph.add_edge(node_a, node_b, weight=distance)

                self.node_list = list(self.graph.nodes())
                print(f"Graph loaded successfully: {self.num_nodes} nodes, {self.graph.number_of_edges()} edges")

        except FileNotFoundError:
            print(f"Error: File {filename} not found.")
            return None
        except Exception as e:
            print(f"Error reading graph: {e}")
            return None

        return self.graph

    def save_graph_to_file(self, graph: nx.DiGraph, filename: str):
        """
        Save a graph to a text file in the specified format.
        """
        with open(filename, 'w') as file:
            # Write number of nodes
            file.write(f"{graph.number_of_nodes()}\n")

            # Write edges
            for edge in graph.edges(data=True):
                node_a, node_b, data = edge
                weight = data.get('weight', 1)
                file.write(f"{node_a} {node_b} {weight}\n")

        print(f"Graph saved to {filename}")

    def visualize_graph(self, highlight_path: List[int] = None, title: str = "Graph Visualization"):
        """
        Visualize the graph with optional path highlighting.
        """
        if self.graph is None:
            print("No graph loaded.")
            return

        plt.figure(figsize=(12, 8))
        pos = nx.spring_layout(self.graph, seed=42)

        # Draw all edges in light gray
        nx.draw_networkx_edges(self.graph, pos, edge_color='lightgray', arrows=True, arrowsize=20)

        # Highlight path if provided
        if highlight_path and len(highlight_path) > 1:
            path_edges = [(highlight_path[i], highlight_path[i+1]) for i in range(len(highlight_path)-1)]
            valid_path_edges = [(u, v) for u, v in path_edges if self.graph.has_edge(u, v)]
            nx.draw_networkx_edges(self.graph, pos, edgelist=valid_path_edges,
                                 edge_color='red', width=3, arrows=True, arrowsize=20)

        # Draw nodes
        nx.draw_networkx_nodes(self.graph, pos, node_color='lightblue',
                             node_size=500, alpha=0.9)

        # Draw labels
        nx.draw_networkx_labels(self.graph, pos, font_size=12, font_weight='bold')

        # Draw edge labels (weights)
        edge_labels = nx.get_edge_attributes(self.graph, 'weight')
        nx.draw_networkx_edge_labels(self.graph, pos, edge_labels, font_size=10)

        plt.title(title, fontsize=16, fontweight='bold')
        plt.axis('off')
        plt.tight_layout()
        plt.show()

    def get_shortest_path_dijkstra(self, start: int, end: int) -> Tuple[List[int], float]:
        """
        Get the shortest path using Dijkstra's algorithm for comparison.
        """
        try:
            path = nx.shortest_path(self.graph, start, end, weight='weight')
            distance = nx.shortest_path_length(self.graph, start, end, weight='weight')
            return path, distance
        except nx.NetworkXNoPath:
            return [], float('inf')
        except Exception as e:
            print(f"Error finding shortest path: {e}")
            return [], float('inf')

# Test the GraphManager class
graph_manager = GraphManager()
print("GraphManager class created successfully!")

## 3. Test Graph Generation

Let's create test graphs of different sizes to test our genetic algorithm.

In [None]:
def generate_test_graph(num_nodes: int, edge_probability: float = 0.3,
                       min_weight: int = 1, max_weight: int = 10) -> nx.DiGraph:
    """
    Generate a random directed graph for testing.
    """
    graph = nx.DiGraph()

    # Add nodes
    for i in range(1, num_nodes + 1):
        graph.add_node(i)

    # Add random edges
    for i in range(1, num_nodes + 1):
        for j in range(1, num_nodes + 1):
            if i != j and random.random() < edge_probability:
                weight = random.randint(min_weight, max_weight)
                graph.add_edge(i, j, weight=weight)

    # Ensure the graph is connected by adding a spanning tree
    nodes = list(range(1, num_nodes + 1))
    random.shuffle(nodes)

    for i in range(len(nodes) - 1):
        if not graph.has_edge(nodes[i], nodes[i + 1]):
            weight = random.randint(min_weight, max_weight)
            graph.add_edge(nodes[i], nodes[i + 1], weight=weight)

    return graph

# Create test graphs directory
os.makedirs('test_graphs', exist_ok=True)

# Generate and save test graphs
test_graphs = {
    'small': generate_test_graph(5, 0.4, 1, 10),
    'medium': generate_test_graph(20, 0.2, 1, 15),
    'large': generate_test_graph(50, 0.1, 1, 20)
}

# Save test graphs
for size, graph in test_graphs.items():
    filename = f'test_graphs/{size}_graph.txt'
    graph_manager.save_graph_to_file(graph, filename)
    print(f"{size.capitalize()} graph: {graph.number_of_nodes()} nodes, {graph.number_of_edges()} edges")

print("\nTest graphs generated and saved successfully!")

## 4. Genetic Algorithm Implementation

Now let's implement the genetic algorithm for finding the shortest path.

In [None]:
class PathFindingGA:
    """
    Genetic Algorithm implementation for shortest path finding.
    """

    def __init__(self, graph: nx.DiGraph, start_node: int, end_node: int, max_path_length: int = None):
        self.graph = graph
        self.start_node = start_node
        self.end_node = end_node
        self.nodes = list(graph.nodes())
        self.num_nodes = len(self.nodes)

        # Set maximum path length (chromosome length)
        if max_path_length is None:
            self.max_path_length = min(self.num_nodes * 2, 20)  # Reasonable upper bound
        else:
            self.max_path_length = max_path_length

        # Gene space: all nodes plus a special value (-1) for unused positions
        self.gene_space = self.nodes + [-1]

        # Best solution tracking
        self.best_solution = None
        self.best_fitness = float('-inf')
        self.fitness_history = []

    def decode_chromosome(self, chromosome: List[int]) -> List[int]:
        """
        Decode chromosome to a valid path.
        Remove -1 values and ensure path starts with start_node and ends with end_node.
        """
        # Remove -1 values (unused genes)
        path = [gene for gene in chromosome if gene != -1]

        # Ensure path starts with start_node
        if not path or path[0] != self.start_node:
            path = [self.start_node] + [node for node in path if node != self.start_node]

        # Ensure path ends with end_node
        if not path or path[-1] != self.end_node:
            if self.end_node in path:
                # Remove end_node from middle and add to end
                path = [node for node in path if node != self.end_node] + [self.end_node]
            else:
                path.append(self.end_node)

        return path

    def calculate_path_distance(self, path: List[int]) -> float:
        """
        Calculate the total distance of a path.
        Return infinity if path is invalid.
        """
        if len(path) < 2:
            return float('inf')

        total_distance = 0
        for i in range(len(path) - 1):
            if self.graph.has_edge(path[i], path[i + 1]):
                total_distance += self.graph[path[i]][path[i + 1]]['weight']
            else:
                return float('inf')  # Invalid path

        return total_distance

    def fitness_function(self, ga_instance, solution, solution_idx):
        """
        Fitness function for the genetic algorithm.
        Higher fitness = better solution (shorter path).
        """
        path = self.decode_chromosome(solution)

        # Check if path starts and ends correctly
        if len(path) < 2 or path[0] != self.start_node or path[-1] != self.end_node:
            return -1000  # Heavy penalty for invalid start/end

        distance = self.calculate_path_distance(path)

        if distance == float('inf'):
            return -1000  # Heavy penalty for invalid path

        # Fitness is inverse of distance (shorter path = higher fitness)
        # Add small penalty for longer paths to encourage shorter solutions
        length_penalty = len(path) * 0.1
        fitness = 1000 / (distance + length_penalty + 1)

        # Track best solution
        if fitness > self.best_fitness:
            self.best_fitness = fitness
            self.best_solution = path.copy()

        return fitness

    def generate_initial_population(self, population_size: int) -> List[List[int]]:
        """
        Generate initial population with some heuristic guidance.
        """
        population = []

        for _ in range(population_size):
            chromosome = []

            # Start with start_node
            chromosome.append(self.start_node)

            # Add random intermediate nodes
            num_intermediate = random.randint(0, self.max_path_length - 2)
            for _ in range(num_intermediate):
                chromosome.append(random.choice(self.nodes))

            # End with end_node
            chromosome.append(self.end_node)

            # Pad with -1 to reach max_path_length
            while len(chromosome) < self.max_path_length:
                chromosome.append(-1)

            # Truncate if too long
            chromosome = chromosome[:self.max_path_length]

            population.append(chromosome)

        return population

    def custom_mutation(self, offspring, ga_instance):
        """
        Custom mutation function that maintains path structure.
        """
        for chromosome_idx in range(offspring.shape[0]):
            if random.random() < ga_instance.mutation_probability:
                # Choose mutation type
                mutation_type = random.choice(['replace', 'insert', 'remove'])

                if mutation_type == 'replace':
                    # Replace a random gene (except first and last if they are start/end)
                    gene_idx = random.randint(1, len(offspring[chromosome_idx]) - 2)
                    offspring[chromosome_idx][gene_idx] = random.choice(self.gene_space)

                elif mutation_type == 'insert':
                    # Insert a new node
                    gene_idx = random.randint(1, len(offspring[chromosome_idx]) - 1)
                    if offspring[chromosome_idx][gene_idx] == -1:
                        offspring[chromosome_idx][gene_idx] = random.choice(self.nodes)

                elif mutation_type == 'remove':
                    # Remove a node (set to -1)
                    gene_idx = random.randint(1, len(offspring[chromosome_idx]) - 2)
                    if offspring[chromosome_idx][gene_idx] != self.start_node and offspring[chromosome_idx][gene_idx] != self.end_node:
                        offspring[chromosome_idx][gene_idx] = -1

        return offspring

    def custom_crossover(self, parents, offspring_size, ga_instance):
        """
        Custom crossover function for path chromosomes.
        """
        offspring = []

        for _ in range(offspring_size[0]):
            # Select two parents
            parent1_idx = random.randint(0, parents.shape[0] - 1)
            parent2_idx = random.randint(0, parents.shape[0] - 1)

            parent1 = parents[parent1_idx]
            parent2 = parents[parent2_idx]

            # Create offspring by combining parts of both parents
            crossover_point = random.randint(1, len(parent1) - 1)

            child = np.concatenate([parent1[:crossover_point], parent2[crossover_point:]])

            # Ensure start and end nodes are correct
            child[0] = self.start_node
            # Find a good position for end_node
            end_pos = len(child) - 1
            while end_pos > 0 and child[end_pos] == -1:
                end_pos -= 1
            if end_pos < len(child) - 1:
                child[end_pos + 1] = self.end_node
            else:
                child[end_pos] = self.end_node

            offspring.append(child)

        return np.array(offspring)

    def on_generation(self, ga_instance):
        """
        Callback function called after each generation.
        """
        best_fitness = ga_instance.best_solution()[1]
        self.fitness_history.append(best_fitness)

        if ga_instance.generations_completed % 10 == 0:
            print(f"Generation {ga_instance.generations_completed}: Best fitness = {best_fitness:.4f}")
            if self.best_solution:
                distance = self.calculate_path_distance(self.best_solution)
                print(f"Best path: {self.best_solution}, Distance: {distance}")

print("PathFindingGA class created successfully!")