# Getting Started with BleakHeart

This notebook is a quick introduction to the BleakHeart package and its supported workflows. 

Since recent versions of the IPython kernel come with ```asyncio``` integration, you do not need to start or manage your own event loop; you can simply go ahead and ```await``` Bleak and BleakHeart coroutines in any cell. This makes it easier to explore the basic BleakHeart functionality. Stand-alone examples are provided in the ```examples``` directory and referenced here; much of their relatively higher complexity does not depend on BleakHeart, but rather comes from the need to manage keyboard input/output in an asynchronous setting under different operating systems. Therefore, I recommend that you start here and only go through the stand-alone examples after reviewing the basic workflows.

In [None]:
## If you do not have bleakheart installed in your environment, you may want to uncomment the lines below 
## and run this cell in order to install it. This will also install Bleak.
#import sys
#!{sys.executable} -m pip install bleakheart

In [None]:
from bleak import BleakScanner, BleakClient
import asyncio as aio
import bleakheart as bh

## Connecting to the sensor

The following code connects to the Polar H10. Make sure the Bluetooth adapter on your computer is powered on and that the sensor is making good contact with your skin. See the [bleak documentation](https://bleak.readthedocs.io/en/latest/) for more details.

In [None]:
## Find your sensor and connect to it
# edit this line if you have another compatible sensor
device= await BleakScanner.find_device_by_filter(lambda dev, adv: dev.name and "polar" in dev.name.lower())
if device==None: 
    print("Polar device not found")
client= BleakClient(device)
await client.connect()
if not client.is_connected:
    print("Connection failed")
else:
    print(f"Connected to {device.name}")

## Checking the battery level

You can easily check the battery level of the device by using the ```BatteryLevel``` class. This class should work with all devices that implement the BLE Battery Service Specification. As the ```read()``` method is a coroutine, it needs to be awaited.

In [None]:
batterylevel= await bh.BatteryLevel(client).read()
batterylevel

In [None]:
# Run this cell to view the documentation for the BatteryLevel class
help(bh.BatteryLevel)

## Heart rate monitoring

The ```HeartRate``` class allows reading the average heart rate and RR intervals as transmitted by the sensor. Frames are returned in the following format:
```
('HR', tstamp, (avghr, rrlist), energy)
```
 where 'HR' is a constant string, tstamp is the (client) time stamp in ns,  avghr the average heart rate as detected by the sensor, rrlist is a list of RR values in the frame (in ms, supported by the H10), and energy is the energy expenditure (in kJoule, not supported by the H10).
 
If the constructor parameter ```unpack``` is True, RR intervals are split into separate tuples and data is formatted as follows:
```
('HR', t_est, (hr, rr), energy)
```
where t_est is the estimated time stamp of the individual heartbeat, and hr can be either the average heart rate returned by the sensor or the instant heart rate as computed from the specific RR interval, according to constructor parameter ```instant_rate```.


### Simplest workflow: callback

In the simplest workflow, a callback is passed the decoded frames one at a time as they are received. Here we use the iPyhton ```display``` function as the callback in order to visualise the frames:

In [None]:
heartrate=bh.HeartRate(client, callback=display, instant_rate=True, unpack=True)

The following code will acquire your heart rate for 5 seconds, and print the frames. Try changing ```instant_rate``` and then ```unpack``` to ```False``` in the cell above and see what happens (note that instant_rate==True is only supported when unpack==True).

In [None]:
# start notifications; bleakheart will start sending data to
# the callback
await heartrate.start_notify()
await aio.sleep(5)
await heartrate.stop_notify()

For a stand-alone example of this workflow, see ```heartrate_callback.py```

### Producer-consumer model: asynchronous queue

Another possible workflow involves an asynchronous queue to which the BLE client routine (the *producer*) writes frames, that are then available for your application to use:

In [None]:
# Hold heart rate frames
hrqueue=aio.Queue()

The producer task starts notifications on the server, waits for a few seconds and then stops them. A special token is then pushed to the queue to signal the consumer task (see below) that data are over and it can exit.

In [None]:
async def producer_task():
    heartrate=bh.HeartRate(client, queue=hrqueue, instant_rate=True, unpack=True)
    await heartrate.start_notify()
    await aio.sleep(5)
    await heartrate.stop_notify()
    # inform the consumer process that data are over
    await hrqueue.put('END')

Awaiting the producer task fills up the queue with data:

In [None]:
await producer_task()
hrqueue

This is the consumer task, that simply prints each frame:

In [None]:
async def consumer_task():
    while True:
        frame=await hrqueue.get()
        # mark the frame as processed - useful if any tasks are "joining" the queue
        hrqueue.task_done()
        if frame=='END':
            break
        print(frame)

Awaiting the consumer task causes the frames to be pulled from the queue and printed out

In [None]:
await consumer_task()

Of course, in your application you will generally want to run the producer and consumer tasks concurrently - using ```gather``` is a covenient way to do just that.

In [None]:
# Purge any residual tasks from the queue to ensure a clean run
while not hrqueue.empty():
    hrqueue.get_nowait()
    hrqueue.task_done()
    
# Run producer and consumer tasks. Both tasks return None
await aio.gather(producer_task(), consumer_task())

For a complete stand-alone example of this workflow, see ```heartrate_queue.py```.

In [None]:
# Run this cell to view the documentation for the HeartRate class
help(bh.HeartRate)

## The Polar Measurement Data API

Polar sensors implement the proprietary Polar Measurement Data API. On the Polar H10, this allows accessing ECG and Accelerometer data. On other devices, photoplethysmography, peak-to-peak intervals, gyroscope and magnetometer data may be available. 

### Listing the measurements supported by your device

The ```available_measurements``` asynchronous method returns a list of available measurements on your device. Currently, BleakHeart only decodes ECG and the type of Accelerometer frames used by the Polar H10; other frame types are returned as raw bytearrays.

In [None]:
measurements = await bh.PolarMeasurementData(client).available_measurements()
measurements

### Acquiring ECG data

The ```available_settings``` async method returns a dictionary of possible settings for a specific measurement. As we see, the only settings available for the ECG on the Polar H10 are a sampling rate of 130Hz and 14-bit precision (BleakHeart converts these 14-bit floats to standard floats for you).

In [None]:
settings=await bh.PolarMeasurementData(client).available_settings('ECG')
print("Request for available ECG settings returned the following:")
for k,v in settings.items():
    print(f"{k}:\t{v}")

```PolarMeasurementData``` supports both the callback workflow and the consumer-producer model, as seen above. In this example, we will push the frames to an async queue, but we will then use some synchronous code to plot the data.

In [None]:
ecgqueue=aio.Queue()
# ECG frames to be pushed to the queue
pmd=bh.PolarMeasurementData(client, ecg_queue=ecgqueue)

Acquisition of a measurement is started and stopped by calling the ```start_streaming``` and ```stop_streaming``` async methods and specifying the requested measurement. The code below will acquire 10 ECG frames (ie, about 5 seconds of electrocardiogram). Note that since it takes some time for the sensor to start streaming the ECG, specifying a short interval such as 5 seconds would offer little control on the actual amount of data received.

In [None]:
(err_code, err_msg, _) = await pmd.start_streaming('ECG')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
# acquire at least 10 frames (about 5 seconds)
print("Streaming .", end="")
while ecgqueue.qsize()<10:
    await aio.sleep(1)
    print(".", end="")
print()
await pmd.stop_streaming('ECG')

The queue should now contain at least 10 frames (but may contain more):

In [None]:
ecgqueue.qsize()

ECG frames are encoded as tuples composed of the 'ECG' tag, the sensor timestamp (in ns) and a list of samples in microVolt. The time stamp refers to the *last* sample of the list.

In [None]:
print(ecgqueue.get_nowait())
ecgqueue.task_done()

#### Plotting ECG data

This section makes use of Matplotlib, that is not a required dependency of BleakHeart. If you do not have Matplotlib available in the current environment, you can install it by uncommenting the appropriate lines in the cell below, according to your package manager.

In [None]:
## Uncomment these lines to install matplotlib in the current Jupyter kernel using pip
#import sys
#!{sys.executable} -m pip install matplotlib

## Uncomment these lines to install matplotlib in the current Jupyter kernel using conda
#import sys
#!conda install --yes --prefix {sys.prefix} matplotlib

import matplotlib.pyplot as plt
%matplotlib inline

We consume the queue using the synchronous method ```get_nowait``` and plot the data using Matplotlib:

In [None]:
fig, ax= plt.subplots()
ax.set_title("ECG")
ax.set_xlabel('s')
ax.set_ylabel('mV')
signal=[]
while not ecgqueue.empty():
    frame=ecgqueue.get_nowait()
    samples=frame[-1] # a list of samples
    signal.extend(samples)
    ecgqueue.task_done()
# convert to mV
signal_mV=[x*1e-3 for x in signal]
# time in seconds; sampling frequency is 130Hz
t=[x/130.0 for x in range(len(signal_mV))]
ax.plot(t, signal_mV)

A complete stand-alone example of the producer-consumer workflow for ECG data can be found in ```ecg_queue.py```.

### Acquiring accelerometer data

The Polar H10 supports a range of different sampling frequencies and dynamic range choices for Accelerometer data: 

In [None]:
settings=await bh.PolarMeasurementData(client).available_settings('ACC')
print("Request for available ACC settings returned the following:")
for k,v in settings.items():
    print(f"{k}:\t{v}")

The default BleakHeart settings for Accelerometer data acquisition are as follows: 
* Sampling rate: 200 Hz
* Resolution (precision): 16 bit
* Range: -2g to 2g

In [None]:
# default settings for the selected measurement
bh.PolarMeasurementData.default_settings['ACC']

Using these defaults, again we declare an async queue onto which Accelerometer frames will be pushed:

In [None]:
accqueue=aio.Queue()
# ACC frames wil be pushed to accqueue
pmd=bh.PolarMeasurementData(client, acc_queue=accqueue)

Streaming is started and stopped similar to ECG data. At a sampling rate of 200Hz, 20 frames will cover about 5 seconds. 

In [None]:
(err_code, err_msg, _) = await pmd.start_streaming('ACC')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
# acquire at least 20 frames (about 5 seconds)
print("Streaming .", end="")
while accqueue.qsize()<20:
    await aio.sleep(1)
    print(".", end="")
print()
await pmd.stop_streaming('ACC')

Let's check the length of the queue (note that this may vary):

In [None]:
accqueue.qsize()

Accelerometer frames are decoded as tuples with the 'ACC' tag, followed by the sensor timestamp (in ns) of the *last* sample, followed by a list of samples. Each sample is a (x, y, z) tuple with the acceleration along the three axes. Acceleration is returned in units of 1/1000th of the gravity acceleration g.

In [None]:
display(accqueue.get_nowait())
accqueue.task_done()

#### Plotting accelerometer data

Again using Matplotlib, we can consume the queue and plot the data:

In [None]:
import numpy as np

fig, ax= plt.subplots()
ax.set_title("Acceleration")
ax.set_xlabel('s')
ax.set_ylabel('g')
acc=[]
while not accqueue.empty():
    frame=accqueue.get_nowait()
    samples=frame[-1] # a list of triples
    acc.extend(samples)
    accqueue.task_done()
# time in s - sampling frequency is 200Hz
t=[x/200.0 for x in range(len(acc))]
# scale the data to units of g and plot it
ax.plot(t, np.array(acc)*1e-3, label=['x axis', 'y azis', 'z axis'])
plt.legend()

For a stand-alone example of the callback workflow for Accelerometer data, see ```accel_callback.py```.

### Acquiring multiple measurements simultaneously

The Polar Measurement Data API and BleakHeart support streaming more than one measurement at the same time. Note that you should only create one instance of ```PolarMeasurementData``` for each device, and use that to control the streaming of all measurements. Here is an example of how to stream both ECG and Accelerometer data, using two separate async queues: 

In [None]:
accqueue=aio.Queue()
ecgqueue=aio.Queue()
pmd=bh.PolarMeasurementData(client, acc_queue=accqueue, ecg_queue=ecgqueue)
print("Starting streaming...")
(err_code, err_msg, _) = await pmd.start_streaming('ACC')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
(err_code, err_msg, _) = await pmd.start_streaming('ECG')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
print("Acquiring for 8 seconds...")
await aio.sleep(8)
print("Done.")
await pmd.stop_streaming('ACC')
await pmd.stop_streaming('ECG')

Note that the number of frames stored on each queue will in general not be the same, and they will generally be interleaved in unpredictable ways during transmission:

In [None]:
print(accqueue.qsize())
print(ecgqueue.qsize())

It is possible to specifiy the same queue for both measurements, which is useful for instance if the frames have to be processed in order in which they are received. Alternatively, you can specify a callback, that will be passed any measurements for which you have not specified a queue. In both cases, differenent measurements types are easily told apart based on their tag:

In [None]:
def handler(frame):
    """ A placeholder callback that prints the type of frame """
    display(f"{frame[0]} frame received")

# only create one instance of this class for each device
pmd=bh.PolarMeasurementData(client, callback= handler)
print("Starting streaming...")
# notifications can be started at any time and in any order...
(err_code, err_msg, _) = await pmd.start_streaming('ACC')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
(err_code, err_msg, _) = await pmd.start_streaming('ECG')
if err_code!=0:
    print(f"PMD returned an error: {err_msg}")
display("Acquiring ECG and ACC for 4 seconds...")
await aio.sleep(4)
# ... and they can be stopped in any order
await pmd.stop_streaming('ACC')
display("Acquiring ECG alone for a further 2 seconds...")
await aio.sleep(2) # only ECG packets during this time
await pmd.stop_streaming('ECG')
print("Done.")

 Run the cell below to view the documentation for the PolarMeasurementData class.

In [None]:
help(bh.PolarMeasurementData)