Skip to content

Commit

Permalink
Connected everything into wolk_connect
Browse files Browse the repository at this point in the history
  • Loading branch information
nanavuletic committed May 26, 2023
1 parent 6850148 commit 1b3173d
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 20 deletions.
9 changes: 5 additions & 4 deletions README.md
Expand Up @@ -73,23 +73,24 @@ wolk_device.connect()
### Adding feed values

```python
wolk_device.add_feed_value(("T", 26.93))
wolk_device.add_feed_value_sealed(("T", 26.93))

# or multiple feed value readings
wolk_device.add_feed_value([("T", 27.11), ("H", 54.34), ("P", 1002.3)])
wolk_device.add_feed_value_sealed([("T", 27.11), ("H", 54.34), ("P", 1002.3)])
```

Optionally pass a `timestamp` as `round(time.time()) * 1000`.
This is useful for maintaining data history when readings are not published immediately after adding them to storage.
If `timestamp` is not provided, the library will assign a timestamp before placing the reading into storage.

#### Adding feed values with timestamp

```python
# Add a signel feed reading to the message queue with the timestamp
wolk_device.add_feed_value(("T", 12.34), 1658315834000)
wolk_device.add_feed_value_sealed(("T", 12.34), 1658315834000)

# Add a multi feed reading to the message queue with the timestamp
wolk_device.add_feed_value([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
wolk_device.add_feed_value_sealed([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
```

### Data publish strategy
Expand Down
9 changes: 5 additions & 4 deletions examples/full_feature_set/README.md
Expand Up @@ -101,23 +101,24 @@ wolk_device = (
### Adding feed values

```python
wolk_device.add_feed_value(("T", 26.93))
wolk_device.add_feed_value_sealed(("T", 26.93))

# or multiple feed value readings
wolk_device.add_feed_value([("T", 27.11), ("H", 54.34), ("P", 1002.3)])
wolk_device.add_feed_value_sealed([("T", 27.11), ("H", 54.34), ("P", 1002.3)])
```

Optionally pass a `timestamp` as `round(time.time()) * 1000`.
This is useful for maintaining data history when readings are not published immediately after adding them to storage.
If `timestamp` is not provided, the library will assign a timestamp before placing the reading into storage.

#### Adding feed values with timestamp

```python
# Add a signel feed reading to the message queue with the timestamp
wolk_device.add_feed_value(("T", 12.34), 1658315834000)
wolk_device.add_feed_value_sealed(("T", 12.34), 1658315834000)

# Add a multi feed reading to the message queue with the timestamp
wolk_device.add_feed_value([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
wolk_device.add_feed_value_sealed([("T", 12.34), ("H", 56.78), ("P", 1022.00)], 1658315834000)
```

### Data publish strategy
Expand Down
13 changes: 9 additions & 4 deletions examples/full_feature_set/main.py
Expand Up @@ -131,7 +131,12 @@ def incoming_feed_value_handler(
# Pass device and optionally connection details
# Enable file management and firmware update via their respective methods
wolk_device = (
wolk.WolkConnect(device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE")
wolk.WolkConnect(
device,
host="insert_host",
port=80,
ca_cert="PATH/TO/YOUR/CA.CRT/FILE",
)
.with_file_management(
file_directory="files",
preferred_package_size=1000, # NOTE: size in kilobytes
Expand Down Expand Up @@ -172,7 +177,7 @@ def incoming_feed_value_handler(
while True:
try:
# Add feed values to outbound message queue
wolk_device.add_feed_value(
wolk_device.add_feed_value_sealed(
[
(switch_feed.reference, switch_feed.value),
(heart_beat.reference, heart_beat.value),
Expand All @@ -182,12 +187,12 @@ def incoming_feed_value_handler(
# Generate a random value
temperature = random.randint(-20, 80)
# Add a feed reading to the message queue
wolk_device.add_feed_value(("T", temperature))
wolk_device.add_feed_value_sealed(("T", temperature))

# Generate random value for the newly registered feed
new_feed = random.randint(0, 100)
# Add feed value reading of the new feed to message queue
wolk_device.add_feed_value(("NF", new_feed))
wolk_device.add_feed_value_sealed(("NF", new_feed))

# Publish all queued messages
wolk_device.publish()
Expand Down
2 changes: 1 addition & 1 deletion examples/pull/main.py
Expand Up @@ -116,7 +116,7 @@ def incoming_feed_value_handler(
time.sleep(heart_beat.value / 10)

# Add feed values to outbound message queue
wolk_device.add_feed_value(
wolk_device.add_feed_value_sealed(
[
(switch_feed.reference, switch_feed.value),
(heart_beat.reference, heart_beat.value),
Expand Down
2 changes: 1 addition & 1 deletion examples/register_feed_and_attribute/main.py
Expand Up @@ -75,7 +75,7 @@ def main() -> None:
# Generate random value for the newly registered feed
new_feed = random.randint(0, 100)
# Add feed value reading of the new feed to message queue
wolk_device.add_feed_value(("NF", new_feed))
wolk_device.add_feed_value_sealed(("NF", new_feed))
print(f'Publishing "NF": {new_feed}')
# Publish queued messages
wolk_device.publish()
Expand Down
19 changes: 14 additions & 5 deletions examples/simple/main.py
Expand Up @@ -31,6 +31,7 @@
print(f"Failed to import WolkConnect: '{e}'")
raise e


# NOTE: Enable debug logging by uncommenting the following line
# Optionally, as a second argument pass a file name

Expand All @@ -45,23 +46,31 @@ def main() -> None:

# Create a WolkConnect object and pass your device
# NOTE: Change Platform instance with host:str, port:int, ca_cert:str
wolk_device = wolk.WolkConnect(device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE")
wolk_device = wolk.WolkConnect(
device, host="insert_host", port=80, ca_cert="PATH/TO/YOUR/CA.CRT/FILE"
)

# Establish a connection to the WolkAbout IoT Platform
wolk_device.connect()

publish_period_seconds = 60
publish_period_seconds = 5
i = 0

while True:
try:
# Generate a random value
temperature = random.randint(-20, 80)

# Add a feed reading to the message queue
wolk_device.add_feed_value(("T", temperature))
temperature = ("T", temperature)
wolk_device.add_feed_value(temperature)
print(f"Adding feed value data {temperature}.")
i += 1

print(f'Publishing "T": {temperature}')
wolk_device.publish()
if i % 10 == 0:
time.sleep(publish_period_seconds)
print("Publishing the feed values.")
wolk_device.publish()
time.sleep(publish_period_seconds)
except KeyboardInterrupt:
print("\tReceived KeyboardInterrupt. Exiting script")
Expand Down
2 changes: 1 addition & 1 deletion test/test_wolk_connect.py
Expand Up @@ -1451,7 +1451,7 @@ def test_add_feed_value(self):
return_value=True
)

self.wolk_device.add_feed_value("foo", "bar")
self.wolk_device.add_feed_value_sealed("foo", "bar")

self.wolk_device.message_queue.put.assert_called_once()

Expand Down
74 changes: 74 additions & 0 deletions wolk/wolk_connect.py
Expand Up @@ -22,13 +22,15 @@
from typing import Union

from wolk import logger_factory
from wolk.in_memory_readings_persistence import InMemoryReadingsPersistence
from wolk.interface.connectivity_service import ConnectivityService
from wolk.interface.file_management import FileManagement
from wolk.interface.firmware_handler import FirmwareHandler
from wolk.interface.firmware_update import FirmwareUpdate
from wolk.interface.message_deserializer import MessageDeserializer
from wolk.interface.message_factory import MessageFactory
from wolk.interface.message_queue import MessageQueue
from wolk.interface.readings_persistence import ReadingsPersistence
from wolk.message_deque import MessageDeque
from wolk.model.data_delivery import DataDelivery
from wolk.model.data_type import DataType
Expand Down Expand Up @@ -77,6 +79,8 @@ class WolkConnect:
:vartype message_factory: MessageFactory
:ivar message_queue: Store data before sending
:vartype message_queue: MessageQueue
:ivar readings_persistence: Store readings before sending
:vartype readings_persistence: ReadingsPersistence
"""

def __init__(
Expand Down Expand Up @@ -117,6 +121,9 @@ def __init__(
self.device
)
self.parameters: Dict[str, Union[int, bool, float, str]] = {}
self.readings_persistence: ReadingsPersistence = (
InMemoryReadingsPersistence()
)

wolk_ca_cert = os.path.join(os.path.dirname(__file__), "ca.crt")

Expand Down Expand Up @@ -253,6 +260,23 @@ def with_custom_message_queue(self, message_queue: MessageQueue): # type: ignor

return self

def with_custom_readings_persistence(self, readings_persistence: ReadingsPersistence): # type: ignore
"""
Use custom means of storing readings.
:param readings_persistence: Custom readings persistence
:type readings_persistence: ReadingsPersistence
"""
self.logger.debug(f"Readings persistence: {readings_persistence}")
if not isinstance(readings_persistence, ReadingsPersistence):
raise ValueError(
"Provided readings persistence does not implement ReadingsPersistence"
)

self.readings_persistence = readings_persistence

return self

def with_custom_protocol( # type: ignore
self,
message_factory: MessageFactory,
Expand Down Expand Up @@ -398,6 +422,40 @@ def add_feed_value(
f"Adding feed value: reading: {reading}, timestamp = {timestamp}"
)

self.readings_persistence.store_reading(reading, timestamp)

def add_feed_value_sealed(
self,
reading: Union[Reading, List[Reading]],
timestamp: Optional[int] = None,
) -> None:
"""
Place a feed value reading into storage.
A reading is identified by a unique feed reference string and
the current value of the feed.
This reading can either be passed as a tuple of (reference, value)
for a single feed or as a list of previously mentioned tuples
to pass multiple feed readings at once.
A Unix epoch timestamp in milliseconds as int can be provided to
denote when the reading occurred. By default, the current system
provided time will be assigned to a reading.
The sealed variant will ensure that these reading values get sent
as a separate message, independent of any other feed values that
have been added to the object.
:param reading: Feed value reading
:type reading: Union[Reading, List[Reading]]
:param timestamp: Unix timestamp. Defaults to system time.
:type timestamp: Optional[int]
"""
self.logger.debug(
f"Adding feed value sealed: reading: {reading}, timestamp = {timestamp}"
)

message = self.message_factory.make_from_feed_value(reading, timestamp)
# NOTE: if device is PUSH, do we try to publish instantly?
self.message_queue.put(message)
Expand All @@ -408,6 +466,22 @@ def publish(self) -> None:
if not self.connectivity_service.is_connected():
self.logger.warning("Not connected, unable to publish messages")
return

saved_readings = len(self.readings_persistence.obtain_readings())
if saved_readings > 0:
readings_message = (
self.message_factory.make_from_feed_values_collected(
self.readings_persistence.obtain_readings()
)
)
if readings_message is not None:
if self.connectivity_service.publish(readings_message):
self.readings_persistence.clear_readings()
else:
self.logger.warning(
f"Failed to publish message: {readings_message}"
)

while True:
message = self.message_queue.peek()
if message is None:
Expand Down

0 comments on commit 1b3173d

Please sign in to comment.