1- import os
2- import time
3- import numpy as np
4- import re
5- import uuid
1+ # First, install the dependencies via:
2+ # $ pip3 install requests
3+
4+ import json
5+ import time , hmac , hashlib
66import requests
7- import hmac
7+ import re , uuid
88import math
9- import hashlib
10- import json
11- import tkinter as tk
12- from tkinter import messagebox
13- import matplotlib .pyplot as plt
14- import logging
15-
16- logging .basicConfig (
17- filename = "sensor_app.log" ,
18- level = logging .DEBUG ,
19- format = "%(asctime)s - %(levelname)s - %(message)s" ,
20- )
21- logging .getLogger ().addHandler (logging .StreamHandler ()) # To also print to console
22-
23-
24- IS_MACOS = os .uname ().sysname == "Darwin"
25- HMAC_KEY = "54598668df4fb5ed5d55247b38dd55a3"
26- API_KEY = "ei_6be3855c55f764ecec5834853c4fd051c684ef4e1cd26e6a074dfeede5636c64"
27- emptySignature = "" .join (["0" ] * 64 )
28- device_name = ":" .join (re .findall (".." , "%012x" % uuid .getnode ()))
29- INTERVAL_MS = 16
30- freq = 1000 / INTERVAL_MS
31-
32- BUFFER_SIZE = 5 * int (round (freq , 0 ))
33-
34-
35- class MockSerial :
36- class Serial :
37- def __init__ (self , * args , ** kwargs ):
38- pass
39-
40- def write (self , data ):
41- pass
42-
43- def close (self ):
44- pass
45-
46-
47- class Sensor :
48- def __init__ (self , mock ):
49- if mock :
50- self .setup_mock ()
51- else :
52- self .setup_real ()
53-
54- def setup_mock (self ):
55- print ("Running on macOS: Mocking hardware-specific libraries." )
56-
57- class MockI2C :
58- pass
59-
60- class MockADS :
61- P0 = 0
62- P1 = 1
63-
64- class ADS1115 :
65- data_rate = 860
66-
67- def __init__ (self , * args , ** kwargs ):
68- pass
69-
70- class MockAnalogIn :
71- def __init__ (self , * args , ** kwargs ):
72- self .t = 0 # time variable
73- self .fs = 1000 / DataHandler .INTERVAL_MS # sampling frequency
74-
75- @property
76- def value (self ):
77- A = 1024 # arbitrary amplitude for the sine wave
78- f = 1 # 1 Hz for 60 BPM
79- val = A * math .sin (2 * math .pi * f * self .t / self .fs )
80- self .t += 1
81- if self .t >= self .fs :
82- self .t = 0 # reset the timer after one second
83- return val
84-
85- self .i2c = MockI2C ()
86- self .ADS = MockADS
87- self .AnalogIn = MockAnalogIn
88- self .serial = MockSerial .Serial ()
89-
90- def setup_real (self ):
91- import busio
92- import board
93- import adafruit_ads1x15 .ads1115 as ADS
94- from adafruit_ads1x15 .analog_in import AnalogIn
95- import serial
96-
97- self .i2c = busio .I2C (board .SCL , board .SDA )
98- self .ADS = ADS
99- self .AnalogIn = AnalogIn
100- self .serial = serial .Serial (self .detect_serial_port (), 115200 )
101-
102- def try_handshake (self , port ):
103- """Try to handshake with a device on a given serial port."""
104- try :
105- with serial .Serial (port , 115200 , timeout = 1 ) as ser :
106- ser .write (b"IDENTIFY\n " ) # Asking the device to identify itself
107- response = ser .readline ().decode ("utf-8" ).strip ()
108- return (
109- response == "ECG_PPG_SENSOR"
110- ) # Assuming this is the response you expect
111- except :
112- return False
113-
114- def detect_serial_port (self ):
115- """Detect the correct serial port by trying a handshake."""
116- potential_ports = [
117- "/dev/ttyAMA0" ,
118- "/dev/ttyUSB0" ,
119- "/dev/ttyUSB1" ,
120- "/dev/ttyACM0" ,
121- ]
122- for port in potential_ports :
123- if os .path .exists (port ) and self .try_handshake (port ):
124- return port
125- logging .error ("No suitable serial port found." )
126- raise Exception ("No suitable serial port found." )
127-
128-
129- class DataHandler :
130- emptySignature = "" .join (["0" ] * 64 )
131- device_name = ":" .join (re .findall (".." , "%012x" % uuid .getnode ()))
132- INTERVAL_MS = 16
1339
134- def __init__ (self , AnalogIn , ads = None , ecg_pin = None , ppg_pin = None ):
135- if ads and ecg_pin and ppg_pin :
136- self .ecg_channel = AnalogIn (ads , ecg_pin )
137- self .ppg_channel = AnalogIn (ads , ppg_pin )
138- else :
139- self .ecg_channel = None
140- self .ppg_channel = None
10+ # Your API & HMAC keys can be found here (go to your project > Dashboard > Keys to find this)
11+ HMAC_KEY = "fed53116f20684c067774ebf9e7bcbdc"
12+ API_KEY = "ei_fd83..."
14113
142- def plot_data (self , data ):
143- # Use classic style for ECG-like appearance
144- plt .style .use ("classic" )
14+ # empty signature (all zeros). HS256 gives 32 byte signature, and we encode in hex, so we need 64 characters here
15+ emptySignature = '' .join (['0' ] * 64 )
14516
146- ecg_values = [ x [ 0 ] for x in data ]
147- ppg_values = [ x [ 1 ] for x in data ]
17+ # use MAC address of network interface as deviceId
18+ device_name = ":" . join ( re . findall ( '..' , '%012x' % uuid . getnode ()))
14819
149- fig , axs = plt .subplots (2 , 1 , figsize = (10 , 6 ))
150-
151- # Mock BPM and HRV values for demonstration
152- mock_bpm = 60
153- mock_hrv = 50
154-
155- # Display BPM and HRV above the graph
156- fig .suptitle (f"BPM: { mock_bpm } HRV: { mock_hrv } ms" , fontsize = 14 )
157-
158- # Plot ECG data
159- axs [0 ].plot (ecg_values , label = "ECG" , color = "lime" )
160- axs [0 ].legend ()
161- axs [0 ].set_title ("ECG Data" )
162- axs [0 ].grid (True , which = "both" , linestyle = "--" , linewidth = 0.5 )
163- axs [0 ].set_facecolor ("white" )
164-
165- # Plot PPG data
166- axs [1 ].plot (ppg_values , color = "red" , label = "PPG" )
167- axs [1 ].legend ()
168- axs [1 ].set_title ("PPG Data" )
169- axs [1 ].grid (True , which = "both" , linestyle = "--" , linewidth = 0.5 )
170- axs [1 ].set_facecolor ("white" )
171-
172- # Display the plot
173- plt .tight_layout ()
174- plt .show ()
175-
176- def send_to_edge_impulse (self , values_list ):
177- if self .INTERVAL_MS <= 0 :
178- raise Exception ("Interval in milliseconds cannot be equal or lower than 0." )
179-
180- data = {
181- "protected" : {"ver" : "v1" , "alg" : "HS256" , "iat" : time .time ()},
182- "signature" : self .emptySignature ,
183- "payload" : {
184- "device_name" : self .device_name ,
185- "device_type" : "LINUX_TEST" ,
186- "interval_ms" : self .INTERVAL_MS ,
187- "sensors" : [
188- {"name" : "ECG" , "units" : "mV" },
189- {"name" : "PPG" , "units" : "arb. units" },
190- ],
191- "values" : values_list ,
192- },
193- }
194-
195- encoded = json .dumps (data )
196- signature = hmac .new (
197- bytes (HMAC_KEY , "utf-8" ),
198- msg = encoded .encode ("utf-8" ),
199- digestmod = hashlib .sha256 ,
200- ).hexdigest ()
201- data ["signature" ] = signature
202- encoded = json .dumps (data )
203-
204- res = requests .post (
205- url = "https://ingestion.edgeimpulse.com/api/training/data" ,
206- data = encoded ,
207- headers = {
208- "Content-Type" : "application/json" ,
209- "x-file-name" : "idle" ,
210- "x-api-key" : API_KEY ,
211- },
212- )
213-
214- if res .status_code == 200 :
215- print ("Uploaded file to Edge Impulse" , res .status_code , res .content )
216- else :
217- print ("Failed to upload file to Edge Impulse" , res .status_code , res .content )
218-
219-
220- def is_raspberry_pi ():
221- """Return True if we are running on a Raspberry Pi."""
222- try :
223- with open ("/proc/cpuinfo" , "r" ) as f :
224- for line in f :
225- if line .startswith ("Hardware" ):
226- if "BCM" in line :
227- return True
228- except :
229- pass
230- return False
231-
232-
233- if __name__ == "__main__" :
234-
235- def run_sensor (mock ):
236- if mock :
237- sensor = Sensor (mock = True )
238- handler = DataHandler (
239- sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1
240- )
241- else :
242- sensor = Sensor (mock = False )
243- handler = DataHandler (
244- sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1
245- )
246-
247- return sensor , handler
248-
249- def on_quit ():
250- root .quit ()
251- root .destroy ()
252-
253- # Create main window
254- root = tk .Tk ()
255- root .title ("Sensor Data" )
256- root .geometry ("300x200" )
257- root .configure (bg = "black" )
258-
259- # Add buttons
260- btn_mock = tk .Button (
261- root ,
262- text = "Use Mock Data" ,
263- command = lambda : run_sensor (True ),
264- bg = "white" ,
265- fg = "black" ,
266- )
267- btn_real = tk .Button (
268- root ,
269- text = "Use Real Data" ,
270- command = lambda : run_sensor (False ),
271- bg = "white" ,
272- fg = "black" ,
273- )
274- btn_quit = tk .Button (root , text = "Quit" , command = on_quit , bg = "red" , fg = "white" )
275-
276- btn_mock .pack (pady = 20 )
277- btn_real .pack (pady = 20 )
278- btn_quit .pack (pady = 20 )
279-
280- root .mainloop ()
281- print ("Collecting data..." )
282- if is_raspberry_pi ():
283- sensor , handler = run_sensor (False )
284- else :
285- sensor , handler = run_sensor (True )
286-
287- if (
288- IS_MACOS or True
289- ): # This ensures mock data is always chosen regardless of the platform
290- handler = DataHandler (sensor .AnalogIn , sensor .ADS , sensor .ADS .P0 , sensor .ADS .P1 )
291- else :
292- handler = DataHandler (None )
293- values_list = [
294- [math .sin (i * 0.1 ) * 10 , math .cos (i * 0.1 ) * 10 ]
295- for i in range (2 * int (1000 / DataHandler .INTERVAL_MS ))
296- ]
297-
298- last_serial_write_time = time .time ()
299- collected_data = []
300-
301- logging .info ("Starting the data collection loop..." )
302-
303- try :
304- while True :
305- if handler .ecg_channel and handler .ppg_channel :
306- ecg_val = handler .ecg_channel .value
307- ppg_val = handler .ppg_channel .value
308- logging .debug (f"ECG: { ecg_val } , PPG: { ppg_val } " ) # Log the collected values
309-
310- collected_data .append ([ecg_val , ppg_val ])
20+ # here we have new data every 16 ms
21+ INTERVAL_MS = 16
31122
312- if len (collected_data ) >= BUFFER_SIZE :
313- logging .info (f"Buffer size reached { BUFFER_SIZE } . Sending data..." )
314- handler .send_to_edge_impulse (collected_data )
315- handler .plot_data (collected_data ) # This will plot the data.
316- collected_data = []
317- else :
318- logging .warning (
319- "ECG and PPG channels not available. Waiting for 1 second..."
320- )
321- time .sleep (1 )
322- except Exception as e :
323- logging .error (f"Error occurred in the loop: { e } " )
324- except KeyboardInterrupt :
325- logging .info ("Terminated by user" )
326- sensor .serial .close ()
23+ if INTERVAL_MS <= 0 :
24+ raise Exception ("Interval in miliseconds cannot be equal or lower than 0." )
25+
26+ # here we'll collect 2 seconds of data at a frequency defined by interval_ms
27+ freq = 1000 / INTERVAL_MS
28+ values_list = []
29+ for i in range (2 * int (round (freq ,0 ))):
30+ values_list .append ([math .sin (i * 0.1 ) * 10 ,
31+ math .cos (i * 0.1 ) * 10 ,
32+ (math .sin (i * 0.1 ) + math .cos (i * 0.1 )) * 10 ])
33+
34+ data = {
35+ "protected" : {
36+ "ver" : "v1" ,
37+ "alg" : "HS256" ,
38+ "iat" : time .time () # epoch time, seconds since 1970
39+ },
40+ "signature" : emptySignature ,
41+ "payload" : {
42+ "device_name" : device_name ,
43+ "device_type" : "LINUX_TEST" ,
44+ "interval_ms" : INTERVAL_MS ,
45+ "sensors" : [
46+ { "name" : "accX" , "units" : "m/s2" },
47+ { "name" : "accY" , "units" : "m/s2" },
48+ { "name" : "accZ" , "units" : "m/s2" }
49+ ],
50+ "values" : values_list
51+ }
52+ }
53+
54+
55+
56+ # encode in JSON
57+ encoded = json .dumps (data )
58+
59+ # sign message
60+ signature = hmac .new (bytes (HMAC_KEY , 'utf-8' ), msg = encoded .encode ('utf-8' ), digestmod = hashlib .sha256 ).hexdigest ()
61+
62+ # set the signature again in the message, and encode again
63+ data ['signature' ] = signature
64+ encoded = json .dumps (data )
65+
66+ # and upload the file
67+ res = requests .post (url = 'https://ingestion.edgeimpulse.com/api/training/data' ,
68+ data = encoded ,
69+ headers = {
70+ 'Content-Type' : 'application/json' ,
71+ 'x-file-name' : 'idle01' ,
72+ 'x-api-key' : API_KEY
73+ })
74+ if (res .status_code == 200 ):
75+ print ('Uploaded file to Edge Impulse' , res .status_code , res .content )
76+ else :
77+ print ('Failed to upload file to Edge Impulse' , res .status_code , res .content )
0 commit comments