In [3]:
import os, sys
root_path = os.path.abspath(os.path.join('..'))
if root_path not in sys.path:
    sys.path.append(root_path)

In [1]:
import time
import sys
import csv

#load bme
import board
from busio import I2C
import adafruit_bme680 as adabme

#load eNose
import re
import serial
import serial.tools.list_ports


#load servo
import RPi.GPIO as GPIO
import time
from tube_router.servo_config import get_control_GPIO, get_channels

ModuleNotFoundError: No module named 'board'

In [None]:
class CSVWriter:
    def __init__(self,filename):
        self.csvfile = open(filename, 'w')
        self.filewriter = csv.writer(self.csvfile, delimiter=',',quotechar=' ', quoting=csv.QUOTE_MINIMAL,escapechar='\\')
        channelstring = []
        for i in range(64):
            channelstring.append(str('Channel'+str(i)))
        headerChannel = str(channelstring).translate({ord('['): '', ord(']'): '', ord('\''): ''})                         
        #write header                      
        self.filewriter.writerow(['Time',headerChannel,'Temperature','Gas','Humidity','Pressure','Altitude','Label'])
    
    def writeSample(self,time, eNoseSample,bmeSample,label):
        eNoseCorrected = str(eNoseSample).translate({ord('['): '', ord(']'): '', ord('\''): ''})
        bmeCorrected = str(bmeSample).translate({ord('['): '', ord(']'): '', ord('\''): ''})
        self.filewriter.writerow([time,eNoseCorrected,bmeCorrected,label])
        
    def __del__(self):
        print('Closing csv')
        try:
            self.csvfile.close()
        except Exception:
            pass

In [None]:
class ServoConnecor:
    #load servo settings
    def __init__(self):
        servoPIN = 17
        self.GPIO.setmode(GPIO.BCM)
        self.GPIO.setup(servoPIN, GPIO.OUT)

        self.p = GPIO.PWM(servoPIN, 50)
        self.Channel = [3.8,   #channel 1 
             4.8,   #channel 2
             5.8,   #channel 3
             6.75,  #channel 4
             7.8,   #channel 5
             8.8,   #channel 6
             10.    #channel 7
            ]
        
        self.p.start(self.Channel[0])
        
    def setSample(self,channel):
        p.ChangeDutyCycle(self.Channel[channel])
        
    def __del__(self):
        p.stop()
        GPIO.cleanup()

In [None]:
class BMEConnector:
    #init bme680
    def __init__(self):
        #load bme settings
        self.i2c = I2C(board.SCL, board.SDA)
        self.bme680 = adabme.Adafruit_BME680_I2C(i2c, debug=False)
        self.bme680.sea_level_pressure = 953.25 

    #read data from bme
    def detect(self):
        sensordata = []
        sensordata[0] = self.bme680.temperature
        sensordata[1] = self.bme680.gas 
        sensordata[2] = self.bme680.humidity
        sensordata[3] = self.bme680.pressure
        sensordata[4] = self.bme680.altitude
        return sensordata
            
    def __del__(self):
        print('closing bme680 connection')
        #self.i2c.close()

In [None]:
class eNoseConnector:
    """ Connects to an eNose on the given port with the given baudrate.
        Parses the input in a new thread and updates its values accordingly.
        After each full received frame, the onUpdate is triggered.
        
        Use onUpdate like this: connector.onUpdate += <callbackMethod>"""
    
    

    def __init__(self, port: str = None, baudrate: int = 115200, channels: int = 64):
        self.sensorValues = [0.0] * channels
        self.channels = channels
        
        if port is None:
            port = self.find_port()

        self.ser = serial.Serial(
            port=port,
            baudrate=baudrate,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
            timeout=10)

    
    def __del__(self):
        print('Closing eNose connection')
        try:
            self.ser.close()
        except Exception:
            pass
        
    def detect(self):
        
        try:
            line = self.ser.readline()
            #line looks like: count=___,var1=____._,var2=____._,....
            match = re.match(b'^count=([0-9]+),(var.+)$', line)
            if match is not None:
                self.channels = int(match.group(1))
                sensors_data = match.group(2)
                self.sensorValues = [float(d.split(b'=')[1]) for d in sensors_data.split(b',')]
                print("received data for %i sensors (actually %i)" % (self.channels, len(self.sensorValues)))
                return self.sensorValues 
        except:
            print('Exception raised')


    @staticmethod
    def find_port():
        ports = list(serial.tools.list_ports.comports())
        port = None
        for p in ports:
            print('Checking port %s / %s' % (p[0], p[1]))
            if "CP2102" in p[1]:
                port = p
                break

        if port is None:
            print('Could not find a connected eNose')
            return None

        print('Using the eNose connected on:')
        print(port[0] + ' / ' + port[1])
        return port[0]


In [None]:
class TestEquimentRunner:
    #labels[0-7] = labels of 0-7 samples, numLoops = number of iterations to be done, timeLoop = timelength of a single sample in seconds
    def __init__(self,labels,numLoops,timeLoop,filename):
        
        #initialize eNose
        eNose = eNoseConnector()
        #initialize bme680
        bme = BMEConnector()
        #initialize csv file
        sampleWriter = CSVWriter(filename)
        #initialize servo
        servo = ServoConnecor()
        
        #start detection
        loopsDone = 0
        while(loopsDone<numLoops):
            currentPos = 0
            nextPos = 1
            print('round: ',loopsDone+1)
            while(nextPos != len(labels)):
                if(labels[currentPos]):
                    #goto position i, start fan,start with smelling for time 
                    servo.setSample(currentPos)
                    t_end = time.time() + timeLoop
                    while(time.time() < t_end):
                        eNoseSample = eNose.detect()
                        bmeSample = bme.detect()
                        sampleWriter.writeSample(time.time(),eNoseSample,bmeSample,labels[currentPos])
                        time.sleep(0.2)
                    feedback ='current sample '+labels[currentPos]+' measured'
                    if(currentPos == 0):
                        currentPos = nextPos
                    else:
                        currentPos = 0
                        nextPos +=1
                    feedback +=str(', next sample will be: '+labels[currentPos])
                    print(feedback)
            loopsDone +=1
        
        

In [None]:
labels = ['null','test2','test3','test4','test5','test6','test7']
TestEquimentRunner(labels,1,1,'test1.csv')