### Initialization

In [8]:
# built-in
import time
from datetime import datetime
import json
import re

# third-party
import bitalino
import serial
import scientisst
import numpy as np

In [9]:
# Initiate variables

timeout = 120
batch_size = 100
sampling_rate = 10

arduino_pattern = re.compile(r'\d+\n') # pattern to make sure the message received from the serial port is in the format "{value}\n"

bitalino_args = {"mac_address": "98:D3:61:FD:6E:27", "sampling_rate": sampling_rate, "channels": [0, 1, 2, 3, 4, 5], "batch_size": batch_size}
scientisst_args = {"mac_address": "COM3", "sampling_rate": sampling_rate, "channels": [1, 2, 3, 4, 5, 6], "batch_size": batch_size}
arduino_args = {"port": "COM7", "sampling_rate": sampling_rate, "batch_size": batch_size}

### Define auxiliary functions

In [10]:
# Define SerialPortUnavailable exception
from serial.tools import list_ports

class SerialPortUnavailable():

    def __init__(self):
        print("Chosen port is not available. Choose one of the following ports:")
        for port in list_ports.comports():
            print(f"    * {port.name}")


SerialPortUnavailable()

Chosen port is not available. Choose one of the following ports:
    * COM4
    * COM3
    * COM5
    * COM6
    * COM7


<__main__.SerialPortUnavailable at 0x1c8fd16fa00>

In [11]:
# Define class that hold all setup informations about the acquisition

class Acquisition:

    def __init__(self, start_time, file):
        
        self.sampling_rate = sampling_rate
        self.keep_alive = True
        self.devices_started = False
        self.bitalino_args = bitalino_args
        self.scientisst_args = scientisst_args
        self.arduino_args = arduino_args
        self.start_time = start_time
        self.file = file


In [12]:
# Define function that creates file header 

def get_header(acquisition):

    scientisst_channels = ""
    for channel in acquisition.scientisst_args["channels"]:
        scientisst_channels += f"\tscientisst_raw_AI{channel}"
    
    bitalino_channels = ""
    for channel in acquisition.bitalino_args["channels"]:
        bitalino_channels += f"\tbitalino_raw_A{channel}"

    return f"#scientisst_nseq{scientisst_channels}bitalino_nseq{bitalino_channels}\tarduino_raw"

### Define functions for device connection

In [13]:
def connect_bitalino(mac_address):

    init_connect_time = time.time()
    print(f'Searching for BITalino... {mac_address}')

    while True:

        if (time.time() - init_connect_time) > timeout:
            print(f"    Timeout for BITalino device connection")
            raise Exception

        try:
            device = bitalino.BITalino(mac_address)
            if device.macAddress:
                print("Connected!")
                break

        except Exception as e:
            print(e)
            continue

    return device

In [8]:
def connect_scientisst1(mac_address):
    init_connect_time = time.time()
    print(f'Searching for ScientISST... {mac_address}')

    while True:

        if (time.time() - init_connect_time) > timeout:
            print(f"    Timeout for ScientISST device connection")
            raise Exception

        try:
            device = scientisst.ScientISST(mac_address)
            if device.address:
                break

        except Exception as e:
            print(e)
            continue

    return device

In [6]:
def connect_scientisst(mac_address):
    print("here")
    pass
    device = scientisst.ScientISST(mac_address)


In [22]:
def connect_arduino(port):
    init_connect_time = time.time()
    print(f'Searching for Arduino... {port}')

    while True:

        if (time.time() - init_connect_time) > timeout:
            print(f"    Timeout for Arduino device connection")
            raise Exception

        try:
            device = serial.Serial(port, baudrate=115200, timeout=0)
            print("Connected!")
            break

        except Exception as e:
            print(e)
            continue

    return device    

### Define functions to start and read devices

In [23]:
def start_devices(bitalino_device, scientisst_device, acquisition):

    bitalino_device.start(SamplingRate=acquisition.bitalino_args["sampling_rate"], analogChannels=acquisition.bitalino_args["channels"])
    scientisst_device.start(sample_rate=acquisition.scientisst_args["sampling_rate"], channels=acquisition.scientisst_args["channels"])  
    arduino_device = connect_arduino(arduino_args["port"])
    
    return arduino_device

In [24]:
def read_batch(bitalino_device, scientisst_device, arduino_device, acquisition):
    
    try: 
        
        scientisst_data = scientisst_device.read(matrix=True)
        scientisst_data = np.take(scientisst_data,[0]+[*range(-2, -len(acquisition.scientisst_args["channels"])*2-2, -2)], axis=1)
        bitalino_data = bitalino_device.read(scientisst_data.shape[0])
        bitalino_data = np.take(bitalino_data,[0]+[*range(-1, -len(acquisition.bitalino_args["channels"])-1, -1)], axis=1)

        arduino_data = []
        arduino_message = ""
        match = None

        while len(arduino_data) < scientisst_data.shape[0]:
            arduino_bytes = arduino_device.readline().decode() # read message from arduino 

            if arduino_bytes != arduino_message:
                arduino_message = arduino_message + arduino_bytes # in case the full line comes in different messages, concatenate it
                match = arduino_pattern.search(arduino_message) # make sure the message received from the serial port is in the format "{value}\n"

            if match is not None: 
                arduino_data += [[int(match.group().strip())]]
                arduino_message = ""
                
        #print(f"bitalino data:\n{bitalino_data}")
        #print(f"\n\nscientisst data:\n{scientisst_data}")
        #print(f"\n\narduino data:\n{np.array(arduino_data)}")
    
        data = np.hstack((scientisst_data, np.array(arduino_data)))
        print(data)

        np.savetxt(acquisition.file, data, fmt='\t'.join(['%d']*data.shape[1]))
        
    except Exception as e:
        print(e)


### Main script

In [None]:
try: 
    
    now = datetime.now()
    file = open(f"D:\\Projects\\respiration-sensor\\respiration-sensor\\resources\\sample_{now.strftime('%d-%m-%Y_%H-%M-%S')}.txt","w")
    acquisition = Acquisition(time.time(), file)

    file.write(get_header(acquisition))

    # Connect devices

    scientisst_device = connect_scientisst(scientisst_args["mac_address"])
    bitalino_device = connect_bitalino(bitalino_args["mac_address"])
    arduino_device = None # This device does not require connection prior to acquisition start

except Exception as e:
    print(e)
    print(f"Script terminated before acquisition was started")
    acquisition.keep_alive = False

try: 

    while acquisition.keep_alive:
        
        if not acquisition.devices_started: 
            # If the devices have not yet started acquiring or they are paused, start acquisition
            try:
                arduino_device = start_devices(bitalino_device, scientisst_device, acquisition)
                acquisition.devices_started = True

            except Exception as e:
                print(e)
                pass
        
        else:
            read_batch(bitalino_device, scientisst_device, arduino_device, acquisition)

except KeyboardInterrupt:
    bitalino_device.stop()
    scientisst_device.stop()
    arduino_device.close()
    file.close()
    print(f"\n\nAcquisition terminated")


here
Connecting to COM6...


In [7]:
scientisst_args = {"mac_address": "COM3", "sampling_rate": sampling_rate, "channels": [1, 2, 3, 4, 5, 6], "batch_size": batch_size}

scientisst_device = connect_scientisst(scientisst_args["mac_address"])


here
Connecting to COM3...
ScientISST version: 1.0
ScientISST Board Vref: 1114
ScientISST Board ADC Attenuation Mode: 0
Connected!
