In [1]:
# Collect Telemetry data
import netifaces
import uuid
import psutil

def get_ip_address(interface_name='eth0:1'):
    try:
        addresses = netifaces.ifaddresses(interface_name)
        ip_address = addresses[netifaces.AF_INET][0]['addr']
        return ip_address
    except Exception as e:
        return f"Error retrieving IP: {e}"


def get_mac_address():
	mac = uuid.getnode()
	mac_address = ':'.join(('%012X' % mac)[i:i + 2] for i in range(0, 12, 2))
	return mac_address


def collect_telemetry_data():
	return {
		"cpu_usage": psutil.cpu_percent(interval=1),
		"ip_address": get_ip_address(),
		"mac_address": get_mac_address(),
		"processes_count": len(psutil.pids()),
		"swap_memory_usage": psutil.swap_memory().percent,
		"ram_usage": psutil.virtual_memory().percent,
		"avg_load": psutil.getloadavg(),
		"boot_time": psutil.boot_time(),
		"battery_level": psutil.sensors_battery().percent if psutil.sensors_battery() else "No Battery"
	}

In [2]:
#Collect Healtcare data

from pynq.overlays.base import BaseOverlay
from pynq.lib import Button, LED
import time
import random

base = BaseOverlay("base.bit")

alarm_active = False
led_num = 0

def button_idle():
    global alarm_active
    
    if base.buttons[0].read():
        alarm_active = True
        base.leds[led_num].on()
        
    if base.buttons[1].read():
        reset_alarm()
        alarm_active = False
        base.leds[led_num].off()
        
def collect_healthcare_data():
	return {
        "heart_rate": random.randint(80, 140),
        "spo2": random.randint(92, 98),
		"alarm": alarm_active
	}

In [3]:
#Collect Ambiental data: ligth
from pynq.lib.arduino.arduino_analog import Arduino_Analog

pin_analog = Arduino_Analog(base.ARDUINO, [0])#Analog Pin A0

def get_ligth_data():
    return float(pin_analog.read())

In [4]:
#Collect Ambiental data: gas

RL = 10.0 # Load resistance (RL) in kilo-ohms; typically 10 kΩ
sensors = {
    "MQ2": {"pin": 1, "R0": 10.0, "a": 1, "b": -1},#Pin A1
    "MQ9": {"pin": 2, "R0": 10.0, "a": 1, "b": -1},#Pin A2
    "MQ135": {"pin": 3, "R0": 10.0, "a": 1, "b": -1}#Pin A3
}

mq_analog = Arduino_Analog(base.ARDUINO, [sensor["pin"] for sensor in sensors.values()])

def adc_to_ppm(adc_value, r0, a, b):
    maxVoltage = 3.33
    voltage = adc_value * (maxVoltage / 1023.0)  # Convert ADC value to voltage
    Rs = RL * (maxVoltage - voltage) / voltage
    ratio = Rs / r0

    # Estimate gas concentration using the sensor's characteristic curve
    ppm = a * (ratio ** b)
    #print(f"Voltage (V) = {adc_value} * ({maxVoltage} / 1023) = {voltage:.4f} V")
    #print(f"Sensor Resistance (Rs) = {RL} * (({maxVoltage} - {voltage:.4f}) / {voltage:.4f}) = {Rs:.4f} kΩ")
    #print(f"Ratio (Rs/R0) = {Rs:.4f} / {r0} = {ratio:.4f}")
    #print(f"Gas Concentration (ppm) = {a} * ({ratio:.4f} ^ {b}) = {ppm:.2f} ppm")
    
    return ppm 

def read_gas_sensors():
    result = {}
    #for (sensor_name, sensor_config), adc_value in zip(sensors.items(), mq_analog.read(out_format='raw')):
    for (sensor_name, sensor_config), adc_value in zip(sensors.items(), mq_analog.read()):
        #print(f"ADC_value: {adc_value}")
        if adc_value == 0:
            print(f"Error: {sensor_name} returned 0")
            result[sensor_name] = float('inf')
            continue
            
        result[sensor_name] = adc_to_ppm(adc_value, sensor_config["R0"], sensor_config["a"], sensor_config["b"])
    
    return result

In [5]:
#Collect Ambiental data

def collect_ambiental_data():
    result = read_gas_sensors()
    return {
        "temperature": random.randint(10, 30),
        "humidity": random.randint(60, 80),
        "light_level": get_ligth_data(),
        'gas_smoke': float(result['MQ2']),
        'CO_CH4_GPL': float(result['MQ9']),
        'NH3_NitrogenOxides': float(result['MQ135'])
    }

In [6]:
# MQTT pub
from tb_device_mqtt import TBDeviceMqttClient, TBPublishInfo
import time
THINGSBOARD_HOST = '192.168.2.2'
ACCESS_TOKEN = 'pg4iiosxgbkonzwpxpfb'

last_time = time.time()
TELEMETRY_INTERVAL = 5  # Seconds

def mqtt_pub_idle(client: TBDeviceMqttClient):
    global last_time
    current_time = time.time()
    if current_time - last_time >= TELEMETRY_INTERVAL:
        data = collect_telemetry_data()
        data.update(collect_healthcare_data())
        data.update(collect_ambiental_data())
        
        result = client.send_telemetry(data)

        success = result.get() == TBPublishInfo.TB_ERR_SUCCESS
        if not success:
            print("Failed to send data.")
            
        last_time = current_time

In [None]:
# Idle

client = TBDeviceMqttClient(THINGSBOARD_HOST, username=ACCESS_TOKEN)
client.connect()
  
try:
    while True:
        button_idle()
        mqtt_pub_idle(client)
        time.sleep(0.01)
except KeyboardInterrupt:
    print("Exiting...")
finally:
    client.disconnect()

In [None]:
# Attemp DHT11 sensor

# -----------------------------------------------
# DHT11 Sensor Communication Timing Issue
# -----------------------------------------------
# The DHT11 sensor uses a single-wire bi-directional protocol, 
# which requires precise timing at the microsecond level.
#
# Problem: Too slow...
# -----------------------------------------------
# Timing Measurements:
# - Time to write(0), write(1), release(): 19.396 ms
# - Time to set pin as read: 2.199 ms
# - Time to read() = 0: 0.490 ms
# - Time to wait for response: 0.011 ms
#
# Solution:
# -----------------------------------------------
# - Implement RTL code for direct sensor communication.
# - Consider using the Microblaze soft processor for handling timing-critical operations.
# -----------------------------------------------

from pynq import GPIO, Overlay
import pynq
import time

TIMEOUT_DURATION = 1000  #1s in ms

def send_start_signal(dht11_pin):
    dht11_pin.write(0)  # Pull down for 18 ms
    time.sleep(0.018)
    dht11_pin.write(1)  # Pull up for 40 µs
    dht11_pin.release()
    #print("Start signal sent.")
    #time.sleep(0.00004)
    

def read_byte(dht11_pin):
    value = 0

    for i in range(8):
        while dht11_pin.read() == 0:# Wait for the pin to go HIGH
            pass  

        time.sleep(0.00003)

        if dht11_pin.read() == 1:
            value |= (1 << (7 - i))  # Set the bit if the pin is HIGH

        while dht11_pin.read() == 1: # Wait for the pin to go LOW
            pass  

    return value

def read_data(pin_number):
    data = []
    
    tmp = GPIO(pin_number, 'out')
    timeout_start1 = time.time() * 1000
    send_start_signal(tmp)
    timeout_start2 = time.time() * 1000
    
    dht11_pin = GPIO(pin_number, 'in')
    timeout_start3 = time.time() * 1000
    
    res = dht11_pin.read()#Test first read after pin setting
    timeout_start4 = time.time() * 1000
    
    #print("Waiting for sensor response...")
    timeout_start = time.time() * 1000
    while dht11_pin.read() == 1:
        if (time.time() * 1000 - timeout_start) > TIMEOUT_DURATION:
            print("Timeout: 253")
            return 253, None;

    print(f"Time to: write(0), write(1), release(): {timeout_start2 - timeout_start1} ms")
    print(f"Time to: set pin read: {timeout_start3 - timeout_start2} ms")
    print(f"Time to: read() = {res}: {timeout_start4 - timeout_start3} ms")
    print(f"Time to wait response: {timeout_start-timeout_start4} ms")
    #print(f"Sensor response detected in {time.time() * 1000 - timeout_start} ms. Reading data...")
    
    data = []
    if (dht11_pin.read() == 0):
        timeout_start = time.time() * 1000
        #time.sleep(0.00008)
        #here
        while dht11_pin.read() == 0:
            if (time.time() * 1000 - timeout_start) > TIMEOUT_DURATION:
                print(f"Timeout: {time.time() * 1000 - timeout_start}")
                return 253, None;

        if (dht11_pin.read() == 1):
            time.sleep(0.00008)
            for i in range(5):
                data.append(readByte());
                if data[i] == 253:
                    dht11_pin.release()
                    print("Timeout: 253")
                    return 253, None;

            dht11_pin.release()
            if (data[4] == ((data[0] + data[1] + data[2] + data[3]) & 0xFF)):
                return 0, data;
            else:
                prin("Checksum: 254")
                return 254, None;
        
        else:
            print("Second if")
    dht11_pin.release()
    print("Timeout2: 253")
    return 253, None
    
# Function to parse the received data
def parse_data(data):
    humidity_int = int(''.join(map(str, data[0:8])), 2)
    humidity_dec = int(''.join(map(str, data[8:16])), 2)
    temp_int = int(''.join(map(str, data[16:24])), 2)
    temp_dec = int(''.join(map(str, data[24:32])), 2)
    checksum = int(''.join(map(str, data[32:40])), 2)

    if (humidity_int + humidity_dec + temp_int + temp_dec) & 0xFF == checksum:
        temperature = temp_int + (temp_dec / 10.0)
        humidity = humidity_int + (humidity_dec / 10.0)
        print(f"Temperature: {temperature}°C, Humidity: {humidity}%")
    else:
        print("Checksum mismatch. Data may be corrupted.")


status, data = read_data(GPIO.get_gpio_pin(26))
if status == 0:
    parse_data(data)
else:
    s = "Error, "
    if status == 253:
        s += "Timeout"
    elif status == 254:
        s += "Checksum"
    else:
        s += "Unknown"
    print(s)