Skip to content

Conversation

@8ohamed
Copy link
Collaborator

@8ohamed 8ohamed commented Apr 9, 2025

New PR for Aligner, because the previous one was closed after renaming it.

@8ohamed
Copy link
Collaborator Author

8ohamed commented Apr 10, 2025

Hi @prasadtalasila I have updated the way we delete samples. If you could approve and merge this PR, I will then create PR for the sysID.
Thanks.

Copy link
Contributor

@prasadtalasila prasadtalasila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@8ohamed This review is incomplete. I am providing the partial review in order to help you proceed further.

@prasadtalasila
Copy link
Contributor

@8ohamed can you please list the missing functionality in the proposed aligner against the functionality provided by the reference aligner given in issue #10

@8ohamed
Copy link
Collaborator Author

8ohamed commented Apr 13, 2025

The reference code subscribes to metadata topic too, it extracts the datatype and the number of samples in each message.
The proposed code assumes that the datatype is float, and uses the size of batch size of the first message, so it does not use metadata.
The reference code ignores a data topic, if it hasn't subscribed to its "metadata" topic.

It also publishes a metadata for the aligned data, this metadata includes "UTCAtFirstSample" and "Fs"

Copy link
Contributor

@prasadtalasila prasadtalasila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@8ohamed thanks for the updates to the PR. Please see the new comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not update these settings. It is best to refactor the code instead.

msg, acc=acc: acc.process_message(msg))


def find_continuous_key_groups(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add type hints here and in other places

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data_map is an internal property of Accelerometer and it should not be used outside. Please write continuous_key_groups(self) for the Accelerometer and reuse those here.

Copy link
Collaborator Author

@8ohamed 8ohamed Apr 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prasadtalasila
In order to avoid other classes accessing the data_map, I added new methods to Accelerometer

  1. get_sorted_keys, so each accelerometer returns a list of its keys so the aligner can use it
  2. get_samples_for_key so the aligner can use it to extract data, instead of directly using the data_map
  3. clear_used_data it deletes used data and everything that is older, aligner class calls it after extracting the data
    I have pushed the changes, I will now look at the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses the actual mqtt server. Please create another test file (tests/unit/data/accel/hbk/test_aligner.py) by using the same code except that the mqtt server is mocked using pytest-mock. Such tests are called unit tests and are very useful.
The same technique can be used to create unit tests for Accelerometer class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing tests in tests/data can be moved to test/integration/data.

@prasadtalasila
Copy link
Contributor

@8ohamed I forgot to mention the following comments. Please look into them.

  1. Write tests to cover all the lines of code in mqtt.py, accelerometer.py, aligner.py
  2. Refactor the functions to limit their length to a maximum of 20 lines.
  3. Test for the following conditions
    In the aligner.py, 48 samples exist at the top, 32 incomplete samples follows next and finally followed by 128. The user requests 64 samples upon which the aligner discards the 48 complete samples and 32 incomplete samples. The aligner returns the first 64 of the remaining 128 samples.

@8ohamed
Copy link
Collaborator Author

8ohamed commented Apr 14, 2025

Hi @prasadtalasila Thanks for the review, I have now resolved the issues with accessing data_map. I have also made unit tests now we have 100% coverage for both mqtt.py and aligner.py and for accelerometer.py we have 96%

@prasadtalasila
Copy link
Contributor

@8ohamed The tests are failing. Please see the log below.

pytest
=================================================================== test session starts ===================================================================
platform win32 -- Python 3.12.2, pytest-8.3.5, pluggy-1.5.0
rootdir: C:\Users\au598657\git\example-shm
configfile: pytest.ini
testpaths: tests
plugins: anyio-4.8.0, cov-6.1.1
collected 41 items

tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_in_steps PASSED                                                      [  2%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_full_fifo PASSED                                                     [  4%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_partial_fifo PASSED                                                  [  7%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_insufficient_samples PASSED                                          [  9%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_appending_more_samples_than_max PASSED                                    [ 12%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_reordering_late_sample PASSED                                             [ 14%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_continuous_block_required FAILED                                                      [ 17%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_extract_removes_used_and_older_data FAILED                                            [ 19%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_single_channel_extract_and_cleanup FAILED                                             [ 21%]
tests/integration/data/sources/test_mqtt.py::test_load_config_success PASSED                                                                         [ 24%]
tests/integration/data/sources/test_mqtt.py::test_load_config_file_not_found PASSED                                                                  [ 26%]
tests/integration/data/sources/test_mqtt.py::test_load_config_invalid_json PASSED                                                                    [ 29%]
tests/integration/data/sources/test_mqtt.py::test_load_config_unexpected_exception PASSED                                                            [ 31%]
tests/integration/data/sources/test_mqtt.py::test_setup_mqtt_client_invalid_index PASSED                                                             [ 34%]
tests/integration/data/sources/test_mqtt.py::test_on_connect_callback_success PASSED                                                                 [ 36%]
tests/integration/data/sources/test_mqtt.py::test_on_connect_callback_failure PASSED                                                                 [ 39%]
tests/integration/data/sources/test_mqtt.py::test_on_subscribe_callback PASSED                                                                       [ 41%]
tests/integration/data/sources/test_mqtt.py::test_on_message_callback PASSED                                                                         [ 43%]
tests/integration/data/sources/test_mqtt.py::test_on_publish_callback PASSED                                                                         [ 46%]
tests/integration/data/sources/test_mqtt.py::test_setup_mqtt_client PASSED                                                                           [ 48%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_stores_data PASSED                                                        [ 51%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_batch_size PASSED                                                                     [ 53%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_removes_samples_and_keys PASSED                                           [ 56%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_fewer_than_available PASSED                                                          [ 58%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_more_than_available_returns_partial PASSED                                           [ 60%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_samples_for_nonexistent_key_returns_none PASSED                                       [ 63%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_batch_size_returns_none_when_empty PASSED                                             [ 65%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_on_message_invokes_process_in_thread ERROR                                                [ 68%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_handles_short_payload PASSED                                              [ 70%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_across_many_keys PASSED                                                   [ 73%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_across_batches PASSED                                                                [ 75%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_acquire_lock_returns_threading_lock PASSED                                                [ 78%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_triggers_eviction_when_map_size_exceeded PASSED                           [ 80%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_with_zero_samples_to_remove_does_nothing PASSED                           [ 82%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups PASSED                                                               [ 85%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_with_enough_samples FAILED                                                              [ 87%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_too_few_samples_returns_empty PASSED                                                    [ 90%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_no_channels PASSED                                                   [ 92%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_handles_last_group PASSED                                            [ 95%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_returns_none_when_batch_size_none PASSED                             [ 97%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_skips_initial_and_gaps_until_valid_block FAILED                                         [100%]

========================================================================= ERRORS ==========================================================================
_______________________________________________ ERROR at setup of test_on_message_invokes_process_in_thread _______________________________________________
file C:\Users\au598657\git\example-shm\tests\unit\data\accel\hbk\test_accelerometer_unit.py, line 121
  def test_on_message_invokes_process_in_thread(test_accelerometer, mocker):
E       fixture 'mocker' not found
>       available fixtures: anyio_backend, anyio_backend_name, anyio_backend_options, cache, capfd, capfdbinary, caplog, capsys, capsysbinary, cov, doctest_namespace, mock_mqtt_client, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, test_accelerometer, tmp_path, tmp_path_factory, tmpdir, tmpdir_factory
>       use 'pytest --fixtures [testpath]' for help on them.

C:\Users\au598657\git\example-shm\tests\unit\data\accel\hbk\test_accelerometer_unit.py:121
======================================================================== FAILURES =========================================================================
_________________________________________________________ test_aligner_continuous_block_required __________________________________________________________

mqtt_setup = (<paho.mqtt.client.Client object at 0x000001CC1486FFB0>, ['topicAA', 'topicBB', 'topicCC'])

    def test_aligner_continuous_block_required(mqtt_setup):
        client, topics = mqtt_setup
        aligner = Aligner(client, topics=topics, map_size=512)

        # CH1 and CH2: send full keys
        for key in [0, 32, 64, 96, 128, 160, 192, 224]:
            publish_samples(client, topics[0], np.arange(32), start_key=key)
            publish_samples(client, topics[1], np.arange(32), start_key=key)

        # CH3: skip key 64 to break continuity
        for key in [0, 32, 96, 128, 160, 192, 224]:
            publish_samples(client, topics[2], np.arange(32), start_key=key)

        time.sleep(1.5)  # Let messages arrive

        aligned = aligner.extract(128)  # Should use keys 96, 128, 160, 192 (128 samples)

>       assert aligned.shape == (3, 128), f"Expected 128 aligned rows, got {aligned.shape}"
E       AttributeError: 'tuple' object has no attribute 'shape'

tests\integration\data\accel\hbk\test_aligner.py:96: AttributeError
------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------
JSON configuration loaded successfully.
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
on_connect: Connected with response code Success
Subscribing to topic: topicAA
on_subscribe: Subscription ID 1 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
on_subscribe: Subscription ID 2 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
on_subscribe: Subscription ID 3 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicAA  Key: 0, Samples: 32
on_publish: Message 4 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 0, Batch size: 32
on_subscribe: Subscription ID 5 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicBB  Key: 0, Samples: 32
on_publish: Message 6 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 0, Batch size: 32
 Channel: topicAA  Key: 32, Samples: 32
on_publish: Message 7 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 32, Batch size: 32
 Channel: topicBB  Key: 32, Samples: 32
on_publish: Message 8 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 32, Batch size: 32
 Channel: topicAA  Key: 64, Samples: 32
on_publish: Message 9 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 64, Batch size: 32
 Channel: topicBB  Key: 64, Samples: 32
on_publish: Message 10 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 64, Batch size: 32
 Channel: topicAA  Key: 96, Samples: 32
on_publish: Message 11 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 96, Batch size: 32
 Channel: topicBB  Key: 96, Samples: 32
on_publish: Message 12 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 96, Batch size: 32
 Channel: topicAA  Key: 128, Samples: 32
on_publish: Message 13 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 128, Batch size: 32
 Channel: topicBB  Key: 128, Samples: 32
on_publish: Message 14 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 128, Batch size: 32
 Channel: topicAA  Key: 160, Samples: 32
on_publish: Message 15 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 160, Batch size: 32
 Channel: topicBB  Key: 160, Samples: 32
on_publish: Message 16 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 160, Batch size: 32
 Channel: topicAA  Key: 192, Samples: 32
on_publish: Message 17 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 192, Batch size: 32
 Channel: topicBB  Key: 192, Samples: 32
on_publish: Message 18 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 192, Batch size: 32
 Channel: topicAA  Key: 224, Samples: 32
on_publish: Message 19 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 224, Batch size: 32
 Channel: topicBB  Key: 224, Samples: 32
on_publish: Message 20 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 224, Batch size: 32
 Channel: topicCC  Key: 0, Samples: 32
on_publish: Message 21 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 0, Batch size: 32
 Channel: topicCC  Key: 32, Samples: 32
on_publish: Message 22 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 32, Batch size: 32
 Channel: topicCC  Key: 96, Samples: 32
on_publish: Message 23 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 96, Batch size: 32
 Channel: topicCC  Key: 128, Samples: 32
on_publish: Message 24 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 128, Batch size: 32
 Channel: topicCC  Key: 160, Samples: 32
on_publish: Message 25 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 160, Batch size: 32
 Channel: topicCC  Key: 192, Samples: 32
on_publish: Message 26 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 192, Batch size: 32
 Channel: topicCC  Key: 224, Samples: 32
on_publish: Message 27 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 224, Batch size: 32
Keys [[0, 32], [96, 128, 160, 192, 224]]
Aligned shape: (3, 128)
____________________________________________________ test_aligner_extract_removes_used_and_older_data _____________________________________________________

mqtt_setup = (<paho.mqtt.client.Client object at 0x000001CC1486FC50>, ['topicAA', 'topicBB', 'topicCC'])

    def test_aligner_extract_removes_used_and_older_data(mqtt_setup):
        client, topics = mqtt_setup
        aligner = Aligner(client, topics=topics, map_size=512)

        # Send 5 batches (160 samples) to all 3 channels
        for key in [0, 32, 64, 96, 128]:
            for topic in topics:
                publish_samples(client, topic, np.arange(32), start_key=key)

        time.sleep(1)

        # Extract 96 samples → should consume keys 0, 32, 64 (96 samples)
        aligned = aligner.extract(96)

>       assert aligned.shape == (3, 96), "Expected 96 aligned samples"
E       AttributeError: 'tuple' object has no attribute 'shape'

tests\integration\data\accel\hbk\test_aligner.py:117: AttributeError
------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------
JSON configuration loaded successfully.
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
on_connect: Connected with response code Success
Subscribing to topic: topicAA
on_subscribe: Subscription ID 1 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
on_subscribe: Subscription ID 2 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
on_subscribe: Subscription ID 3 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicAA  Key: 0, Samples: 32
on_publish: Message 4 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 0, Batch size: 32
on_subscribe: Subscription ID 5 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicBB  Key: 0, Samples: 32
on_publish: Message 6 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 0, Batch size: 32
 Channel: topicCC  Key: 0, Samples: 32
on_publish: Message 7 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 0, Batch size: 32
 Channel: topicAA  Key: 32, Samples: 32
on_publish: Message 8 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 32, Batch size: 32
 Channel: topicBB  Key: 32, Samples: 32
on_publish: Message 9 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 32, Batch size: 32
 Channel: topicCC  Key: 32, Samples: 32
on_publish: Message 10 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 32, Batch size: 32
 Channel: topicAA  Key: 64, Samples: 32
on_publish: Message 11 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 64, Batch size: 32
 Channel: topicBB  Key: 64, Samples: 32
on_publish: Message 12 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 64, Batch size: 32
 Channel: topicCC  Key: 64, Samples: 32
on_publish: Message 13 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 64, Batch size: 32
 Channel: topicAA  Key: 96, Samples: 32
on_publish: Message 14 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 96, Batch size: 32
 Channel: topicBB  Key: 96, Samples: 32
on_publish: Message 15 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 96, Batch size: 32
 Channel: topicCC  Key: 96, Samples: 32
on_publish: Message 16 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 96, Batch size: 32
 Channel: topicAA  Key: 128, Samples: 32
on_publish: Message 17 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 128, Batch size: 32
 Channel: topicBB  Key: 128, Samples: 32
on_publish: Message 18 published.
[PUBLISH DEBUG] Topic: topicBB, Start key: 128, Batch size: 32
 Channel: topicCC  Key: 128, Samples: 32
on_publish: Message 19 published.
[PUBLISH DEBUG] Topic: topicCC, Start key: 128, Batch size: 32
Keys [[0, 32, 64, 96, 128]]
Aligned shape: (3, 96)
_____________________________________________________ test_aligner_single_channel_extract_and_cleanup _____________________________________________________

mqtt_setup = (<paho.mqtt.client.Client object at 0x000001CC1491A6F0>, ['topicAA', 'topicBB', 'topicCC'])

    def test_aligner_single_channel_extract_and_cleanup(mqtt_setup):
        client, topics = mqtt_setup
        one_topic = [topics[0]]  # Use only the first topic

        aligner = Aligner(client, topics=one_topic, map_size=512)

        # Publish 128 samples to the single channel (4 batches of 32)
        for key in [0, 32, 64, 96]:
            publish_samples(client, topics[0], np.arange(32) + key, start_key=key)

        time.sleep(1)

        # Extract 64 samples from the single channel
        extracted = aligner.extract(64)

>       assert extracted.shape == (1, 64), f"Expected shape (64, 1), got {extracted.shape}"
E       AttributeError: 'tuple' object has no attribute 'shape'

tests\integration\data\accel\hbk\test_aligner.py:144: AttributeError
------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------
JSON configuration loaded successfully.
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
on_connect: Connected with response code Success
Subscribing to topic: topicAA
on_subscribe: Subscription ID 1 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicAA  Key: 0, Samples: 32
on_publish: Message 2 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 0, Batch size: 32
on_subscribe: Subscription ID 3 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
 Channel: topicAA  Key: 32, Samples: 32
on_publish: Message 4 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 32, Batch size: 32
 Channel: topicAA  Key: 64, Samples: 32
on_publish: Message 5 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 64, Batch size: 32
 Channel: topicAA  Key: 96, Samples: 32
on_publish: Message 6 published.
[PUBLISH DEBUG] Topic: topicAA, Start key: 96, Batch size: 32
Keys [[0, 32, 64, 96]]
Aligned shape: (1, 64)
____________________________________________________________ test_extract_with_enough_samples _____________________________________________________________

aligner_with_mock_channels = (<data.accel.hbk.aligner.Aligner object at 0x000001CC14972930>, [<MagicMock id='1976030565280'>, <MagicMock id='1976030567344'>, <MagicMock id='1976029731184'>])

    def test_extract_with_enough_samples(aligner_with_mock_channels):
        aligner, accelerometers = aligner_with_mock_channels

        result = aligner.extract(8)

>       assert result.shape == (3, 8)
E       AttributeError: 'tuple' object has no attribute 'shape'

tests\unit\data\accel\hbk\test_aligner_unit.py:42: AttributeError
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
Keys [[0, 4, 8, 12, 16]]
Aligned shape: (3, 8)
__________________________________________________ test_extract_skips_initial_and_gaps_until_valid_block __________________________________________________

aligner_with_mock_channels = (<data.accel.hbk.aligner.Aligner object at 0x000001CC14A149E0>, [<MagicMock id='1976030923856'>, <MagicMock id='1976030991888'>, <MagicMock id='1976031011008'>])

    def test_extract_skips_initial_and_gaps_until_valid_block(aligner_with_mock_channels):
        aligner, accelerometers = aligner_with_mock_channels

        # Total keys in the system
        partial_keys = [0, 16, 32]  # Only 48 samples — should be ignored
        missing_keys = [48, 64]     # Break continuity
        valid_keys = [80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240]
        all_keys = partial_keys + valid_keys

        batch_size = 16
        required_samples = 128
        required_keys = required_samples // batch_size  # 8

        for acc in accelerometers:
            acc.get_batch_size.return_value = batch_size
            acc.get_sorted_keys.return_value = all_keys
            acc.clear_used_data = MagicMock()
            acc.get_samples_for_key.side_effect = lambda key: [float(key + i) for i in range(batch_size)]

        # Extract 128 samples
        result = aligner.extract(required_samples)

        # Assert correct shape
>       assert result.shape == (3, 128), f"Expected (3, 128), got {result.shape}"
E       AttributeError: 'tuple' object has no attribute 'shape'

tests\unit\data\accel\hbk\test_aligner_unit.py:113: AttributeError
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
Keys [[0, 16, 32], [80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240]]
Aligned shape: (3, 128)
==================================================================== warnings summary =====================================================================
.venv\Lib\site-packages\_pytest\config\__init__.py:1441
  C:\Users\au598657\git\example-shm\.venv\Lib\site-packages\_pytest\config\__init__.py:1441: PytestConfigWarning: Unknown config option: timeout

    self._warn_or_fail_if_strict(f"Unknown config option: {key}\n")

tests/integration/data/accel/hbk/test_aligner.py::test_aligner_continuous_block_required
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_extract_removes_used_and_older_data
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_single_channel_extract_and_cleanup
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_with_enough_samples
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_skips_initial_and_gaps_until_valid_block
  C:\Users\au598657\git\example-shm\src\data\accel\hbk\aligner.py:114: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
    utc_time = datetime.utcnow()

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
===================================================================== tests coverage ======================================================================
_____________________________________________________ coverage: platform win32, python 3.12.2-final-0 _____________________________________________________

Name                                   Stmts   Miss  Cover   Missing
--------------------------------------------------------------------
src\data\accel\accelerometer.py            7      1    86%   22
src\data\accel\constants.py                4      0   100%
src\data\accel\hbk\accelerometer.py      101      2    98%   48-49
src\data\accel\hbk\aligner.py             74      2    97%   105, 121
src\data\accel\metadata_constants.py       1      0   100%
src\data\sources\mqtt.py                  48      0   100%
--------------------------------------------------------------------
TOTAL                                    235      5    98%
Coverage HTML written to dir htmlcov
================================================================= short test summary info =================================================================
FAILED tests/integration/data/accel/hbk/test_aligner.py::test_aligner_continuous_block_required - AttributeError: 'tuple' object has no attribute 'shape'
FAILED tests/integration/data/accel/hbk/test_aligner.py::test_aligner_extract_removes_used_and_older_data - AttributeError: 'tuple' object has no attribute 'shape'
FAILED tests/integration/data/accel/hbk/test_aligner.py::test_aligner_single_channel_extract_and_cleanup - AttributeError: 'tuple' object has no attribute 'shape'
FAILED tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_with_enough_samples - AttributeError: 'tuple' object has no attribute 'shape'
FAILED tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_skips_initial_and_gaps_until_valid_block - AttributeError: 'tuple' object has no attribute 'shape'
ERROR tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_on_message_invokes_process_in_thread

@8ohamed
Copy link
Collaborator Author

8ohamed commented Apr 14, 2025

@prasadtalasila I had forgotten to update the output of extract() in the test. It's fixed now.

Copy link
Contributor

@prasadtalasila prasadtalasila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@8ohamed Thanks for the quick updates. Please see the following comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the following error.

Received message on topic cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data
 Channel: cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data  Key: 282170143, Samples: 16
Fatal Python error: _enter_buffered_busy: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x00007ffb20bf3fe8)

Current thread 0x000071f8 (most recent call first):
  <no Python frame>

Extension modules: numpy._core._multiarray_umath, _wmi, numpy.linalg._umath_linalg (total: 3)

@prasadtalasila
Copy link
Contributor

The pytest run experiment_1 is failing with the following message.


Data requsted [ 5.99679584e-03  5.31226397e-03  5.59592713e-03  5.55710588e-03
  5.95429447e-03  5.53189032e-03  5.75515302e-03  5.72047243e-03
  5.73546626e-03  5.48965065e-03  5.60505362e-03  5.63557027e-03
  6.36669621e-03  5.83295943e-03  5.99874649e-03  5.82477124e-03
  5.66888275e-03  5.02047362e-03  5.35012875e-03  5.64552471e-03
  5.29122213e-03  4.97291517e-03  5.37165022e-03  5.84702939e-03
  5.49756130e-03  5.34680113e-03  5.37787145e-03  5.91300754e-03
  5.34048211e-03  5.29261632e-03  5.54494653e-03  6.36724429e-03
  5.45703853e-03  5.50364051e-03  5.85703552e-03  6.33669272e-03
  5.22914110e-03  5.30774565e-03  5.55936480e-03  5.95297944e-03
  5.08784642e-03  4.91982233e-03  5.62838791e-03  6.34953938e-03
  5.56255924e-03  5.06869191e-03  5.43903746e-03  5.44857187e-03
  4.89530154e-03  4.35755588e-03  4.86283284e-03  5.12561016e-03
  4.71059280e-03  4.02515475e-03  4.22035577e-03  4.39535128e-03
  4.51173075e-03  4.22712928e-03  4.65322100e-03  5.18373400e-03
  4.48718993e-03  3.54815950e-03  4.44268342e-03  4.36670473e-03
  4.36450262e-03  4.23830422e-03  4.96666599e-03  4.85844072e-03
  4.32782806e-03  3.65711609e-03  4.10234323e-03  4.20487532e-03
  3.29474080e-03  2.15840572e-03  3.00711673e-03  3.62963136e-03
  2.60622031e-03  2.45753955e-03  4.01072158e-03  4.03226819e-03
  3.46947298e-03  3.21069639e-03  3.80464806e-03  3.98724666e-03
  3.58535862e-03  2.73496239e-03  3.01837036e-03  2.90152943e-03
  2.51125242e-03  2.29119486e-03  2.48313905e-03  2.93448986e-03
  2.19121180e-03  2.20294623e-03  2.76000937e-03  2.81675952e-03
  2.25038338e-03  2.50570942e-03  3.10368137e-03  2.58532073e-03
  2.08710157e-03  1.99279631e-03  2.49487860e-03  2.68347980e-03
  2.26117903e-03  1.83037017e-03  1.88036170e-03  1.82122586e-03
  1.10945536e-03  1.28645368e-03  1.76192168e-03  1.66943087e-03
  9.78194759e-04  8.15392472e-04  9.08618502e-04  9.00052837e-04
  2.35824395e-04  5.56406158e-04  7.03828293e-04  4.53962188e-04
 -2.26996810e-04 -2.08407859e-04  1.48292194e-04 -3.16684833e-04
 -2.36781270e-04 -6.18194405e-04 -2.82180466e-04 -6.96672243e-04
 -5.32046019e-04 -9.68011736e-04 -1.36866525e-03 -1.24841195e-03
 -1.80554553e-03 -1.75866566e-03 -1.38920930e-03 -1.68648351e-03
 -1.76260050e-03 -1.87797402e-03 -1.37302803e-03 -1.74790306e-03
 -1.80640724e-03 -2.18798965e-03 -1.67350913e-03 -1.55370461e-03
 -1.22657930e-03 -1.67388632e-03 -1.29780767e-03 -1.85193273e-03
 -1.98204047e-03 -1.52593374e-03 -1.12566666e-03 -1.46845274e-03
 -1.89048413e-03 -1.41096546e-03 -1.33734976e-03 -8.47921765e-04
 -1.10439130e-03 -8.27564159e-04 -5.48671931e-04 -2.84969428e-04
 -9.34308860e-04 -7.20562937e-04 -6.83302933e-04 -8.14107887e-04
 -1.12573139e-03 -3.40687431e-04 -3.42040817e-04 -6.46915869e-04
 -8.41493835e-04 -2.70219298e-05  1.64126759e-05 -3.40888393e-04
 -4.81156516e-04 -2.32626669e-04  1.43318011e-05  3.78305631e-05
 -1.17812342e-04  3.73007584e-04  6.33967342e-04  2.97731487e-04
 -3.05262110e-05  1.85728539e-04  4.12563473e-04  1.20237390e-04
 -3.82243888e-04 -3.07146314e-04  9.82810379e-05 -5.74650592e-04
 -6.37621793e-04  2.08464946e-04  2.43590403e-04 -2.26297430e-04]
Received message on topic cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data
 Channel: cpsens/d8-3a-dd-37-d2-7e/3160-A-042_sn_999998/1/acc/raw/data  Key: 282170143, Samples: 16
Fatal Python error: _enter_buffered_busy: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x00007ffb20bf3fe8)

Current thread 0x000071f8 (most recent call first):
  <no Python frame>

Extension modules: numpy._core._multiarray_umath, _wmi, numpy.linalg._umath_linalg (total: 3)

The pytest is still failing with the following message.


tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_in_steps PASSED                                                      [  2%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_full_fifo FAILED                                                     [  4%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_partial_fifo PASSED                                                  [  7%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_read_insufficient_samples PASSED                                          [  9%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_appending_more_samples_than_max PASSED                                    [ 12%]
tests/integration/data/accel/hbk/test_accelerometer.py::test_accelerometer_reordering_late_sample PASSED                                             [ 14%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_continuous_block_required PASSED                                                      [ 17%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_extract_removes_used_and_older_data PASSED                                            [ 19%]
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_single_channel_extract_and_cleanup PASSED                                             [ 21%]
tests/integration/data/sources/test_mqtt.py::test_load_config_success PASSED                                                                         [ 24%]
tests/integration/data/sources/test_mqtt.py::test_load_config_file_not_found PASSED                                                                  [ 26%]
tests/integration/data/sources/test_mqtt.py::test_load_config_invalid_json PASSED                                                                    [ 29%]
tests/integration/data/sources/test_mqtt.py::test_load_config_unexpected_exception PASSED                                                            [ 31%]
tests/integration/data/sources/test_mqtt.py::test_setup_mqtt_client_invalid_index PASSED                                                             [ 34%]
tests/integration/data/sources/test_mqtt.py::test_on_connect_callback_success PASSED                                                                 [ 36%]
tests/integration/data/sources/test_mqtt.py::test_on_connect_callback_failure PASSED                                                                 [ 39%]
tests/integration/data/sources/test_mqtt.py::test_on_subscribe_callback PASSED                                                                       [ 41%]
tests/integration/data/sources/test_mqtt.py::test_on_message_callback PASSED                                                                         [ 43%]
tests/integration/data/sources/test_mqtt.py::test_on_publish_callback PASSED                                                                         [ 46%]
tests/integration/data/sources/test_mqtt.py::test_setup_mqtt_client PASSED                                                                           [ 48%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_stores_data PASSED                                                        [ 51%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_batch_size PASSED                                                                     [ 53%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_removes_samples_and_keys PASSED                                           [ 56%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_fewer_than_available PASSED                                                          [ 58%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_more_than_available_returns_partial PASSED                                           [ 60%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_samples_for_nonexistent_key_returns_none PASSED                                       [ 63%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_get_batch_size_returns_none_when_empty PASSED                                             [ 65%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_on_message_invokes_process_in_thread PASSED                                               [ 68%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_handles_short_payload PASSED                                              [ 70%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_across_many_keys PASSED                                                   [ 73%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_read_across_batches PASSED                                                                [ 75%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_acquire_lock_returns_threading_lock PASSED                                                [ 78%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_process_message_triggers_eviction_when_map_size_exceeded PASSED                           [ 80%]
tests/unit/data/accel/hbk/test_accelerometer_unit.py::test_clear_used_data_with_zero_samples_to_remove_does_nothing PASSED                           [ 82%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups PASSED                                                               [ 85%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_with_enough_samples PASSED                                                              [ 87%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_too_few_samples_returns_empty PASSED                                                    [ 90%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_no_channels PASSED                                                   [ 92%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_handles_last_group PASSED                                            [ 95%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_find_continuous_key_groups_returns_none_when_batch_size_none PASSED                             [ 97%]
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_skips_initial_and_gaps_until_valid_block PASSED                                         [100%]

======================================================================== FAILURES =========================================================================
____________________________________________________________ test_accelerometer_read_full_fifo ____________________________________________________________

client_and_topic = (<paho.mqtt.client.Client object at 0x0000019D5F55C890>, 'topicAA')
accelerometer_instance = <data.accel.hbk.accelerometer.Accelerometer object at 0x0000019D50E829C0>

    def test_accelerometer_read_full_fifo(client_and_topic, accelerometer_instance):
        """
        Test that the accelerometer correctly stores and retrieves the full FIFO capacity.

        Steps:
        1. Publish exactly 96 samples.
        2. Read all 96 samples.
        3. Verify:
           - The status is 1 (all requested samples retrieved).
           - The shape is (96,).
           - The data is in sequential order from 0 to 95.
        """
        client, topic = client_and_topic

        publish_binary_samples(client, topic, 0, 96)
        total_samples = 0

        while total_samples < 60:
            with accelerometer_instance._lock:
                total_samples = sum(len(deque) for deque in accelerometer_instance.data_map.values())

        status, data = accelerometer_instance.read(96)

>       assert status == 1, f"Expected status 1, but got {status}"
E       AssertionError: Expected status 1, but got 0
E       assert 0 == 1

tests\integration\data\accel\hbk\test_accelerometer.py:125: AssertionError
------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------
JSON configuration loaded successfully.
------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------
on_connect: Connected with response code Success
Subscribing to topic: topicAA
on_publish: Message 1 published.
on_subscribe: Subscription ID 2 with QoS levels [ReasonCode(Suback, 'Granted QoS 1')]
Received message on topic topicAA
 Channel: topicAA  Key: 32, Samples: 32
on_publish: Message 3 published.
Received message on topic topicAA
 Channel: topicAA  Key: 64, Samples: 32
on_publish: Message 4 published.
==================================================================== warnings summary =====================================================================
.venv\Lib\site-packages\_pytest\config\__init__.py:1441
  C:\Users\au598657\git\example-shm\.venv\Lib\site-packages\_pytest\config\__init__.py:1441: PytestConfigWarning: Unknown config option: timeout

    self._warn_or_fail_if_strict(f"Unknown config option: {key}\n")

tests/integration/data/accel/hbk/test_aligner.py::test_aligner_continuous_block_required
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_extract_removes_used_and_older_data
tests/integration/data/accel/hbk/test_aligner.py::test_aligner_single_channel_extract_and_cleanup
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_with_enough_samples
tests/unit/data/accel/hbk/test_aligner_unit.py::test_extract_skips_initial_and_gaps_until_valid_block
  C:\Users\au598657\git\example-shm\src\data\accel\hbk\aligner.py:113: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
    utc_time = datetime.utcnow()

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html

@8ohamed
Copy link
Collaborator Author

8ohamed commented Apr 14, 2025

@prasadtalasila I have wrote the interface, and fixed the test they are all passing now.
For the error in the experiment I tried to look it up, and as I understood it was shutting down while a background thread is trying to print (the on_message) so we need time.sleep which gives it time to make the last prints and exit safely.

@prasadtalasila prasadtalasila merged commit cddeb3d into main Apr 15, 2025
@prasadtalasila prasadtalasila deleted the aligner branch May 4, 2025 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants