In [12]:
import csv

with open('family_data.csv', mode='r', newline='', encoding='utf-8') as file:
    reader = csv.reader(file)
    headers = next(reader)
    family_data = [list(map(int, line)) for line in reader]

In [13]:
# Some constant values
num_families = 5000
num_days = 100
min_occupancy = 125
max_occupancy = 300

## `is_valid` function:

### **Description**
The `is_valid` function checks whether a given solution satisfies the occupancy constraints for the workshop tours. Specifically, it ensures that the number of visitors per day is between the minimum (`min_occupancy`) and maximum (`max_occupancy`) allowed values.

### **Parameters**
- `solution`: A list where each element represents the day assigned to a family.  
  - Example: `[5, 10, 15, ...]` means family 1 is assigned to day 5, family 2 to day 10, etc.

### **Returns**
- `True` if the solution is valid (i.e., all days have between `min_occupancy` and `max_occupancy` visitors).
- `False` if the solution violates the constraints.

### **Steps**
1. **Initialize Daily Occupancy**:
   - Create a list `daily_occupancy` of size `num_days` (100) to track the number of visitors per day.

2. **Calculate Occupancy**:
   - For each family in the solution:
     - Retrieve the assigned day and the number of people in the family.
     - Update the `daily_occupancy` list to reflect the number of visitors for that day.

3. **Check Constraints**:
   - Use a list comprehension to check if all days have between `min_occupancy` (125) and `max_occupancy` (300) visitors.
   - Return `True` if all days satisfy the constraints; otherwise, return `False`.

In [14]:
def is_valid(solution):
    daily_occupancy = [0] * num_days
    for family_id, day in enumerate(solution):
        daily_occupancy[day - 1] += family_data[family_id][-1]

    return all(min_occupancy <= occupancy <= max_occupancy for occupancy in daily_occupancy)

## `initialize_population` function:

### **Description**
The `initialize_population` function generates an initial population of valid solutions for the workshop scheduling problem. Each solution is a list of days assigned to families, ensuring that the number of visitors per day is between `min_occupancy` (125) and `max_occupancy` (300).

To handle the complexity of the problem and ensure valid solutions, the function uses a **greedy algorithm** to prioritize families with more people and assigns them to days first. This approach helps satisfy the constraints more efficiently.

### **Parameters**
- `population_size`: The number of solutions to generate for the initial population.

### **Returns**
- A list of valid solutions (each solution is a list of assigned days for all families).

---

## **Key steps**

### **1. Sorting families by size**
- Families are sorted in **descending order** by the number of people (`n_people`). This ensures that larger families are assigned first, reducing the likelihood of violating the `max_occupancy` constraint.

### **2. Assigning days to families**
- For each family, the function tries to assign a day from their preferred choices (`choices`).
- If no preferred day is available (i.e., assigning the family to that day would exceed `max_occupancy`), the function selects any valid day.

### **3. Ensuring minimum occupancy**
- After assigning all families, the function checks if all days have at least `min_occupancy` visitors.
- If not, it attempts to **redistribute families** from overpopulated days to underpopulated days while maintaining the constraints.


In [15]:
import random

def initialize_population(population_size):
    population = []
    for _ in range(population_size):
        solution = []
        daily_occupancy = [0] * num_days
        
        sorted_families = sorted(family_data, key=lambda x: x[-1], reverse=True)
        
        for family in sorted_families:
            choices = family[1:-1]
            n_people = family[-1]
            
            valid_choices = [day for day in choices if daily_occupancy[day - 1] + n_people <= max_occupancy]
            
            if valid_choices:
                assigned_day = random.choice(valid_choices)
            else:
                valid_values = [day for day in range(1, num_days + 1) if daily_occupancy[day - 1] + n_people <= max_occupancy]
                assigned_day = random.choice(valid_values)
            
            solution.append(assigned_day)
            daily_occupancy[assigned_day - 1] += n_people
        
        if all(occupancy >= min_occupancy for occupancy in daily_occupancy):
            population.append(solution)
        else:
            for day in range(num_days):
                if daily_occupancy[day] < min_occupancy:
                    for family_id, assigned_day in enumerate(solution):
                        family = family_data[family_id]
                        n_people = family[-1]
                        
                        if (daily_occupancy[day] + n_people <= max_occupancy and
                            daily_occupancy[assigned_day - 1] - n_people >= min_occupancy):
                            solution[family_id] = day + 1
                            daily_occupancy[assigned_day - 1] -= n_people
                            daily_occupancy[day] += n_people
                            break
        
        population.append(solution)
    
    return population

## `evaluate_solution` function:

### **Description**
The `evaluate_solution` function calculates the total cost of a given solution. The total cost is the sum of two components:
1. **Preference cost**: The cost of consolation gifts for families based on how far their assigned day is from their preferred choices.
2. **Accounting penalty**: Additional costs incurred due to daily occupancy fluctuations, as defined by Santa's accounting department.

### **Parameters**
- `solution`: A list where each element represents the day assigned to a family.

### **Formula**

**Total cost** = **Preference cost** + **Accounting penalty**



## Helper function: `calculate_preference_cost`

### **Description**
The `calculate_preference_cost` function computes the cost of consolation gifts for all families based on their assigned days. The cost depends on how far the assigned day is from their preferred choices.

### **Parameters**
- `solution`: A list where each element represents the day assigned to a family.

### **Steps**
1. For each family:
   - Determine which choice (if any) corresponds to the assigned day.
   - Calculate the cost of gifts based on the choice index and the number of people in the family.
2. Sum the costs for all families.

### **Cost table**
| Choice Index | Cost Formula                          |
|--------------|---------------------------------------|
| 0            | \$0                                   |
| 1            | \$50                                  |
| 2            | \$50 + \$9 × `n_people`               |
| 3            | \$100 + \$9 × `n_people`              |
| 4            | \$200 + \$9 × `n_people`              |
| 5            | \$200 + \$18 × `n_people`             |
| 6            | \$300 + \$18 × `n_people`             |
| 7            | \$300 + \$36 × `n_people`             |
| 8            | \$400 + \$36 × `n_people`             |
| 9            | \$500 + \$235 × `n_people`            |
| Otherwise    | \$500 + \$434 × `n_people`            |



## Helper function: `calculate_accounting_penalty`

### **Description**
The `calculate_accounting_penalty` function computes the additional costs incurred due to daily occupancy fluctuations. These costs are based on an empirical formula provided by Santa's accounting department.

### **Formula**
The accounting penalty is calculated using the following formula:

![Accounting Penalty Formula](formula.png)
  
  where:
  - N_d: Number of people on day d.
  - N_{d+1}: Number of people on the next day.

### **Steps**
1. Calculate the daily occupancy for each day.
2. Apply the formula to compute the penalty for each day.
3. Sum the penalties for all days.

In [16]:
def calculate_preference_cost(solution):
    total_cost = 0
    for family_id, assigned_day in enumerate(solution):
        family = family_data[family_id]
        choices = family[1:-1]
        n_people = family[-1]
        
        # Determining which choice corresponds to the assigned day
        choice_index = choices.index(assigned_day) if assigned_day in choices else len(choices)
        
        # Calculating the cost of gifts
        if choice_index == 0:
            cost = 0
        elif choice_index == 1:
            cost = 50
        elif choice_index == 2:
            cost = 50 + 9 * n_people
        elif choice_index == 3:
            cost = 100 + 9 * n_people
        elif choice_index == 4:
            cost = 200 + 9 * n_people
        elif choice_index == 5:
            cost = 200 + 18 * n_people
        elif choice_index == 6:
            cost = 300 + 18 * n_people
        elif choice_index == 7:
            cost = 300 + 36 * n_people
        elif choice_index == 8:
            cost = 400 + 36 * n_people
        elif choice_index == 9:
            cost = 500 + 235 * n_people 
        else:
            cost = 500 + 434 * n_people
            
        total_cost += cost
        
    return total_cost

def calculate_accounting_penalty(solution):
    daily_occupancy = [0] * num_days
    for family_id, assigned_day in enumerate(solution):
        n_people = family_data[family_id][-1]
        daily_occupancy[assigned_day - 1] += n_people
    
    penalty = 0
    for d in range(num_days):
        Nd = daily_occupancy[d]
        Nd_next = daily_occupancy[d + 1] if d < num_days - 1 else Nd
        penalty += ((Nd - 125) / 400) * Nd**(0.5 + abs(Nd - Nd_next) / 50)
    return penalty

def calculate_affinity(solution):
    return calculate_preference_cost(solution) + calculate_accounting_penalty(solution)

## `mutate` function:
The `mutate` function performs a mutation on the current solution by changing the assigned days for a randomly selected subset of families. If the mutation results in a solution that violates constraints, the function attempts to repair it using a local search.

### **Parameters**
- `solution`: The current solution (a list of days assigned to each family).
- `max_attempts`: The maximum number of attempts to find a valid mutation (default: 100).

### **Steps**
1. **Select Families to Mutate**:
   - Randomly select a small number of families (e.g., 1–5% of all families) to mutate.
2. **Apply Mutation**:
   - For each selected family, assign a new day randomly chosen from their preferences.
3. **Check Validity**:
   - If the new solution is valid (satisfies all constraints), return it.
4. **Local Search**:
   - If the new solution is invalid, use the `local_search` function to repair it.
5. **Return Solution**:
   - If a valid solution is found within `max_attempts`, return it. Otherwise, return the original solution.

### **Example**
```python
new_solution = mutate(current_solution)

In [17]:
def mutate(solution, max_attempts=100):
    for _ in range(max_attempts):
        num_mutations = max(1, int(len(family_data) * 0.05))
        families_to_mutate = random.sample(range(len(family_data)), num_mutations)
        
        new_solution = solution.copy()
        
        for family_id in families_to_mutate:
            family = family_data[family_id]
            choices = family[1:-1]
            
            new_day = random.choice(choices)
            new_solution[family_id] = new_day
        
        if is_valid(new_solution):
            return new_solution
        
        new_solution = local_search(new_solution)
        if is_valid(new_solution):
            return new_solution
    
    return solution

## `local_search` function:
The `local_search` function attempts to repair an invalid solution by moving families between days to satisfy the constraints (minimum 125 and maximum 300 people per day).

### **Parameters**
- `solution`: The current solution (a list of days assigned to each family).

### **Steps**
1. **Calculate daily occupancy**:
   - Compute the number of people assigned to each day.
2. **Fix underpopulated days**:
   - For days with fewer than 125 people, try to move families from overpopulated days.
3. **Fix overpopulated days**:
   - For days with more than 300 people, try to move families to underpopulated days.
4. **Return repaired solution**:
   - Return the solution after attempting to fix all violations.

In [18]:
def local_search(solution):
    daily_occupancy = [0] * num_days
    for family_id, day in enumerate(solution):
        daily_occupancy[day - 1] += family_data[family_id][-1]
    
    for day in range(100):
        while daily_occupancy[day] < min_occupancy or daily_occupancy[day] > max_occupancy:
            if daily_occupancy[day] < min_occupancy:
                # Finding families that can be moved on this day
                for family_id, assigned_day in enumerate(solution):
                    family = family_data[family_id]
                    n_people = family[-1]
                    
                    # We check whether it is possible to move the family on this day
                    if (daily_occupancy[day] + n_people <= max_occupancy and
                        daily_occupancy[assigned_day - 1] - n_people >= min_occupancy):
                        # Moving the family
                        solution[family_id] = day + 1
                        daily_occupancy[assigned_day - 1] -= n_people
                        daily_occupancy[day] += n_people
                        break
                else:
                    break
            
            elif daily_occupancy[day] > max_occupancy:
                for family_id, assigned_day in enumerate(solution):
                    if assigned_day == day + 1:
                        family = family_data[family_id]
                        n_people = family[-1]
                        
                        valid_days = [d for d in range(1, num_days + 1) if daily_occupancy[d - 1] + n_people <= max_occupancy]
                        if valid_days:
                            new_day = random.choice(valid_days)
                            solution[family_id] = new_day
                            daily_occupancy[day] -= n_people
                            daily_occupancy[new_day - 1] += n_people
                            break
                else:
                    break
    
    return solution

## `select_population` function:

### **Description**
The `select_population` function performs **selection** in the artificial immune system (AIS) algorithm. It combines the current population and the new population (generated through mutation or other operations), evaluates their fitness, and selects the best solutions to form the next generation.

### **Parameters**
- `population`: The current population of solutions.
- `new_population`: The new population of solutions generated through mutation or other operations.

### **Returns**
- A new population containing the best solutions from the combined population.

---

### **Steps**

### **1. Combine populations**
- The current population and the new population are combined into a single list called `combined_population`.

### **2. Evaluate solutions**
- The fitness of each solution in the combined population is evaluated using the `evaluate_solution` function. This function calculates the total cost of a solution, which includes:
  - **Preference Cost**: The cost of consolation gifts for families based on their assigned days.
  - **Accounting Penalty**: Additional costs due to daily occupancy fluctuations.

### **3. Sort solutions**
- The combined population is sorted by their fitness scores in ascending order (lower cost is better).

### **4. Select best solutions**
- The top `len(population)` solutions are selected to form the next generation. This ensures that the population size remains constant.

In [19]:
def select_population(population, new_population):
    combined_population = population + new_population
    combined_scores = [calculate_affinity(sol) for sol in combined_population]
    
    sorted_population = [sol for _, sol in sorted(zip(combined_scores, combined_population), key=lambda x: x[0])]
    
    selected_population = sorted_population[:len(population)]
    return selected_population

## Simulated Annealing

Simulated Annealing is a probabilistic optimization algorithm inspired by the annealing process in metallurgy. It is used to find a global optimum in a large search space by exploring both good and bad solutions, with the probability of accepting worse solutions decreasing over time.

### **Why Add SA to AIS?**
- **AIS Limitation**: The Artificial Immune System (AIS) algorithm is good at exploring the search space but may get stuck in local optima.
- **SA Strength**: SA can escape local optima by accepting worse solutions with a decreasing probability, allowing it to explore more of the search space.
- **Hybrid Benefit**: By combining AIS with SA, we leverage the exploration capabilities of AIS and the exploitation capabilities of SA, leading to better solutions.

### **How SA Works**
1. **Initialization**: Start with an initial solution and set an initial temperature.
2. **Neighbor Generation**: Generate a neighboring solution by making small changes (e.g., mutation).
3. **Acceptance Criteria**:
   - If the neighbor solution is better, accept it.
   - If the neighbor solution is worse, accept it with a probability based on the temperature and the cost difference.
4. **Cooling Schedule**: Gradually reduce the temperature to decrease the probability of accepting worse solutions.
5. **Termination**: Stop after a fixed number of iterations or when the temperature reaches a minimum value.

### **Key Parameters**
- **Initial Temperature**: Controls the initial probability of accepting worse solutions.
- **Cooling Rate**: Determines how quickly the temperature decreases.
- **Maximum Iterations**: The number of iterations SA runs to refine the solution.

### **Integration with AIS**
- After each iteration of AIS, the best solution is refined using SA.
- This hybrid approach ensures that the best solutions found by AIS are further improved, leading to lower total costs.

### **Expected Outcome**
- The hybrid AIS-SA algorithm should produce **better solutions** (lower total cost) compared to using AIS alone.
- The algorithm will be more **robust** and less likely to get stuck in local optima.

In [None]:
import random
import math

def simulated_annealing(solution, initial_temp, cooling_rate, max_iter):
    current_solution = solution.copy()
    current_cost = calculate_affinity(current_solution)
    best_solution = current_solution.copy()
    best_cost = current_cost
    
    temperature = initial_temp
    
    for iteration in range(max_iter):
        # Generate a neighbor solution by mutating the current solution
        neighbor_solution = mutate(current_solution)
        neighbor_cost = calculate_affinity(neighbor_solution)
        
        # Calculate the cost difference
        cost_diff = neighbor_cost - current_cost
        
        # Accept the neighbor solution if it's better or with a certain probability
        if cost_diff < 0 or random.random() < math.exp(-cost_diff / temperature):
            current_solution = neighbor_solution
            current_cost = neighbor_cost
            
            # Update the best solution if needed
            if current_cost < best_cost:
                best_solution = current_solution
                best_cost = current_cost
        
        # Cool down the temperature
        temperature *= cooling_rate
    
    return best_solution

### Artificial Immune System Algorithm

This cell implements an **Artificial Immune System (AIS)** algorithm, which is inspired by the biological immune system. The algorithm evolves a population of solutions over a number of iterations to find the best solution.

#### Key Steps:
1. **Initialization**: 
   - A population of solutions is initialized using `initialize_population(pop_size)`.

2. **Iteration Loop**:
   - **Affinity Calculation**: The fitness (affinity) of each solution is calculated using `calculate_affinity(sol)`.
   - **Best Solution Selection**: The solution with the lowest cost (best affinity) is identified.
   - **Cost History**: The cost of the best solution is recorded in `cost_history`.
   - **Refinement**: The best solution is refined using **Simulated Annealing** to potentially improve it.
   - **Mutation**: A new population is generated by mutating the best solution.
   - **Selection**: The next generation is selected from the current and new populations.

3. **Output**:
   - The algorithm returns the best solution found after `max_iter` iterations.

#### Parameters:
- `pop_size`: Size of the population.
- `max_iter`: Maximum number of iterations.

#### Helper Functions:
- `initialize_population(pop_size)`: Initializes the population.
- `calculate_affinity(sol)`: Calculates the fitness of a solution.
- `simulated_annealing(solution, initial_temp, cooling_rate, max_iter)`: Refines a solution using simulated annealing.
- `mutate(solution)`: Mutates a solution to generate new candidates.
- `select_population(old_pop, new_pop)`: Selects the next generation from the old and new populations.

In [21]:
def artificial_immune_system(pop_size, max_iter):
    # Initialize population
    population = initialize_population(pop_size)
    
    # List to store the cost history
    cost_history = []
    
    for iteration in range(max_iter):
        # Evaluate the population
        affinities = [calculate_affinity(sol) for sol in population]
        
        # Find the best solution in the current population
        best_solution = population[affinities.index(min(affinities))]
        best_cost = min(affinities)
        
        # Record the cost of the best solution
        cost_history.append(best_cost)
        
        # Print progress
        print(f"Iteration {iteration}, Best Score: {best_cost}")
        
        # Refine the best solution using Simulated Annealing
        refined_solution = simulated_annealing(
            best_solution,
            initial_temp=1000,  # Initial temperature
            cooling_rate=0.95,  # Cooling rate
            max_iter=100        # Maximum iterations for SA
        )
        refined_cost = calculate_affinity(refined_solution)
        
        # Replace the best solution with the refined solution if it's better
        if refined_cost < best_cost:
            best_solution = refined_solution
            best_cost = refined_cost
        
        # Generate new population through mutation
        new_population = [mutate(best_solution.copy()) for _ in range(pop_size)]
        
        # Select the next generation
        population = select_population(population, new_population)
    
    return best_solution

In [22]:
solution = artificial_immune_system(200, 100)
print("Best solution:", calculate_affinity(solution))

Iteration 0, Best Score: 16505283683.867071
Iteration 1, Best Score: 10439132.463434733
Iteration 2, Best Score: 9985744.209525818
Iteration 3, Best Score: 9985744.209525818
Iteration 4, Best Score: 9506694.848293543
Iteration 5, Best Score: 8363483.041865852
Iteration 6, Best Score: 8099170.014456908
Iteration 7, Best Score: 7984895.586233219
Iteration 8, Best Score: 7690358.359663541
Iteration 9, Best Score: 7556190.556051171
Iteration 10, Best Score: 6961911.462867076
Iteration 11, Best Score: 6502875.078892246
Iteration 12, Best Score: 6293029.944156809
Iteration 13, Best Score: 6132900.390059887
Iteration 14, Best Score: 6132900.390059887
Iteration 15, Best Score: 6132900.390059887
Iteration 16, Best Score: 6132900.390059887
Iteration 17, Best Score: 5986692.877441706
Iteration 18, Best Score: 5986692.877441706
Iteration 19, Best Score: 5973262.685130471
Iteration 20, Best Score: 5973262.685130471
Iteration 21, Best Score: 5930113.623214658
Iteration 22, Best Score: 5616648.341341

In [None]:
import pandas as pd

family_ids = [id for id in range(0, num_families)]

# Create DataFrame
data = {'family_id': family_ids, 'assigned_day': solution}
df = pd.DataFrame(data)

# Writing data to CSV
df.to_csv('solution.csv', index=False)