Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ omit =
*/venv/*
*/.venv/*
*/__init__.py
*/src/data/accel/accelerometer.py
*/src/data/accel/aligner.py
*/src/examples/*

[report]
exclude_lines =
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,5 @@ cython_debug/

# Ignore config files with credentials
config/*.json
!config/test.json
!config/*.json.template
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ poetry build # builds cp-sens package that can be published
poetry run experiment_1 # run one experiment with real data
```

The `poetry build` will create a `.whl` file in the `dist/` directory, e.g., `dist/cp_sens-0.1.0-py3-none-any.whl`.

## Testing

Write tests in the _tests_ directory. Be sure to name any new files as
_test_*_.py_. To run all tests, with coverage:

```bash
pytest
pytest -m unit # run unit tests
pytest -m integration # run integration tests
pytest # run all the tests
```

## Use

Please see [examples](src/examples/README.md) for typical usage of the package.
3 changes: 2 additions & 1 deletion config/test.json.template → config/test.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"MQTT": {
"host": "test.mosquitto.org",
"host": "mqtt.eclipseprojects.io",
"port": 1883,
"userId": "",
"password": "",
Expand All @@ -9,3 +9,4 @@
"TopicsToSubscribe": ["topicAA", "topicBB", "topicCC"]
}
}

1,208 changes: 178 additions & 1,030 deletions poetry.lock

Large diffs are not rendered by default.

19 changes: 11 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "example-shm"
version = "0.1.0"
description = ""
version = "0.2.0"
description = "Demonstration for structural health monitoring of engineering structures"
authors = [
"Mohamed Abdulkarim <202206332@post.au.dk>",
"Prasad Talasila <prasadtalasila@gmail.com>"
Expand All @@ -11,18 +11,21 @@ license = "INTO-CPS Association"
packages = [{include = "*", from="src"}]

[tool.poetry.dependencies]
python = ">=3.10, <3.13"
python = ">=3.12, <3.13"
paho-mqtt = "^2.1.0"
matplotlib = "^3.10.0"
numpy = "^2.2.4"

[tool.poetry.group.dev.dependencies]
pylint = "^3.3.3"
pytest = "^8.3.4"
pytest-cov = "^6.0.0"
pylint = "^3.3.6"
pytest = "^8.3.5"
pytest-cov = "^6.1.1"
pytest-mock = "^3.14.0"
pytest-env = "^1.1.5"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
experiment_1 = "src.experiment_1:main"
experiment_1 = "src.examples.experiment_1:main"
experiment_2 = "src.examples.experiment_2:main"
9 changes: 8 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ minversion = 8.3
pythonpath = src src/data src/methods tests
testpaths =
tests
addopts = --cov=src --cov-report=term-missing --cov-report=html
addopts = --cov=src --ignore=src/examples --cov-report=term-missing --cov-report=html
python_files = test_*.py

log_cli=true
Expand All @@ -13,3 +13,10 @@ log_date_format = %Y-%m-%d %H:%M:%S

#timeout slow tests
timeout=5

env =
MQTT_CONNECT_DELAY=0.5

markers =
unit: marks unit tests
integration: marks integration tests
75 changes: 61 additions & 14 deletions src/data/accel/accelerometer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,69 @@
# pylint: disable=W0107
import abc
from typing import Tuple, Optional, List
import numpy as np

US_MULTIPLIER = 1000000 # factor to convert time to microseconds
class IAccelerometer(abc.ABC):
@abc.abstractmethod
def read(self, requested_samples: int) -> Tuple[int, np.ndarray]:
"""
Read and return the requested number of accelerometer samples from the buffer.

Parameters:
requested_samples (int): The number of samples to retrieve.

Returns:
Tuple[int, np.ndarray]:
- status: 1 if the requested number of samples was returned,
0 if fewer samples were available.
- data: A NumPy array containing the retrieved samples.
"""
pass


@abc.abstractmethod
def get_batch_size(self) -> Optional[int]:
"""
Get the number of samples in the first data batch.

Returns:
int or None: The size of the first batch, or None if no data is available.
"""
pass


@abc.abstractmethod
def get_sorted_keys(self) -> List[int]:
"""
Get the sorted list of keys (samples since data first acquisition) available in the buffer.

Returns:
List[int]: Sorted keys representing batches of data.
"""
pass


@abc.abstractmethod
def get_samples_for_key(self, key: int) -> Optional[List[float]]:
"""
Get the list of samples for a given key.

Parameters:
key (int): The identifier for the desired data batch.

Returns:
List[float] or None: A copy of the sample list if the key exists, otherwise None.
"""
pass


class IAccelerometer(abc.ABC):
@abc.abstractmethod
def read(self, requested_samples: int) -> (int, np.ndarray): # type: ignore
"""
This method provides a single accelerometer reading.
A sample reading is:
sample = {
'timestamp': 0,
'accel_readings': {
'x': 0,
'y': 0,
'z': 0
}
}
def clear_used_data(self, start_key: int, samples_to_remove: int) -> None:
"""
Remove old and used data from the buffer.

Parameters:
start_key (int): The first key from which data will be retained.
samples_to_remove (int): The number of samples to remove starting from `start_key`.
"""
pass
36 changes: 36 additions & 0 deletions src/data/accel/aligner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# pylint: disable=W0107
import abc
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np

class IAligner(abc.ABC):
@abc.abstractmethod
def find_continuous_key_groups(self) -> Tuple[Optional[int], Optional[List[List[int]]]]:
"""
Finds common continuous key groups across all channels.

Returns:
Tuple:
- batch_size (Optional[int]): Size of each data batch if found, else None.
- key_groups (Optional[List[List[int]]]):
Groups of continuous sample keys common to all channels.
"""
pass


@abc.abstractmethod
def extract(self, requested_samples: int) -> Tuple[np.ndarray, Optional[datetime]]:
"""
Extracts aligned accelerometer samples from all channels.

Parameters:
requested_samples (int): The number of aligned samples to extract.

Returns:
Tuple:
- np.ndarray: A 2D NumPy array of shape (num_channels, num_samples).
- Optional[datetime]: UTC timestamp
when the aligned samples were extracted, or None on failure.
"""
pass
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import threading
import struct
from collections import deque
from typing import Tuple, Any, Optional, List
import numpy as np

import paho.mqtt.client as mqtt
# Project Imports
from data.accel.accelerometer import IAccelerometer
from data.accel.constants import MAX_MAP_SIZE
from data.accel.metadata_constants import DESCRIPTOR_LENGTH_BYTES

class Accelerometer(IAccelerometer):
def __init__(
self,
mqtt_client,
mqtt_client: mqtt.Client,
topic: str = "cpsens/d8-3a-dd-f5-92-48/cpsns_Simulator/1/acc/raw/data",
map_size: int = MAX_MAP_SIZE ):


"""
Initializes the Accelerometer instance with a pre-configured MQTT client.

Expand All @@ -33,20 +33,21 @@ def __init__(
# Setting up MQTT callback
self.mqtt_client.on_message = self._on_message

def _on_message(self, _, __, msg):
# pylint: disable=unused-argument
def _on_message(self, client: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
"""Handles incoming MQTT messages."""
print(f"Received message on topic {msg.topic}")

def safe_process(): # This ensures that an exception does not crash the entire thread
try:
self._process_message(msg)
self.process_message(msg)
except Exception as e:
print(f"Error processing message: {e}")

threading.Thread(target=safe_process, daemon=True).start()


def _process_message(self, msg):
def process_message(self, msg: mqtt.MQTTMessage) -> None:
"""
Processes incoming MQTT messages, extracts accelerometer data,
and stores it in a dictionary of FIFO queues.
Expand All @@ -58,9 +59,7 @@ def _process_message(self, msg):
try:
raw_payload = msg.payload

# Extract metadata
# We know that the first 2 bytes tells the length of the descriptor
descriptor_length = struct.unpack("<H", raw_payload[:2])[0]
descriptor_length = struct.unpack("<H", raw_payload[:DESCRIPTOR_LENGTH_BYTES])[0]
(descriptor_length, _, __, ___,
samples_from_daq_start,) = struct.unpack("<HHQQQ", raw_payload[:descriptor_length])

Expand All @@ -70,21 +69,90 @@ def _process_message(self, msg):
accel_values = struct.unpack(f"<{num_samples}f", data_payload)

# Store each data batch (e.g 32 samples in one message)
# in the map where usnig samples_from_daq_start as its the key
# in the map samples_from_daq_start is used as the key for each batch
with self._lock:
self.data_map[samples_from_daq_start] = deque(accel_values)
if samples_from_daq_start not in self.data_map:
self.data_map[samples_from_daq_start] = deque(accel_values)

total_samples = sum(len(dq) for dq in self.data_map.values())
# Check if the total samples in the map exceeds the max,
# then remove the oldest data batch
while sum(len(deque) for deque in self.data_map.values()) > self._map_size:
while total_samples > self._map_size:
oldest_key = min(self.data_map.keys()) # Find the oldest batch
del self.data_map[oldest_key] # Remove oldest batch
oldest_deque = self.data_map[oldest_key]
oldest_deque.popleft() # Delete samples from the oldest deque
if not oldest_deque: # Remove the key/deque from the map if it's empty
del self.data_map[oldest_key]
total_samples = sum(len(dq) for dq in self.data_map.values())
print(f" Channel: {self.topic} Key: {samples_from_daq_start}, Samples: {num_samples}")

except Exception as e:
print(f"Error processing message: {e}")


def read(self, requested_samples: int) -> (int, np.ndarray):
def get_batch_size(self) -> Optional[int]:
"""
Returns the number of samples in the first available data batch.
Useful for determining alignment batch size.
"""
with self._lock:
if not self.data_map:
return None
first_key = next(iter(self.data_map))
return len(self.data_map[first_key])


def get_sorted_keys(self) -> List[int]:
"""
Returns the sorted list of sample keys currently available.
"""
with self._lock:
return sorted(self.data_map.keys())


def get_samples_for_key(self, key: int) -> Optional[List[float]]:
"""
Returns a copy of the sample list for a given key,
or None if the key is not present.
"""
with self._lock:
if key in self.data_map:
return list(self.data_map[key])
return None


def clear_used_data(self, start_key: int, samples_to_remove: int) -> None:
"""
Deletes all keys older than `start_key` and consumes `samples_to_remove`
samples starting from `start_key`, across subsequent keys in order.
"""
with self._lock:
# Delete older keys
keys_to_delete = [k for k in self.data_map if k < start_key]
for k in keys_to_delete:
del self.data_map[k]

# Remove samples from start_key and onwards until all samples used are removed
keys = sorted(k for k in self.data_map if k >= start_key)
remaining_to_remove = samples_to_remove

for key in keys:
if remaining_to_remove <= 0:
break
dq = self.data_map[key]
num_available = len(dq)
num_to_remove = min(remaining_to_remove, num_available)

for _ in range(num_to_remove):
dq.popleft()

if not dq:
del self.data_map[key]

remaining_to_remove -= num_to_remove


def read(self, requested_samples: int) -> Tuple[(int, np.ndarray)]:
"""
Reads the oldest accelerometer data from the FIFO buffer and removes only the read samples.

Expand Down Expand Up @@ -127,5 +195,5 @@ def read(self, requested_samples: int) -> (int, np.ndarray):
return status, samples


def acquire_lock(self)->(threading.Lock):
def acquire_lock(self)-> threading.Lock:
return self._lock
Loading