## Reading sensor data from PlantBuddy.

### The necessary imports

In [12]:
import asyncio
from bleak import BleakClient, BleakError, BleakScanner

### Connect to the PlantBuddy

In [93]:
import struct
from collections import namedtuple
from typing import Callable

# Create a PlantBuddy class that can be used to connect/disconnect from it and fetch data.
class PlantBuddy():
    # MAC Address of the PlantBuddy
    mac_addr = "F4:94:15:33:BC:8A"
    # UUID's of service and sensordata characteristic
    uuid_service = "f4c2abcd-6e5f-48a2-b9b2-a4f762791d85"
    uuid_char_sensordata = "f4c20001-6e5f-48a2-b9b2-a4f762791d85"
    # Local variables
    ble_client = None
    callback = None

    def __init__(self):
        pass

    async def connect(self, timeout: float = 60.0):
        device = await BleakScanner.find_device_by_address(self.mac_addr, timeout=timeout)
        if not device:
            raise BleakError(f"A device with address {self.ble_address} could not be found.")
        # Save the client locally
        self.ble_client = BleakClient(device)
        await self.ble_client.connect(timeout=timeout)

    async def disconnect(self):
         await self.ble_client.disconnect()

    def install_callback(self, callback: Callable[[bytearray], None]):
        self.callback = callback

    async def start_listening(self):
        if self.callback is None:
            raise BleakError(f"No callback was specified!")
        await self.ble_client.start_notify(self.uuid_char_sensordata, self._notification_handler)

    async def stop_listening(self):
        await self.ble_client.stop_notify(self.uuid_char_sensordata)

    def _notification_handler(self, sender, data):
        Packet = namedtuple('Packet', 'timestamp soil_humidity luminous_flux air_humidity air_temperature')
        raw_packet = Packet._make(struct.unpack("<IIIHH", data))
        phys_packet = raw_packet._replace(air_temperature=raw_packet.air_temperature/100, air_humidity=raw_packet.air_humidity/100)

        self.callback(phys_packet)


In [94]:
def new_data(sensor_data):
    # Data needs to be a dictionary containing all my items
    print(f"Received: {sensor_data}")

pb = PlantBuddy()
await pb.connect()
pb.install_callback(new_data)
await pb.start_listening()
await asyncio.sleep(10.0)


Received: Packet(timestamp=31, soil_humidity=908679, luminous_flux=16, air_humidity=38.41, air_temperature=23.8)
Received: Packet(timestamp=32, soil_humidity=908669, luminous_flux=16, air_humidity=38.41, air_temperature=23.84)
Received: Packet(timestamp=33, soil_humidity=908679, luminous_flux=16, air_humidity=38.37, air_temperature=23.83)
Received: Packet(timestamp=34, soil_humidity=908699, luminous_flux=16, air_humidity=38.31, air_temperature=23.83)
Received: Packet(timestamp=35, soil_humidity=908709, luminous_flux=16, air_humidity=38.29, air_temperature=23.84)
Received: Packet(timestamp=36, soil_humidity=908699, luminous_flux=16, air_humidity=38.19, air_temperature=23.81)
Received: Packet(timestamp=37, soil_humidity=908709, luminous_flux=16, air_humidity=38.13, air_temperature=23.84)
Received: Packet(timestamp=38, soil_humidity=908699, luminous_flux=16, air_humidity=38.06, air_temperature=23.87)
Received: Packet(timestamp=39, soil_humidity=908709, luminous_flux=15, air_humidity=38.03

12, bytearray(b'\x14\xde\r\x00\x10\x00\x00\x00\x89\x0eJ\t')
12, bytearray(b'\xd7\xdd\r\x00\x10\x00\x00\x00\x8e\x0eG\t')
12, bytearray(b'\x14\xde\r\x00\x10\x00\x00\x00\x94\x0eI\t')


In [86]:
from collections import namedtuple
Packet = namedtuple('Packet', 'soil_humidity luminous_flux air_humidity air_temperature')

x = bytearray(b'\xf5\xdd\r\x00\x10\x00\x00\x00\x86\x0eG\t')
packet = Packet._make(struct.unpack("<IIHH", x))
packet = packet._replace(air_temperature=packet.air_temperature/100, air_humidity=packet.air_humidity/100)
print(f"Received: {packet}")

Received: Packet(soil_humidity=908789, luminous_flux=16, air_humidity=37.18, air_temperature=23.75)
