# Creating randomized schedule data in a CSV file in 2 variants

In [None]:
import random
import csv

# 1 variant
#  data is represented as example
# 1,1,1,1,1,0,0,1,0,3,2,1,1,1,0,1,1,1,1,0,0
# for 3 days and 3 employe
# first part (1,1,1) stands for emp0 preference for each day
# (3,2,1) stands for company requirments for each day
# (1,0,0) last part for 3 day assigned is employe  1

days = 3
employees = 3
sample = 100
max_tries = 1000
FileName="grafik_100_3x3.csv"

def generate_schedule_from_preferences(preferences, firm_requirements, num_days=days, max_tries=max_tries):
    num_emps = len(preferences)
    schedule = [[0 for _ in range(num_emps)] for _ in range(num_days)]
    for e in range(num_emps):
        available_days = [d for d in range(num_days) if preferences[e][d] == 1]
        if len(available_days) < firm_requirements[e]:
            return None
        chosen = random.sample(available_days, firm_requirements[e])
        for d in chosen:
            schedule[d][e] = 1
    return schedule

def is_schedule_valid(schedule, firm_requirements):
    if schedule is None:
        return False
    num_days = len(schedule)
    num_emps = len(schedule[0])
    col_sums = [sum(schedule[d][e] for d in range(num_days)) for e in range(num_emps)]
    return col_sums == firm_requirements

def generate_dataset_csv(filename=FileName, num_samples=sample, num_emps=employees, num_days=days, max_tries=max_tries):
    with open(filename, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        header = [f"emp{e}_pref{d}" for e in range(num_emps) for d in range(num_days)] + \
                 [f"req_worker{e}" for e in range(num_emps)] + \
                 [f"day{d}_emp{e}" for d in range(num_days) for e in range(num_emps)]
        writer.writerow(header)
        
        samples_generated = 0
        attempts = 0
        while samples_generated < num_samples and attempts < max_tries * num_samples:
            preferences = [[random.randint(0, 1) for _ in range(num_days)] for _ in range(num_emps)]
            firm_requirements = [random.randint(1, num_days) for _ in range(num_emps)]
            valid = True
            for e in range(num_emps):
                if sum(preferences[e]) < firm_requirements[e]:
                    valid = False
                    break
            if not valid:
                attempts += 1
                continue
            
            schedule = generate_schedule_from_preferences(preferences, firm_requirements, num_days, max_tries)
            if schedule is not None and is_schedule_valid(schedule, firm_requirements):
                row = []
                for e in range(num_emps):
                    row.extend(preferences[e])
                row.extend(firm_requirements)
                for d in range(num_days):
                    row.extend(schedule[d])
                writer.writerow(row)
                samples_generated += 1
            attempts += 1
        
        print(f"Generated {samples_generated} samples after {attempts} attempts.")
        print(f"File {filename} has been saved.")

generate_dataset_csv()

Generated 100 samples after 726 attempts.
File grafik_100_3x3.csv has been saved.


In [None]:
import random
import csv

# 2 variant
#  data is represented as example
# 1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,0,1,1,0,1
# for 3 days and 3 employe
# first part (1,1,1) stands for 1 day preferences of each employee
# (3,1,3) stands for company requirments for each day
# (1,0,1) last part for 3 day assigned is employe  1 and 3

days = 3
employees = 3
sample = 100
max_tries = 1000
FileName = "grafik_100_3x3.csv"

def generate_preferences_for_employees(num_emps=employees, num_days=days):
    return [[random.randint(0, 1) for _ in range(num_days)] for _ in range(num_emps)]

def generate_schedule_from_preferences(preferences, firm_requirements, num_days=days, max_tries=max_tries):
    num_emps = len(preferences)
    schedule = [[0 for _ in range(num_emps)] for _ in range(num_days)]
    
    # Dla każdego pracownika
    for e in range(num_emps):
        available_days = [d for d in range(num_days) if preferences[e][d] == 1]  # dni, w które pracownik chce pracować
        if len(available_days) < firm_requirements[e]:
            return None  # Jeśli nie ma wystarczającej liczby dni, zwróć None
        chosen = random.sample(available_days, firm_requirements[e])  # Wybierz dni, w które pracownik będzie pracować
        for d in chosen:
            schedule[d][e] = 1  # Pracownik zostaje przypisany do dnia
    
    return schedule

def is_schedule_valid(schedule, firm_requirements):
    if schedule is None:
        return False
    num_days = len(schedule)
    num_emps = len(schedule[0])
    col_sums = [sum(schedule[d][e] for d in range(num_days)) for e in range(num_emps)]  # Ilość dni pracy każdego pracownika
    return col_sums == firm_requirements  # Sprawdza, czy każdy pracownik ma przypisaną odpowiednią liczbę dni

def generate_dataset_csv(filename=FileName, num_samples=sample, num_emps=employees, num_days=days, max_tries=max_tries):
    with open(filename, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        
        # Nagłówki pliku CSV - dla każdego dnia preferencje każdego pracownika
        header = []
        for d in range(num_days):
            for e in range(num_emps):
                header.append(f"day{d}_pref_emp{e}")
        header.extend([f"req_worker{e}" for e in range(num_emps)])  # Dodaj wymagania dla każdego pracownika
        for d in range(num_days):
            for e in range(num_emps):
                header.append(f"day{d}_emp{e}")  # Dodaj grafiki dla każdego dnia
        writer.writerow(header)
        
        samples_generated = 0
        attempts = 0
        while samples_generated < num_samples and attempts < max_tries * num_samples:
            preferences = generate_preferences_for_employees(num_emps, num_days)  # Generowanie preferencji dla pracowników
            firm_requirements = [random.randint(1, num_days) for _ in range(num_emps)]  # Wymagania dla każdego pracownika (ile dni musi pracować)
            
            # Sprawdzenie, czy preferencje są wystarczające dla wymagań firmy
            valid = True
            for e in range(num_emps):
                if sum(preferences[e]) < firm_requirements[e]:
                    valid = False
                    break
            if not valid:
                attempts += 1
                continue
            
            # Generowanie grafiku
            schedule = generate_schedule_from_preferences(preferences, firm_requirements, num_days, max_tries)
            if schedule is not None and is_schedule_valid(schedule, firm_requirements):
                # Tworzenie wiersza danych do CSV
                row = []
                
                # Dodaj preferencje dla każdego dnia i pracownika
                for d in range(num_days):
                    for e in range(num_emps):
                        row.append(preferences[e][d])
                
                # Dodaj wymagania dla każdego pracownika
                row.extend(firm_requirements)
                
                # Dodaj grafik dla każdego dnia
                for d in range(num_days):
                    for e in range(num_emps):
                        row.append(schedule[d][e])
                
                # Zapisz wiersz w pliku
                writer.writerow(row)
                samples_generated += 1
            attempts += 1
        
        print(f"Generated {samples_generated} samples after {attempts} attempts.")
        print(f"File {filename} has been saved.")

# Uruchomienie generowania danych
generate_dataset_csv()

Generated 100 samples after 688 attempts.
File grafik_100_3x3.csv has been saved.


# Neural Network 

In [32]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow.keras.backend as K
import tensorflow as tf

# Wczytanie danych z CSV
data = pd.read_csv("grafik_2000_3x3.csv")

# Kolumny wejściowe (preferencje i wymagania)
X_cols = [
    "emp0_pref0", "emp0_pref1", "emp0_pref2",
    "emp1_pref0", "emp1_pref1", "emp1_pref2",
    "emp2_pref0", "emp2_pref1", "emp2_pref2",
    "req_worker0", "req_worker1", "req_worker2"
]

Y_cols = [
    "day0_emp0", "day0_emp1", "day0_emp2",
    "day1_emp0", "day1_emp1", "day1_emp2",
    "day2_emp0", "day2_emp1", "day2_emp2"
]

# Przygotowanie danych
X = data[X_cols].values.astype(np.float32)  
Y = data[Y_cols].values.astype(np.float32)  

# Podział na dane wejściowe
X_pref = X[:, :9]   # preferencje
X_req = X[:, 9:]    # wymagania

# Definicja modelu
input_main = Input(shape=(9,), name='preferences')
input_req = Input(shape=(3,), name='requirements')

x = Concatenate()([input_main, input_req])
x = Dense(32, activation='relu')(x)
x = Dense(64, activation='relu')(x)
output = Dense(9, activation='sigmoid')(x)

def custom_loss_with_requirements(req_tensor):
    def loss(y_true, y_pred):
        # Upewnij się, że y_true i y_pred są typu float32
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        
        bce = K.mean(K.binary_crossentropy(y_true, y_pred), axis=-1)

        y_pred_reshaped = tf.reshape(y_pred, (-1, 3, 3))  # (batch, 3 dni, 3 pracowników)
        assigned = tf.reduce_sum(y_pred_reshaped, axis=2)  # shape: (batch, 3)

        penalty = tf.reduce_mean(tf.abs(assigned - req_tensor), axis=1)

        return bce + penalty * 0.5
    return loss


class CustomModel(Model):
    def train_step(self, data):
        (x, req_tensor), y_true = data
        with tf.GradientTape() as tape:
            y_pred = self([x, req_tensor], training=True)

            # Obliczanie straty
            y_pred_reshaped = tf.reshape(y_pred, (-1, 3, 3))
            assigned = tf.reduce_sum(y_pred_reshaped, axis=2)

            penalty = tf.reduce_mean(tf.abs(assigned - req_tensor), axis=1)

            # Standardowa binary crossentropy
            bce = tf.reduce_mean(tf.keras.losses.binary_crossentropy(y_true, y_pred), axis=-1)

            total_loss = tf.reduce_mean(bce + 0.5 * penalty)

        # Obliczanie gradientów
        gradients = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        # Aktualizacja metryk
        self.compiled_metrics.update_state(y_true, y_pred)

        # Zwrócenie metryk
        return {m.name: m.result() for m in self.metrics}

model = CustomModel(inputs=[input_main, input_req], outputs=output)

# Kompilacja modelu - dostarczanie minimalnej straty
model.compile(optimizer=Adam(), loss=tf.keras.losses.BinaryCrossentropy(), metrics=['accuracy'])

# Callback do wczesnego zatrzymania
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

# Trenowanie modelu
model.fit(
    [X_pref, X_req],  # wejścia
    Y,                # wyjścia
    validation_split=0.2,
    epochs=50,
    batch_size=32,
    callbacks=[early_stop]
)

# Przykładowa prognoza
test_pref = np.array([[0, 1, 1, 1, 1, 1, 0, 0, 1]]).astype(np.float32)  # Konwersja na float32
test_req = np.array([[2, 3, 1]]).astype(np.float32)  # Konwersja na float32

# Upewnij się, że przekazujesz dane wejściowe w postaci słownika
prediction = model.predict({'preferences': test_pref, 'requirements': test_req})

y_pred_binary = (prediction > 0.4).astype(int)

print("Binarna prognoza (0 = nie pracuje, 1 = pracuje):")
print(y_pred_binary[0].reshape(3, 3))

print(test_req)

# Prognoza ciągła
print("Predykcja ciągła (wartości zmiennoprzecinkowe):")
print(prediction[0].reshape(3, 3))


Epoch 1/50


```
for metric in self.metrics:
    metric.update_state(y, y_pred)
```

  return self._compiled_metrics_update_state(


[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 9ms/step - accuracy: 0.3943 - loss: 0.4706 - val_accuracy: 0.0650 - val_loss: 0.6403
Epoch 2/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.0890 - loss: 0.4712 - val_accuracy: 0.1825 - val_loss: 0.5715
Epoch 3/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.1889 - loss: 0.4920 - val_accuracy: 0.2175 - val_loss: 0.5060
Epoch 4/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.2052 - loss: 0.4913 - val_accuracy: 0.2750 - val_loss: 0.4717
Epoch 5/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.2492 - loss: 0.4955 - val_accuracy: 0.2600 - val_loss: 0.4437
Epoch 6/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step - accuracy: 0.2696 - loss: 0.4875 - val_accuracy: 0.2850 - val_loss: 0.4252
Epoch 7/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━

In [35]:
test_pref = np.array([[0, 1, 1, 1, 1, 1, 0, 0, 1]]).astype(np.float32)  # Konwersja na float32
test_req = np.array([[3, 3, 3]]).astype(np.float32)  # Konwersja na float32

# Upewnij się, że przekazujesz dane wejściowe w postaci słownika
y_pred_binary = (prediction > 0.4).astype(int)

print("Binarna prognoza (0 = nie pracuje, 1 = pracuje):")
print(y_pred_binary[0].reshape(3, 3))

print(test_req)

# Prognoza ciągła
print("Predykcja ciągła (wartości zmiennoprzecinkowe):")
print(prediction[0].reshape(3, 3))

Binarna prognoza (0 = nie pracuje, 1 = pracuje):
[[1 1 1]
 [1 1 1]
 [1 1 1]]
[[3. 3. 3.]]
Predykcja ciągła (wartości zmiennoprzecinkowe):
[[0.71747017 0.9807725  0.93656945]
 [0.9891666  0.97968745 0.7928449 ]
 [0.97058743 0.77228796 0.9962871 ]]


# not working yet

In [19]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import accuracy_score

# Wczytanie danych z CSV
data = pd.read_csv("grafik_2000_3x3.csv")

# Kolumny wejściowe (preferencje i wymagania)
X_cols = [
    "emp0_pref0", "emp0_pref1", "emp0_pref2",
    "emp1_pref0", "emp1_pref1", "emp1_pref2",
    "emp2_pref0", "emp2_pref1", "emp2_pref2",
    "req_worker0", "req_worker1", "req_worker2"
]

Y_cols = [
    "day0_emp0", "day0_emp1", "day0_emp2",
    "day1_emp0", "day1_emp1", "day1_emp2",
    "day2_emp0", "day2_emp1", "day2_emp2"
]

# Przygotowanie danych
X = data[X_cols].values
Y = data[Y_cols].values

# Definicja modelu
model = Sequential()
model.add(Dense(32, activation='relu', input_shape=(12,)))
model.add(Dense(64, activation='relu'))
model.add(Dense(9, activation='sigmoid'))  # 9 wyjść: 3 dni × 3 pracowników

model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

early_stop = EarlyStopping(
    monitor='val_loss',     # możesz też użyć 'val_accuracy' itp.
    patience=5,             # ile epok poczeka zanim zatrzyma
    restore_best_weights=True # przywróć najlepszy model po zatrzymaniu
)
# Trening
model.fit(X, Y, epochs=50, batch_size=32, callbacks=[early_stop] ,validation_split=0.2)

# Przykładowa prognoza
test_input = np.array([[0,1,1,1,1,1,0,0,1,2,3,1]])  # 12 wartości
prediction = model.predict(test_input)
y_pred_binary = (prediction > 0.5).astype(int)

print("Binarna prognoza (0 = nie pracuje, 1 = pracuje):")
print(y_pred_binary[0].reshape(3, 3))

print("Przewidywany harmonogram (3 dni × 3 pracowników):")
print(test_input)
print(prediction[0].reshape(3, 3))
# wiersz oznacza dzien kolumna przewidywania danego pracownika


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 17ms/step - accuracy: 0.1778 - loss: 0.6884 - val_accuracy: 0.2700 - val_loss: 0.6302
Epoch 2/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 8ms/step - accuracy: 0.2907 - loss: 0.6121 - val_accuracy: 0.2100 - val_loss: 0.5414
Epoch 3/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.2392 - loss: 0.5240 - val_accuracy: 0.2550 - val_loss: 0.4492
Epoch 4/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 0.2068 - loss: 0.4401 - val_accuracy: 0.2525 - val_loss: 0.3879
Epoch 5/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - accuracy: 0.2079 - loss: 0.3873 - val_accuracy: 0.2075 - val_loss: 0.3536
Epoch 6/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 0.2037 - loss: 0.3502 - val_accuracy: 0.1725 - val_loss: 0.3259
Epoch 7/50
[1m50/50[0m [32m━━━━━━━━━

In [14]:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Wczytanie danych z CSV
data = pd.read_csv("grafik_2000_4x4.csv")

# Kolumny wejściowe (preferencje i wymagania)
X_cols = [
    "emp0_pref0", "emp0_pref1", "emp0_pref2", "emp0_pref3",
    "emp1_pref0", "emp1_pref1", "emp1_pref2", "emp1_pref3",
    "emp2_pref0", "emp2_pref1", "emp2_pref2", "emp2_pref3",
    "emp3_pref0", "emp3_pref1", "emp3_pref2", "emp3_pref3",
    "req_worker0", "req_worker1", "req_worker2", "req_worker3"
]

Y_cols = [
    "day0_emp0", "day0_emp1", "day0_emp2", "day0_emp3",
    "day1_emp0", "day1_emp1", "day1_emp2", "day1_emp3",
    "day2_emp0", "day2_emp1", "day2_emp2", "day2_emp3",
    "day3_emp0", "day3_emp1", "day3_emp2", "day3_emp3"
]

# Przygotowanie danych
X = data[X_cols].values
Y = data[Y_cols].values

# Definicja modelu
model = Sequential()
model.add(Dense(128, activation='relu', input_shape=(20,)))
model.add(Dense(256, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(16, activation='sigmoid'))

# Early stopping
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

model.compile(optimizer=Adam(), loss='binary_crossentropy', metrics=['accuracy'])

# Trening modelu
model.fit(X, Y, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stop])

# Przykładowa prognoza
# test_input: 16 preferencji (4 pracowników × 4 preferencje) + 4 wymagania
test_input = np.array([[
    1, 0, 1, 1,   # emp0
    1, 1, 0, 0,   # emp1
    0, 1, 1, 1,   # emp2
    1, 0, 0, 1,   # emp3
    2, 3, 1, 2    # wymagania na 4 dni
]])

prediction = model.predict(test_input)

print("Przewidywany harmonogram (4 dni × 4 pracowników):")
print(test_input)
print(prediction[0].reshape(4, 4))  # 4 dni × 4 pracowników


Epoch 1/50


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 16ms/step - accuracy: 0.1268 - loss: 0.6504 - val_accuracy: 0.1900 - val_loss: 0.4997
Epoch 2/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.1981 - loss: 0.4504 - val_accuracy: 0.2300 - val_loss: 0.3642
Epoch 3/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.2003 - loss: 0.3511 - val_accuracy: 0.1225 - val_loss: 0.3353
Epoch 4/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.1684 - loss: 0.3162 - val_accuracy: 0.1675 - val_loss: 0.3148
Epoch 5/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.1711 - loss: 0.2999 - val_accuracy: 0.1850 - val_loss: 0.3076
Epoch 6/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.1927 - loss: 0.2889 - val_accuracy: 0.1575 - val_loss: 0.3124
Epoch 7/50
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━

# genetic algorithms

In [3]:
import random
import string
import time
from datetime import datetime
from pathlib import Path
import numpy as np

class AG:
    def __init__(self):
        self.chromosome_length = 9
        self.population_size = 100
        self.number_of_epochs = 200
        self.number_of_parents = 2
        self.number_of_candidates = 8
        self.mutation_probability = 0.1

    def algorithm(self):
        log_file = Path(f"log-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.txt")
        string_builder = []

        population = self.initialize_population(self.population_size)
        children = self.initialize_population(self.population_size)

        for epoch in range(self.number_of_epochs):
            fitness = self.get_fitness(population)
            population = [x for _, x in sorted(zip(fitness, population), key=lambda pair: pair[0])]
            fitness.sort()

            print(f"{epoch}  {min(fitness):.2f}  {np.mean(fitness):.2f}")
            string_builder.append(f"\n{epoch}  {min(fitness):.2f}  {np.mean(fitness):.2f}")

            result = self.write_result(population[0])
            if epoch == self.number_of_epochs - 1:
                print(result)
            string_builder.append("\n" + result + "\n")

            for p in range(int(0.1 * self.population_size), self.population_size):
                parents, parent_ids = self.select_parents(fitness, population)
                children[p] = self.generate_child_aex(parents)

                for k in range(self.number_of_parents):
                    string_builder.append(" ".join(str(x) for x in parents[k]) + f"   {fitness[parent_ids[k]]}")
                string_builder.append(" ".join(str(x) for x in children[p]) + f"   {self.get_fitness_single(children[p])}\n")

            log_file.write_text("\n".join(string_builder), encoding='utf-8')

            for p in range(int(0.1 * self.population_size), self.population_size):
                population[p] = children[p][:]

            for p in range(self.population_size):
                if random.random() < self.mutation_probability:
                    population[p] = self.mutation_swap(population[p])

    def mutation_swap(self, pop):
        x1 = random.randint(0, self.chromosome_length - 1)
        x2 = random.randint(0, self.chromosome_length - 1)
        pop[x1], pop[x2] = pop[x2], pop[x1]
        return pop

    def initialize_population(self, size):
        base = list(range(ord('A'), ord('A') + 26)) + [91, 92, 93, 94]
        return [random.sample(base, len(base)) for _ in range(size)]

    def generate_child_aex(self, parents):
        parent_length = len(parents[0])
        current_vertex = parents[0][0]
        child = [current_vertex]
        available = set(parents[0])
        available.remove(current_vertex)
        counter = 1
        parent_index = 0

        while counter < parent_length:
            next_vertex = -1
            selected = parents[parent_index]
            index = selected.index(current_vertex)
            if index < parent_length - 1 and selected[index + 1] not in child:
                next_vertex = selected[index + 1]
                parent_index = (parent_index + 1) % len(parents)

            if next_vertex == -1:
                next_vertex = random.choice(list(available))
            child.append(next_vertex)
            available.remove(next_vertex)
            current_vertex = next_vertex
            counter += 1

        return child

    def get_fitness(self, population):
        return [self.calculate_fitness(individual) for individual in population]

    def get_fitness_single(self, individual):
        return self.calculate_fitness(individual)

    def calculate_fitness(self, chromosome):
        fitness = 0
        for i, gene in enumerate(chromosome):
            if gene < ord('A'):
                continue
            fitness += self._frequency[gene - ord('A')] * self._weights[i]
        return fitness

    def select_parents(self, fitness, population):
        selected_ids = []
        for _ in range(self.number_of_parents):
            candidates = random.sample(range(len(population)), self.number_of_candidates)
            best = min(candidates, key=lambda i: fitness[i])
            selected_ids.append(best)
        return [population[i][:] for i in selected_ids], selected_ids

    def write_result(self, chromosome):
        result = []
        for i in range(3):
            row = []
            for j in range(10):
                val = chromosome[10 * i + j]
                if val == 91:
                    row.append(".")
                elif val == 92:
                    row.append(",")
                elif val == 93:
                    row.append(";")
                elif val == 94:
                    row.append("/")
                else:
                    row.append(chr(val))
            result.append(" ".join(row))
        return "\n".join(result)


if __name__ == "__main__":
    ag = AG()
    ag.algorithm()


0  225.84  272.95
1  223.12  267.32
2  222.40  261.88
3  198.08  262.82
4  198.08  263.29
5  198.08  258.96
6  198.08  257.52
7  198.08  245.70
8  198.08  239.17
9  198.08  235.95
10  198.08  239.81
11  195.10  218.23
12  191.89  211.16
13  191.89  213.15
14  191.89  237.59
15  191.89  232.88
16  188.58  217.79
17  187.68  205.35
18  186.88  226.44
19  186.88  241.14
20  186.88  236.91
21  186.88  239.69
22  182.84  218.72
23  182.84  210.04
24  182.84  223.81
25  182.84  228.76
26  182.84  235.37
27  182.84  230.26
28  181.14  219.61
29  181.14  201.98
30  181.14  194.75
31  180.21  218.45
32  180.21  224.19
33  180.21  221.47
34  180.21  231.32
35  180.21  232.55
36  180.21  224.21
37  178.19  202.26
38  178.19  199.23
39  177.75  205.72
40  177.75  210.04
41  176.85  220.87
42  176.85  217.38
43  176.85  221.86
44  176.85  197.97
45  176.85  191.82
46  175.05  184.08
47  175.05  199.27
48  175.05  204.93
49  175.05  198.44
50  175.05  197.55
51  175.05  177.11
52  175.05  177.57
53 

In [None]:
import csv
import random
import argparse
from datetime import datetime
from pathlib import Path
import numpy as np

class GeneticScheduler:
    def __init__(self, csv_path: str,
                 population_size: int = 100,
                 generations: int = 200,
                 crossover_rate: float = 0.8,
                 mutation_rate: float = 0.1,
                 tournament_size: int = 3):
        self.csv_path = csv_path
        self.population_size = population_size
        self.generations = generations
        self.crossover_rate = crossover_rate
        self.mutation_rate = mutation_rate
        self.tournament_size = tournament_size

        # Loaded from CSV
        self.preferences = []     # flat list: emp0_day0, emp0_day1, ..., empN_dayM
        self.requirements = []    # per day
        self.num_employees = 0
        self.num_days = 0
        self.chromosome_length = 0

        self._load_data()

    def _load_data(self):
        """
        Expects CSV with headers:
        emp0_pref0, emp0_pref1, ..., empN_prefM,
        req_worker0, ..., req_workerM,
        day0_emp0, ..., dayM_empN
        """
        with open(self.csv_path, newline='') as csvfile:
            reader = csv.DictReader(csvfile)
            data = next(reader)

        # Determine dimensions from keys
        pref_keys = [k for k in data.keys() if k.startswith('emp') and '_pref' in k]
        req_keys = [k for k in data.keys() if k.startswith('req_worker')]
        # sort keys to ensure order
        pref_keys.sort(key=lambda x: (int(x.split('_pref')[0][3:]), int(x.split('_pref')[1])))
        req_keys.sort(key=lambda x: int(x.replace('req_worker', '')))

        # flatten preferences
        self.preferences = [int(data[k]) for k in pref_keys]
        self.requirements = [int(data[k]) for k in req_keys]

        # infer dimensions
        self.num_days = len(self.requirements)
        self.num_employees = len(self.preferences) // self.num_days
        self.chromosome_length = self.num_days * self.num_employees

    def _initialize_population(self):
        # each gene: 0 (off) or 1 (on)
        return [ [random.randint(0,1) for _ in range(self.chromosome_length)]
                 for _ in range(self.population_size) ]

    def _fitness(self, chromosome: list) -> float:
        score = 0
        # Preference score: match preferred assignment
        for emp in range(self.num_employees):
            for day in range(self.num_days):
                gene = chromosome[day*self.num_employees + emp]
                pref = self.preferences[emp*self.num_days + day]
                # if matches preference (1=works,0=off)
                if gene == pref:
                    score += 1
        # Requirement penalty
        for day in range(self.num_days):
            assigned = sum(chromosome[day*self.num_employees + emp]
                           for emp in range(self.num_employees))
            required = self.requirements[day]
            # penalty for shortage or surplus
            if assigned < required:
                score -= 10 * (required - assigned)
            elif assigned > required:
                score -= 5 * (assigned - required)
        return score

    def _select_parent(self, population, fitnesses):
        # tournament selection
        candidates = random.sample(list(enumerate(population)), self.tournament_size)
        # select best fitness
        best = max(candidates, key=lambda x: fitnesses[x[0]])
        return best[1]

    def _crossover(self, parent1: list, parent2: list) -> list:
        if random.random() > self.crossover_rate:
            return parent1.copy()
        point = random.randint(1, self.chromosome_length - 1)
        child = parent1[:point] + parent2[point:]
        return child

    def _mutate(self, chromosome: list):
        for i in range(self.chromosome_length):
            if random.random() < self.mutation_rate:
                chromosome[i] = 1 - chromosome[i]
        return chromosome

    def run(self):
        population = self._initialize_population()
        best_solution = None
        best_score = float('-inf')

        for gen in range(self.generations):
            fitnesses = [self._fitness(ch) for ch in population]
            # track best
            idx = int(np.argmax(fitnesses))
            if fitnesses[idx] > best_score:
                best_score = fitnesses[idx]
                best_solution = population[idx].copy()

            # report
            avg_fit = sum(fitnesses) / len(fitnesses)
            print(f"Gen {gen:3d}: best={best_score:.2f}, avg={avg_fit:.2f}")

            # new population
            new_pop = []
            while len(new_pop) < self.population_size:
                p1 = self._select_parent(population, fitnesses)
                p2 = self._select_parent(population, fitnesses)
                child = self._crossover(p1, p2)
                child = self._mutate(child)
                new_pop.append(child)
            population = new_pop

        print("\nBest schedule fitness:", best_score)
        print(self.format_solution(best_solution))
        return best_solution, best_score

    def format_solution(self, chromosome: list) -> str:
        # Pretty-print grid of days × employees
        lines = []
        for day in range(self.num_days):
            row = chromosome[day*self.num_employees:(day+1)*self.num_employees]
            lines.append(' '.join(str(x) for x in row))
        return '\n'.join(lines)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Genetic Scheduler for work plan optimization")
    parser.add_argument('csvfile', help="Input CSV file with preferences and requirements")
    parser.add_argument('-p', '--population', type=int, default=100)
    parser.add_argument('-g', '--generations', type=int, default=200)
    parser.add_argument('-c', '--crossover', type=float, default=0.8)
    parser.add_argument('-m', '--mutation', type=float, default=0.1)
    parser.add_argument('-t', '--tournament', type=int, default=3)
    args = parser.parse_args()

    scheduler = GeneticScheduler(
        csv_path=args.csvfile,
        population_size=args.population,
        generations=args.generations,
        crossover_rate=args.crossover,
        mutation_rate=args.mutation,
        tournament_size=args.tournament
    )
    scheduler.run()
