# Import and Initialize

In [1]:
import sys
sys.path.append(r'F:\gitlab\mxdbg\src')
from mxESP32Debugger.debugger import Dbg as MXDBG
import time
import ctypes
from tqdm import trange
from loguru import logger

dev = MXDBG()

[32m2025-05-08 13:10:14.351[0m | [1mINFO    [0m | [36mmxESP32Debugger.debugger[0m:[36mconnect[0m:[36m419[0m - [1mUsing serial port to connect. Port: COM4[0m
[32m2025-05-08 13:10:14.352[0m | [1mINFO    [0m | [36mmxESP32Debugger.debugger[0m:[36mconnect[0m:[36m424[0m - [1mUsing ESP32-S3R8.[0m
[32m2025-05-08 13:10:14.352[0m | [1mINFO    [0m | [36mmxESP32Debugger.debugger[0m:[36mconnect[0m:[36m425[0m - [1mEmbedded software version: v1.0[0m
[32m2025-05-08 13:10:14.353[0m | [1mINFO    [0m | [36mmxESP32Debugger.debugger[0m:[36mconnect[0m:[36m426[0m - [1mLibrary version: v241210.0.rc3.[0m


# I2C Initialize

Hardware I2C：
- HW_I2C_0: sdio 10 / sclk 11
- HW_I2C_1: sdio 41 / sclk 42

Soft I2C:
- SW_I2C_0: sdio 39 / sclk 40
- SW_I2C_1: sdio 37 / sclk 38
- SW_I2C_2: sdio 35 / sclk 36
- SW_I2C_3: sdio 33 / sclk 34
- SW_I2C_4: sdio 1 / sclk 2
- SW_I2C_5: sdio 4 / sclk 5
- SW_I2C_6: sdio 7 / sclk 8
- SW_I2C_7: sdio 17 / sclk 18 # which may be conflict with PWMs' pin definition


![The board and the ports](board_and_ports.png)

In [9]:
soft_i2c_groups_dict = {
    '0': {'sda': 39, 'scl': 40},
    '1': {'sda': 37, 'scl': 38},
    '2': {'sda': 35, 'scl': 36},
    '3': {'sda': 33, 'scl': 34},
    '4': {'sda': 1, 'scl': 2},    
    '5': {'sda': 4, 'scl': 5},
    '6': {'sda': 7, 'scl': 8},
}

for group, pins in soft_i2c_groups_dict.items():
    dev.soft_i2c_config(port=int(group), sda_pin=pins['sda'], scl_pin=pins['scl'], freq=100_000, pullup_enable=False)
    
ret, data = dev.i2c_config(sda_pin=10, scl_pin=11, freq=400_000, sda_pullup=False, scl_pullup=False, port=0)
print(ret, data)
ret, data = dev.i2c_config(sda_pin=41, scl_pin=42, freq=100_000, sda_pullup=True, scl_pullup=True, port=1)
print(ret, data)

True None
True None


# I2C find slave

In [3]:
# for port in range(7):
#     ret, data = dev.soft_i2c_find_slave(port=port)
#     logger.info(f"Soft I2C port {port} find slave: {ret}, {data}")

# I2C Write and Read

In [10]:
# software i2c test
for port in range(len(soft_i2c_groups_dict)):
    ret, data = dev.soft_i2c_write_read(slave_id=0x78, write_list=[0x00], read_length=2, port=port)
    if ret is False:
        logger.error("Return {} in SW I2C port {}".format(ret, port))
        continue
    logger.info("SW I2C Port {} data: 0x{:04X}".format(port, data[0] << 8 | data[1]))

# hardware i2c test (port 0)
ret, data = dev.i2c_write_read(slave_id=0x78, write_list=[0x00], read_length=2, port=0)
if ret != False:
    logger.info("HW I2C Port 0 data: 0x{:04X}".format(data[0] << 8 | data[1]))
else:
    logger.error("Return {} in HW I2C port {}".format(ret, 0))

# hardware i2c test (port 1)
ret, data = dev.i2c_write_read(slave_id=0x38, write_list=[0x00], read_length=2, port=1)
if ret != False:
    logger.info("HW I2C Port 1 data: 0x{:04X}".format(data[0] << 8 | data[1]))
else:
    logger.error("Return {} in HW I2C port {}".format(ret, 1))
    
# hardware i2c test (port 1, different slave id)
ret, data = dev.i2c_write_read(slave_id=0x5D, write_list=[0x00], read_length=2, port=1)
if ret != False:
    logger.info("HW I2C Port 1 data: 0x{:04X}".format(data[0] << 8 | data[1]))
else:
    logger.error("Return {} in HW I2C port {}".format(ret, 1))

[32m2025-05-08 13:12:20.713[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 0 data: 0x0000[0m
[32m2025-05-08 13:12:20.725[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 1 data: 0x0000[0m
[32m2025-05-08 13:12:20.736[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 2 data: 0x0000[0m
[32m2025-05-08 13:12:20.748[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 3 data: 0x0000[0m
[32m2025-05-08 13:12:20.759[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 4 data: 0x0000[0m
[32m2025-05-08 13:12:20.770[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 5 data: 0x0000[0m
[32m2025-05-08 13:12:20.782[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mSW I2C Port 6 data: 0x0000[0m
[32m2025-05-08 13:12:21.782[0m |

In [5]:
dev.i2c_find_slave(port=1)

(True, ['0x38', '0x5D'])

# Coco

In [None]:

def coco_get_value(i2c_port_num:int):
    
    """
    @brief Get COCO value from COCO sensor
    @return: COCO value, temperature value
    """
    
    coco_addr = 0x78
    
    if i2c_port_num not in range(8):
        raise ValueError("Invalid I2C port. Choose from 'soft_i2c_0' to 'soft_i2c_6' or 'i2c_0'.")
    
    if i2c_port_num < 7:
        I2C_WRITE_READ_API = dev.soft_i2c_write_read
        port = i2c_port_num
    else:
        I2C_WRITE_READ_API = dev.i2c_write_read
        port = 0
    
    def check_status(status):
        
        '''
        @brief Check COCO status
        @param status: COCO status byte
        '''
        power_indication = True if status & 0x40 else False
        busy_indication = True if status & 0x20 else False
        cmd_enable = True if status & 0x08 else False
        flag_memory_integrity = True if status & 0x04 else False

        # logger.debug(f'power_indication: {power_indication}')
        # logger.debug(f'busy_indication: {busy_indication}')
        # print(f'cmd_enable: {cmd_enable}')
        # print(f'flag_memory_integrity: {flag_memory_integrity}')

        return power_indication, busy_indication, cmd_enable, flag_memory_integrity

    ret, data = I2C_WRITE_READ_API(slave_id=coco_addr, 
                                   write_list=[0xA3, 0xC7, 0x00],
                                   read_length=0,
                                   port=port)
    assert ret == True, "Failed to write to COCO"
    # print(ret, data)

    while True:
        ret, data = I2C_WRITE_READ_API(slave_id=coco_addr,
                                       write_list=[],
                                       read_length=1,
                                       port=port)
        time.sleep(0.001)

        _, busy_indication, _, _ = check_status(data[0])

        if busy_indication == False:
            break

    ret, data = I2C_WRITE_READ_API(slave_id=coco_addr, 
                                   write_list=[], 
                                   read_length=6,
                                   port=port)
    assert ret == True, "Failed to read from COCO"

    bridge_data = data[1] << 16 | data[2] << 8 | data[3]
    if bridge_data & 0x800000:
        bridge_data |= 0xFF000000
        bridge_data = ctypes.c_int32(bridge_data).value
    # logger.info(f'bridge_data: {bridge_data}')

    temperature = data[4] << 8 | data[5]
    # logger.info(f'temperature: {temperature}')

    return bridge_data, temperature

In [None]:
for port in range(8):
    print(coco_get_value(i2c_port_num=port))

![](./Circuit.png)
$$
R_1 = R_3 = R_4 = 27.000K\Omega \\
R_2 = 20.000K\Omega
$$

另，$R_5$ 和 $R_6$ 合并为 $R_x$。

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

df = pd.read_csv('resistance.csv')
x = df['R_x(Ω)'].values
y = df['Raw'].values

coefficients = np.polyfit(x, y, 2)
a,b,c = coefficients
logger.info(f"拟合公式：y = {a:.4f}x² + {b:.4f}x + {c:.4f}")

x_fit = np.linspace(min(x), max(x), 500)
y_fit = np.polyval(coefficients, x_fit)

y_fit_actual = np.polyval(coefficients, x)
residuals = y - y_fit_actual
residuals_squared = residuals ** 2
ss = np.sum(residuals_squared)
mse = np.mean(residuals_squared)
rmse = np.sqrt(mse)
logger.info(f"RMSE: {rmse:.4f}")
logger.info(f"R²: {1 - (ss / np.sum((y - np.mean(y)) ** 2)):.4f}")

plt.figure(figsize=(10, 6))
plt.scatter(x, y, label='Data', color='blue')
plt.plot(x_fit, y_fit, 
         label=f'Fitted Curve (y = {a:.4f}x² + {b:.4f}x + {c:.4f})',
         color='red')
plt.xlabel('R_x(Ω)')
plt.ylabel('Raw')
plt.title('Curve Fitting')
plt.legend()
plt.grid()
plt.show()


# AHT10

In [None]:

ret, _ = dev.i2c_write_read(slave_id=0x38, write_list=[0xE1], read_length=0, port=1)
if ret == False:
    logger.error("Return {} in HW I2C port {}".format(ret, 1))

time.sleep(1)

ret, _ = dev.i2c_write_read(slave_id=0x38, write_list=[0xBA], read_length=0, port=1)
if ret == False:
    logger.error("Return {} in HW I2C port {}".format(ret, 1))
    
time.sleep(1)

for i in range(10):
    
    ret, _ = dev.i2c_write_read(slave_id=0x38, write_list=[0xAC, 0x33, 0x00], read_length=0, port=1)
    
    time.sleep(200/1000) # You have to wait 200ms for the sensor to be ready
    if ret == False:
        logger.error("Return {} in HW I2C port {}".format(ret, 1))
    
    ret, data = dev.i2c_write_read(slave_id=0x38, write_list=[], read_length=6, port=1)
    if ret != False:
        # humadity
        humidity_raw = 0
        humidity_raw = (humidity_raw | data[1]) << 8
        humidity_raw = (humidity_raw | data[2]) << 8
        humidity_raw = (humidity_raw | data[3])
        humidity_raw = humidity_raw >> 4
        
        # temperature
        temp_raw = 0
        temp_raw = (temp_raw | data[3]) << 8
        temp_raw = (temp_raw | data[4]) << 8
        temp_raw = (temp_raw | data[5])
        temp_raw = temp_raw & 0x0FFFFF;
        
        humidity = humidity_raw * 100  / 1024 / 1024    
        temperature = (temp_raw * 200 * 10 / 1024 / 1024 - 500) / 10
        logger.info("{:.2f}%RH, {:.2f}℃".format(humidity, temperature))
        
    else:
        logger.error("Return {} in HW I2C port {}".format(ret, 1))
        
    time.sleep(1)

# SFA30

In [6]:
def cal_crc(data:list[int]):
    """
    @brief Calculate CRC8 checksum for a list of data bytes.
    @param data: List of data bytes to calculate CRC for.
    @return: Calculated CRC8 checksum.
    """
    crc = 0xFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x80:
                crc = (crc << 1) ^ 0x31
            else:
                crc <<= 1
            crc &= 0xFF
    return crc

In [7]:
def cal_formula(val:int, average:float, smoothing_factor:float=0.0167) -> float:
    return smoothing_factor * val + (1.0 - smoothing_factor) * average

In [11]:
start_time = time.time()

formaldehyde_raw = []
temperature_raw = []
humidity_raw = []

# 1. test crc
if cal_crc([0xBE, 0xEF]) != 0x92:
    logger.error("CRC error: {} != {}".format(cal_crc(data), 0x92))
    raise ValueError("CRC error")

# 2. Wait for sensor to be ready
for i in trange(10, desc="waiting for sensor to be ready"):
    time.sleep(1)
    
# 3. Send measure command to sensor
ret, _ = dev.i2c_write_read(slave_id=0x5D, write_list=[0x00, 0x06], read_length=0, port=1)
if ret == False:
    logger.error("Return {} in HW I2C port {}".format(ret, 1))
    raise ValueError("Failed to start measurement")

time.sleep(500/1000)

# 4. Read data from sensor
for i in range(1000):
    
    first_60sec = True if ((time.time() - start_time) < 60) else False
    
    ret, _ = dev.i2c_write_read(slave_id=0x5D, write_list=[0x03, 0x27], read_length=0, port=1)
    if ret == False:
        logger.error("Return {} in HW I2C port {}".format(ret, 1))
        
    ret, data = dev.i2c_write_read(slave_id=0x5D, write_list=[], read_length=9, port=1)
    # if data != None:
    #     logger.debug("Data: {}".format("".join(["0x{:02X}, ".format(i) for i in data])))
    if ret != False:
        
        # 验证 CRC
        for i in range(9):
            if (i != 0) and ((i+1) % 3 == 0):
                data_crc = [data[i - 2], data[i - 1]]
                crc = cal_crc(data_crc)
                if crc != data[i]:
                    logger.error("CRC error: {} != {}".format(crc, data[i]))
                    break
    
        # 数据导出
        temperature = (data[6] << 8) | data[7]
        formaldhyde = (data[1] << 8) | data[2]
        humidity    = (data[3] << 8) | data[4]
        
        if first_60sec:
            temperature_raw.append(temperature)
            formaldehyde_raw.append(formaldhyde)
            humidity_raw.append(humidity)
            
        else:
            temperature_raw.append(cal_formula(temperature, np.mean(temperature_raw)))
            formaldehyde_raw.append(cal_formula(formaldhyde, np.mean(formaldehyde_raw)))
            humidity_raw.append(cal_formula(humidity, np.mean(humidity_raw)))
            
        logger.info("Temperature: {:.2f}°C, Formaldehyde: {:.2f}ppb, Humidity: {:.2f}%".format(
            temperature_raw[-1] / 200, formaldehyde_raw[-1] / 5, humidity_raw[-1] / 100))
    
    else:
        logger.error("Return {} in HW I2C port {}".format(ret, 1))
        
    time.sleep(1400/1000)

waiting for sensor to be ready: 100%|██████████| 10/10 [00:10<00:00,  1.00s/it]
[32m2025-05-08 13:12:37.228[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m62[0m - [1mTemperature: 26.34°C, Formaldehyde: 315.00ppb, Humidity: 53.57%[0m
[32m2025-05-08 13:12:38.651[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m62[0m - [1mTemperature: 26.18°C, Formaldehyde: 362.80ppb, Humidity: 53.78%[0m
[32m2025-05-08 13:12:40.075[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m62[0m - [1mTemperature: 26.16°C, Formaldehyde: 362.80ppb, Humidity: 53.77%[0m
[32m2025-05-08 13:12:41.499[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m62[0m - [1mTemperature: 26.14°C, Formaldehyde: 362.80ppb, Humidity: 53.79%[0m
[32m2025-05-08 13:12:42.922[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m62[0m - [1mTemperature: 26.11°C, Formaldehyde: 362.80ppb, Humidity: 53.80%[0m
[32m2025-05-08 13:12:44.345[0m | [1mI

NameError: name 'np' is not defined