Skip to content

Commit

Permalink
Merge pull request #72 from Wolkabout/improvement/readings-persistence
Browse files Browse the repository at this point in the history
Persisting readings directly to allow for bulk messages
  • Loading branch information
nanavuletic committed Jun 5, 2023
2 parents 86e33f8 + 5557dc6 commit bd7d4e8
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 13 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Expand Up @@ -11,10 +11,10 @@ repos:
hooks:
- id: reorder-python-imports
- repo: https://github.com/ambv/black
rev: 21.7b0
rev: 22.3.0
hooks:
- id: black
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
Expand Down
36 changes: 33 additions & 3 deletions README.md
Expand Up @@ -16,6 +16,7 @@


```

[![Tests and Coverage](https://github.com/Wolkabout/WolkConnect-Python/actions/workflows/tests-and-coverage.yml/badge.svg?branch=development)](https://github.com/Wolkabout/WolkConnect-Python/actions/workflows/tests-and-coverage.yml) [![PyPI version](https://badge.fury.io/py/wolk-connect.svg)](https://badge.fury.io/py/wolk-connect) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/wolk-connect) ![GitHub](https://img.shields.io/github/license/wolkabout/WolkConnect-Python) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) [![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/) [![Documentation Status](https://readthedocs.org/projects/wolkconnect-python/badge/?version=latest)](https://wolkconnect-python.readthedocs.io/en/latest/?badge=latest)
----
WolkAbout Python Connector library for connecting devices to WolkAbout IoT platform instance.
Expand All @@ -29,20 +30,23 @@ WolkAbout Python Connector library for connecting devices to WolkAbout IoT platf
There are two ways to install this package

### Installing with pip

```console
python3 -m pip install wolk-connect
```

### Installing from source

Clone this repository from the command line using:

```console
git clone https://github.com/Wolkabout/WolkConnect-Python.git
```

Install dependencies by invoking `python3 -m pip install -r requirements.txt`

Install the package by running:

```console
python3 setup.py install
```
Expand All @@ -51,8 +55,10 @@ python3 setup.py install

### Establishing connection with WolkAbout IoT platform

Create a device on WolkAbout IoT Platform by using the *Simple example* device type that is available on the platform. ``Note that device type can be created by importing `simple_example.json` file as new Device Type.``
This device type fits [main.py](https://github.com/Wolkabout/WolkConnect-Python/blob/master/examples/simple/main.py) and demonstrates the periodic sending of a temperature feed reading.
Create a device on WolkAbout IoT Platform by using the *Simple example* device type that is available on the
platform. ``Note that device type can be created by importing `simple_example.json` file as new Device Type.``
This device type fits [main.py](https://github.com/Wolkabout/WolkConnect-Python/blob/master/examples/simple/main.py) and
demonstrates the periodic sending of a temperature feed reading.

```python
import wolk
Expand Down Expand Up @@ -84,6 +90,7 @@ This is useful for maintaining data history when readings are not published imme
If `timestamp` is not provided, the library will assign a timestamp before placing the reading into storage.

#### Adding feed values with timestamp

```python
# Add a signel feed reading to the message queue with the timestamp
wolk_device.add_feed_value(("T", 12.34), 1658315834000)
Expand All @@ -92,13 +99,35 @@ wolk_device.add_feed_value(("T", 12.34), 1658315834000)
wolk_device.add_feed_value([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
```

### Readings persistence and limit

Readings with method `add_feed_value` are added into local persistence. When adding messages be mindful of the
message size that will be published. The default MQTT message size is 260MB, and since readings are of different sizes
(based on the users use-case), check that the limit of readings in persistence will be under the MQTT limit for your broker.
The default readings limit is set to 500000. You can change it with `set_custom_readings_persistence_limit`, if your readings
are bigger, you can decrease the size, or if you have smaller readings, you can increase the size.

### Data publish strategy

Stored feed values are pushed to WolkAbout IoT platform on demand by calling:

```python
wolk_device.publish()
```

### Adding feed values 'separated'

When adding feed values, the values themselves are persisted, which means when publishing all values will be placed
in a single message and published as a single message.

If you would like to ensure different behavior, where you can add feed values that will be sent as a separate message
from any other feed values, use the alternative method:

```python
# Method arguments are exactly the same as for the `add_feed_value`
wolk_device.add_feed_value_separated([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
```

### Disconnecting from the platform

```python
Expand All @@ -107,4 +136,5 @@ wolk_device.disconnect()

## Additional functionality

WolkConnect-Python library has integrated additional features which can perform full WolkAbout IoT platform potential. Explore the [examples](https://github.com/Wolkabout/WolkConnect-Python/tree/master/examples/) for more information.
WolkConnect-Python library has integrated additional features which can perform full WolkAbout IoT platform potential.
Explore the [examples](https://github.com/Wolkabout/WolkConnect-Python/tree/master/examples/) for more information.
14 changes: 14 additions & 0 deletions examples/full_feature_set/README.md
Expand Up @@ -112,6 +112,7 @@ This is useful for maintaining data history when readings are not published imme
If `timestamp` is not provided, the library will assign a timestamp before placing the reading into storage.

#### Adding feed values with timestamp

```python
# Add a signel feed reading to the message queue with the timestamp
wolk_device.add_feed_value(("T", 12.34), 1658315834000)
Expand All @@ -127,6 +128,19 @@ Stored feed values are pushed to WolkAbout IoT platform on demand by calling:
wolk_device.publish()
```

### Adding feed values 'separated'

When adding feed values, the values themselves are persisted, which means when publishing all values will be placed
in a single message and published as a single message.

If you would like to ensure different behavior, where you can add feed values that will be sent as a separate message
from any other feed values, use the alternative method:

```python
# Method arguments are exactly the same as for the `add_feed_value`
wolk_device.add_feed_value_separated([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
```

### Disconnecting from the platform

```python
Expand Down
8 changes: 7 additions & 1 deletion examples/full_feature_set/main.py
Expand Up @@ -35,6 +35,7 @@
print(f"Failed to import WolkConnect: '{e}'")
raise e


# NOTE: Enable debug logging by uncommenting the following line
# Optionally, as a second argument pass a file name
# wolk.logging_config("debug")
Expand Down Expand Up @@ -131,7 +132,12 @@ def incoming_feed_value_handler(
# Pass device and optionally connection details
# Enable file management and firmware update via their respective methods
wolk_device = (
wolk.WolkConnect(device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE")
wolk.WolkConnect(
device,
host="insert_host",
port=80,
ca_cert="PATH/TO/YOUR/CA.CRT/FILE",
)
.with_file_management(
file_directory="files",
preferred_package_size=1000, # NOTE: size in kilobytes
Expand Down
19 changes: 14 additions & 5 deletions examples/simple/main.py
Expand Up @@ -31,6 +31,7 @@
print(f"Failed to import WolkConnect: '{e}'")
raise e


# NOTE: Enable debug logging by uncommenting the following line
# Optionally, as a second argument pass a file name

Expand All @@ -45,23 +46,31 @@ def main() -> None:

# Create a WolkConnect object and pass your device
# NOTE: Change Platform instance with host:str, port:int, ca_cert:str
wolk_device = wolk.WolkConnect(device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE")
wolk_device = wolk.WolkConnect(
device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE"
)

# Establish a connection to the WolkAbout IoT Platform
wolk_device.connect()

publish_period_seconds = 60
publish_period_seconds = 5
i = 0

while True:
try:
# Generate a random value
temperature = random.randint(-20, 80)

# Add a feed reading to the message queue
wolk_device.add_feed_value(("T", temperature))
temperature = ("T", temperature)
wolk_device.add_feed_value(temperature)
print(f"Adding feed value data {temperature}.")
i += 1

print(f'Publishing "T": {temperature}')
wolk_device.publish()
if i % 10 == 0:
time.sleep(publish_period_seconds)
print("Publishing the feed values.")
wolk_device.publish()
time.sleep(publish_period_seconds)
except KeyboardInterrupt:
print("\tReceived KeyboardInterrupt. Exiting script")
Expand Down
67 changes: 67 additions & 0 deletions test/test_in_memory_readings_persistence.py
@@ -0,0 +1,67 @@
"""Tests for InMemoryReadingsPersistence."""
# Copyright 2023 WolkAbout Technology s.r.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from time import time

from wolk.in_memory_readings_persistence import InMemoryReadingsPersistence


class TestInMemoryReadingsPersistence(unittest.TestCase):
"""Tests for InMemoryReadingsPersistence class."""

def test_adding_empty_list(self):
"""Test adding a single reading to the persistence."""
self.persistence = InMemoryReadingsPersistence()
self.assertFalse(self.persistence.store_reading([]))
self.assertEqual(len(self.persistence.obtain_readings()), 0)

def test_adding_single_int_reading(self):
"""Test adding a single int reading to the persistence."""
self.persistence = InMemoryReadingsPersistence()
self.assertTrue(self.persistence.store_reading(("T", 123)))
self.assertEqual(len(self.persistence.obtain_readings()), 1)

def test_adding_multiple_int_readings(self):
"""Test adding a list of int readings to the persistence."""
self.persistence = InMemoryReadingsPersistence()
self.assertTrue(
self.persistence.store_reading(
[("A", 123), ("B", 456), ("C", 789)]
)
)
self.assertEqual(len(self.persistence.obtain_readings()), 1)

def test_add_multiple_readings_at_different_times_and_clean(self):
"""Test adding a lot of different readings at different times."""
self.persistence = InMemoryReadingsPersistence()
self.assertTrue(
self.persistence.store_reading(
[("A", 1), ("B", 2)], round(time() * 1000) + 123511
)
)
self.assertTrue(
self.persistence.store_reading(
[("B", 3), ("C", 4)], round(time() * 1000) + 871265
)
)
self.assertTrue(
self.persistence.store_reading(
[("C", 5), ("A", 6)], round(time() * 1000) + 839681
)
)
self.assertEqual(len(self.persistence.obtain_readings()), 3)

self.assertTrue(self.persistence.clear_readings())
self.assertEqual(len(self.persistence.obtain_readings()), 0)
11 changes: 10 additions & 1 deletion test/test_wolk_connect.py
Expand Up @@ -1446,12 +1446,21 @@ def test_request_timestamp_when_not_none(self):
def test_add_feed_value(self):
"""Test add feed value."""
self.wolk_device.logger.setLevel(logging.CRITICAL)
self.wolk_device.readings_persistence.store_reading = MagicMock()

self.wolk_device.add_feed_value(("foo", "bar"))

self.wolk_device.readings_persistence.store_reading.assert_called_once()

def test_add_feed_value_separated(self):
"""Test add feed value separated."""
self.wolk_device.logger.setLevel(logging.CRITICAL)
self.wolk_device.message_queue.put = MagicMock()
self.wolk_device.message_factory.make_from_feed_value = MagicMock(
return_value=True
)

self.wolk_device.add_feed_value("foo", "bar")
self.wolk_device.add_feed_value_separated(("foo", "bar"))

self.wolk_device.message_queue.put.assert_called_once()

Expand Down
45 changes: 45 additions & 0 deletions test/test_wolkabout_protocol_message_factory.py
Expand Up @@ -86,6 +86,51 @@ def test_feed_values(self):

self.assertEqual(expected_message, serialized_message)

def test_feed_values_collected(self):
"""Test valid message for collected data over time."""
reference_temperature = "T"
value_temperature = 123
reference_location = "LOC"
value_location1 = "45.45,19.19"
value_location2 = "42.42,18.18"
reference_information = "I"
value_information = "Hello!"

timestamp_first = round(time.time()) * 1000
timestamp_second = timestamp_first + 123456789

expected_topic = self.factory.common_topic + WAPMF.FEED_VALUES
expected_payload = json.dumps(
[
{
"timestamp": timestamp_first,
f"{reference_temperature}": value_temperature,
f"{reference_location}": value_location1,
},
{
"timestamp": timestamp_second,
f"{reference_location}": value_location2,
f"{reference_information}": value_information,
},
]
)
expected_message = Message(expected_topic, expected_payload)

serialized_message = self.factory.make_from_feed_values_collected(
{
timestamp_first: {
reference_temperature: value_temperature,
reference_location: value_location1,
},
timestamp_second: {
reference_location: value_location2,
reference_information: value_information,
},
}
)

self.assertEqual(expected_message, serialized_message)

def test_feed_value_throws_on_invalid_data(self):
"""Test valid message for two string feeds."""
self.assertRaises(
Expand Down
2 changes: 1 addition & 1 deletion wolk/__init__.py
Expand Up @@ -8,7 +8,7 @@
For more information about module features visit:
https://github.com/Wolkabout/WolkConnect-Python/tree/master/examples/full_feature_set
"""
__version__ = "5.0.1"
__version__ = "5.1.0"
from .interface.connectivity_service import ConnectivityService
from .interface.firmware_handler import FirmwareHandler
from .interface.message_deserializer import MessageDeserializer
Expand Down

0 comments on commit bd7d4e8

Please sign in to comment.