In [457]:
import pandas as pd
import numpy as np
import random
from datetime import datetime,timedelta
import matplotlib.pyplot as plt
from tqdm import tqdm
import gudhi as gd


In [458]:
print("Loading the dataset...")
df = pd.read_csv(r"C:\Users\Admin\Downloads\full_schedule.csv")
print(f"Original dataset loaded with {len(df)} records.")

# Log the intention to sample
print("Attempting to sample 1000 records with replacement...")
try:
    df_sampled = df.sample(n=10000, replace=True)  # Allows sampling the same row more than once
    print(f"Sampled dataset created with {len(df_sampled)} records.")
except ValueError as e:
    print(f"An error occurred: {e}")

Loading the dataset...
Original dataset loaded with 756 records.
Attempting to sample 1000 records with replacement...
Sampled dataset created with 10000 records.


In [459]:
ga_type = input("Enter GA type (generic (1) or proposed (2)): ").strip()
if ga_type not in ['1', '2']:
    raise ValueError("Invalid GA type. Please enter '1' for generic or '2' for proposed.")


# GENERIC GENETIC ALGO


In [460]:
def split_timeslot(timeslot):
    parts = timeslot.split(' ')
    if len(parts) >= 2:
        return parts[0], ' '.join(parts[1:])
    return timeslot, ''  # Adjust this according to the expected format

df_sampled['day'], df_sampled['time'] = zip(*df_sampled['timeslot'].apply(split_timeslot))
df_sampled['start_time'], df_sampled['end_time'] = zip(*df_sampled['time'].apply(lambda x: x.split('-') if x else ('', '')))

In [461]:
class Schedule:
    # Class variable to track all occupied timeslots
    occupied_timeslots = set()

    def __init__(self, assignments, teacher_type, ga_type, max_hours=None, unavailable_days=None, location_to_index_map=None):
        self.assignments = assignments
        self.teacher_type = teacher_type
        self.ga_type = ga_type
        self.max_hours = max_hours
        self.unavailable_days = unavailable_days or []
        self.location_to_index_map = location_to_index_map  # Add this line

    def update_occupied_timeslots(self):
        for a in self.assignments:
            Schedule.occupied_timeslots.add((a['faculty'], a['timeslot']))

    def is_timeslot_available(self, faculty, timeslot):
        return (faculty, timeslot) not in Schedule.occupied_timeslots

    def __str__(self):
        schedule_details = "Schedule:\n"
        for assignment in self.assignments:
            schedule_details += f"Course: {assignment['course']}, Faculty: {assignment['faculty']}, Timeslot: {assignment['timeslot']}, Classroom: {assignment['classroom']}\n"
        return schedule_details

    def fitness(self):
        if self.ga_type == '1':
            return self.generic_fitness_evaluation()
        elif self.ga_type == '2':
            return self.tda_based_fitness_evaluation()
        else:
            print(f"Unexpected GA type encountered in fitness evaluation: {self.ga_type}")
            return 0  # Or handle as appropriate


    def generic_fitness_evaluation(self):
        # Initialize variables
        total_hours = 0
        faculty_course_count = {}
        for a in self.assignments:
            duration = self.calculate_duration(a['start_time'], a['end_time'])
            total_hours += duration
            faculty_course_count[a['faculty']] = faculty_course_count.get(a['faculty'], 0) + 1

        # Data type check for debugging
        if not isinstance(self.max_hours, (float, int)):
            print(f"max_hours is not a number: {self.max_hours}")
            return 0  # Or handle the error as appropriate

        # Check constraints
        if (self.teacher_type == 'FT' and total_hours > self.max_hours) or \
        (self.teacher_type == 'PT' and (total_hours > 12 or any(a['day'] in self.unavailable_days for a in self.assignments))) or \
        any(count > 4 for count in faculty_course_count.values()):
            return 0

        return len(set(a['timeslot'] for a in self.assignments))


    def tda_based_fitness_evaluation(self):
        point_cloud = self.to_point_cloud(self.location_to_index_map)
        persistence_diagrams = compute_persistence_diagrams(point_cloud)
        return compute_tda_fitness(persistence_diagrams)


    def to_point_cloud(self, location_to_index_map):
        point_cloud = []
        for assignment in self.assignments:
            time_float = self.convert_time_to_float(assignment['start_time'])
            classroom_index = self.convert_location_to_index(assignment['classroom'], location_to_index_map)
            point_cloud.append((time_float, classroom_index))
        return point_cloud


    @staticmethod
    def calculate_duration(start, end):
        def parse_time(time_str):
            time_str = time_str.strip()
            if ' PM' in time_str or ' AM' in time_str:
                time_str = time_str[:time_str.rfind(' ')]
            for fmt in ('%I:%M %p', '%H:%M'):
                try:
                    return datetime.strptime(time_str, fmt)
                except ValueError:
                    continue
            raise ValueError(f"Time data '{time_str}' does not match expected format")

        start_time, end_time = parse_time(start), parse_time(end)
        if end_time < start_time:
            end_time += timedelta(days=1)
        return (end_time - start_time).total_seconds() / 3600.0

    @staticmethod
    def convert_time_to_float(time_str):
        # Map each day to a number (e.g., Monday=0, Tuesday=1, etc.)
        day_to_num = {'Monday': 0, 'Tuesday': 1, 'Wednesday': 2, 'Thursday': 3, 'Friday': 4, 'Saturday': 5, 'Sunday': 6}

        # Split the time string into day and time of day (if day is provided)
        parts = time_str.strip().split(maxsplit=1)
        day_part = parts[0] if len(parts) == 2 else "Monday"  # Default to Monday if day is not provided
        time_of_day = parts[-1]

        day_num = day_to_num.get(day_part, 0)  # Default to 0 if day is not found

        # Convert time to total hours
        time_format = "%H:%M"
        time_delta = datetime.strptime(time_of_day, time_format) - datetime.strptime("00:00", time_format)
        total_hours = 24 * day_num + time_delta.total_seconds() / 3600

        return total_hours



    @staticmethod
    def convert_location_to_index(location_str, location_to_index_map):
        # Assuming location_to_index_map is a pre-defined dictionary mapping locations to unique integers
        return location_to_index_map.get(location_str, -1)  # Return -1 or some default value if location is not found


# TDA-related functions
def compute_persistence_diagrams(point_cloud):
    rips_complex = gd.RipsComplex(points=point_cloud, max_edge_length=42)
    simplex_tree = rips_complex.create_simplex_tree(max_dimension=2)
    persistence = simplex_tree.persistence()
    return simplex_tree.persistence_intervals_in_dimension(1)

def compute_tda_fitness(persistence_diagrams):
    return sum(persistence[1] - persistence[0] for persistence in persistence_diagrams if persistence[1] != float('inf'))


In [462]:
# Initialize population
def initialize_population(size, df, teacher_type, max_hours, unavailable_days, location_to_index_map):
    population = []
    for _ in range(size):
        assignments = df.sample(n=10).to_dict('records')
        population.append(Schedule(assignments, teacher_type, '1', max_hours, unavailable_days, location_to_index_map))
    return population




In [463]:
def select_parents(population):
    # Sort the population by fitness in descending order and assign ranks
    sorted_population = sorted(population, key=lambda x: x.fitness(), reverse=True)
    
    # Calculate selection probabilities based on ranks
    total_ranks = sum(range(1, len(population) + 1))  # Sum of ranks
    selection_probabilities = [rank / total_ranks for rank in range(len(population), 0, -1)]

    # Select parents
    parents = []
    for _ in range(len(population)):
        chosen_parent = random.choices(sorted_population, weights=selection_probabilities, k=1)[0]
        parents.append(chosen_parent)
        print(f"Parent selected with fitness: {chosen_parent.fitness()}")

    return parents


In [464]:
# crossover
def enhanced_crossover(parent1, parent2, teacher_type, max_hours, unavailable_days):
    half = len(parent1.assignments) // 2
    new_assignments = parent1.assignments[:half] + parent2.assignments[half:]
    child = Schedule(new_assignments, teacher_type, max_hours, unavailable_days)
    print(f"Crossover created child with fitness: {child.fitness()}")
    return child


In [465]:
# Mutation
def mutate(schedule):
    mutation_rate = 0.1
    if random.random() < mutation_rate:
        mutated_assignment = df_sampled.sample(1).to_dict('records')[0]
        schedule.assignments[random.randint(0, len(schedule.assignments) - 1)] = mutated_assignment
        print(f"Mutation occurred with new assignment: {mutated_assignment}")
    return schedule

In [466]:
# Elitism
def elitism(population, top_k=1):
    return sorted(population, key=lambda s: s.fitness(), reverse=True)[:top_k]

# Proposed GA

In [467]:
def logistic_map(r, x):
    """Logistic map function."""
    return r * x * (1 - x)

def chaotic_initial_population(size, df, teacher_type, max_hours, unavailable_days,location_to_index_map):
    """Generate initial population with chaotic mapping based on the logistic map."""
    r = 3.99  # Chaotic regime for logistic map
    x = 0.5  # Arbitrary start value in (0, 1)
    population = []

    for _ in range(size):
        # Use the logistic map to create a chaotic sequence
        x = logistic_map(r, x)
        sample_size = max(1, int(x * len(df)))  # Determine number of rows to sample
        sampled_df = df.sample(n=sample_size)  # Sample without replacement
        assignments = sampled_df.to_dict('records')
        
        # Create a Schedule instance with the sampled assignments
        # Assuming Schedule is a class that needs to be defined or already exists in your code
        schedule = Schedule(assignments, teacher_type, '2', max_hours, unavailable_days,location_to_index_map)  # Pass '2' for proposed GA
        population.append(schedule)

    return population

In [468]:
# Implement TDA analysis using Gudhi
def compute_persistence_diagrams(point_cloud):
    rips_complex = gd.RipsComplex(points=point_cloud, max_edge_length=42)
    simplex_tree = rips_complex.create_simplex_tree(max_dimension=2)
    persistence = simplex_tree.persistence()
    persistence_diagrams = simplex_tree.persistence_intervals_in_dimension(1)
    return persistence_diagrams

In [469]:
def compute_tda_fitness(persistence_diagrams):
    # Compute TDA-based fitness score
    tda_fitness = sum(persistence[1] - persistence[0] for persistence in persistence_diagrams if persistence[1] != float('inf'))
    return tda_fitness

In [470]:
def check_termination_condition(population, generation, max_generations, target_fitness, convergence_threshold):
    """
    Check if the termination condition for the genetic algorithm is met.

    :param population: The current population of schedules.
    :param generation: The current generation number.
    :param max_generations: The maximum number of generations to run.
    :param target_fitness: The target fitness score to achieve.
    :param convergence_threshold: The threshold for population convergence.

    :return: True if the termination condition is met, False otherwise.
    """

    # Check if the maximum number of generations has been reached
    if generation >= max_generations:
        return True

    # Check if any individual in the population has reached the target fitness
    if any(schedule.fitness() >= target_fitness for schedule in population):
        return True

    # Check for population convergence
    fitness_values = [schedule.fitness() for schedule in population]
    if max(fitness_values) - min(fitness_values) < convergence_threshold:
        return True

    # If none of the above conditions are met, continue the algorithm
    return False


In [471]:
def check_convergence_with_enhanced_criteria(population, fitness_threshold, diversity_threshold, improvement_threshold, num_generations_for_improvement_check):
    # Check if any individual exceeds the fitness threshold
    if any(individual.fitness() >= fitness_threshold for individual in population):
        return True

    # Calculate diversity (could be based on genetic diversity or variance in fitness)
    fitness_values = [individual.fitness() for individual in population]
    diversity = calculate_diversity(fitness_values)  # Implement this function

    # Check if diversity is too low
    if diversity < diversity_threshold:
        return True

    # Check improvement rate (implement a way to track fitness over generations)
    if len(fitness_values) > num_generations_for_improvement_check:
        recent_fitness_values = fitness_values[-num_generations_for_improvement_check:]
        improvement = max(recent_fitness_values) - min(recent_fitness_values)
        if improvement < improvement_threshold:
            return True

    return False

def calculate_diversity(fitness_values):
    # Implement a method to calculate diversity.
    # For example, using standard deviation as a measure of diversity
    return np.std(fitness_values)

In [472]:
def de_crossover_and_mutation(parent1, parent2, population, F=0.8, CR=0.5):
    # Select two random individuals from the population for differential mutation
    individual1, individual2 = random.sample(population, 2)

    # DE Trial Vector Creation
    trial_vector = {}
    for key in parent1.assignments:
        base_value = parent1.assignments[key]
        trial_vector[key] = base_value + F * (individual1.assignments[key] - individual2.assignments[key])

    # DE Crossover Operation
    child_assignments = {}
    for key in parent1.assignments:
        if random.random() < CR:
            child_assignments[key] = trial_vector[key]
        else:
            child_assignments[key] = parent1.assignments[key]

    # Create child with new assignments
    child = Schedule(child_assignments, parent1.teacher_type, parent1.ga_type, parent1.max_hours, parent1.unavailable_days)

    # Mutation (optional, depending on your design)
    child = mutate(child)  # Assuming 'mutate' is a predefined function

    return child


In [473]:
def dynamic_parameter_adjustment_with_tda(schedule, population, feature_persistence_threshold, location_to_index_map):
    # Convert the population to a point cloud for TDA
    point_cloud = []
    for individual in population:
        individual_cloud = individual.to_point_cloud(location_to_index_map)
        point_cloud.extend(individual_cloud)  # Flatten the structure by extending the main point cloud

    # Perform TDA on the point cloud
    persistence_diagrams = compute_persistence_diagrams(point_cloud)

    # Analyze the persistence diagrams
    analysis_result = analyze_persistence_diagrams(persistence_diagrams, feature_persistence_threshold)

    # Adjust parameters based on TDA analysis
    if analysis_result['indicates_convergence']:
        # Increase mutation rate to maintain diversity
        schedule.mutation_rate = min(schedule.mutation_rate + 0.05, 1.0)
    elif analysis_result['indicates_diversity']:
        # Decrease mutation rate
        schedule.mutation_rate = max(schedule.mutation_rate - 0.05, 0.01)

    return schedule



def analyze_persistence_diagrams(persistence_diagrams, feature_persistence_threshold):
    # Define thresholds for convergence and diversity
    CONVERGENCE_THRESHOLD = 10  # Example value indicating a high level of population complexity
    DIVERSITY_THRESHOLD = 5     # Example value indicating a low level of population complexity

    # Count significant features in persistence diagrams
    num_significant_features = len([d for d in persistence_diagrams if d[1] - d[0] > feature_persistence_threshold])

    # Determine population structure based on significant features
    analysis_result = {
        'indicates_convergence': num_significant_features > CONVERGENCE_THRESHOLD,
        'indicates_diversity': num_significant_features < DIVERSITY_THRESHOLD,
    }

    return analysis_result


In [474]:
def select_best_solution(population):
    # Select the individual with the highest fitness score in the population
    best_individual = max(population, key=lambda individual: individual.fitness())
    return best_individual


In [475]:
def ga_picked_passer(ga_type, population_size, max_generations, df, teacher_type, max_hours=None, unavailable_days=None, feature_persistence_threshold=0.1):
    if ga_type == '1':
        return generic_GA(population_size, max_generations, df, teacher_type, max_hours, unavailable_days)
    elif ga_type == '2':
        return proposed_GA(population_size, max_generations, df, teacher_type, max_hours, unavailable_days, feature_persistence_threshold)
    else:
        raise ValueError("Invalid GA type specified. Choose 'generic' or 'proposed'.")

In [476]:
def plot_fitness_scores(fitness_scores, title):
    plt.plot(fitness_scores)
    plt.xlabel('Generation')
    plt.ylabel('Best Fitness Score')
    plt.title(title)
    plt.show()

In [477]:
target_fitness = 95  # This means we are aiming for a solution with at least a fitness of 95
convergence_threshold = 0.5  # This means we consider the algorithm to have converged if the improvement is less than 0.5

def generic_GA(population_size, max_generations, df, teacher_type, max_hours=None, unavailable_days=None):
    location_to_index_map = {}
    # Define the mapping from classroom locations to indices
    location_to_index_map = {classroom: index for index, classroom in enumerate(df['classroom'].unique(), start=1)}

    # Initialize population
    population = initialize_population(population_size, df, teacher_type, max_hours, unavailable_days, location_to_index_map)

    fitness_scores = []  # List to store the best fitness score in each generation

    # Main GA loop
    for generation in range(max_generations):
        # Evaluate fitness
        for individual in population:
            individual.fitness()  # Calls the generic_fitness_evaluation method

        # Log the best fitness score in this generation
        generation_best_fitness = max(individual.fitness() for individual in population)
        fitness_scores.append(generation_best_fitness)

        # Check termination condition
        if check_termination_condition(population, generation, max_generations, target_fitness, convergence_threshold):
            print(f"Termination condition met at generation {generation}.")
            break

        # Select parents
        parents = select_parents(population)

        # Generate offspring through crossover, handling odd number of parents
        children = []
        for i in range(0, len(parents), 2):
            # Check if the next parent exists
            if i + 1 < len(parents):
                child = enhanced_crossover(parents[i], parents[i+1], teacher_type, max_hours, unavailable_days)
                children.append(child)
            else:
                # Handle the case where the number of parents is odd
                # Example: Pair the last parent with the first parent
                child = enhanced_crossover(parents[i], parents[0], teacher_type, max_hours, unavailable_days)
                children.append(child)
        # Mutate the offspring
        for child in children:
            mutate(child)

        # Apply elitism to form the next generation
        elite = elitism(population)
        population = children + elite

        # Logging for tracking
        print(f"Generation {generation}: Best fitness score: {generation_best_fitness}")

    # Find the best solution from the final population
    best_solution = max(population, key=lambda ind: ind.fitness())

    # Plot the evolution of fitness scores
    plt.plot(fitness_scores)
    plt.xlabel('Generation')
    plt.ylabel('Best Fitness Score')
    plt.title('Generic Genetic Algorithm Progress')
    plt.show()

    return best_solution


In [478]:
def proposed_GA(population_size, max_generations, df, teacher_type, max_hours=None, unavailable_days=None, feature_persistence_threshold=0.1):
    print("Initializing proposed GA...")
    
    # Log the initial parameters
    print(f"Parameters: Population size: {population_size}, Max generations: {max_generations}, Teacher type: {teacher_type}, Max hours: {max_hours}, Unavailable days: {unavailable_days}")

    # Define the mapping from classroom locations to indices
    location_to_index_map = {classroom: index for index, classroom in enumerate(df['classroom'].unique(), start=1)}
    print(f"Classroom location to index mapping: {location_to_index_map}")

    # Initialize population with chaotic mapping
    print("Initializing population with chaotic mapping...")
    population = chaotic_initial_population(population_size, df, teacher_type, max_hours, unavailable_days, location_to_index_map)
    print(f"Initial population size: {len(population)}")

    # Define thresholds
    print("Defining fitness, convergence, and diversity thresholds...")
    
    fitness_scores = []  # List to store the best fitness score in each generation

    # Main GA loop
    for generation in range(max_generations):
        print(f"\nStarting generation {generation}...")

        # Evaluate fitness with TDA guidance for each individual
        for individual in population:
            individual.fitness()  # Calls the TDA-based fitness evaluation

        # Log the best fitness score in this generation
        generation_best_fitness = max(individual.fitness() for individual in population)
        fitness_scores.append(generation_best_fitness)
        print(f"Generation {generation}: Best fitness score: {generation_best_fitness}")

        # Check termination condition
        if check_termination_condition(population, generation, max_generations, target_fitness, convergence_threshold):
            print(f"Termination condition met at generation {generation}.")
            break

        # Select parents
        print("Selecting parents...")
        parents = select_parents(population)
        print(f"Number of parents selected: {len(parents)}")

        # Apply DE-Inspired crossover and mutation or dynamic parameter adjustment based on convergence
        print("Applying crossover and mutation...")
        if not check_convergence_with_enhanced_criteria(population, fitness_threshold, diversity_threshold, improvement_threshold, num_generations_for_improvement_check):
            children = [de_crossover_and_mutation(parents[i], parents[i+1], population) for i in range(0, len(parents), 2)]
        else:
            children = [dynamic_parameter_adjustment_with_tda(child, population, feature_persistence_threshold, location_to_index_map) for child in children]
        print(f"Number of children generated: {len(children)}")

        # Combine elite individuals from the previous generation with offspring
        elite = elitism(population)
        print(f"Number of elite individuals: {len(elite)}")
        population = children + elite

    # Find the best solution from the final population
    best_solution = select_best_solution(population)
    print("\nBest solution found:")
    print(best_solution)

    # Plot the evolution of fitness scores
    plt.plot(fitness_scores)
    plt.xlabel('Generation')
    plt.ylabel('Best Fitness Score')
    plt.title('Proposed Genetic Algorithm Progress')
    plt.show()

    return best_solution


In [479]:
teacher_type = input("Enter teacher type (FT for Full-Time, PT for Part-Time): ").strip()
max_hours = None
unavailable_days = []

if teacher_type == 'FT':
    max_hours = float(input("Enter maximum hours you can teach per week: "))
elif teacher_type == 'PT':
    unavailable_days = input("Enter unavailable days (comma-separated, e.g., Monday,Wednesday): ").split(',')


In [480]:
def run_genetic_algorithm(ga_type, population_size, max_generations, df, teacher_type, max_hours=None, unavailable_days=None, feature_persistence_threshold=0.1):
    # Convert ga_type to string to ensure consistency in comparison
    ga_type = str(ga_type)
    
    if ga_type == '1':  # If generic GA is selected
        return generic_GA(population_size, max_generations, df, teacher_type, max_hours, unavailable_days)
    elif ga_type == '2':  # If proposed GA is selected
        return proposed_GA(population_size, max_generations, df, teacher_type, max_hours, unavailable_days, feature_persistence_threshold)
    else:
        raise ValueError(f"Encountered invalid GA type: {ga_type}")

# Define the parameters for the genetic algorithm
population_size = 1000  # Set the population size as per your requirement
max_generations = 100    # Set the maximum number of generations


# Run the chosen genetic algorithm to find the best schedule
best_schedule_found = run_genetic_algorithm(ga_type, population_size, max_generations, df_sampled, teacher_type, max_hours, unavailable_days)

# Check if a schedule was found and print details
if best_schedule_found:
    print("Best schedule fitness:", best_schedule_found.fitness())
    print("Best Schedule Details:\n", best_schedule_found)
else:
    print("No suitable schedule was found.")


Initializing proposed GA...
Parameters: Population size: 1000, Max generations: 100, Teacher type: FT, Max hours: 40.0, Unavailable days: []
Classroom location to index mapping: {'GV 302': 1, 'Com Lab 2': 2, 'Com Lab 4': 3, 'Com Lab 3': 4, 'GCA 301': 5, 'GV 304': 6, 'GCA 305': 7, 'Com Lab 1': 8, 'GV 305': 9, 'GCA 304': 10, 'GCA 303': 11, 'GV 301': 12, 'GV 307': 13, 'GV 303': 14, 'GCA 302': 15, 'GV 306': 16}
Initializing population with chaotic mapping...
