# ControlledSim

Controlled simulation runner:
- loads PARAMS from `parameters.py`
- runs FMU via `fmi_gym`
- publishes observations to MQTT
- optional actions (if action_names non-empty)
- also publishes RL-style payloads for Influx bridges


In [None]:
import os
import json
import time
import traceback
import numpy as np
import pandas as pd
import paho.mqtt.client as mqtt
import datetime

from fmi_mlc.fmi_gym import fmi_gym

# --- Load parameters ---------------------------------------------------------
from parameters import parameter as PARAMS

print('‚úÖ Controlled Sim ‚Äî Parameters loaded')
print('   fmu_path:', PARAMS.get('fmu_path'))
print('   step:', PARAMS.get('fmu_step_size'))
print('   observations:', PARAMS.get('observation_names'))
print('   actions:', PARAMS.get('action_names'))

for k,v in PARAMS.items():
    if k in ("action_min", "action_max"):
        print(f"  {k}: shape={getattr(v, 'shape', None)}")
    else:
        print(f"  {k}: {v}")

ACTION_NAMES = list(PARAMS.get("action_names", []))
OBS_NAMES = list(PARAMS.get("observation_names", []))

ACTION_MIN = np.array(PARAMS.get("action_min", np.array([], dtype=np.float64)), dtype=np.float64)
ACTION_MAX = np.array(PARAMS.get("action_max", np.array([], dtype=np.float64)), dtype=np.float64)

print("\nAction dimension:", len(ACTION_NAMES))
print("Observation dimension:", len(OBS_NAMES))

In [None]:
# --- MQTT topics ---------------------------------------------------------
MQTT_BROKER_HOST = os.environ.get('MQTT_BROKER_HOST', 'mosquitto')
MQTT_BROKER_PORT = int(os.environ.get('MQTT_BROKER_PORT', '1883'))

# Base prefix for the platform subscriber
TOPIC_BASE = os.environ.get('MQTT_TOPIC_BASE', 'simulation')

# Per-variable topics expected by the subscriber
TOPIC_OBS_PREFIX = f'{TOPIC_BASE}/observations'
TOPIC_ACT_PREFIX = f'{TOPIC_BASE}/actions'

print('‚úÖ MQTT config')
print('   Host:', MQTT_BROKER_HOST)
print('   Port:', MQTT_BROKER_PORT)
print('   Observation prefix:', TOPIC_OBS_PREFIX)
print('   Action prefix:', TOPIC_ACT_PREFIX)

In [None]:
# --- Time conversion helper ----------------------------------------------

YEAR = int(os.environ.get('SIM_YEAR', '2002'))

def convert_fmu_timestamp(ts_fmu: float, year: int = 2002) -> int:
    """
    Convert FMU-relative timestamp (seconds from Jan 1 of a dummy year)
    into a real Unix timestamp using the selected YEAR.
    """
    # Unix timestamp of YEAR-01-01 00:00:00
    base = datetime.datetime(year, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)
    
    # FMU timestamp is seconds from Jan 1 of the simulation year
    return int(base.timestamp() + float(ts_fmu))

def extract_obs(row: dict, names) -> dict:
    out = {}
    for name in names:
        out[name] = row.get(name, None)
    return out

In [None]:
# --- MQTT client ----------------------------------------------------------

# Buffer to store the most recent action received from the controller
action_buffer = None

def on_connect(client, userdata, flags, rc):
    """
    Callback executed when the MQTT client successfully connects.

    - Prints connection status
    - Subscribes to the topic where the RL controller publishes actions
    """
    print(f"‚úÖ Controlled Sim ‚Äî Connected (rc={rc})")
    client.subscribe(TOPIC_ACT_PREFIX, qos=1)
    print(f"‚úÖ Controlled Sim ‚Äî Subscribed to topic: {TOPIC_ACT_PREFIX}")

def on_disconnect(client, userdata, rc):
    print(f'‚ÑπÔ∏è Controlled Sim ‚Äî Disconnected (rc={rc})')


def on_action(client, userdata, msg):
    """
    Callback executed whenever an action is received on the MQTT topic.

    The payload can be:
        - a list (in which case we map it to a dict using PARAMS['action_names'])
        - a dict{name: value} directly
        - anything else (ignored with a warning)
    """
    global action_buffer

    payload = json.loads(msg.payload)

    # Case 1: action sent as a list ‚Üí convert to dict {action_name: value}
    if isinstance(payload, list):
        action_buffer = {
            name: payload[i]
            for i, name in enumerate(PARAMS['action_names'])
        }

    # Case 2: action already sent as a dict ‚Üí use directly
    elif isinstance(payload, dict):
        action_buffer = payload

    # Unexpected format ‚Üí print warning and ignore
    else:
        print("‚ö†Ô∏è Controlled Sim ‚Äî Unexpected payload:", payload)
        return

mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
mqtt_client.on_message = on_action
mqtt_client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, keepalive=60)
mqtt_client.loop_start()


print("\Controlled Sim running.")
if len(ACTION_NAMES) == 0:
    print("üîé Open-loop mode")
else:
    print(f"üéÆ Closed-loop mode")

In [None]:
def publish_series(prefix: str, data: dict, ts: int):
    """Publish each variable on its own topic: <prefix>/<name> with {value, timestamp}."""
    for name, value in data.items():
        payload = {"value": float(value) if value is not None else None, "timestamp": int(ts)}
        mqtt_client.publish(f"{prefix}/{name}", json.dumps(payload), qos=1)

In [None]:
# --- Create and reset the environment ---------------------------------
env = fmi_gym(PARAMS)
_ = env.reset()

# Extract the last row of the internal data (FMU step result)
last = env.data.iloc[-1].to_dict() if hasattr(env, 'data') and len(env.data) else {}
obs = extract_obs(last, OBS_NAMES)

# FMU timestamp (seconds)
ts_fmu = last.get('time', 0.0)

# Convert to real Unix timestamp
ts = convert_fmu_timestamp(ts_fmu, YEAR)

# Publish initial observations per-variable
publish_series(TOPIC_OBS_PREFIX, obs, ts)
print(f"‚Üí Published initial observation at ts={ts}: {obs}")

In [None]:
# --- Main loop (controlled simulation) --------------------------------------------------
max_steps = int(os.environ.get('MAX_STEPS', '1000000'))

step = 0
done = False

try:
    while not done and step < max_steps:
        if ACTION_NAMES:
            # Wait until action_buffer is a dict (i.e., an action has arrived)
            while action_buffer is None:
                time.sleep(0.001)

            # Consume the action from the buffer
            act_dict = action_buffer
            action_buffer = None

            # Publish actions per-variable
            publish_series(TOPIC_ACT_PREFIX, act_dict, ts)

            action = np.array([act_dict[n] for n in ACTION_NAMES], dtype=np.float32)

        else:
            action = np.array([], dtype=np.float64)
        
        # Step the environment forward
        obs_vec, reward, done, info = env.step(action)

        # Extract latest FMU data row as a dict
        last = env.data.iloc[-1].to_dict()

        # FMU time (seconds from Jan 1 in the FMU time reference)
        ts_fmu = last.get('time', ts_fmu)

        # Convert FMU time to real Unix timestamp using the global YEAR
        ts = convert_fmu_timestamp(ts_fmu, YEAR)

        # Publish observations per-variable
        obs = extract_obs(last, OBS_NAMES)
        publish_series(TOPIC_OBS_PREFIX, obs, ts)

        step += 1

    print('‚úÖ Controlled Sim ‚Äî Episode finished.')
    print('   steps:', step)

except KeyboardInterrupt:
    print('‚õî Controlled Sim ‚Äî Interrupted by user.')
except Exception as e:
    print('‚ùå Controlled Sim ‚Äî Exception:', e)
    traceback.print_exc()
finally:
    try:
        mqtt_client.loop_stop()
        mqtt_client.disconnect()
    except Exception:
        pass