1- # First, install the dependencies via:
2- # $ pip3 install requests
3-
4- import json
5- import time , hmac , hashlib
1+ import os
2+ import time
3+ import numpy as np
4+ import re
5+ import uuid
66import requests
7- import re , uuid
7+ import hmac
88import math
9+ import hashlib
10+ import json
11+ import tkinter as tk
12+ from tkinter import messagebox
13+ import matplotlib .pyplot as plt
14+ import logging
915
10- # Your API & HMAC keys can be found here (go to your project > Dashboard > Keys to find this)
11- HMAC_KEY = "fed53116f20684c067774ebf9e7bcbdc"
12- API_KEY = "ei_fd83..."
13-
14- # empty signature (all zeros). HS256 gives 32 byte signature, and we encode in hex, so we need 64 characters here
15- emptySignature = '' . join ([ '0' ] * 64 )
16+ logging . basicConfig (
17+ filename = "sensor_app.log" ,
18+ level = logging . DEBUG ,
19+ format = "%(asctime)s - %(levelname)s - %(message)s" ,
20+ )
21+ logging . getLogger (). addHandler ( logging . StreamHandler ()) # To also print to console
1622
17- # use MAC address of network interface as deviceId
18- device_name = ":" .join (re .findall ('..' , '%012x' % uuid .getnode ()))
1923
20- # here we have new data every 16 ms
24+ IS_MACOS = os .uname ().sysname == "Darwin"
25+ HMAC_KEY = "54598668df4fb5ed5d55247b38dd55a3"
26+ API_KEY = "ei_6be3855c55f764ecec5834853c4fd051c684ef4e1cd26e6a074dfeede5636c64"
27+ emptySignature = "" .join (["0" ] * 64 )
28+ device_name = ":" .join (re .findall (".." , "%012x" % uuid .getnode ()))
2129INTERVAL_MS = 16
30+ freq = 1000 / INTERVAL_MS
31+
32+ BUFFER_SIZE = 5 * int (round (freq , 0 ))
33+
34+
35+ class MockSerial :
36+ class Serial :
37+ def __init__ (self , * args , ** kwargs ):
38+ pass
39+
40+ def write (self , data ):
41+ pass
42+
43+ def close (self ):
44+ pass
45+
46+
47+ class Sensor :
48+ def __init__ (self , mock ):
49+ if mock :
50+ self .setup_mock ()
51+ else :
52+ self .setup_real ()
53+
54+ def setup_mock (self ):
55+ print ("Running on macOS: Mocking hardware-specific libraries." )
56+
57+ class MockI2C :
58+ pass
59+
60+ class MockADS :
61+ P0 = 0
62+ P1 = 1
63+
64+ class ADS1115 :
65+ data_rate = 860
66+
67+ def __init__ (self , * args , ** kwargs ):
68+ pass
69+
70+ class MockAnalogIn :
71+ def __init__ (self , * args , ** kwargs ):
72+ self .t = 0 # time variable
73+ self .fs = 1000 / DataHandler .INTERVAL_MS # sampling frequency
74+
75+ @property
76+ def value (self ):
77+ A = 1024 # arbitrary amplitude for the sine wave
78+ f = 1 # 1 Hz for 60 BPM
79+ val = A * math .sin (2 * math .pi * f * self .t / self .fs )
80+ self .t += 1
81+ if self .t >= self .fs :
82+ self .t = 0 # reset the timer after one second
83+ return val
84+
85+ self .i2c = MockI2C ()
86+ self .ADS = MockADS
87+ self .AnalogIn = MockAnalogIn
88+ self .serial = MockSerial .Serial ()
89+
90+ def setup_real (self ):
91+ import busio
92+ import board
93+ import adafruit_ads1x15 .ads1115 as ADS
94+ from adafruit_ads1x15 .analog_in import AnalogIn
95+ import serial
96+
97+ self .i2c = busio .I2C (board .SCL , board .SDA )
98+ self .ADS = ADS
99+ self .AnalogIn = AnalogIn
100+ self .serial = serial .Serial (self .detect_serial_port (), 115200 )
101+
102+ def try_handshake (self , port ):
103+ """Try to handshake with a device on a given serial port."""
104+ try :
105+ with serial .Serial (port , 115200 , timeout = 1 ) as ser :
106+ ser .write (b"IDENTIFY\n " ) # Asking the device to identify itself
107+ response = ser .readline ().decode ("utf-8" ).strip ()
108+ return (
109+ response == "ECG_PPG_SENSOR"
110+ ) # Assuming this is the response you expect
111+ except :
112+ return False
113+
114+ def detect_serial_port (self ):
115+ """Detect the correct serial port by trying a handshake."""
116+ potential_ports = [
117+ "/dev/ttyAMA0" ,
118+ "/dev/ttyUSB0" ,
119+ "/dev/ttyUSB1" ,
120+ "/dev/ttyACM0" ,
121+ ]
122+ for port in potential_ports :
123+ if os .path .exists (port ) and self .try_handshake (port ):
124+ return port
125+ logging .error ("No suitable serial port found." )
126+ raise Exception ("No suitable serial port found." )
127+
128+
129+ class DataHandler :
130+ emptySignature = "" .join (["0" ] * 64 )
131+ device_name = ":" .join (re .findall (".." , "%012x" % uuid .getnode ()))
132+ INTERVAL_MS = 16
133+
134+ def __init__ (self , AnalogIn , ads = None , ecg_pin = None , ppg_pin = None ):
135+ if ads and ecg_pin and ppg_pin :
136+ self .ecg_channel = AnalogIn (ads , ecg_pin )
137+ self .ppg_channel = AnalogIn (ads , ppg_pin )
138+ else :
139+ self .ecg_channel = None
140+ self .ppg_channel = None
141+
142+ def plot_data (self , data ):
143+ # Use classic style for ECG-like appearance
144+ plt .style .use ("classic" )
145+
146+ ecg_values = [x [0 ] for x in data ]
147+ ppg_values = [x [1 ] for x in data ]
148+
149+ fig , axs = plt .subplots (2 , 1 , figsize = (10 , 6 ))
150+
151+ # Mock BPM and HRV values for demonstration
152+ mock_bpm = 60
153+ mock_hrv = 50
154+
155+ # Display BPM and HRV above the graph
156+ fig .suptitle (f"BPM: { mock_bpm } HRV: { mock_hrv } ms" , fontsize = 14 )
157+
158+ # Plot ECG data
159+ axs [0 ].plot (ecg_values , label = "ECG" , color = "lime" )
160+ axs [0 ].legend ()
161+ axs [0 ].set_title ("ECG Data" )
162+ axs [0 ].grid (True , which = "both" , linestyle = "--" , linewidth = 0.5 )
163+ axs [0 ].set_facecolor ("white" )
164+
165+ # Plot PPG data
166+ axs [1 ].plot (ppg_values , color = "red" , label = "PPG" )
167+ axs [1 ].legend ()
168+ axs [1 ].set_title ("PPG Data" )
169+ axs [1 ].grid (True , which = "both" , linestyle = "--" , linewidth = 0.5 )
170+ axs [1 ].set_facecolor ("white" )
171+
172+ # Display the plot
173+ plt .tight_layout ()
174+ plt .show ()
175+
176+ def send_to_edge_impulse (self , values_list ):
177+ if self .INTERVAL_MS <= 0 :
178+ raise Exception ("Interval in milliseconds cannot be equal or lower than 0." )
179+
180+ data = {
181+ "protected" : {"ver" : "v1" , "alg" : "HS256" , "iat" : time .time ()},
182+ "signature" : self .emptySignature ,
183+ "payload" : {
184+ "device_name" : self .device_name ,
185+ "device_type" : "LINUX_TEST" ,
186+ "interval_ms" : self .INTERVAL_MS ,
187+ "sensors" : [
188+ {"name" : "ECG" , "units" : "mV" },
189+ {"name" : "PPG" , "units" : "arb. units" },
190+ ],
191+ "values" : values_list ,
192+ },
193+ }
194+
195+ encoded = json .dumps (data )
196+ signature = hmac .new (
197+ bytes (HMAC_KEY , "utf-8" ),
198+ msg = encoded .encode ("utf-8" ),
199+ digestmod = hashlib .sha256 ,
200+ ).hexdigest ()
201+ data ["signature" ] = signature
202+ encoded = json .dumps (data )
203+
204+ res = requests .post (
205+ url = "https://ingestion.edgeimpulse.com/api/training/data" ,
206+ data = encoded ,
207+ headers = {
208+ "Content-Type" : "application/json" ,
209+ "x-file-name" : "idle" ,
210+ "x-api-key" : API_KEY ,
211+ },
212+ )
213+
214+ if res .status_code == 200 :
215+ print ("Uploaded file to Edge Impulse" , res .status_code , res .content )
216+ else :
217+ print ("Failed to upload file to Edge Impulse" , res .status_code , res .content )
218+
219+
220+ def is_raspberry_pi ():
221+ """Return True if we are running on a Raspberry Pi."""
222+ try :
223+ with open ("/proc/cpuinfo" , "r" ) as f :
224+ for line in f :
225+ if line .startswith ("Hardware" ):
226+ if "BCM" in line :
227+ return True
228+ except :
229+ pass
230+ return False
231+
232+
233+ if __name__ == "__main__" :
234+
235+ def run_sensor (mock ):
236+ if mock :
237+ sensor = Sensor (mock = True )
238+ handler = DataHandler (
239+ sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1
240+ )
241+ else :
242+ sensor = Sensor (mock = False )
243+ handler = DataHandler (
244+ sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1
245+ )
246+
247+ return sensor , handler
248+
249+ def on_quit ():
250+ root .quit ()
251+ root .destroy ()
252+
253+ # Create main window
254+ root = tk .Tk ()
255+ root .title ("Sensor Data" )
256+ root .geometry ("300x200" )
257+ root .configure (bg = "black" )
258+
259+ # Add buttons
260+ btn_mock = tk .Button (
261+ root ,
262+ text = "Use Mock Data" ,
263+ command = lambda : run_sensor (True ),
264+ bg = "white" ,
265+ fg = "black" ,
266+ )
267+ btn_real = tk .Button (
268+ root ,
269+ text = "Use Real Data" ,
270+ command = lambda : run_sensor (False ),
271+ bg = "white" ,
272+ fg = "black" ,
273+ )
274+ btn_quit = tk .Button (root , text = "Quit" , command = on_quit , bg = "red" , fg = "white" )
275+
276+ btn_mock .pack (pady = 20 )
277+ btn_real .pack (pady = 20 )
278+ btn_quit .pack (pady = 20 )
279+
280+ root .mainloop ()
281+ print ("Collecting data..." )
282+ if is_raspberry_pi ():
283+ sensor , handler = run_sensor (False )
284+ else :
285+ sensor , handler = run_sensor (True )
286+
287+ if (
288+ IS_MACOS or True
289+ ): # This ensures mock data is always chosen regardless of the platform
290+ handler = DataHandler (sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1 )
291+ else :
292+ handler = DataHandler (None )
293+ values_list = [
294+ [math .sin (i * 0.1 ) * 10 , math .cos (i * 0.1 ) * 10 ]
295+ for i in range (2 * int (1000 / DataHandler .INTERVAL_MS ))
296+ ]
297+
298+ last_serial_write_time = time .time ()
299+ collected_data = []
300+
301+ logging .info ("Starting the data collection loop..." )
302+
303+ try :
304+ while True :
305+ if handler .ecg_channel and handler .ppg_channel :
306+ ecg_val = handler .ecg_channel .value
307+ ppg_val = handler .ppg_channel .value
308+ logging .debug (f"ECG: { ecg_val } , PPG: { ppg_val } " ) # Log the collected values
309+
310+ collected_data .append ([ecg_val , ppg_val ])
22311
23- if INTERVAL_MS <= 0 :
24- raise Exception ("Interval in miliseconds cannot be equal or lower than 0." )
25-
26- # here we'll collect 2 seconds of data at a frequency defined by interval_ms
27- freq = 1000 / INTERVAL_MS
28- values_list = []
29- for i in range (2 * int (round (freq ,0 ))):
30- values_list .append ([math .sin (i * 0.1 ) * 10 ,
31- math .cos (i * 0.1 ) * 10 ,
32- (math .sin (i * 0.1 ) + math .cos (i * 0.1 )) * 10 ])
33-
34- data = {
35- "protected" : {
36- "ver" : "v1" ,
37- "alg" : "HS256" ,
38- "iat" : time .time () # epoch time, seconds since 1970
39- },
40- "signature" : emptySignature ,
41- "payload" : {
42- "device_name" : device_name ,
43- "device_type" : "LINUX_TEST" ,
44- "interval_ms" : INTERVAL_MS ,
45- "sensors" : [
46- { "name" : "accX" , "units" : "m/s2" },
47- { "name" : "accY" , "units" : "m/s2" },
48- { "name" : "accZ" , "units" : "m/s2" }
49- ],
50- "values" : values_list
51- }
52- }
53-
54-
55-
56- # encode in JSON
57- encoded = json .dumps (data )
58-
59- # sign message
60- signature = hmac .new (bytes (HMAC_KEY , 'utf-8' ), msg = encoded .encode ('utf-8' ), digestmod = hashlib .sha256 ).hexdigest ()
61-
62- # set the signature again in the message, and encode again
63- data ['signature' ] = signature
64- encoded = json .dumps (data )
65-
66- # and upload the file
67- res = requests .post (url = 'https://ingestion.edgeimpulse.com/api/training/data' ,
68- data = encoded ,
69- headers = {
70- 'Content-Type' : 'application/json' ,
71- 'x-file-name' : 'idle01' ,
72- 'x-api-key' : API_KEY
73- })
74- if (res .status_code == 200 ):
75- print ('Uploaded file to Edge Impulse' , res .status_code , res .content )
76- else :
77- print ('Failed to upload file to Edge Impulse' , res .status_code , res .content )
312+ if len (collected_data ) >= BUFFER_SIZE :
313+ logging .info (f"Buffer size reached { BUFFER_SIZE } . Sending data..." )
314+ handler .send_to_edge_impulse (collected_data )
315+ handler .plot_data (collected_data ) # This will plot the data.
316+ collected_data = []
317+ else :
318+ logging .warning (
319+ "ECG and PPG channels not available. Waiting for 1 second..."
320+ )
321+ time .sleep (1 )
322+ except Exception as e :
323+ logging .error (f"Error occurred in the loop: { e } " )
324+ except KeyboardInterrupt :
325+ logging .info ("Terminated by user" )
326+ sensor .serial .close ()
0 commit comments