## Particle Swarm Optimization Implementation

Developed by Daniel Frutos Rodriguez for _"A Comparison of Optimization Techniques for Solving the Team Formation Problem"_.

Frutos-Rodriguez D., Barrios-Fleitas Y., and Lalla E. 2025. _A Comparison of Optimization Techniques for Solving the Team Formation Problem._ In _Proceedings Placeholder_. ACM, New York, NY, USA, 6 pages. [https://doi.org/10.1145/nnnnnnn.nnnnnnn](https://doi.org/10.1145/nnnnnnn.nnnnnnn)

#### Libraries

In [1]:
import pandas as pd # For datasets
import utils.fitness_functions as ff
import utils.monoobjective_exhaustive_solver as es
import utils.restriction_checker as rc
import models.team_assignment as ta
import random # For randomness
import os # For output
from contextlib import redirect_stdout # For output
import datetime # For output
from copy import deepcopy # For velocity function
from collections import defaultdict

#### Constants

In [2]:
# Data

PROJECTS = ['Shotmaniacs', 'actFact', 'Honours Programme', 'Voice', 'Topicus', 'Earnit', 'Inter-actief'] # Possible projects.
DATA = 'data/dataset_small.csv' # Dataset reference.
DATASET = pd.read_csv(DATA) # Dataset stored as a dataframe.

# Safeguards

MAX_RANDOM_SWAP_ATTEMPTS = 10

# Tuning Parameters

W = 0.5

C1 = 0.25

C2 = 0.75

# Experimental controls

NUMBER_OF_ITERATIONS = 100 # Number of generations to iterate in the PSO loop.
SWARM_SIZE = 10 # Number of particles per swarm in each iteration of the PSO.
COMPUTE_OPTIMAL_SOLUTION = False # (Only for small synthetic datasets), computes optimal solution through exhaustive solver.
EFFICIENCY_GOAL = None # (Only for small synthetic datasets), allows early finishing of the algorithm if a certain efficiency
                     # (score / score of best possible solution) is met.

# Output constants

RUN_TIME = datetime.datetime.now().strftime("run_%Y%m%d_%H%M%S") # Constant for output folder naming.
BASE_DIR = "output/pso" # Base directory for outputs.

#### Output Configuration

In [3]:
def save_arrangement_batch(arrangements, iteration, run_time=RUN_TIME):
    # Create output directory
    os.makedirs(BASE_DIR, exist_ok=True)

    run_folder = os.path.join(BASE_DIR, f"{run_time}", f"generation_{iteration}")
    os.makedirs(run_folder, exist_ok=True)

    scores = []

    # Evaluate and save each arrangement
    for i, arrangement in enumerate(arrangements, start=1):
        filename = os.path.join(run_folder, f"assignment_{i}.txt")
        with open(filename, "w") as f:
            with redirect_stdout(f):
                score = ff.evaluate_all_teams(arrangement)
                print(f"\nFinal score for arrangement {i}: {score:.4f}")
                scores.append(score)
                
    return scores

#### Initial Population Generation

In [4]:
def create_random_teams(df, min_size=5, max_size=6, project_pool=PROJECTS):
    students = df.to_dict(orient='records')
    random.shuffle(students)

    total_students = len(students)
    remainder = total_students % min_size
    num_six_person_teams = remainder
    num_five_person_teams = (total_students - (max_size * num_six_person_teams)) // min_size
    total_teams = num_five_person_teams + num_six_person_teams

    def assign_projects_to_teams(num_teams, projects):
        base = num_teams // len(projects)
        extra = num_teams % len(projects)
        project_assignments = []
        for idx, project in enumerate(projects):
            count = base + (1 if idx < extra else 0)
            project_assignments.extend([project] * count)
        random.shuffle(project_assignments)
        return project_assignments

    project_assignments = assign_projects_to_teams(total_teams, project_pool)
    project_counters = {proj: 0 for proj in project_pool}

    teams = []

    for _ in range(num_six_person_teams):
        team = []
        tcs_count = 0
        nationality_counts = {}

        while len(team) < max_size:
            valid_candidates = [
                s for s in students
                if ('TCS' not in s['Program'] or tcs_count < 4)
                and (s['Nationality'] == 'Dutch' or nationality_counts.get(s['Nationality'], 0) < 3)
            ]

            selected = random.choice(valid_candidates)
            team.append(selected)
            students.remove(selected)

            if 'TCS' in selected['Program']:
                tcs_count += 1
            if selected['Nationality'] != 'Dutch':
                nationality_counts[selected['Nationality']] = nationality_counts.get(selected['Nationality'], 0) + 1

        project = project_assignments[len(teams)]
        project_counters[project] += 1
        team_id = f"{project} {project_counters[project]}"
        teams.append(ta.TeamAssignment(team_id, team, project, fitness=0.0))

    for _ in range(num_five_person_teams):
        team = []
        tcs_count = 0
        nationality_counts = {}

        while len(team) < min_size:
            valid_candidates = [
                s for s in students
                if ('TCS' not in s['Program'] or tcs_count < 4)
                and (s['Nationality'] == 'Dutch' or nationality_counts.get(s['Nationality'], 0) < 3)
            ]
            if not valid_candidates:
                return None

            selected = random.choice(valid_candidates)
            team.append(selected)
            students.remove(selected)

            if 'TCS' in selected['Program']:
                tcs_count += 1
            if selected['Nationality'] != 'Dutch':
                nationality_counts[selected['Nationality']] = nationality_counts.get(selected['Nationality'], 0) + 1

        project = project_assignments[len(teams)]
        project_counters[project] += 1
        team_id = f"{project} {project_counters[project]}"
        teams.append(ta.TeamAssignment(team_id, team, project, fitness=0.0))

    return teams if rc.is_valid_arrangement(teams, total_students, project_pool) else None

#### Particle Class

In [5]:
class Particle:
    def __init__(self, dataset, projects, min_size=5, max_size=6):
        self.dataset = dataset
        self.projects = projects
        self.min_size = min_size
        self.max_size = max_size
        self.position = self.random_position()
        self.velocity = [] # List of swaps: (Origin, Destination, Student_1, Student_2) 4-tuples
        self.best_position = deepcopy(self.position)
        self.best_fitness = -1

    def random_position(self):
        arrangement = None
        while arrangement is None:
            arrangement = create_random_teams(self.dataset)
        return arrangement

    def update_velocity(self, pbest_pos, gbest_pos, w, c1, c2, attempted_arrangements):

        def get_team_of_student(teams, student_id):
            for i, team in enumerate(teams):
                if any(member['ID'] == student_id for member in team.members):
                    return i
            return None

        def generate_swaps(source, target):
            swaps = []
            for team_a_idx, team_a in enumerate(source):
                for member_a in team_a.members:
                    student_id = member_a['ID']
                    team_b_idx = get_team_of_student(target, student_id)
                    if team_b_idx is not None and team_b_idx != team_a_idx:
                        # Try to find a member from team_b to swap with
                        team_b = source[team_b_idx]
                        for member_b in team_b.members:
                            swaps.append((team_a_idx, team_b_idx, member_a['ID'], member_b['ID']))
            return swaps

        def simulate_and_validate(teams, swaps):
            simulated = deepcopy(teams)
            student_lookup = {m['ID']: m for team in simulated for m in team.members}

            for a, b, id_a, id_b in swaps:
                ta, tb = simulated[a], simulated[b]
                ta.members = [m for m in ta.members if m['ID'] != id_a]
                tb.members = [m for m in tb.members if m['ID'] != id_b]
                ta.members.append(student_lookup[id_b])
                tb.members.append(student_lookup[id_a])

            valid, _ = rc.is_valid_arrangement(simulated, len(student_lookup), self.projects)
            return valid

        # --- Build candidate swaps ---
        new_velocity = [(f, t, sa, sb) for f, t, sa, sb in self.velocity if random.random() < w]
        cognitive_swaps = generate_swaps(self.position, pbest_pos)
        social_swaps = generate_swaps(self.position, gbest_pos)

        cognitive_part = [s for s in cognitive_swaps if random.random() < c1]
        social_part = [s for s in social_swaps if random.random() < c2]

        combined_swaps = new_velocity + cognitive_part + social_part

        # --- Try building a valid velocity ---
        valid_velocity = []
        for swap in combined_swaps:
            attempted_arrangements[0] += 1
            trial = valid_velocity + [swap]
            if simulate_and_validate(self.position, trial):
                valid_velocity.append(swap)

        # --- Fallback: try random single swaps ---
        if not valid_velocity:
            for _ in range(MAX_RANDOM_SWAP_ATTEMPTS):
                attempted_arrangements[0] += 1
                a, b = random.sample(range(len(self.position)), 2)
                if not self.position[a].members or not self.position[b].members:
                    continue
                sa = random.choice(self.position[a].members)
                sb = random.choice(self.position[b].members)
                trial = [(a, b, sa['ID'], sb['ID'])]
                if simulate_and_validate(self.position, trial):
                    valid_velocity.append((a, b, sa['ID'], sb['ID']))
                    break

        self.velocity = valid_velocity

    def fitness(self):
        return ff.evaluate_all_teams(self.position)

    def update_position(self):
        new_particle = deepcopy(self.position)

        def remove_student(team, student_id):
            for i, member in enumerate(team.members):
                if member['ID'] == student_id:
                    return team.members.pop(i)
            return None

        for a, b, id_a, id_b in self.velocity:
            sa = remove_student(new_particle[a], id_a)
            sb = remove_student(new_particle[b], id_b)
            if sa and sb:
                new_particle[a].members.append(sb)
                new_particle[b].members.append(sa)
        self.position = new_particle

    def update_velocity_and_position(self, gbest_pos, attempted_arrangements):
        self.update_velocity(self.best_position, gbest_pos, W, C1, C2, attempted_arrangements)
        self.update_position()

#### PSO Execution

In [6]:
def execute():

    df = pd.DataFrame({
        "score": pd.Series(dtype="float"),
        "number of computations": pd.Series(dtype="float")
    })

    swarm = [Particle(DATASET, PROJECTS) for _ in range(SWARM_SIZE)]

    # Computation of Best Possible Arrangement for Performance Measurement. Only feasible in small synthetic datasets due to TFP.
    if (COMPUTE_OPTIMAL_SOLUTION):
        optimal_arrangement, optimal_score, arrangements_computed = es.find_best_arrangement(DATASET, DATA)

    global_best_position = None
    global_best_fitness = -1
    iters = 0
    efficiency = -1
    attempted_arrangements = [0]

    run_time = datetime.datetime.now().strftime("run_%Y%m%d_%H%M%S")
    performance_log_path = os.path.join(BASE_DIR, f"{run_time}", "performance.txt")
    os.makedirs(os.path.dirname(performance_log_path), exist_ok=True)

    with open(performance_log_path, "w") as f:
        f.write(f"Performance log for run: {run_time}\n\n")

    while iters <= NUMBER_OF_ITERATIONS and (EFFICIENCY_GOAL is None or efficiency < EFFICIENCY_GOAL):
        for particle in swarm:
            fitness_val = particle.fitness()
            if fitness_val > particle.best_fitness:
                particle.best_fitness = fitness_val
                particle.best_position = deepcopy(particle.position)
            if fitness_val > global_best_fitness:
                global_best_fitness = fitness_val
                global_best_position = deepcopy(particle.position)

        for particle in swarm:
            particle.update_velocity_and_position(global_best_position, attempted_arrangements)

        # Log the best score in the generation
        if (COMPUTE_OPTIMAL_SOLUTION):
            efficiency = round((global_best_fitness / optimal_score) * 100, 2)

        with open(performance_log_path, "a") as f:
            f.write(
                f"Iteration {iters}: Best score = {global_best_fitness:.4f}"
                + (f" | Efficiency = {efficiency:.2f}%" if COMPUTE_OPTIMAL_SOLUTION else "")
                + (f" | Total Computations: {attempted_arrangements}")
                + (f" of {arrangements_computed:.0f} Possible Combinations" if COMPUTE_OPTIMAL_SOLUTION else "")
                + "\n"
            )

        # Save best arrangement of this iteration
        save_arrangement_batch([p.position for p in swarm], iters, run_time)

        iters += 1

        df.loc[iters] = [global_best_fitness, attempted_arrangements[0]]

    return df, global_best_position