In [1]:
import random
import numpy as np
from collections import defaultdict, deque

class GraphColoringProblem:
    def __init__(self, adjacency_matrix, pre_assigned_colors=None, k_beam_size=10, max_iterations=100):
        """
        Initialize the graph coloring problem.
        
        Parameters:
        - adjacency_matrix: 2D array representing graph connections
        - pre_assigned_colors: Dictionary mapping vertex indices to fixed colors
        - k_beam_size: Number of states to maintain in the beam
        - max_iterations: Maximum number of iterations to run
        """
        self.adjacency_matrix = adjacency_matrix
        self.num_vertices = len(adjacency_matrix)
        self.pre_assigned_colors = pre_assigned_colors or {}
        self.k_beam_size = k_beam_size
        self.max_iterations = max_iterations
        
        # Determine vertices affected by distance constraint (here we'll select high degree vertices)
        self.distance_constrained_vertices = self._select_distance_constrained_vertices()
        
        # Calculate vertex degrees for prioritizing high-degree vertices
        self.vertex_degrees = [sum(row) for row in adjacency_matrix]

    def _select_distance_constrained_vertices(self):
        """Select vertices with highest degrees for distance constraint"""
        degrees = [(i, sum(self.adjacency_matrix[i])) for i in range(self.num_vertices)]
        # Select top 20% of vertices by degree
        top_vertices = sorted(degrees, key=lambda x: x[1], reverse=True)[:max(1, self.num_vertices // 5)]
        return [v[0] for v in top_vertices]
    
    def _get_vertices_within_two_hops(self, vertex):
        """Find all vertices that are within two hops from the given vertex"""
        visited = set()
        queue = deque([(vertex, 0)])  # (vertex, distance)
        visited.add(vertex)
        two_hop_neighbors = set()
        
        while queue:
            current, distance = queue.popleft()
            
            if distance <= 2:
                if distance > 0:  # Don't include the starting vertex
                    two_hop_neighbors.add(current)
                
                if distance < 2:  # Only explore further if we're not yet at 2 hops
                    for neighbor, connected in enumerate(self.adjacency_matrix[current]):
                        if connected and neighbor not in visited:
                            visited.add(neighbor)
                            queue.append((neighbor, distance + 1))
        
        return two_hop_neighbors

    def generate_initial_state(self, max_colors):
        """Generate a random valid initial coloring"""
        colors = [0] * self.num_vertices
        
        # Assign pre-assigned colors first
        for vertex, color in self.pre_assigned_colors.items():
            colors[vertex] = color
        
        # Color remaining vertices randomly while respecting constraints
        for vertex in range(self.num_vertices):
            if vertex not in self.pre_assigned_colors:
                # Get unavailable colors from neighbors
                unavailable_colors = set()
                
                # Check direct neighbors (standard constraint)
                for neighbor, connected in enumerate(self.adjacency_matrix[vertex]):
                    if connected and colors[neighbor] != 0:
                        unavailable_colors.add(colors[neighbor])
                
                # Check two-hop neighbors if vertex is distance constrained
                if vertex in self.distance_constrained_vertices:
                    two_hop_neighbors = self._get_vertices_within_two_hops(vertex)
                    for neighbor in two_hop_neighbors:
                        if colors[neighbor] != 0:
                            unavailable_colors.add(colors[neighbor])
                
                # Assign smallest available color
                color = 1
                while color in unavailable_colors and color <= max_colors:
                    color += 1
                colors[vertex] = color
        
        return colors
    
    def generate_successors(self, state, max_colors):
        """Generate successor states by changing one vertex's color"""
        successors = []
        
        # Sort vertices by degree (high to low) for prioritizing high-degree vertices
        vertices = sorted(range(self.num_vertices), key=lambda v: self.vertex_degrees[v], reverse=True)
        
        for vertex in vertices:
            # Skip pre-assigned colors
            if vertex in self.pre_assigned_colors:
                continue
            
            current_color = state[vertex]
            
            # Try each possible color
            for color in range(1, max_colors + 1):
                if color != current_color:
                    new_state = state.copy()
                    new_state[vertex] = color
                    
                    if self.is_valid_state(new_state, vertex):
                        successors.append(new_state)
        
        return successors
    
    def is_valid_state(self, state, changed_vertex):
        """Check if the state is valid after changing the color of changed_vertex"""
        color = state[changed_vertex]
        
        # Check direct neighbors
        for neighbor, connected in enumerate(self.adjacency_matrix[changed_vertex]):
            if connected and state[neighbor] == color:
                return False
        
        # Check two-hop neighbors if vertex is distance constrained
        if changed_vertex in self.distance_constrained_vertices:
            two_hop_neighbors = self._get_vertices_within_two_hops(changed_vertex)
            for neighbor in two_hop_neighbors:
                if state[neighbor] == color:
                    return False
        
        return True
    
    def calculate_heuristic(self, state):
        """Calculate the heuristic value (lower is better)"""
        # Count number of conflicts
        conflicts = 0
        
        # Check standard constraint violations
        for i in range(self.num_vertices):
            for j in range(i + 1, self.num_vertices):
                if self.adjacency_matrix[i][j] and state[i] == state[j]:
                    conflicts += 1
        
        # Check distance constraint violations
        for vertex in self.distance_constrained_vertices:
            two_hop_neighbors = self._get_vertices_within_two_hops(vertex)
            for neighbor in two_hop_neighbors:
                if state[vertex] == state[neighbor]:
                    conflicts += 1
        
        # Count number of colors used
        num_colors_used = len(set(state))
        
        # Calculate color distribution imbalance
        color_counts = defaultdict(int)
        for color in state:
            color_counts[color] += 1
        
        avg_vertices_per_color = self.num_vertices / num_colors_used
        imbalance = sum(abs(count - avg_vertices_per_color) for count in color_counts.values())
        
        # Combined heuristic: conflicts have highest weight, then imbalance, then number of colors
        return 100 * conflicts + imbalance + 10 * num_colors_used
    
    def local_beam_search(self, max_colors):
        """Perform local beam search to find optimal coloring"""
        # Generate k initial states
        current_states = [self.generate_initial_state(max_colors) for _ in range(self.k_beam_size)]
        
        for iteration in range(self.max_iterations):
            # Generate all successors
            all_successors = []
            for state in current_states:
                successors = self.generate_successors(state, max_colors)
                all_successors.extend(successors)
            
            if not all_successors:
                break
            
            # Evaluate successors
            successor_scores = [(successor, self.calculate_heuristic(successor)) for successor in all_successors]
            
            # Select k best successors
            successor_scores.sort(key=lambda x: x[1])
            current_states = [state for state, _ in successor_scores[:self.k_beam_size]]
            
            # Check if we've found a solution with no conflicts
            best_state, best_score = successor_scores[0]
            if best_score < 100:  # No conflicts
                return best_state, best_score, iteration + 1
        
        # Return the best state found so far
        best_state = min(current_states, key=self.calculate_heuristic)
        return best_state, self.calculate_heuristic(best_state), self.max_iterations
    
    def read_graph_from_file(filepath):
        """Read graph from file"""
        with open(filepath, 'r') as f:
            lines = f.readlines()
            
        # Parse the number of vertices
        num_vertices = int(lines[0].strip())
        
        # Initialize adjacency matrix
        adjacency_matrix = [[0 for _ in range(num_vertices)] for _ in range(num_vertices)]
        
        # Parse edges
        for i in range(1, len(lines)):
            line = lines[i].strip()
            if not line or line.startswith('#'):
                continue
                
            parts = line.split()
            if len(parts) >= 2:
                try:
                    v1, v2 = int(parts[0]), int(parts[1])
                    adjacency_matrix[v1][v2] = 1
                    adjacency_matrix[v2][v1] = 1  # For undirected graph
                except ValueError:
                    continue
        
        return adjacency_matrix

# Example usage
if __name__ == "__main__":
    # Example adjacency matrix for a small graph
    adj_matrix = [
        [0, 1, 1, 0, 0],
        [1, 0, 1, 1, 0],
        [1, 1, 0, 1, 1],
        [0, 1, 1, 0, 1],
        [0, 0, 1, 1, 0]
    ]
    
    # Pre-assigned colors (vertex -> color)
    pre_assigned = {0: 1}  # Vertex 0 has color 1
    
    problem = GraphColoringProblem(adj_matrix, pre_assigned, k_beam_size=10)
    solution, score, iterations = problem.local_beam_search(max_colors=4)
    
    print(f"Solution found after {iterations} iterations:")
    print(f"Colors: {solution}")
    print(f"Score: {score}")
    print(f"Number of colors used: {len(set(solution))}")
    
    # You can also read from file:
    # adj_matrix = GraphColoringProblem.read_graph_from_file("hypercube_dataset.txt")
    # problem = GraphColoringProblem(adj_matrix, k_beam_size=20)
    # solution, score, iterations = problem.local_beam_search(max_colors=10)

Solution found after 1 iterations:
Colors: [1, 2, 4, 1, 2]
Score: 31.333333333333332
Number of colors used: 3


In [3]:
import random
import numpy as np
import pandas as pd
from collections import defaultdict

class Product:
    def __init__(self, id, name, weight, category, perishable=False, high_demand=False, 
                 hazardous=False, promotional=False, high_value=False, bulky=False):
        self.id = id
        self.name = name
        self.weight = weight
        self.category = category
        self.perishable = perishable
        self.high_demand = high_demand
        self.hazardous = hazardous
        self.promotional = promotional
        self.high_value = high_value
        self.bulky = bulky
        
    def __repr__(self):
        return f"{self.name} ({self.weight}kg)"

class Shelf:
    def __init__(self, id, name, capacity, is_refrigerated=False, is_eye_level=False, 
                 is_lower=False, is_hazardous_zone=False, is_secure=False, is_high_visibility=False):
        self.id = id
        self.name = name
        self.capacity = capacity
        self.is_refrigerated = is_refrigerated
        self.is_eye_level = is_eye_level
        self.is_lower = is_lower
        self.is_hazardous_zone = is_hazardous_zone
        self.is_secure = is_secure
        self.is_high_visibility = is_high_visibility
        
    def __repr__(self):
        return f"{self.name} ({self.capacity}kg)"

class ShelfOptimizationGA:
    def __init__(self, products, shelves, population_size=100, max_generations=500, 
                 mutation_rate=0.1, crossover_rate=0.8):
        self.products = products
        self.shelves = shelves
        self.population_size = population_size
        self.max_generations = max_generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        
        # Dictionary to store complementary products
        self.complementary_pairs = self._define_complementary_pairs()
        
    def _define_complementary_pairs(self):
        """Define pairs of products that complement each other"""
        # This is a simple example; in reality, this would be more complex
        pairs = []
        
        # Find products by category
        pasta_products = [p for p in self.products if "Pasta" in p.name]
        pasta_sauce_products = [p for p in self.products if "Sauce" in p.name]
        
        # Create complementary pairs
        for pasta in pasta_products:
            for sauce in pasta_sauce_products:
                pairs.append((pasta.id, sauce.id))
                
        return pairs
    
    def generate_individual(self):
        """Generate a random shelf assignment"""
        assignment = {}
        for product in self.products:
            # Randomly assign each product to a shelf
            valid_shelves = []
            
            for shelf in self.shelves:
                # Check basic constraints
                if product.perishable and not shelf.is_refrigerated:
                    continue
                if product.hazardous and not shelf.is_hazardous_zone:
                    continue
                
                valid_shelves.append(shelf.id)
            
            if valid_shelves:
                assignment[product.id] = random.choice(valid_shelves)
            else:
                # If no valid shelf found, assign randomly (will be penalized in fitness function)
                assignment[product.id] = random.choice([s.id for s in self.shelves])
                
        return assignment
    
    def generate_initial_population(self):
        """Generate initial population of shelf assignments"""
        return [self.generate_individual() for _ in range(self.population_size)]
    
    def calculate_fitness(self, individual):
        """Calculate fitness of an individual (lower is better)"""
        # Start with perfect fitness (0 penalties)
        fitness = 0
        
        # Check shelf capacity constraint
        shelf_weights = defaultdict(float)
        for product_id, shelf_id in individual.items():
            product = next(p for p in self.products if p.id == product_id)
            shelf_weights[shelf_id] += product.weight
        
        for shelf_id, weight in shelf_weights.items():
            shelf = next(s for s in self.shelves if s.id == shelf_id)
            if weight > shelf.capacity:
                # Penalize exceeding shelf capacity
                fitness += 50 * (weight - shelf.capacity)
        
        # Check accessibility of high-demand products
        for product in self.products:
            if product.high_demand:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_eye_level:
                    # Penalize high-demand products not on eye-level shelves
                    fitness += 20
        
        # Check perishable items
        for product in self.products:
            if product.perishable:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_refrigerated:
                    # Heavily penalize perishable items not in refrigeration
                    fitness += 100
        
        # Check hazardous items
        for product in self.products:
            if product.hazardous:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_hazardous_zone:
                    # Heavily penalize hazardous items not in hazardous zone
                    fitness += 100
        
        # Check product compatibility and cross-selling
        for product1_id, product2_id in self.complementary_pairs:
            shelf1_id = individual[product1_id]
            shelf2_id = individual[product2_id]
            if shelf1_id != shelf2_id:
                # Penalize complementary products not on the same shelf
                fitness += 10
        
        # Check restocking efficiency
        for product in self.products:
            if product.bulky:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_lower:
                    # Penalize bulky items not on lower shelves
                    fitness += 15
        
        # Check refrigeration efficiency
        refrigerated_shelves = [s.id for s in self.shelves if s.is_refrigerated]
        perishable_products = [p for p in self.products if p.perishable]
        
        for shelf_id in refrigerated_shelves:
            # Count products on this shelf
            products_on_shelf = [p for p in perishable_products if individual[p.id] == shelf_id]
            non_perishable_on_shelf = [product_id for product_id, s_id in individual.items() 
                                       if s_id == shelf_id and not next(p for p in self.products if p.id == product_id).perishable]
            
            # Penalize refrigerated shelves not fully utilized by perishable items
            if non_perishable_on_shelf:
                fitness += 10 * len(non_perishable_on_shelf)
        
        # Check promotional items visibility
        for product in self.products:
            if product.promotional:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_high_visibility:
                    # Penalize promotional items not in high-visibility areas
                    fitness += 25
        
        # Check theft prevention
        for product in self.products:
            if product.high_value:
                shelf_id = individual[product.id]
                shelf = next(s for s in self.shelves if s.id == shelf_id)
                if not shelf.is_secure:
                    # Penalize high-value items not in secure areas
                    fitness += 40
        
        return fitness
    
    def select_parents(self, population, fitnesses):
        """Select parents using tournament selection"""
        tournament_size = 3
        selected_parents = []
        
        for _ in range(2):  # Select 2 parents
            tournament_indices = random.sample(range(len(population)), tournament_size)
            tournament_fitness = [fitnesses[i] for i in tournament_indices]
            winner_idx = tournament_indices[tournament_fitness.index(min(tournament_fitness))]
            selected_parents.append(population[winner_idx])
            
        return selected_parents
    
    def crossover(self, parent1, parent2):
        """Perform crossover between two parents"""
        if random.random() > self.crossover_rate:
            return parent1.copy(), parent2.copy()
            
        # Single-point crossover
        crossover_point = random.randint(1, len(self.products) - 1)
        
        child1 = {}
        child2 = {}
        
        product_ids = list(parent1.keys())
        
        for i, product_id in enumerate(product_ids):
            if i < crossover_point:
                child1[product_id] = parent1[product_id]
                child2[product_id] = parent2[product_id]
            else:
                child1[product_id] = parent2[product_id]
                child2[product_id] = parent1[product_id]
                
        return child1, child2
    
    def mutate(self, individual):
        """Randomly mutate an individual"""
        mutated = individual.copy()
        
        for product_id in mutated:
            if random.random() < self.mutation_rate:
                # Find valid shelves for this product
                product = next(p for p in self.products if p.id == product_id)
                valid_shelves = []
                
                for shelf in self.shelves:
                    # Check basic constraints
                    if product.perishable and not shelf.is_refrigerated:
                        continue
                    if product.hazardous and not shelf.is_hazardous_zone:
                        continue
                    
                    valid_shelves.append(shelf.id)
                
                if valid_shelves:
                    mutated[product_id] = random.choice(valid_shelves)
                    
        return mutated
    
    def run(self):
        """Run the genetic algorithm"""
        population = self.generate_initial_population()
        
        best_fitness = float('inf')
        best_individual = None
        
        for generation in range(self.max_generations):
            # Calculate fitness for each individual
            fitnesses = [self.calculate_fitness(ind) for ind in population]
            
            # Find the best individual
            gen_best_idx = fitnesses.index(min(fitnesses))
            gen_best_fitness = fitnesses[gen_best_idx]
            gen_best_individual = population[gen_best_idx]
            
            if gen_best_fitness < best_fitness:
                best_fitness = gen_best_fitness
                best_individual = gen_best_individual.copy()
                print(f"Generation {generation}: New best fitness = {best_fitness}")
                
                # If we found a perfect solution, stop
                if best_fitness == 0:
                    break
            
            # Create a new population
            new_population = []
            
            # Keep the best individual (elitism)
            new_population.append(gen_best_individual)
            
            # Fill the rest of the population
            while len(new_population) < self.population_size:
                # Select parents
                parent1, parent2 = self.select_parents(population, fitnesses)
                
                # Perform crossover
                child1, child2 = self.crossover(parent1, parent2)
                
                # Perform mutation
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                
                # Add children to new population
                new_population.append(child1)
                if len(new_population) < self.population_size:
                    new_population.append(child2)
            
            population = new_population
            
            if generation % 50 == 0:
                print(f"Generation {generation}: Best fitness = {best_fitness}")
        
        return best_individual, best_fitness
    
    def save_results_to_excel(self, solution, filename="shelf_optimization_results.xlsx"):
        """Save the optimization results to an Excel file"""
        # Create a DataFrame with the results
        results = []
        
        for product_id, shelf_id in solution.items():
            product = next(p for p in self.products if p.id == product_id)
            shelf = next(s for s in self.shelves if s.id == shelf_id)
            
            results.append({
                "Product ID": product_id,
                "Product Name": product.name,
                "Weight (kg)": product.weight,
                "Category": product.category,
                "Perishable": "Yes" if product.perishable else "No",
                "High Demand": "Yes" if product.high_demand else "No",
                "Hazardous": "Yes" if product.hazardous else "No",
                "Promotional": "Yes" if product.promotional else "No",
                "High Value": "Yes" if product.high_value else "No",
                "Bulky": "Yes" if product.bulky else "No",
                "Assigned Shelf": shelf.name,
                "Shelf Capacity (kg)": shelf.capacity
            })
        
        df = pd.DataFrame(results)
        
        # Calculate shelf weight utilization
        shelf_utilization = defaultdict(float)
        for product_id, shelf_id in solution.items():
            product = next(p for p in self.products if p.id == product_id)
            shelf_utilization[shelf_id] += product.weight
        
        shelf_stats = []
        for shelf in self.shelves:
            shelf_stats.append({
                "Shelf ID": shelf.id,
                "Shelf Name": shelf.name,
                "Capacity (kg)": shelf.capacity,
                "Used Capacity (kg)": shelf_utilization[shelf.id],
                "Utilization (%)": (shelf_utilization[shelf.id] / shelf.capacity) * 100 if shelf.capacity > 0 else 0
            })
        
        df_shelves = pd.DataFrame(shelf_stats)
        
        # Save to Excel
        with pd.ExcelWriter(filename) as writer:
            df.to_excel(writer, sheet_name="Product Assignments", index=False)
            df_shelves.to_excel(writer, sheet_name="Shelf Utilization", index=False)
            
        print(f"Results saved to {filename}")

# Example usage
if __name__ == "__main__":
    # Define shelves
    shelves = [
        Shelf("S1", "Checkout Display", 8, is_high_visibility=True),
        Shelf("S2", "Lower Shelf", 25, is_lower=True),
        Shelf("S4", "Eye-Level Shelf", 15, is_eye_level=True, is_high_visibility=True),
        Shelf("S5", "General Aisle Shelf", 20),
        Shelf("R1", "Refrigerator Zone", 20, is_refrigerated=True),
        Shelf("H1", "Hazardous Item Zone", 10, is_hazardous_zone=True)
    ]
    
    # Define products
    products = [
        Product("P1", "Milk", 5, "Dairy", perishable=True, high_demand=True),
        Product("P2", "Rice Bag", 10, "Grains", bulky=True),
        Product("P3", "Frozen Nuggets", 5, "Frozen", perishable=True),
        Product("P4", "Cereal", 3, "Breakfast", high_demand=True),
        Product("P5", "Pasta", 2, "Grains"),
        Product("P6", "Pasta Sauce", 3, "Condiments"),
        Product("P7", "Detergent", 4, "Cleaning", hazardous=True),
        Product("P8", "Glass Cleaner", 5, "Cleaning", hazardous=True)
    ]
    
    # Initialize and run the GA
    optimizer = ShelfOptimizationGA(products, shelves, population_size=50, max_generations=200)
    solution, fitness = optimizer.run()
    
    print("\nOptimized Shelf Allocation:")
    for product_id, shelf_id in solution.items():
        product = next(p for p in products if p.id == product_id)
        shelf = next(s for s in shelves if s.id == shelf_id)
        print(f"{product.name} -> {shelf.name}")
    
    print(f"\nFinal Fitness Score: {fitness}")
    
    # Check shelf capacities
    shelf_weights = defaultdict(float)
    for product_id, shelf_id in solution.items():
        product = next(p for p in products if p.id == product_id)
        shelf_weights[shelf_id] += product.weight
    
    print("\nShelf Weight Usage:")
    for shelf in shelves:
        weight = shelf_weights[shelf.id]
        print(f"{shelf.name}: {weight}kg / {shelf.capacity}kg")
    
    # Save results to Excel (requires pandas)
    optimizer.save_results_to_excel(solution, "shelf_optimization_results.xlsx")

Generation 0: New best fitness = 45
Generation 0: Best fitness = 45
Generation 1: New best fitness = 40
Generation 2: New best fitness = 30
Generation 4: New best fitness = 20
Generation 50: Best fitness = 20
Generation 100: Best fitness = 20
Generation 150: Best fitness = 20

Optimized Shelf Allocation:
Milk -> Refrigerator Zone
Rice Bag -> Lower Shelf
Frozen Nuggets -> Refrigerator Zone
Cereal -> Eye-Level Shelf
Pasta -> General Aisle Shelf
Pasta Sauce -> General Aisle Shelf
Detergent -> Hazardous Item Zone
Glass Cleaner -> Hazardous Item Zone

Final Fitness Score: 20

Shelf Weight Usage:
Checkout Display: 0.0kg / 8kg
Lower Shelf: 10.0kg / 25kg
Eye-Level Shelf: 3.0kg / 15kg
General Aisle Shelf: 5.0kg / 20kg
Refrigerator Zone: 10.0kg / 20kg
Hazardous Item Zone: 9.0kg / 10kg
Results saved to shelf_optimization_results.xlsx
