# ENS160 Sensor Data Reading using Python

This notebook demonstrates how to read data from the ENS160 sensor using the I2C interface on a Raspberry Pi. The following steps are performed:

1. Open the I2C bus.
2. Check if the ENS160 sensor is present.
3. Read the `DEVICE_STATUS` register to check the sensor status.
4. Read the `OPMODE` register to determine the current operating mode.
5. If new data is available, read the AQI, TVOC, and ECO2 values.

## Mathematical Formulas

We can include mathematical formulas using LaTeX syntax. For example, the formula for the area of a circle is given by:

$$
A = \pi r^2
$$

where:
- \( A \) is the area of the circle.
- \( \pi \) is the mathematical constant Pi.
- \( r \) is the radius of the circle.

Inline formulas can be written like this: \( E = mc^2 \), which represents the famous equation by Albert Einstein.

## Code Explanation

The following code reads the `DEVICE_STATUS` register and checks if new data is available. If new data is available, it reads the AQI, TVOC, and ECO2 values and prints them in a comma-separated format.

In [1]:
print("Hello, World, I'm Emilio!")

Hello, World, I'm Emilio!


## Connection and detection of ENS160 sensor in I2C bus

In [2]:
import smbus2
import time

# I2C bus number
I2C_BUS = 1

# ENS160 I2C address
ENS160_ADDRESS = 0x53

# ENS160 register to check
REGISTER_ADDRESS = 0x00

# Expected value in the register
EXPECTED_VALUE = 0x0160

def read_register(bus, address, register):
    try:
        # Read 2 bytes from the register
        data = bus.read_word_data(address, register)
        return data
    except Exception as e:
        print(f"Error reading register: {e}")
        return None

def main():
    # Open I2C bus
    bus = smbus2.SMBus(I2C_BUS)

    # Check if the device is present
    try:
        bus.read_byte(ENS160_ADDRESS)
        print(f"Device 0x{ENS160_ADDRESS:02X} is present on I2C bus {I2C_BUS}.")
    except Exception as e:
        print(f"Device 0x{ENS160_ADDRESS:02X} not found on I2C bus {I2C_BUS}.")
        return

    # Read the register value
    value = read_register(bus, ENS160_ADDRESS, REGISTER_ADDRESS)
    if value is not None:
        print(f"Register 0x{REGISTER_ADDRESS:02X} contains value 0x{value:04X}.")
        # Check if the value matches the expected value
        if value == EXPECTED_VALUE:
            print("Device is an ENS160 sensor.")
        else:
            print("Device is not an ENS160 sensor.")
    else:
        print("Failed to read the register value.")

    # Close the I2C bus
    bus.close()

if __name__ == "__main__":
    main()

Device 0x53 is present on I2C bus 1.
Register 0x00 contains value 0x0160.
Device is an ENS160 sensor.


## Checks that ENS160 is running and shows current Operation Mode

In [3]:
# Constants for the DEVICE_STATUS register and bit positions
DEVICE_STATUS_REGISTER = 0x20  # Corrected register address
STATUS_BITS_MASK = 0b00001100  # Mask for bits 3 and 2
STATUS_SHIFT = 2               # Number of bits to shift right to get the status

# Function to read the DEVICE_STATUS register and interpret the status
def read_device_status(bus, address):
    try:
        # Read the DEVICE_STATUS register (1 byte)
        status_register_value = bus.read_byte_data(address, DEVICE_STATUS_REGISTER)
        
        # Extract bits 3 and 2
        status_bits = (status_register_value & STATUS_BITS_MASK) >> STATUS_SHIFT
        
        # Interpret the status
        if status_bits == 0:
            status = "Normal operation"
        elif status_bits == 1:
            status = "Warm-up phase"
        elif status_bits == 2:
            status = "Initial startup phase"
        elif status_bits == 3:
            status = "Invalid output"
        else:
            status = "Unknown status"
        
        print(f"Device status: {status}")
    except Exception as e:
        print(f"Error reading DEVICE_STATUS register: {e}")

# Open I2C bus
bus = smbus2.SMBus(I2C_BUS)

# Read and report the device status
read_device_status(bus, ENS160_ADDRESS)

# Close the I2C bus
bus.close()

Device status: Normal operation


In [4]:
# Constants for the OPMODE register and operation modes
OPMODE_REGISTER = 0x10  # Register address for OPMODE
STANDARD_MODE = 0x02    # Standard Gas Sensing Mode

# Function to read the OPMODE register and report the current operation mode
def read_opmode(bus, address):
    try:
        # Read the OPMODE register (1 byte)
        opmode_value = bus.read_byte_data(address, OPMODE_REGISTER)
        
        # Interpret the operation mode
        if opmode_value == 0x00:
            mode = "DEEP SLEEP mode (low-power standby)"
        elif opmode_value == 0x01:
            mode = "IDLE mode (low power)"
        elif opmode_value == 0x02:
            mode = "STANDARD Gas Sensing Mode"
        elif opmode_value == 0xF0:
            mode = "RESET"
        else:
            mode = "Unknown mode"
        
        print(f"Current Operation Mode: {mode} (0x{opmode_value:02X})")
        return opmode_value
    except Exception as e:
        print(f"Error reading OPMODE register: {e}")
        return None

# Function to write the STANDARD mode to the OPMODE register
def write_standard_mode(bus, address):
    try:
        # Write STANDARD mode (0x02) to the OPMODE register
        bus.write_byte_data(address, OPMODE_REGISTER, STANDARD_MODE)
        print("Written STANDARD Gas Sensing Mode (0x02) to OPMODE register.")
    except Exception as e:
        print(f"Error writing to OPMODE register: {e}")

# Open I2C bus
bus = smbus2.SMBus(I2C_BUS)

# Read and report the current operation mode
current_mode = read_opmode(bus, ENS160_ADDRESS)

# If the current mode is not STANDARD, write STANDARD mode and read again to confirm
if current_mode is not None and current_mode != STANDARD_MODE:
    write_standard_mode(bus, ENS160_ADDRESS)
    # Read and report the operation mode again to confirm the change
    read_opmode(bus, ENS160_ADDRESS)

# Close the I2C bus
bus.close()

Current Operation Mode: STANDARD Gas Sensing Mode (0x02)


## Checks if new data is available to read and shows AQI, TVOC, ECO2 available values

In [5]:
# Constants for the DEVICE_STATUS register and bit positions
DEVICE_STATUS_REGISTER = 0x20  # Register address for DEVICE_STATUS
NEWDAT_BIT_MASK = 0b00000010   # Mask for bit 1 (NEWDAT)

# Constants for the data registers
DATA_AQI_REGISTER = 0x21       # Register address for DATA_AQI
DATA_TVOC_REGISTER = 0x22      # Register address for DATA_TVOC
DATA_ECO2_REGISTER = 0x24      # Register address for DATA_ECO2

# Function to read the DEVICE_STATUS register and check the NEWDAT bit
def check_new_data(bus, address):
    try:
        # Read the DEVICE_STATUS register (1 byte)
        status_register_value = bus.read_byte_data(address, DEVICE_STATUS_REGISTER)
        
        # Check if the NEWDAT bit (bit 1) is set
        newdat = (status_register_value & NEWDAT_BIT_MASK) >> 1
        
        if newdat == 1:
            print("New data is available in the DATA_x registers.")
            return True
        else:
            print("No new data available.")
            return False
    except Exception as e:
        print(f"Error reading DEVICE_STATUS register: {e}")
        return False

# Function to read AQI, TVOC, and ECO2 values
def read_sensor_data(bus, address):
    try:
        # Read AQI value
        aqi_value = bus.read_byte_data(address, DATA_AQI_REGISTER)
        print(f"AQI: {aqi_value & 0b00000111}")  # Extract AQI_UBA (bits 0:2)
        
        # Read TVOC value (2 bytes)
        tvoc_lsb = bus.read_byte_data(address, DATA_TVOC_REGISTER)
        tvoc_msb = bus.read_byte_data(address, DATA_TVOC_REGISTER + 1)
        tvoc_value = (tvoc_msb << 8) | tvoc_lsb
        print(f"TVOC: {tvoc_value} ppb")
        
        # Read ECO2 value (2 bytes)
        eco2_lsb = bus.read_byte_data(address, DATA_ECO2_REGISTER)
        eco2_msb = bus.read_byte_data(address, DATA_ECO2_REGISTER + 1)
        eco2_value = (eco2_msb << 8) | eco2_lsb
        print(f"eCO2: {eco2_value} ppm")
    except Exception as e:
        print(f"Error reading sensor data: {e}")

# Open I2C bus
bus = smbus2.SMBus(I2C_BUS)

# Check if new data is available and read sensor data if it is
if check_new_data(bus, ENS160_ADDRESS):
    read_sensor_data(bus, ENS160_ADDRESS)

# Close the I2C bus
bus.close()

New data is available in the DATA_x registers.
AQI: 3
TVOC: 376 ppb
eCO2: 827 ppm


## Performs a number of read operations and output the values to the exit file sensor_out.csv
Check output file and wait until the operation finishes

In [6]:
import smbus2
import time
import csv
from datetime import datetime

# Constants for the DEVICE_STATUS register and bit positions
DEVICE_STATUS_REGISTER = 0x20  # Register address for DEVICE_STATUS
NEWDAT_BIT_MASK = 0b00000010   # Mask for bit 1 (NEWDAT)

# Constants for the data registers
DATA_AQI_REGISTER = 0x21       # Register address for DATA_AQI
DATA_TVOC_REGISTER = 0x22      # Register address for DATA_TVOC
DATA_ECO2_REGISTER = 0x24      # Register address for DATA_ECO2

# Function to read the DEVICE_STATUS register and check the NEWDAT bit
def check_new_data(bus, address):
    try:
        # Read the DEVICE_STATUS register (1 byte)
        status_register_value = bus.read_byte_data(address, DEVICE_STATUS_REGISTER)
        
        # Check if the NEWDAT bit (bit 1) is set
        newdat = (status_register_value & NEWDAT_BIT_MASK) >> 1
        
        if newdat == 1:
            return True
        else:
            return False
    except Exception as e:
        print(f"Error reading DEVICE_STATUS register: {e}")
        return False

# Function to read AQI, TVOC, and ECO2 values
def read_sensor_data(bus, address):
    try:
        # Read AQI value
        aqi_value = bus.read_byte_data(address, DATA_AQI_REGISTER) & 0b00000111  # Extract AQI_UBA (bits 0:2)
        
        # Read TVOC value (2 bytes)
        tvoc_lsb = bus.read_byte_data(address, DATA_TVOC_REGISTER)
        tvoc_msb = bus.read_byte_data(address, DATA_TVOC_REGISTER + 1)
        tvoc_value = (tvoc_msb << 8) | tvoc_lsb
        
        # Read ECO2 value (2 bytes)
        eco2_lsb = bus.read_byte_data(address, DATA_ECO2_REGISTER)
        eco2_msb = bus.read_byte_data(address, DATA_ECO2_REGISTER + 1)
        eco2_value = (eco2_msb << 8) | eco2_lsb
        
        return aqi_value, tvoc_value, eco2_value
    except Exception as e:
        print(f"Error reading sensor data: {e}")
        return None, None, None

# Open I2C bus
bus = smbus2.SMBus(I2C_BUS)

# Open CSV file for writing (append mode)
with open('sensor_out.csv', mode='a', newline='') as file:
    writer = csv.writer(file)
    
    # Write header if file is empty
    if file.tell() == 0:
        writer.writerow(["DATE - TIME", "AQI", "TVOC ppb", "eCO2 ppm"])

    # Run for 10 iterations
    print("File opened with headers")  # print headers
    for _ in range(2000):
        if check_new_data(bus, ENS160_ADDRESS):
            aqi, tvoc, eco2 = read_sensor_data(bus, ENS160_ADDRESS)
            if aqi is not None and tvoc is not None and eco2 is not None:
                current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                writer.writerow([current_time, aqi, tvoc, eco2])
                file.flush()  # Ensure data is written to the file immediately
        else:
            print("No new data available.")
    
        time.sleep(30)

print("Data output finished. File closed.") 
# Close the I2C bus
bus.close()

File opened with headers


KeyboardInterrupt: 

# Communications: test MQTT Autodiscovery with a dummy sensor
The following is for testing autodiscovery with MQTT and a fake simple sensor sending a random value:

In [30]:
import random
import paho.mqtt.client as mqtt
import time
from datetime import datetime
import json  # Import the JSON module

# MQTT settings
MQTT_BROKER = "192.168.0.189"
MQTT_PORT = 1883
MQTT_TOPIC = "homeassistant/sensor/ens160"
MQTT_DISCOVERY_TOPIC = "homeassistant/sensor/ens160/config"
MQTT_USERNAME = "usermqtt"
MQTT_PASSWORD = "usermqtt"

# Function to handle connection outcome
def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0:
        print("Connected to MQTT Broker!")
        # MQTT auto-discovery payload
        discovery_payload = {
            "name": "ENS160 Sensor",
            "state_topic": MQTT_TOPIC,
            "unit_of_measurement": "F",  # Replace with the appropriate unit (e.g., "ppm", "ppb")
            "value_template": "{{ value_json.value }}",
            "device_class": "temperature",  # Optional: Specify the type of sensor (e.g., "temperature", "humidity")
            "unique_id": "ens160_sensor",  # Unique ID for the sensor
            "availability_topic": f"{MQTT_TOPIC}/availability",  # Optional: Availability topic
            "device": {
                "identifiers": ["ens160_sensor_device"],  # Unique identifier for the device
                "name": "ENS160 Sensor Device",
                "manufacturer": "Your Manufacturer",
                "model": "ENS160"
            }
        }
        # Publish MQTT auto-discovery message
        #client.publish(MQTT_DISCOVERY_TOPIC, str(discovery_payload), retain=True)
        #print(f"Published auto-discovery message: {discovery_payload}")
        client.publish(MQTT_DISCOVERY_TOPIC, json.dumps(discovery_payload), retain=True)
        print(f"Published auto-discovery message: {json.dumps(discovery_payload)}")
    else:
        print(f"Failed to connect, return code {rc}")

# MQTT client setup with version 5.0
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect

# Connect to the MQTT broker
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60, bind_address="", bind_port=0, clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, properties=None)

# Start the MQTT client loop
client.loop_start()

# Function to generate random sensor data
def generate_random_sensor_data():
    aqi = random.randint(0, 5)
    tvoc = random.randint(0, 1000)
    eco2 = random.randint(400, 5000)
    return aqi, tvoc, eco2

# Publish random sensor data to MQTT
for _ in range(4):
    aqi, tvoc, eco2 = generate_random_sensor_data()
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    payload = {
        "time": current_time,
        "aqi": aqi,
        "tvoc": tvoc,
        "eco2": eco2
    }
    
    #client.publish(MQTT_TOPIC, str(payload))
    client.publish(MQTT_TOPIC, json.dumps(payload))
    print(f"Published: {payload}")
    
    time.sleep(3)

# Stop the MQTT client loop and disconnect
client.loop_stop()
client.disconnect()

  client = mqtt.Client(protocol=mqtt.MQTTv5)


Connected to MQTT Broker!
Published auto-discovery message: {"name": "ENS160 Sensor", "state_topic": "homeassistant/sensor/ens160", "unit_of_measurement": "F", "value_template": "{{ value_json.value }}", "device_class": "temperature", "unique_id": "ens160_sensor", "availability_topic": "homeassistant/sensor/ens160/availability", "device": {"identifiers": ["ens160_sensor_device"], "name": "ENS160 Sensor Device", "manufacturer": "Your Manufacturer", "model": "ENS160"}}
Published: {'time': '2025-04-06 18:29:08', 'aqi': 3, 'tvoc': 866, 'eco2': 3106}
Published: {'time': '2025-04-06 18:29:11', 'aqi': 1, 'tvoc': 26, 'eco2': 1530}
Published: {'time': '2025-04-06 18:29:14', 'aqi': 4, 'tvoc': 283, 'eco2': 1449}
Published: {'time': '2025-04-06 18:29:17', 'aqi': 1, 'tvoc': 839, 'eco2': 2575}


<MQTTErrorCode.MQTT_ERR_SUCCESS: 0>

# Example of sensor random emulation published over MQTT
This is a full example sending the three sensor values and a timestamp, using MQTT:

In [29]:
import random
import paho.mqtt.client as mqtt
import time
from datetime import datetime
import json  # Import the JSON module

# MQTT settings
MQTT_BROKER = "192.168.0.189"
MQTT_PORT = 1883
MQTT_TOPIC = "homeassistant/sensor/ens160"
MQTT_USERNAME = "usermqtt"
MQTT_PASSWORD = "usermqtt"

# MQTT client setup with version 5.0
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect

# Connect to the MQTT broker
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60, bind_address="", bind_port=0, clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, properties=None)

# Function to generate random sensor data
def generate_random_sensor_data():
    aqi = random.randint(0, 5)
    tvoc = random.randint(0, 1000)
    eco2 = random.randint(400, 5000)
    return aqi, tvoc, eco2

# Publish random sensor data to MQTT
for _ in range(4):
    aqi, tvoc, eco2 = generate_random_sensor_data()
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    payload = {
        "time": current_time,
        "aqi": aqi,
        "tvoc": tvoc,
        "eco2": eco2
    }
    
    #client.publish(MQTT_TOPIC, str(payload))
    client.publish(MQTT_TOPIC, json.dumps(payload))
    print(f"Published: {payload}")
    
    time.sleep(3)

client.disconnect()

  client = mqtt.Client(protocol=mqtt.MQTTv5)


Published: {'time': '2025-04-06 18:22:00', 'aqi': 3, 'tvoc': 223, 'eco2': 4885}
Published: {'time': '2025-04-06 18:22:03', 'aqi': 0, 'tvoc': 606, 'eco2': 759}
Published: {'time': '2025-04-06 18:22:06', 'aqi': 5, 'tvoc': 151, 'eco2': 2147}
Published: {'time': '2025-04-06 18:22:10', 'aqi': 5, 'tvoc': 190, 'eco2': 662}


<MQTTErrorCode.MQTT_ERR_SUCCESS: 0>