# Genetic Algorithm for Sequential Ordering Problem


In [None]:
import random
import time
import numpy as np
from collections import defaultdict
import os
from pathlib import Path
import multiprocessing
import pandas as pd

Processing 5 instances in parallel...


## The Sequential Ordering Problem

The SOP class handles parsing problem instances from files and provides methods to validate solutions and calculate their costs.


In [None]:
class SOP:
    def __init__(self, file_path):
        """Initialize SOP problem from a file path"""
        self.file_path = file_path
        self.file_name = Path(file_path).name
        self.nodes, self.costs, self.precedence = self.parse_file(file_path)
        self.size = len(self.nodes)
        # Create precedence graph for faster validation
        self.before = defaultdict(set)
        self.after = defaultdict(set)
        self.build_precedence_graph()

    def parse_file(self, file_path):
        """Parse SOP file format"""
        with open(file_path, 'r') as f:
            lines = f.readlines()

        # Extract dimension
        dimension = next(int(line.strip().split(':')[1]) 
                         for line in lines if line.startswith('DIMENSION'))
        
        # Find the start of matrix data
        matrix_start = next(i for i, line in enumerate(lines) 
                           if line.startswith('EDGE_WEIGHT_SECTION')) + 1
        
        # Skip the dimension line if present
        if lines[matrix_start].strip().isdigit():
            matrix_start += 1
        
        # Parse the matrix
        matrix_data = []
        for i in range(matrix_start, matrix_start + dimension):
            row = list(map(int, lines[i].strip().split()))
            matrix_data.append(row)
        
        # Convert to numpy array for faster operations
        costs_matrix = np.array(matrix_data)
        
        # Extract nodes and costs
        nodes = list(range(1, dimension + 1))
        costs = {}
        for i in range(dimension):
            for j in range(dimension):
                if costs_matrix[i, j] != -1:
                    costs[(i+1, j+1)] = costs_matrix[i, j]
        
        # Extract precedence constraints (wherever there's a -1)
        precedence = []
        for i in range(dimension):
            for j in range(dimension):
                if costs_matrix[i, j] == -1 and i != j:
                    precedence.append((j+1, i+1))  # j must come before i
        
        return nodes, costs, precedence

    def build_precedence_graph(self):
        """Build precedence graph for faster validation"""
        for u, v in self.precedence:
            self.before[v].add(u)
            self.after[u].add(v)
        
        # Transitive closure - if a→b and b→c then a→c
        changed = True
        while changed:
            changed = False
            for node in self.nodes:
                old_before = self.before[node].copy()
                for b in old_before:
                    self.before[node].update(self.before[b])
                if len(self.before[node]) > len(old_before):
                    changed = True
                    
                old_after = self.after[node].copy()
                for a in old_after:
                    self.after[node].update(self.after[a])
                if len(self.after[node]) > len(old_after):
                    changed = True

    def is_valid(self, path):
        """Check if a path is valid according to precedence constraints"""
        position = {node: i for i, node in enumerate(path)}
        
        # Check precedence constraints
        for i, node in enumerate(path):
            for before_node in self.before[node]:
                if before_node in position and position[before_node] >= i:
                    return False
                    
        # Check if costs exist between adjacent nodes
        for i in range(len(path) - 1):
            if (path[i], path[i+1]) not in self.costs:
                return False
                
        return True

    def total_cost(self, path):
        """Calculate total cost of a path"""
        return sum(self.costs.get((path[i], path[i+1]), float('inf')) 
                  for i in range(len(path) - 1))

## Genetic Algorithm Implementation

Our genetic algorithm implementation includes methods for creating an initial population, selection, crossover, mutation, and evolution.


In [None]:
class GeneticSolver:
    def __init__(self, sop, pop_size=50, generations=100, 
                 mutation_rate=0.15, elite_size=5):
        self.sop = sop
        self.pop_size = pop_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.elite_size = elite_size
        self.population = []
        self.best_solution = None
        self.best_cost = float('inf')
        self.generation_stats = []  # Track progress

    def create_initial_population(self):
        """Create initial population using topological sorting"""
        self.population = []
        
        # Get mandatory start and end nodes
        start_candidates = [n for n in self.sop.nodes if not self.sop.before[n]]
        end_candidates = [n for n in self.sop.nodes if not self.sop.after[n]]
        
        start_node = start_candidates[0] if start_candidates else 1
        end_node = end_candidates[0] if end_candidates else self.sop.size
        
        while len(self.population) < self.pop_size:
            path = self.generate_valid_path(start_node, end_node)
            if path:
                self.population.append(path)
                cost = self.sop.total_cost(path)
                if cost < self.best_cost:
                    self.best_cost = cost
                    self.best_solution = path.copy()

    def generate_valid_path(self, start_node, end_node):
        """Generate a valid path using topological sort with randomization"""
        path = [start_node]
        remaining = set(self.sop.nodes) - {start_node, end_node}
        
        current_node = start_node
        max_attempts = 100
        attempts = 0
        
        while remaining and attempts < max_attempts:
            # Find valid next nodes
            candidates = [n for n in remaining if 
                         self.sop.before[n].issubset(set(path)) and
                         (current_node, n) in self.sop.costs]
            
            if not candidates:
                # Backtrack or restart
                attempts += 1
                if attempts >= max_attempts:
                    return None
                path = [start_node]
                remaining = set(self.sop.nodes) - {start_node, end_node}
                current_node = start_node
                continue
                
            # Choose random next node with preference for lower cost
            costs = [self.sop.costs.get((current_node, n), float('inf')) for n in candidates]
            # Inverse costs for weighted selection
            weights = [1/(c+1) for c in costs]
            total = sum(weights)
            weights = [w/total for w in weights]
            
            next_node = random.choices(candidates, weights=weights, k=1)[0]
            path.append(next_node)
            remaining.remove(next_node)
            current_node = next_node
            
        if not remaining:
            path.append(end_node)
            return path
        return None

    def select_parents(self):
        """Tournament selection for parents"""
        # Calculate fitness (inverse of cost)
        costs = [self.sop.total_cost(path) for path in self.population]
        fitness = [1/(1+cost) for cost in costs]
        
        # Select two parents using tournament selection
        def tournament():
            candidates = random.sample(range(self.pop_size), 3)
            return self.population[max(candidates, key=lambda i: fitness[i])]
            
        return tournament(), tournament()

    def ordered_crossover(self, parent1, parent2):
        """Ordered crossover preserving relative order and precedence"""
        size = len(parent1)
        
        # Select a random segment
        start, end = sorted(random.sample(range(size), 2))
        
        # Create child with segment from parent1
        child = [None] * size
        child[start:end+1] = parent1[start:end+1]
        
        # Fill remaining positions with genes from parent2
        remaining = [gene for gene in parent2 if gene not in child]
        
        # Fill before segment
        idx = 0
        for i in range(start):
            child[i] = remaining[idx]
            idx += 1
            
        # Fill after segment
        for i in range(end+1, size):
            child[i] = remaining[idx]
            idx += 1
        
        # Repair if needed
        if not self.sop.is_valid(child):
            child = self.repair(child)
            
        return child

    def repair(self, path):
        """Repair a path to satisfy precedence constraints"""
        valid_path = []
        
        # Reconstruct topological order based on precedence
        candidates = set(path)
        while candidates:
            # Find nodes with no remaining predecessors
            ready = [n for n in candidates 
                    if all(pred not in candidates for pred in self.sop.before[n])]
            
            if not ready:
                # Circular dependency detected - break it arbitrarily
                ready = [next(iter(candidates))]
                
            # Select node by original order in path
            next_node = min(ready, key=lambda n: path.index(n))
            valid_path.append(next_node)
            candidates.remove(next_node)
            
        return valid_path

    def mutate(self, path):
        """Swap mutation with precedence constraint check"""
        if random.random() > self.mutation_rate:
            return path
            
        # Make a copy to avoid modifying the original
        mutated = path.copy()
        
        # Try to find a valid swap
        max_attempts = 20
        for _ in range(max_attempts):
            i, j = sorted(random.sample(range(len(path)), 2))
            
            # Try the swap
            mutated[i], mutated[j] = mutated[j], mutated[i]
            
            # Check if valid
            if self.sop.is_valid(mutated):
                return mutated
                
            # Revert if invalid
            mutated[i], mutated[j] = mutated[j], mutated[i]
            
        return path  # Return original if no valid mutation found

    def evolve(self):
        """Evolve the population"""
        new_population = []
        
        # Sort population by fitness (ascending cost)
        sorted_pop = sorted(self.population, 
                          key=lambda p: self.sop.total_cost(p))
        
        # Elitism - keep best solutions
        new_population.extend(sorted_pop[:self.elite_size])
        
        # Fill the rest with crossover and mutation
        while len(new_population) < self.pop_size:
            parent1, parent2 = self.select_parents()
            child = self.ordered_crossover(parent1, parent2)
            child = self.mutate(child)
            new_population.append(child)
            
        self.population = new_population
        
        # Update best solution
        current_best = min(self.population, key=self.sop.total_cost)
        current_cost = self.sop.total_cost(current_best)
        
        if current_cost < self.best_cost:
            self.best_cost = current_cost
            self.best_solution = current_best.copy()
            
        # Record statistics
        self.generation_stats.append({
            'generation': len(self.generation_stats),
            'best_cost': self.best_cost,
            'avg_cost': np.mean([self.sop.total_cost(p) for p in self.population]),
            'min_cost': current_cost
        })

    def solve(self, verbose=True):
        """Main solving method"""
        start_time = time.time()
        
        self.create_initial_population()
        
        for gen in range(self.generations):
            self.evolve()
            
            # Print progress periodically
            if verbose and gen % 10 == 0:
                print(f"Generation {gen}/{self.generations}, " 
                      f"Best cost: {self.best_cost}")
                
        end_time = time.time()
        
        if verbose:
            print(f"\nSolution for {self.sop.file_name}:")
            print(f"Path: {self.best_solution}")
            print(f"Cost: {self.best_cost}")
            print(f"Time: {end_time - start_time:.2f} seconds")
        
        return {
            'instance': self.sop.file_name,
            'solution': self.best_solution,
            'cost': self.best_cost,
            'time': end_time - start_time,
            'generations': self.generations,
            'stats': self.generation_stats
        }