In [1]:


import pandas as pd

from base.training import mse_weighted
from common.model_utils import load_model_tflite

assert mse_weighted


Using mse_weighted


In [2]:
from typing import List


def preprocess_df(df: pd.DataFrame, features: List[str], periodicity: List[str]) -> pd.DataFrame:
    df = df.reindex(columns=features, copy=False)
    return df


BASE_MODEL_METADATA = load_model_tflite("models/zamg_vienna_2019_2019_simple_dense").metadata

sim_data: pd.DataFrame = pd.read_pickle("data/simulation_data.pickle")
sim_data = preprocess_df(sim_data.copy(), BASE_MODEL_METADATA.input_features, [])
sim_data.sort_index()
sim_data.to_csv("simulations_data/sim_data.csv")

sim_data

Unnamed: 0_level_0,TL,P,RF,SO
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-01-01 00:00:00,5.0,1012.4,60.0,0.0
2020-01-01 00:10:00,5.0,1012.5,60.0,0.0
2020-01-01 00:20:00,4.9,1012.5,61.0,0.0
2020-01-01 00:30:00,4.9,1012.5,61.0,0.0
2020-01-01 00:40:00,4.9,1012.6,60.0,0.0
...,...,...,...,...
2021-12-31 23:10:00,15.3,999.2,59.0,0.0
2021-12-31 23:20:00,15.2,999.3,59.0,0.0
2021-12-31 23:30:00,15.2,999.3,59.0,0.0
2021-12-31 23:40:00,15.0,999.7,59.0,0.0


In [3]:
from matplotlib.patches import Patch
import os
from tracemalloc import Snapshot

import matplotlib.dates as mdates
import pandas as pd
from matplotlib import axes, pyplot as plt
from matplotlib.ticker import MultipleLocator

from sensor.sensor_knowledge import SensorKnowledge


def plot_simulation(path: str,
                    sim_id: str,
                    predictions: pd.DataFrame,
                    measurements: pd.DataFrame,
                    violations: pd.DataFrame,
                    horizon_updates: pd.DataFrame,
                    configuration_updates: pd.DataFrame,
                    last_config: pd.Series,
                    threshold: float,
                    configuration_colors: dict
                    ):
    fig, plots = plt.subplots(figsize=(200, 10))
    plots: axes.Axes

    # Measurements, predictions
    plots.plot(predictions.index, predictions['TL'], label='Predictions', zorder=6, color='darkorange')
    plots.plot(measurements.index, measurements['TL'], label='Measurements', zorder=5, color='black')

    # Violations
    nearest_indices = measurements.index.get_indexer(violations.index, method='nearest')
    violation_predictions = predictions.iloc[nearest_indices]
    plots.scatter(violation_predictions.index, violation_predictions['TL'], label='Violation', s=12, zorder=8,
                  color='red', marker='x')

    # Horizon updates
    nearest_indices = predictions.index.get_indexer(horizon_updates.index, method='nearest')
    horizon_updates_predictions = predictions.iloc[nearest_indices]
    plots.scatter(horizon_updates_predictions.index,
                  horizon_updates_predictions['TL'], label='Horizon Update', s=12, zorder=8, color='limegreen',
                  marker='8')

    # Threshold
    plots.plot(measurements.index, measurements['TL'] + threshold, label='_Measurements (upper threshold)',
               color='gray', linestyle='dashed')
    plots.plot(measurements.index, measurements['TL'] - threshold, label='_Measurements (lower threshold)',
               color='gray', linestyle='dashed')
    plt.fill_between(
        measurements.index, measurements['TL'] - threshold, measurements['TL'] + threshold,
        color='gray', alpha=0.2, label=f'Threshold (±{threshold}°C)'
    )

    # Configuration updates
    configs = configuration_updates.copy()
    configs.loc[measurements.index.min()] = configuration_updates.iloc[0] if last_config is None else last_config
    configs.loc[measurements.index.max()] = configuration_updates.iloc[-1] if len(
        configuration_updates) > 0 else configs.iloc[-1]
    configs.sort_index(inplace=True)

    new_labels = []
    plotted_configurations = []
    patches = []

    for (timestamp, configuration), (next_timestamp, _) in zip(configs.iterrows(),
                                                               configs[1:].iterrows()):
        config_id = configuration[0]
        if config_id not in plotted_configurations:
            patch = Patch(edgecolor='black', facecolor=configuration_colors[config_id], label=config_id)
            patches.append(patch)
            new_labels.append(f"Active Model: {config_id}")

        plotted_configurations.append(config_id)
        plots.axvspan(timestamp, next_timestamp,
                      color=configuration_colors[config_id],
                      alpha=0.15,
                      label=f"_{config_id}")

    plots.xaxis.set_major_locator(mdates.DayLocator(interval=1))
    plots.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    plots.yaxis.set_major_locator(MultipleLocator(5))
    plt.xticks(rotation=90)
    plots.margins(x=0.01, y=0.01)

    plots.grid(True)
    plots.set_title('Simulation')
    plots.set_ylabel('Temperature (°C)')
    handles, labels = plots.get_legend_handles_labels()
    plt.legend(handles=handles + patches, labels=labels + new_labels, loc='best')
    file_path = os.path.join(path, f"{sim_id}.png")
    fig.savefig(file_path, format='png', bbox_inches='tight')
    plt.close()


def get_model_colors(configs):
    colors = [
        '#FF9999', '#66B2FF', '#FFD700', '#4C0099', '#FFA500', '#82B366', '#FF69B4', '#67AB9F', '#994C00', '#666600'
    ]
    configuration_colors = {}
    colors_counter = 0
    for configuration in configs.iterrows():
        config_id = configuration[1]['configuration_id']
        if config_id not in configuration_colors:
            configuration_colors[config_id] = colors[colors_counter]
            colors_counter += 1
            if colors_counter >= len(colors):
                colors_counter = 0
    return configuration_colors


def save_results(
        bs_models_dir: str,
        sim_dir: str,
        knowledge: SensorKnowledge,
        profiler_data: pd.DataFrame,
        sim_data: pd.DataFrame,
        threshold: float,
        memory_snapshot: Snapshot,
        cpu_time: float
) -> None:
    os.makedirs(sim_dir, exist_ok=True)

    source = os.path.join(bs_models_dir)
    dest = os.path.join(sim_dir, bs_models_dir)
    shutil.copytree(source, dest, dirs_exist_ok=True)

    violations = knowledge.predictor.data.get_violations()
    violations.to_csv(os.path.join(sim_dir, "violations.csv"))

    measurements = knowledge.predictor.data.get_measurements()
    measurements = measurements[measurements.index >= sim_data.index.min()]
    measurements.to_csv(os.path.join(sim_dir, "measurements.csv"))

    predictions = knowledge.predictor.data.get_predictions()
    predictions.to_csv(os.path.join(sim_dir, "predictions.csv"))

    configuration_updates = knowledge.predictor.data.get_configuration_updates()
    configuration_updates.index = pd.to_datetime(configuration_updates.index)
    configuration_updates.to_csv(os.path.join(sim_dir, "config_updates.csv"))

    horizon_updates = knowledge.predictor.data.get_horizon_updates()
    horizon_updates.to_csv(os.path.join(sim_dir, "horizon_updates.csv"))

    analysis = knowledge.predictor.data.get_analysis()
    analysis.to_csv(os.path.join(sim_dir, "analysis.csv"))

    interaction_log = profiler_data[
        (profiler_data['func'] == 'base_station_blueprint.post_update') |
        (profiler_data['func'] == 'base_station_blueprint.post_violation') |
        (profiler_data['func'] == 'base_station_blueprint.synchronize')
        ]

    profiler_data.to_csv(os.path.join(sim_dir, "profiling.csv"))
    stats = profiler_data.drop(columns=["timestamp", "details"])
    model_deployments = knowledge.predictor.data.get_model_deployments()
    model_deployments.to_csv(os.path.join(sim_dir, "model_deployments.csv"))
    stats_by_func = stats.groupby("func")
    stats_sum = stats_by_func.sum()
    stats_sum['calls'] = stats_by_func.size().values
    stats_sum.to_csv(os.path.join(sim_dir, "profiling_sum.csv"))
    stats_avg = stats_by_func.mean()
    stats_avg.to_csv(os.path.join(sim_dir, "profiling_avg.csv"))

    statistics = memory_snapshot.statistics("filename")
    peak_memory = 0
    total_allocations = 0
    memory_data = []
    for stat in statistics:
        _size = stat.size / (1024 * 1024)
        peak_memory += _size
        total_allocations += stat.count
        memory_data.append({
            "traceback": stat.traceback,
            "size_megabytes": _size,
            "count": stat.count,
        })

    pd.DataFrame(memory_data).to_csv(os.path.join(sim_dir, "memory_snapshot.csv"))

    aggregate = stats.drop(columns='func').sum()

    results = {
        "measurements": len(measurements),
        "violations": len(violations),
        "violations_sent_to_BS": len(
            interaction_log[interaction_log['func'] == 'base_station_blueprint.post_violation']),
        "analysis": len(analysis),
        "configuration_updates": len(configuration_updates),
        "horizon_updates": len(horizon_updates),
        "horizon_updates_sent_to_BS": len(
            interaction_log[interaction_log['func'] == 'base_station_blueprint.post_update']),
        "sync_requests_to_BS": len(interaction_log[interaction_log['func'] == 'sync']),
        "model_deployments": len(model_deployments),
        "total_cpu_time_s": round(cpu_time, 3),
        "total_memory_allocations": total_allocations,
        "peak_memory_allocated_MB": round(peak_memory, 3),
        "total_data_received_B": aggregate["received_data_B"],
        "total_data_sent_B": aggregate["transmitted_data_B"],
        "total_data_exchanged_B": aggregate["received_data_B"] + aggregate["transmitted_data_B"]
    }

    results_file_path = os.path.join(sim_dir, "results.json")
    with open(results_file_path, "w") as fp:
        json.dump(results, fp)

    last_config = configuration_updates.loc[configuration_updates.index.min()]
    min_date = measurements.index.min()
    max_date = measurements.index.max()
    window_size = pd.DateOffset(months=3)

    start_date: pd.Timestamp = min_date

    model_colors = get_model_colors(configuration_updates)

    while start_date <= max_date:
        end_date = start_date + window_size
        configs = configuration_updates.loc[start_date:end_date]
        sim_id = f"{start_date.year}_{start_date.month:02d}_{start_date.day:02d}"

        plot_simulation(
            sim_dir, sim_id,
            predictions[start_date:end_date],
            measurements[start_date:end_date],
            violations[start_date:end_date],
            horizon_updates[start_date:end_date],
            configs,
            last_config,
            threshold,
            model_colors
        )
        if len(configs) > 0:
            last_config = configs.iloc[-1]
        start_date += window_size


In [4]:

from common.predictor_adaptation_goal import ViolationRateAdaptationGoal
from base.base_station_executor import SequentialBaseStationExecutor
from base.base_station_analyzer import PredictorGoalsBaseStationAnalyzer
from base.base_station_monitor import ViolationsBaseStationMonitor
from sensor.sensor_executor import SequentialSensorExecutor
from sensor.sensor_planner import PortfolioSensorPlanner
from sensor.sensor_analyzer import PortfolioSensorAnalyzer
from sensor.sensor_monitor import MultivariateSensorMonitor
from sensor.multivariate_sensor_mock import SimulationSensor
from simulation_parameters import portfolios, violation_rates, strategies
from typing import Tuple
from common.data_reduction_strategy import DownsamplingStrategy
import json
import datetime

from sensor.sensor_manager import SensorManager
from sensor.sensor_knowledge import SensorKnowledge

THRESHOLD = 1.0

sim_scenarios = [
    {
        "simulation_id": f"{pid}_{s}_{v}v{h}h",
        "portfolio": p["models"],
        "base_model_id": p["models"][0],
        "initial_df": p["initial_df"],
        "training_df": p["training_df"],
        "sensitivity_v": v,
        "sensitivity_h": h,
        "sn_goals": [
            ViolationRateAdaptationGoal(
                goal_id="violation_rate",
                metric_id="TL",
                threshold=THRESHOLD,
                time_window=datetime.timedelta(hours=h),
                max_violations=v
            )
        ],
        "learning_strategy": g[0],
        "bs_goals": [g[1]],
        "legacy_mode": p["legacy_mode"],
        "data_reduction_strategy": DownsamplingStrategy()
    }
    for (pid, p) in portfolios.items() for (v, h) in violation_rates for (s, g) in strategies.items()
]


def already_simulated(simulation_id: str) -> bool:
    sim_dir = os.path.join("simulations", simulation_id)
    results_path = os.path.join(sim_dir, "results.json")
    has_result = os.path.isfile(results_path)
    if has_result:
        print(f"Scenario {simulation_id} already simulated. Skipping")
    return has_result


def valid(simulation: dict) -> bool:
    v = simulation['sensitivity_v']
    h = simulation['sensitivity_h']
    s = simulation['learning_strategy']
    p = simulation['portfolio']

    if v == 1 and h != 1:
        print(f"Scenario {simulation['simulation_id']} has invalid 'sensitivity' configuration. Skipping")
        return False

    if len(p) == 1 and v > 1:
        print(
            f"Scenario {simulation['simulation_id']} has single-model portfolio and invalid 'sensitivity' configuration. Skipping")
        return False

    if len(p) > 1 and s != "static":
        print(
            f"Scenario {simulation['simulation_id']} has multi-model portfolio and 'retrain_*' strategy. Skipping")
        return False

    return True


sim_scenarios = [sim for sim in sim_scenarios if valid(sim) and not already_simulated(sim["simulation_id"])]

simulations = {sim["simulation_id"]: sim for sim in sim_scenarios}

simulations_size = len(simulations)
print(f"Generated {simulations_size} simulation scenarios")

Scenario vienna_2019_2019_simple_dense_static_1v1h already simulated. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_short_1v1h already simulated. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_long_1v1h already simulated. Skipping
Scenario vienna_2019_2019_simple_dense_static_2v1h has single-model portfolio and invalid 'sensitivity' configuration. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_short_2v1h has single-model portfolio and invalid 'sensitivity' configuration. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_long_2v1h has single-model portfolio and invalid 'sensitivity' configuration. Skipping
Scenario vienna_2019_2019_simple_dense_static_2v2h has single-model portfolio and invalid 'sensitivity' configuration. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_short_2v2h has single-model portfolio and invalid 'sensitivity' configuration. Skipping
Scenario vienna_2019_2019_simple_dense_retrain_long_2v2h has single-model portfolio and in

In [5]:
[s for s in simulations.keys()]

['linz_2010_2019_simple_lstm_retrain_long_1v1h']

In [6]:

from base.base_station_planner import PortfolioBaseStationPlanner
from keras.src.utils.io_utils import disable_interactive_logging
from sensor.base_station_gateway import HttpBaseStationGateway
from base.model_trainer import ModelTrainer
from base.base_station_knowledge import BaseStationKnowledge
from base.base_station_manager import BaseStationManager
from common.predictor_adaptation_goal import PredictorAdaptationGoal
from base.portfolio_adaptation_goal import PortfolioAdaptationGoal
from common.data_reduction_strategy import DataReductionStrategy
import time
from tracemalloc import Snapshot
import tracemalloc

import logging
from base.learning_strategy import LearningStrategy
from sensor.sensor_executor import SensorExecutor
from sensor.sensor_planner import SensorPlanner
from sensor.sensor_analyzer import SensorAnalyzer
from typing import Type
from sensor.sensor_monitor import SensorMonitor
import shutil
from simulation.mapek.simulation_base_station import SimulationBaseStation
from base.model_manager import ModelManager
import tensorflow as tf

RESULTS = {}
BS_MODELS_DIR = "bs_models"
SN_MODEL_DIR = "sn_models"


def clean_folder(folder: str):
    with os.scandir(folder) as entries:
        for entry in entries:
            if entry.is_dir():
                shutil.rmtree(entry.path)
            else:
                os.remove(entry.path)


def simulate(
        log_path: str,
        model_manager: ModelManager,
        training_data: pd.DataFrame,
        initial_data: pd.DataFrame,
        monitor_type: Type[SensorMonitor],
        analyzer_type: Type[SensorAnalyzer],
        planner_type: Type[SensorPlanner],
        executor_type: Type[SensorExecutor],
        data_reduction_strategy: DataReductionStrategy,
        legacy_mode: bool,
) -> Tuple[SensorKnowledge, pd.DataFrame, Snapshot, float]:
    # Sensor configuration
    sensor = SimulationSensor(sim_data)

    # Base Station Configuration
    knowledge = BaseStationKnowledge(
        "base_station", model_manager, training_data, learning_strategy, data_reduction_strategy, sim_bs_goals,
        sim_sn_goals, legacy_mode
    )
    executor = SequentialBaseStationExecutor(knowledge)
    planner = PortfolioBaseStationPlanner(knowledge, executor)
    analyzer = PredictorGoalsBaseStationAnalyzer(knowledge, planner)
    monitor = ViolationsBaseStationMonitor(knowledge, analyzer)
    base_station_manager = BaseStationManager("default_cluster", knowledge, monitor)

    base_station = SimulationBaseStation(log_path, base_station_manager, model_manager)
    base_station.start()

    clean_folder(SN_MODEL_DIR)

    input_features = BASE_MODEL_METADATA.input_features
    output_features = BASE_MODEL_METADATA.output_features

    tracemalloc.start()
    measurements = initial_data.copy()
    sensor_manager = SensorManager(
        SN_MODEL_DIR, input_features, output_features, sensor, measurements,
        HttpBaseStationGateway(base_station.address),
        monitor_type, analyzer_type, planner_type, executor_type, data_reduction_strategy
    )

    process_time_start = time.process_time()
    sensor_manager.run()
    process_time_end = time.process_time()
    memory_snapshot: Snapshot = tracemalloc.take_snapshot()
    tracemalloc.stop()
    cpu_time: float = process_time_end - process_time_start

    base_station.stop()

    return sensor_manager.knowledge, base_station.profiler.data, memory_snapshot, cpu_time


i = 0

for simulation_id, parameters in simulations.items():
    sim_dir = os.path.join("simulations", simulation_id)
    i += 1
    print(f"{datetime.datetime.now().isoformat()} - [{i}/{simulations_size}] Running simulation {simulation_id}...")

    os.makedirs(sim_dir, exist_ok=True)
    clean_folder(sim_dir)

    initial_df = parameters["initial_df"]
    initial_data: pd.DataFrame = pd.read_pickle(os.path.join("data", initial_df))
    initial_data = preprocess_df(initial_data, BASE_MODEL_METADATA.input_features, [])
    initial_data = initial_data.asfreq('H')
    initial_data.sort_index()
    print(
        f"  {datetime.datetime.now().isoformat()} - [{i}/{simulations_size}] Loaded Initial Dataset {initial_df} ({initial_data.index.min()} - {initial_data.index.max()})"
    )

    training_df = parameters["training_df"]
    training_data: pd.DataFrame = pd.read_pickle(os.path.join("data", training_df))
    training_data = preprocess_df(training_data, BASE_MODEL_METADATA.input_features, [])
    training_data = training_data.asfreq('H')
    training_data.sort_index()
    print(
        f"  {datetime.datetime.now().isoformat()} - [{i}/{simulations_size}] Loaded Training Dataset {training_df} ({training_data.index.min()} - {training_data.index.max()})"
    )

    base_model_id = parameters["base_model_id"]
    sim_sn_goals: list[PredictorAdaptationGoal] = parameters["sn_goals"]
    sim_bs_goals: list[PortfolioAdaptationGoal] = parameters["bs_goals"]

    clean_folder(BS_MODELS_DIR)

    sim_portfolio: list[str] = parameters["portfolio"]
    for model_id in sim_portfolio:
        source = os.path.join("models", model_id)
        dest = os.path.join(BS_MODELS_DIR, model_id)
        shutil.copytree(source, dest, dirs_exist_ok=True)

    model_trainer = ModelTrainer()
    model_manager = ModelManager("bs_models", base_model_id)
    base_model = model_manager.base_model

    data_reduction_strategy: DataReductionStrategy = parameters["data_reduction_strategy"]
    learning_strategy: LearningStrategy = parameters["learning_strategy"](model_trainer, model_manager)

    sim_parameters = {
        "sim_id": simulation_id,
        "initial_df": initial_df,
        "portfolio": parameters["portfolio"],
        "base_model_id": parameters["base_model_id"],
        "sn_goals": [goal.to_dict() for goal in sim_sn_goals],
        "bs_goals": [goal.to_dict() for goal in sim_bs_goals],
        "learning_strategy": learning_strategy.__class__.__name__,
    }
    parameters_file_path = os.path.join(sim_dir, "parameters.json")
    with open(parameters_file_path, "w") as fp:
        json.dump(sim_parameters, fp)

    executor = SequentialSensorExecutor
    planner = PortfolioSensorPlanner
    analyzer = PortfolioSensorAnalyzer
    monitor = MultivariateSensorMonitor

    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.ERROR)
    file_handler = logging.FileHandler(filename=os.path.join(sim_dir, "logfile.log"), mode='w')
    file_handler.setLevel(logging.INFO)

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    if logger.hasHandlers():
        logger.handlers.clear()
    logger.addHandler(stream_handler)
    logger.addHandler(file_handler)

    tf_logger = tf.get_logger()
    tf_logger.setLevel(logging.WARN)
    tf_logger.handlers = []
    tf_logger.propagate = False
    disable_interactive_logging()

    legacy_mode = parameters["legacy_mode"]

    log_dir = sim_dir + "log"
    try:
        knowledge, profiler_data, memory_snapshot, cpu_time = (
            simulate(
                log_path=log_dir,
                model_manager=model_manager,
                training_data=training_data,
                initial_data=initial_data,
                monitor_type=monitor, analyzer_type=analyzer, planner_type=planner, executor_type=executor,
                data_reduction_strategy=data_reduction_strategy,
                legacy_mode=legacy_mode
            )
        )
        # RESULTS[simulation_id] = (knowledge, profiler_data, memory_snapshot, cpu_time)
    finally:
        print(
            f"    {datetime.datetime.now().isoformat()} - [{i}/{simulations_size}] Simulation {simulation_id} finished.")
        for handler in logger.handlers[:]:
            handler.close()
            logger.removeHandler(handler)

    print(f"    {datetime.datetime.now().isoformat()} - [{i}/{simulations_size}] Saving results of {simulation_id}.")
    save_results(BS_MODELS_DIR, sim_dir, knowledge, profiler_data, sim_data, THRESHOLD, memory_snapshot, cpu_time)


2025-03-08T20:30:21.273298 - [1/1] Running simulation linz_2010_2019_simple_lstm_retrain_long_1v1h...
  2025-03-08T20:30:21.273298 - [1/1] Loaded Initial Dataset linz_2019_2019.pickle (2019-01-01 00:00:00 - 2019-12-31 23:00:00)
  2025-03-08T20:30:21.273298 - [1/1] Loaded Training Dataset linz_2010_2019.pickle (2010-01-01 01:00:00 - 2019-12-31 23:00:00)
    2025-03-08T22:00:01.758547 - [1/1] Simulation linz_2010_2019_simple_lstm_retrain_long_1v1h finished.
    2025-03-08T22:00:01.762062 - [1/1] Saving results of linz_2010_2019_simple_lstm_retrain_long_1v1h.
