In [22]:
import time
import pandas as pd
import numpy as np
import random
from typing import List, Dict, Tuple, Optional
from copy import deepcopy
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import re

In [23]:
def parse_datenbank_data(file_path):
    """
    Parse the datenbank.txt format and return personnel and project DataFrames
    """
    # Try different encodings to handle special characters
    encodings = ['latin-1', 'iso-8859-1', 'cp1252', 'utf-8']
    content = None
    
    for encoding in encodings:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                content = f.read()
            print(f"Successfully read file with {encoding} encoding")
            break
        except UnicodeDecodeError:
            continue
    
    if content is None:
        raise ValueError("Could not read file with any of the attempted encodings")
    
    # Print the first few lines to debug
    print("First 10 lines of file:")
    lines = content.split('\n')
    for i, line in enumerate(lines[:10]):
        print(f"{i}: {line}")
    
    # Split into personnel and project sections
    sections = content.split('Projektdaten')
    if len(sections) < 2:
        raise ValueError("Could not find 'Projektdaten' section in file")
    
    personnel_section = sections[0].strip()
    project_section = sections[1].strip()
    
    # Parse personnel data
    personnel_lines = personnel_section.split('\n')[2:]  # Skip header lines
    personnel_data = []
    
    for line in personnel_lines:
        if line.strip():
            # Parse line: ID Name Competencies 2025 2026 2027
            parts = line.split()
            if len(parts) >= 7:  # Make sure we have enough parts
                person_id = parts[0]
                name = parts[1] + ' ' + parts[2]  # First and last name
                competencies = parts[3].split(',')
                availability_2025 = float(parts[4].replace(',', '.'))
                availability_2026 = float(parts[5].replace(',', '.'))
                availability_2027 = float(parts[6].replace(',', '.'))
                
                personnel_data.append({
                    'id': person_id,
                    'name': name,
                    'competencies': competencies,
                    'availability_2025': availability_2025,
                    'availability_2026': availability_2026,
                    'availability_2027': availability_2027,
                    'total_availability': availability_2025 + availability_2026 + availability_2027
                })
    
    # Parse project data - CORRECTED: Skip 6 lines instead of 4
    project_lines = project_section.split('\n')[6:]  # Skip header lines including "AP von bis PM"
    project_data = []
    
    def fix_invalid_date(date_str):
        """Fix common invalid dates"""
        day, month, year = date_str.split('.')
        day, month, year = int(day), int(month), int(year)
        
        # Fix invalid dates
        if month == 4 and day == 31:  # April 31st doesn't exist
            day = 30
        elif month == 2 and day > 28:  # February issues
            if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0):  # Leap year
                if day > 29:
                    day = 29
            else:
                if day > 28:
                    day = 28
        elif month in [4, 6, 9, 11] and day == 31:  # Months with 30 days
            day = 30
        
        return f"{day:02d}.{month:02d}.{year:02d}"
    
    for line in project_lines:
        if line.strip() and not line.startswith('---'):
            # Parse line: AP von bis PM
            parts = line.split()
            if len(parts) >= 4:
                # Handle the AP format more carefully
                ap_part = parts[0]
                if '(' in ap_part and ')' in ap_part:
                    ap_code = ap_part.split('(')[0]  # Extract just the letter
                    ap_name = ap_part.split('(')[1].rstrip(')')  # Extract name in parentheses
                else:
                    # If no parentheses, use the whole part as code
                    ap_code = ap_part
                    ap_name = ap_part
                
                start_date = parts[1]
                end_date = parts[2]
                person_months = float(parts[3].replace(',', '.'))
                
                # Fix invalid dates
                start_date_fixed = fix_invalid_date(start_date)
                end_date_fixed = fix_invalid_date(end_date)
                
                try:
                    # Convert dates to datetime objects
                    start_dt = datetime.strptime(start_date_fixed, '%d.%m.%y')
                    end_dt = datetime.strptime(end_date_fixed, '%d.%m.%y')
                    
                    # Calculate duration in months
                    duration_months = (end_dt.year - start_dt.year) * 12 + (end_dt.month - start_dt.month) + 1
                    
                    project_data.append({
                        'ap_code': ap_code,
                        'ap_name': ap_name,
                        'full_name': f"{ap_code}({ap_name})",
                        'start_date': start_date_fixed,
                        'end_date': end_date_fixed,
                        'start_dt': start_dt,
                        'end_dt': end_dt,
                        'person_months': person_months,
                        'duration_months': duration_months,
                        'monthly_effort': person_months / duration_months
                    })
                    
                    # Print if we fixed a date
                    if start_date != start_date_fixed or end_date != end_date_fixed:
                        print(f"Fixed invalid date in {ap_code}: {start_date}->{start_date_fixed}, {end_date}->{end_date_fixed}")
                        
                except ValueError as e:
                    print(f"Error parsing dates for {ap_code}: {start_date_fixed}, {end_date_fixed}")
                    print(f"Error: {e}")
                    continue
    
    personnel_df = pd.DataFrame(personnel_data)
    project_df = pd.DataFrame(project_data)
    
    return personnel_df, project_df

In [24]:
def calculate_gaps_between_tasks(results_df):
    """
    Calculate gaps between tasks for each person
    Returns a dictionary with person_id as key and total gap days as value
    """
    gaps_per_person = {}
    
    # Group tasks by person
    for person_id in results_df['person_id'].unique():
        if person_id == 'Unassigned':
            continue
            
        person_tasks = results_df[results_df['person_id'] == person_id].copy()
        person_tasks['start_dt'] = pd.to_datetime(person_tasks['start_date'], format='%d.%m.%y')
        person_tasks['end_dt'] = pd.to_datetime(person_tasks['end_date'], format='%d.%m.%y')
        
        # Sort by start date
        person_tasks = person_tasks.sort_values('start_dt')
        
        total_gaps = 0
        
        # Calculate gaps between consecutive tasks
        for i in range(len(person_tasks) - 1):
            current_task_end = person_tasks.iloc[i]['end_dt']
            next_task_start = person_tasks.iloc[i + 1]['start_dt']
            
            # If there's a gap (next task starts after current task ends)
            if next_task_start > current_task_end:
                gap_days = (next_task_start - current_task_end).days
                total_gaps += gap_days
        
        gaps_per_person[person_id] = total_gaps
    
    return gaps_per_person

In [25]:
def calculate_fitness_new_format(assignment, project_df, personnel_df):
    """
    Calculate fitness score for a given assignment using the new data format
    Higher score = better assignment
    """
    total_score = 0
    constraint_penalty = 0
    
    # Track monthly availability for each person
    monthly_availability = {}
    
    for task_idx, person_idx in enumerate(assignment):
        if person_idx >= len(personnel_df):
            continue
            
        task = project_df.iloc[task_idx]
        person = personnel_df.iloc[person_idx]
        
        # Check skill match
        if task['ap_code'] not in person['competencies']:
            constraint_penalty += 1000  # Heavy penalty for skill mismatch
            continue
        
        # Check availability across the task duration
        task_start_year = task['start_dt'].year
        task_end_year = task['end_dt'].year
        
        # Calculate required effort per month
        required_monthly_effort = task['monthly_effort']
        
        # Check if person has enough availability for each year
        person_id = person['id']
        if person_id not in monthly_availability:
            monthly_availability[person_id] = {
                2025: person['availability_2025'],
                2026: person['availability_2026'],
                2027: person['availability_2027']
            }
        
        # Check availability for each year the task spans
        available_effort = 0
        for year in range(task_start_year, task_end_year + 1):
            if year in monthly_availability[person_id]:
                available_effort += monthly_availability[person_id][year]
        
        if available_effort >= task['person_months']:
            # Good assignment - calculate positive score
            skill_bonus = 2.0  # Bonus for skill match
            availability_score = available_effort * skill_bonus
            total_score += availability_score
            
            # Update availability (simple allocation)
            remaining_effort = task['person_months']
            for year in range(task_start_year, task_end_year + 1):
                if year in monthly_availability[person_id] and remaining_effort > 0:
                    allocated = min(monthly_availability[person_id][year], remaining_effort)
                    monthly_availability[person_id][year] -= allocated
                    remaining_effort -= allocated
        else:
            constraint_penalty += 200  # Penalty for insufficient availability
    
    # Calculate workload coverage bonus
    assigned_workload = sum(
        row['person_months']
        for i, (_, row) in enumerate(project_df.iterrows())
        if assignment[i] < len(personnel_df)
        and row['ap_code'] in personnel_df.iloc[assignment[i]]['competencies']
    )
    total_workload = project_df['person_months'].sum()
    coverage_bonus = (assigned_workload / total_workload) * 1000

    return total_score + coverage_bonus - constraint_penalty

In [26]:
def create_initial_population(population_size, num_tasks, num_people):
    """
    Create initial population of random assignments
    """
    population = []
    for _ in range(population_size):
        # Create random assignment: each task gets assigned to a random person
        assignment = [random.randint(0, num_people - 1) for _ in range(num_tasks)]
        population.append(assignment)
    return population

def tournament_selection(population, fitness_scores, tournament_size=3):
    """
    Select parent using tournament selection
    """
    tournament_indices = random.sample(range(len(population)), tournament_size)
    tournament_fitness = [fitness_scores[i] for i in tournament_indices]
    winner_idx = tournament_indices[tournament_fitness.index(max(tournament_fitness))]
    return population[winner_idx]

def crossover(parent1, parent2, crossover_rate=0.8):
    """
    Perform crossover between two parents
    """
    if random.random() > crossover_rate:
        return parent1, parent2
    
    # Single-point crossover
    crossover_point = random.randint(1, len(parent1) - 1)
    child1 = parent1[:crossover_point] + parent2[crossover_point:]
    child2 = parent2[:crossover_point] + parent1[crossover_point:]
    
    return child1, child2

def mutate(assignment, mutation_rate=0.1, num_people=None):
    """
    Perform mutation on an assignment
    """
    if num_people is None:
        num_people = max(assignment) + 1
    
    mutated = assignment.copy()
    for i in range(len(mutated)):
        if random.random() < mutation_rate:
            mutated[i] = random.randint(0, num_people - 1)
    
    return mutated

In [27]:
def genetic_algorithm_matching_new_format(project_df, personnel_df, population_size=50, generations=100, 
                                         mutation_rate=0.1, crossover_rate=0.8, tournament_size=3):
    """
    Genetic Algorithm for optimal assignment using new data format
    """
    print("\n=== Genetic Algorithm (New Format) ===")
    start_time = time.time()
    
    num_tasks = len(project_df)
    num_people = len(personnel_df)
    
    # Create initial population
    population = create_initial_population(population_size, num_tasks, num_people)
    
    best_fitness = float('-inf')
    best_assignment = None
    
    print(f"Population size: {population_size}")
    print(f"Generations: {generations}")
    print(f"Tasks: {num_tasks}, People: {num_people}")
    
    # Evolution loop
    for generation in range(generations):
        # Calculate fitness for all individuals
        fitness_scores = []
        for assignment in population:
            fitness = calculate_fitness_new_format(assignment, project_df, personnel_df)
            fitness_scores.append(fitness)
            
            # Track best solution
            if fitness > best_fitness:
                best_fitness = fitness
                best_assignment = assignment.copy()
        
        # Create new population
        new_population = []
        
        # Elitism: keep best individual
        best_idx = fitness_scores.index(max(fitness_scores))
        new_population.append(population[best_idx])
        
        # Generate rest of population
        while len(new_population) < population_size:
            # Selection
            parent1 = tournament_selection(population, fitness_scores, tournament_size)
            parent2 = tournament_selection(population, fitness_scores, tournament_size)
            
            # Crossover
            child1, child2 = crossover(parent1, parent2, crossover_rate)
            
            # Mutation
            child1 = mutate(child1, mutation_rate, num_people)
            child2 = mutate(child2, mutation_rate, num_people)
            
            new_population.extend([child1, child2])
        
        # Trim to population size
        population = new_population[:population_size]
        
        # Progress reporting
        if generation % 20 == 0 or generation == generations - 1:
            avg_fitness = np.mean(fitness_scores)
            worst_fitness = min(fitness_scores)
            print(f"Generation {generation}: Best={best_fitness:.1f}, Avg={avg_fitness:.1f}")
    
    # Convert best assignment to results format
    matching_results = []
    for task_idx, person_idx in enumerate(best_assignment):
        if person_idx < len(personnel_df):
            task = project_df.iloc[task_idx]
            person = personnel_df.iloc[person_idx]
            
            matching_results.append({
                "task_id": task['ap_code'],
                "task_name": task['full_name'],
                "start_date": task['start_date'],
                "end_date": task['end_date'],
                "person_months": task['person_months'],
                "person_id": person['id'],
                "name": person['name'],
                "competencies": ','.join(person['competencies']),
                "assigned_competency": task['ap_code'],
                "duration_months": task['duration_months'],
                "monthly_effort": task['monthly_effort']
            })
    
    execution_time = time.time() - start_time
    
    print(f"Execution time: {execution_time:.3f} seconds")
    print(f"Best fitness: {best_fitness:.1f}")
    print(f"Found assignments: {len(matching_results)}")
    
    return matching_results, execution_time

In [28]:
def genetic_matching_with_elite_new_format(project_df, personnel_df, population_size=100, generations=200):
    """
    Enhanced genetic algorithm with elite preservation and adaptive parameters
    """
    print("\n=== Enhanced Genetic Algorithm (New Format) ===")
    start_time = time.time()
    
    # Run multiple times with different parameters and keep best result
    best_result = None
    best_fitness = float('-inf')
    
    # Parameter combinations to try
    param_combinations = [
        {"population_size": 50, "generations": 100, "mutation_rate": 0.1},
        {"population_size": 100, "generations": 150, "mutation_rate": 0.05},
        {"population_size": 75, "generations": 200, "mutation_rate": 0.15},
    ]
    
    for i, params in enumerate(param_combinations):
        print(f"\nTrying parameter set {i+1}/{len(param_combinations)}")
        results, _ = genetic_algorithm_matching_new_format(
            project_df, personnel_df, **params
        )
        
        # Calculate fitness of this result
        assignment = []
        for result in results:
            task_idx = project_df[project_df['ap_code'] == result['task_id']].index[0]
            person_idx = personnel_df[personnel_df['id'] == result['person_id']].index[0]
            assignment.append(person_idx)
        
        # Pad assignment if needed
        while len(assignment) < len(project_df):
            assignment.append(random.randint(0, len(personnel_df) - 1))
        
        fitness = calculate_fitness_new_format(assignment, project_df, personnel_df)
        
        if fitness > best_fitness:
            best_fitness = fitness
            best_result = results
    
    execution_time = time.time() - start_time
    return best_result, execution_time

In [29]:
print("Testing Genetic Algorithm with new datenbank.txt format...")

# Parse the datenbank.txt file
personnel_df, project_df = parse_datenbank_data("../data/datenbank.txt")

print(f"Loaded {len(personnel_df)} personnel records")
print(f"Loaded {len(project_df)} project tasks")
print("\nPersonnel data:")
print(personnel_df[['name', 'competencies', 'total_availability']].to_string(index=False))
print("\nProject data:")
print(project_df[['full_name', 'start_date', 'end_date', 'person_months']].to_string(index=False))

Testing Genetic Algorithm with new datenbank.txt format...
Successfully read file with latin-1 encoding
First 10 lines of file:
0:                                     Verfügbarkeit
1:                     Kompetenz       2025    2026    2027
2: 1 Becker, Anna      A,B,H,I,J,N,O   2,00    1,00    2,00
3: 2 Wagner, Lukas     C,H,J           0,50    2,00    1,00
4: 3 Keller, Jonas     D,E,H,I,K       0,75    2,00    3,00
5: 4 Braun, Maria      D,F,G,J,L,M,O   2,00    4,00    3,00
6: 5 Schmid, David     G,K,L           0,25    2,00    2,00
7: 6 Hoffmann, Laura   G,L,M,O         0,50    4,00    1,50
8: 7 Vogel, Peter      E,I,M           3,00    0,25    0,50
9: 
Fixed invalid date in F: 01.06.26->01.06.26, 31.04.27->30.04.27
Fixed invalid date in I: 01.12.26->01.12.26, 31.06.27->30.06.27
Loaded 7 personnel records
Loaded 14 project tasks

Personnel data:
           name          competencies  total_availability
   Becker, Anna [A, B, H, I, J, N, O]                5.00
  Wagner, Lukas        

In [30]:
# Run algorithm
results, time_taken = genetic_matching_with_elite_new_format(project_df, personnel_df)

print(f"\nFinal Results:")
print(f"Total assignments: {len(results)}")
print(f"Total person-months: {sum(item['person_months'] for item in results):.1f}")
print(f"Total workload: {sum(item['person_months'] for item in results) * 160:.1f} hours (assuming 160h/month)")


=== Enhanced Genetic Algorithm (New Format) ===

Trying parameter set 1/3

=== Genetic Algorithm (New Format) ===
Population size: 50
Generations: 100
Tasks: 14, People: 7
Generation 0: Best=-7015.1, Avg=-9910.7
Generation 20: Best=473.5, Avg=-1233.4
Generation 40: Best=473.5, Avg=-968.8
Generation 60: Best=473.5, Avg=-1242.1
Generation 80: Best=473.5, Avg=-1021.2
Generation 99: Best=473.5, Avg=-953.3
Execution time: 5.903 seconds
Best fitness: 473.5
Found assignments: 14

Trying parameter set 2/3

=== Genetic Algorithm (New Format) ===
Population size: 100
Generations: 150
Tasks: 14, People: 7
Generation 0: Best=-4788.1, Avg=-9323.7
Generation 20: Best=281.0, Avg=-334.6
Generation 40: Best=283.0, Avg=-236.1
Generation 60: Best=283.0, Avg=-270.8
Generation 80: Best=283.0, Avg=-262.2
Generation 100: Best=283.0, Avg=-358.9
Generation 120: Best=283.0, Avg=-378.0
Generation 140: Best=283.0, Avg=-348.6
Generation 149: Best=283.0, Avg=-236.5
Execution time: 17.022 seconds
Best fitness: 283.

In [31]:
# Calculate gaps and add to results
gaps_per_person = calculate_gaps_between_tasks(results_df)

In [32]:
# Add gap information to each result
for i, row in results_df.iterrows():
    person_id = row['person_id']
    if person_id in gaps_per_person:
        results_df.at[i, 'total_gaps_days'] = gaps_per_person[person_id]
    else:
        results_df.at[i, 'total_gaps_days'] = 0

In [33]:
# Display results in a nice table
results_df = pd.DataFrame(results)
print("\nAssignment Results:")
print(results_df[['task_name', 'assigned_competency', 'name', 'person_months', 'start_date', 'end_date']].to_string(index=False))


Assignment Results:
         task_name assigned_competency            name  person_months start_date end_date
       B(enchmark)                   B    Becker, Anna           0.50   01.11.25 31.12.25
           C(I/CD)                   C   Wagner, Lukas           0.50   01.12.25 31.01.26
    D(emonstrator)                   D   Keller, Jonas           2.00   01.12.25 31.01.26
     E(ntwicklung)                   E    Vogel, Peter           3.50   01.10.25 31.01.26
     F(einplanung)                   F    Braun, Maria           1.00   01.06.26 30.04.27
G(renzwertanalyse)                   G    Braun, Maria           5.00   01.11.25 31.08.26
     H(ilfesystem)                   H   Wagner, Lukas           5.00   01.07.26 31.08.27
 I(mplementierung)                   I    Becker, Anna           3.00   01.12.26 30.06.27
      J(ustierung)                   J    Braun, Maria           4.00   01.11.26 31.08.27
K(onzeptionierung)                   K   Keller, Jonas           3.00   01.01.2

In [34]:
# Save results to CSV
results_df.to_csv("matching_ergebnis_new_format.csv", index=False)
print("Results saved to matching_ergebnis_new_format.csv")

Results saved to matching_ergebnis_new_format.csv


In [35]:
# Print gap summary
print("\nGap Analysis:")
print("="*50)
for person_id, gap_days in gaps_per_person.items():
    person_name = results_df[results_df['person_id'] == person_id]['name'].iloc[0]
    print(f"{person_name}: {gap_days} days of gaps")
print(f"Total gaps across all employees: {sum(gaps_per_person.values())} days")


Gap Analysis:
Becker, Anna: 0 days of gaps
Wagner, Lukas: 0 days of gaps
Keller, Jonas: 151 days of gaps
Vogel, Peter: 0 days of gaps
Braun, Maria: 0 days of gaps
Hoffmann, Laura: 0 days of gaps
Total gaps across all employees: 151 days
