Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ poetry install # installs all required python packages
pylint src tests --rcfile=.pylintrc # runs linting checks

poetry build # builds cp-sens package that can be published on pip
poetry run experiment_1 # run one experiment with real data
poetry run python .\src\examples\example.py oma-and-plot # run an experiment with real data (Needs "production.json" Config)
```

The `poetry build` will create a `.whl` file in the `dist/` directory, e.g., `dist/cp_sens-0.1.0-py3-none-any.whl`.
Expand Down
21 changes: 10 additions & 11 deletions config/production.json.template
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
{
"MQTT": {
"host": "dtl-server-2.st.lab.au.dk",
"port": 8090,
"userId": "NEEDED",
"password": "NEEDED",
"ClientID": "NOT_NEEDED",
"host": "test.mosquitto.org",
"port": 1883,
"userId": "",
"password": "",
"ClientID": "ReplaySubscriber",
"QoS": 1,
"TopicsToSubscribe": [
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data",
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/metadata",
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/2/acc/raw/data",
"cpsens/+/+/1/acc/raw/data",
"cpsens/+/+/2/acc/raw/data"
"cpsens/recorded/1/data",
"cpsens/recorded/1/metadata",
"cpsens/recorded/2/data",
"cpsens/recorded/+/data"
]
},
},

"sysID": {
"host": "",
Expand Down
28 changes: 28 additions & 0 deletions config/replay.json.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"MQTT": {
"host": "test.mosquitto.org",
"port": 1883,
"userId": "",
"password": "",
"ClientID": "ReplaySubscriber",
"QoS": 1,
"TopicsToSubscribe": [
"cpsens/recorded/1/data",
"cpsens/recorded/1/metadata",
"cpsens/recorded/2/data"
]
},

"sysID": {
"host": "test.mosquitto.org",
"port": 1883,
"userId": "",
"password": "",
"ClientID": "sub.232.sds.213s",
"QoS": 1,
"TopicsToSubscribe": ["cpsens/d8-3a-dd-f5-92-48/cpsns_Simulator/1_2/oma_results"]
}
}



44,791 changes: 44,791 additions & 0 deletions record/mqtt_recordings/data1.jsonl

Large diffs are not rendered by default.

44,731 changes: 44,731 additions & 0 deletions record/mqtt_recordings/data2.jsonl

Large diffs are not rendered by default.

624 changes: 624 additions & 0 deletions record/mqtt_recordings/metadata.jsonl

Large diffs are not rendered by default.

71 changes: 71 additions & 0 deletions record/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import time
import json
import threading
from datetime import datetime
import paho.mqtt.client as mqtt

# MQTT Configuration
MQTT_CONFIG = {
"host": "test.mosquitto.org",
"port": 1883,
"userId": "xxxx",
"password": "xxxx",
"ClientID": "xxxx",
"QoS": 1,
"TopicsToSubscribe": {
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data": "record/mqtt_recordings/data1.jsonl",
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/metadata": "record/mqtt_recordings/metadata.jsonl",
"cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/2/acc/raw/data": "record/mqtt_recordings/data2.jsonl"
}
}

DURATION_SECONDS = 3000

# Ensure output directory exists
os.makedirs("mqtt_recordings", exist_ok=True)

# Thread-safe file locks
file_locks = {topic: threading.Lock() for topic in MQTT_CONFIG["TopicsToSubscribe"]}


def on_connect(client, userdata, flags, rc, properties):
print("Connected with result code", rc)
for topic in MQTT_CONFIG["TopicsToSubscribe"]:
client.subscribe(topic, qos=MQTT_CONFIG["QoS"])
print(f"Subscribed to {topic}")


def on_message(client, userdata, msg):
topic = msg.topic
if topic in MQTT_CONFIG["TopicsToSubscribe"]:
timestamp = datetime.utcnow().isoformat()
record = {
"timestamp": timestamp,
"payload": list(msg.payload) # Byte data as list of ints
}
file_path = MQTT_CONFIG["TopicsToSubscribe"][topic]
with file_locks[topic]:
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")


def record_mqtt():
client = mqtt.Client(client_id=MQTT_CONFIG["ClientID"], protocol=mqtt.MQTTv311, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set(MQTT_CONFIG["userId"], MQTT_CONFIG["password"])
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_CONFIG["host"], MQTT_CONFIG["port"], keepalive=60)
client.loop_start()

print(f"Recording for {DURATION_SECONDS} seconds...")
time.sleep(DURATION_SECONDS)

client.loop_stop()
client.disconnect()
print("Recording complete.")


if __name__ == "__main__":
record_mqtt()
98 changes: 98 additions & 0 deletions record/replay_mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
import json
import time
from datetime import datetime
from paho.mqtt.client import Client as MQTTClient, CallbackAPIVersion, MQTTv5 # type: ignore

RECORDINGS_DIR = "record/mqtt_recordings"

TOPIC_MAPPING = {
"data1.jsonl": "cpsens/recorded/1/data",
"metadata.jsonl": "cpsens/recorded/1/metadata",
"data2.jsonl": "cpsens/recorded/2/data"
}

PUBLISH_BROKER = {
"host": "test.mosquitto.org",
"port": 1883,
"username": "",
"password": "",
"client_id": "ReplayPublisherTest"
}

def setup_publish_client(config: dict) -> MQTTClient:
client = MQTTClient(
client_id=config["client_id"],
protocol=MQTTv5,
callback_api_version=CallbackAPIVersion.VERSION2
)
if config["username"]:
client.username_pw_set(config["username"], config["password"])
client.connect(config["host"], config["port"], keepalive=60)
return client

def replay_mqtt_messages():
publish_client = setup_publish_client(PUBLISH_BROKER)
publish_client.loop_start()

files = {}
for fname in TOPIC_MAPPING:
path = os.path.join(RECORDINGS_DIR, fname)
if not os.path.exists(path):
print(f"[SKIP] File not found: {path}")
continue
files[fname] = open(path, "r", encoding="utf-8")

iterators = {fname: iter(files[fname]) for fname in files}

prev_timestamps = {fname: None for fname in files}
done = set()


while len(done) < len(files):
for fname, fiter in iterators.items():
if fname in done:
continue

try:
line = next(fiter)
record = json.loads(line.strip())
payload = record["payload"]
if isinstance(payload, list):
payload_bytes = bytes(payload)
elif isinstance(payload, str):
payload_bytes = bytes.fromhex(payload)
else:
raise ValueError("Invalid payload format")

qos = record.get("qos", 1)
timestamp_str = record.get("timestamp")
if timestamp_str:
current_timestamp = datetime.fromisoformat(timestamp_str)
prev = prev_timestamps[fname]
if prev:
delay = (current_timestamp - prev).total_seconds()
if delay > 0:
time.sleep(delay)
prev_timestamps[fname] = current_timestamp

topic = TOPIC_MAPPING[fname]
publish_client.publish(topic, payload=payload_bytes, qos=qos)
print(f"[{fname}] → {topic} (len={len(payload_bytes)})")

except StopIteration:
done.add(fname)
print(f"[DONE] {fname} finished")
except Exception as e:
print(f"[ERROR] in {fname}: {e}")

for f in files.values():
f.close()

time.sleep(1)
publish_client.loop_stop()
publish_client.disconnect()
print("[DONEEEE].")

if __name__ == "__main__":
replay_mqtt_messages()
2 changes: 1 addition & 1 deletion src/data/accel/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
MAX_MAP_SIZE = 160100 # The maximum number of samples saved in FIFO
MAX_MAP_SIZE = 5560100 # The maximum number of samples saved in FIFO

TIMEOUT = 2 # Max wait time until enough samples are collected for the test_Accelerometer

Expand Down
6 changes: 3 additions & 3 deletions src/data/accel/hbk/accelerometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Accelerometer(IAccelerometer):
def __init__(
self,
mqtt_client: mqtt.Client,
topic: str = "cpsens/d8-3a-dd-f5-92-48/cpsns_Simulator/1/acc/raw/data",
topic: str,
map_size: int = MAX_MAP_SIZE ):
"""
Initializes the Accelerometer instance with a pre-configured MQTT client.
Expand All @@ -38,7 +38,7 @@ def __init__(
# pylint: disable=unused-argument
def _on_message(self, client: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
"""Handles incoming MQTT messages."""
#print(f"Received message on topic {msg.topic}")
print(f"Received message on topic {msg.topic}")

def safe_process(): # This ensures that an exception does not crash the entire thread
try:
Expand Down Expand Up @@ -86,7 +86,7 @@ def process_message(self, msg: mqtt.MQTTMessage) -> None:
if not oldest_deque: # Remove the key/deque from the map if it's empty
del self.data_map[oldest_key]
total_samples = sum(len(dq) for dq in self.data_map.values())
#print(f" Channel: {self.topic} Key: {samples_from_daq_start}, Samples: {num_samples}")
print(f" Channel: {self.topic} Key: {samples_from_daq_start}, Samples: {num_samples}")

except Exception as e:
print(f"Error processing message: {e}")
Expand Down
6 changes: 4 additions & 2 deletions src/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ python .\src\examples\example.py oma-and-plot
python .\src\examples\example.py oma-and-publish
python .\src\examples\example.py mode-tracking-with-local-sysid
python .\src\examples\example.py mode-tracking-with-remote-sysid
python .\src\examples\example.py model-update
python .\src\examples\example.py model-update-local-sysid
python .\src\examples\example.py model-update-remote-sysid

```

To run the examples with specified config, use
Expand Down Expand Up @@ -121,5 +123,5 @@ This machine subscribes to pyOMA results, performs mode tracking and updates the
Run mode tracking and model update

```bash
poetry run python src/examples/example.py mode-tracking-with-remote-sysid
poetry run python src/examples/example.py model-update-remote-sysid
```
15 changes: 12 additions & 3 deletions src/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
run_mode_tracking_with_local_sysid,
run_mode_tracking_with_remote_sysid,
)
from examples.updating_parameters import run_model_update
from examples.updating_parameters import (
run_model_update_local_sysid,
run_model_update_remote_sysid
)


@click.group()
Expand Down Expand Up @@ -61,7 +64,13 @@ def mode_tracking_with_remote_sysid(ctx):

@cli.command()
@click.pass_context
def model_update(ctx):
run_model_update(ctx.obj["CONFIG"])
def model_update_local_sysid(ctx):
run_model_update_local_sysid(ctx.obj["CONFIG"])

@cli.command()
@click.pass_context
def model_update_remote_sysid(ctx):
run_model_update_remote_sysid(ctx.obj["CONFIG"])

if __name__ == "__main__":
cli(obj={})
36 changes: 35 additions & 1 deletion src/examples/updating_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from methods import model_update_module as MT
# pylint: disable=R0914, C0103

def run_model_update(config_path):
def run_model_update_local_sysid(config_path):
number_of_minutes = 5
config = load_config(config_path)
mqtt_config = config["MQTT"]
Expand Down Expand Up @@ -53,3 +53,37 @@ def run_model_update(config_path):

else:
print("Model update failed.")



def run_model_update_remote_sysid(config_path):
config = load_config(config_path)
cleaned_values, _, _ = (
MT.subscribe_and_get_cleaned_values(config_path)
)

# Run model update
update_result = MT.run_model_update(cleaned_values)

if update_result is not None:
optimized_parameters = update_result['optimized_parameters']
omegaN_rad = update_result['omegaN_rad']
omegaN_Hz = update_result['omegaN_Hz']
mode_shapes = update_result['mode_shapes']
damping_matrix = update_result['damping_matrix']
pars_model = update_result['pars_updated']
system_up = update_result['System_updated']

print("\nOptimized parameters (k, m):", optimized_parameters)
print("\nNatural frequencies (rad/s):", omegaN_rad)
print("\nNatural frequencies (Hz):", omegaN_Hz)
print("\nMode shapes (normalized):\n", mode_shapes)
print("\nDamping matrix:\n", damping_matrix)
print("\nUpdated model parameters (dictionary):", pars_model)
print("\nUpdated system:")
print("\nMass matrix M:", system_up["M"])
print("\nStiffness matrix K:\n", system_up["K"])
print("\nDamping matrix C:\n", system_up["C"])

else:
print("Model update failed.")
10 changes: 9 additions & 1 deletion src/methods/model_update_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,15 @@ def subscribe_and_get_cleaned_values(config_path: str,
mqtt_client.connect(config["sysID"]["host"], config["sysID"]["port"], keepalive=60)
mqtt_client.loop_start()
print("Waiting for OMA data...")
result_ready.wait() # Wait until message arrives
try:
while not result_ready.wait(timeout=0.1):
pass
except KeyboardInterrupt:
print("Cancel")
mqtt_client.loop_stop()
mqtt_client.disconnect()
raise SystemExit

mqtt_client.loop_stop()
mqtt_client.disconnect()

Expand Down