In [None]:
# general
import numpy as np
import json
import plotly.graph_objects as go
import plotly.express as px
from time import perf_counter

In [None]:
# experimental swarm modules
from swarm.ant_colony import AntColony
from swarm.ant_pheromones import BinsPheromones
from swarm.precedence import PrecedenceManager
from swarm.evolution_history import EvolutionHistory

from psplib.launcher import load_psplib_data, Launcher
from psplib.launcher import get_dict_of_optimized_data_structures

## Prepare data

In [None]:
file_name = "psplib/j120_json/j12016_6.json"
# loading data for the test
wg, contractors, work_estimator = load_psplib_data(file_name)
ods = get_dict_of_optimized_data_structures(wg, contractors)

n_tasks = len(wg.nodes)
# to get fitness metrics
launcher = Launcher(wg, contractors, work_estimator, ods)
# to convert to valid order
precedence_manager = PrecedenceManager.from_workgraph(wg)

In [None]:
# for now, define fitness here
def fitness_function(priorities_array):
    activity_list = precedence_manager(priorities_array)
    makespan = launcher(activity_list)
    return makespan

## Create colony

In [None]:
colony = AntColony(
    fitness_function=fitness_function,
    pheromones=BinsPheromones(
        n_tasks=n_tasks, 
        n_bins=5, 
        start_amount=10,
        evaporation_rate=0.20,
        # reproducible results 
        random=np.random.default_rng(seed=0)
    ),
    history=EvolutionHistory()
)

## Start the evolution

In [None]:
# basic evolution
# can be interrupted, progress will be saved in history
# colony.simple_evolution(n_generations=150, n_per_generation=50)

In [None]:
# "expanding" evolution
for epoch in range(5):
    colony.simple_evolution(n_generations=50, n_per_generation=25)
    colony.pheromones.stretch(2)
    colony.pheromones.evaporation_rate += 0.05
    print(f"{epoch=}")

## Plot

In [None]:
# info about evolution
stats = colony.history.get_stats()

# a super-simple baseline
baseline = min(
    fitness_function(np.arange(n_tasks)),
    fitness_function(np.arange(n_tasks)[::-1])
    # ... what else?
)

# for plotting, we need array with same length as others
baseline_line = baseline * np.ones(stats["n_generations"])
best_result_line = stats["best_overall"] * np.ones(stats["n_generations"])

In [None]:
# plot the history of evolution
fig = go.Figure([
    go.Scatter(y=stats["generation_best"], name="Gen Best", line_color="navy"),
    go.Scatter(y=stats["generation_mean"], name="Gen Mean", line_color="green"),
    go.Scatter(y=stats["record_break"], name="Record", line_color="navy", mode="markers"),
    
    go.Scatter(y=baseline_line, name="Baseline", line_color="lightgrey"),
    go.Scatter(y=best_result_line, name="Best Found", line_color="navy")
])

fig.update_layout(height=500, width=1000, template="plotly_white")
fig.update_layout(
    xaxis_title="Generation",                   
    yaxis_title="Metric"
)
fig.update_layout(legend=dict(
    yanchor="middle", y=0.40,
    xanchor="center", x=0.90
))

fig.show()
# fig.write_html("logs/ant_evolution.html")

In [None]:
fig = px.imshow(
    colony.pheromones.bins_pheromones.T,
    color_continuous_scale=["white", "blue", "darkblue", "black"],
    template="plotly_dark"
)
fig.show()

## Testing integration

In [None]:
from sampo.pipeline import SchedulingPipeline
from sampo.schemas.time_estimator import DefaultWorkEstimator
from sampo.scheduler.base import Scheduler
from sampo.scheduler.resource.identity import IdentityResourceOptimizer

In [None]:
class AntSheduler(Scheduler):

    def __init__(self, resource_optimizer, work_estimator):
        super().__init__(
            scheduler_type="AntScheduler[Experimental]",
            resource_optimizer=resource_optimizer,
            work_estimator=work_estimator
        )
    
    def __str__(self):
        return "AntScheduler[Experimental]"

    def schedule_with_cache(self, wg, contractors, *args, **kwargs):
        ods = get_dict_of_optimized_data_structures(wg, contractors)
        
        n_tasks = len(wg.nodes)
        launcher = Launcher(wg, contractors, self.work_estimator, ods)
        precedence_manager = PrecedenceManager.from_workgraph(wg)

        def fitness_function(priorities_array):
            activity_list = precedence_manager(priorities_array)
            makespan = launcher(activity_list)
            return makespan

        colony = AntColony(
            fitness_function=fitness_function,
            pheromones=BinsPheromones(
                n_tasks=n_tasks, 
                n_bins=5, 
                start_amount=10,
                evaporation_rate=0.20,
                # reproducible results 
                random=np.random.default_rng(seed=0)
            ),
            history=EvolutionHistory()
        )
        colony.simple_evolution(n_generations=10, n_per_generation=25)

        best_solution = colony.history.best_solution
        best_activity_list = precedence_manager(best_solution)
        best_schedule = launcher.get_schedule(best_activity_list)
        return [best_schedule]

In [None]:
wg, contractors, work_estimator = load_psplib_data("psplib/j120_json/j1206_1.json")

In [None]:
ant_sheduler = AntSheduler(
    work_estimator=work_estimator,
    resource_optimizer=IdentityResourceOptimizer()
)

In [None]:
schedule = (
    SchedulingPipeline
    .create()
    .wg(wg)
    .contractors(contractors)
    .schedule(ant_sheduler)
    .finish()
)[0]

## Historical data

In [None]:
# df = pd.read_csv("data/electroline_full_connections.csv")
# work_estimator = DefaultWorkEstimator()
# ant_sheduler = AntSheduler(
#     work_estimator=work_estimator,
#     resource_optimizer=IdentityResourceOptimizer()
# )
# schedule = (
#     SchedulingPipeline
#     .create()
#     .wg(wg=df, all_connections=True, change_connections_info=False)
#     # contractors should be set by .schedule
#     .schedule(ant_sheduler)
#     .finish()
# )[0]