# Cálculo de la SpO2 a partir de los datos crudos del sensor

Me voy a basar en el spo2_algorithm que implementa Pablo en el firmware

### Paso 1: Definir la tabla uch_spo2_table en Python

En C está declarada como un array uint8_t, en Python simplemente la definimos como una lista de enteros:

In [12]:
uch_spo2_table = [
    95, 95, 95, 96, 96, 96, 97, 97, 97, 97, 97, 98, 98, 98, 98, 98, 99, 99, 99, 99,
    99, 99, 99, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
    100, 100, 100, 100, 99, 99, 99, 99, 99, 99, 99, 99, 98, 98, 98, 98, 98, 98, 97, 97,
    97, 97, 96, 96, 96, 96, 95, 95, 95, 94, 94, 94, 93, 93, 93, 92, 92, 92, 91, 91,
    90, 90, 89, 89, 89, 88, 88, 87, 87, 86, 86, 85, 85, 84, 84, 83, 82, 82, 81, 81,
    80, 80, 79, 78, 78, 77, 76, 76, 75, 74, 74, 73, 72, 72, 71, 70, 69, 69, 68, 67,
    66, 66, 65, 64, 63, 62, 62, 61, 60, 59, 58, 57, 56, 56, 55, 54, 53, 52, 51, 50,
    49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 31, 30, 29,
    28, 27, 26, 25, 23, 22, 21, 20, 19, 17, 16, 15, 14, 12, 11, 10, 9, 7, 6, 5,
    3, 2, 1
]

In [4]:
import numpy as np

def estimate_spo2_only(pun_ir_buffer, pun_red_buffer, an_ir_valley_locs, uch_spo2_table):
    n_exact_ir_valley_locs_count = len(an_ir_valley_locs)
    
    an_x = np.array(pun_ir_buffer, dtype=np.int32)
    an_y = np.array(pun_red_buffer, dtype=np.int32)

    an_ratio = np.zeros(5, dtype=np.int32)
    n_i_ratio_count = 0

    for k in range(n_exact_ir_valley_locs_count - 1):
        if an_ir_valley_locs[k + 1] - an_ir_valley_locs[k] > 3:
            n_y_dc_max = -2**31
            n_x_dc_max = -2**31
            n_y_dc_max_idx = 0
            n_x_dc_max_idx = 0

            for i in range(an_ir_valley_locs[k], an_ir_valley_locs[k + 1]):
                if an_x[i] > n_x_dc_max:
                    n_x_dc_max = an_x[i]
                    n_x_dc_max_idx = i
                if an_y[i] > n_y_dc_max:
                    n_y_dc_max = an_y[i]
                    n_y_dc_max_idx = i

            # Cálculo AC y DC con corrección lineal
            try:
                n_y_ac = (an_y[an_ir_valley_locs[k + 1]] - an_y[an_ir_valley_locs[k]]) * (n_y_dc_max_idx - an_ir_valley_locs[k])
                n_y_ac = an_y[an_ir_valley_locs[k]] + n_y_ac // (an_ir_valley_locs[k + 1] - an_ir_valley_locs[k])
                n_y_ac = an_y[n_y_dc_max_idx] - n_y_ac

                n_x_ac = (an_x[an_ir_valley_locs[k + 1]] - an_x[an_ir_valley_locs[k]]) * (n_x_dc_max_idx - an_ir_valley_locs[k])
                n_x_ac = an_x[an_ir_valley_locs[k]] + n_x_ac // (an_ir_valley_locs[k + 1] - an_ir_valley_locs[k])
                n_x_ac = an_x[n_y_dc_max_idx] - n_x_ac

                n_nume = (n_y_ac * n_x_dc_max) >> 7
                n_denom = (n_x_ac * n_y_dc_max) >> 7

                if n_denom > 0 and n_nume != 0 and n_i_ratio_count < 5:
                    an_ratio[n_i_ratio_count] = (n_nume * 100) // n_denom
                    n_i_ratio_count += 1
            except ZeroDivisionError:
                continue

    if n_i_ratio_count > 0:
        an_ratio = an_ratio[:n_i_ratio_count]
        an_ratio.sort()
        n_middle_idx = n_i_ratio_count // 2

        if n_middle_idx > 0:
            n_ratio_average = (an_ratio[n_middle_idx - 1] + an_ratio[n_middle_idx]) // 2
        else:
            n_ratio_average = an_ratio[n_middle_idx]

        if 2 < n_ratio_average < 184:
            spo2 = uch_spo2_table[n_ratio_average]
            spo2_valid = True
        else:
            spo2 = -999
            spo2_valid = False
    else:
        spo2 = -999
        spo2_valid = False

    return spo2, spo2_valid


In [9]:
import pandas as pd
import numpy as np
from scipy.signal import find_peaks

# 1. Leer CSV
ruta_csv = r"C:\Users\Elena\Desktop\GitHub\TFG-Elena-Ruiz\Datos\Datos crudos\save_log2\raw_data_95_77_2.csv"
df = pd.read_csv(ruta_csv, delimiter =';')

# 2. Restar la componente ambiental
df['IR_clean'] = df['IR'] - df['AMB_IR']
df['RED_clean'] = df['RED'] - df['AMB_RED']

# 3. Eliminar primeros 5 segundos (5000 ms)
df = df[df['Tiempo (ms)'] >= df['Tiempo (ms)'].min() + 5000].reset_index(drop=True)

# 4. Detectar valles de IR (invirtiendo la señal para detectar como picos)
ir_signal = df['IR_clean'].values
inverted_ir = -ir_signal
peaks, _ = find_peaks(inverted_ir, distance=30)  # Ajusta distance según sampling rate (~100Hz?)

# 5. Obtener buffers IR y RED
pun_ir_buffer = df['IR_clean'].astype(int).values
pun_red_buffer = df['RED_clean'].astype(int).values
an_ir_valley_locs = peaks.tolist()

# 6. Cargar tabla uch_spo2_table 
uch_spo2_table = [
    95, 95, 95, 96, 96, 96, 97, 97, 97, 97, 97, 98, 98, 98, 98, 98, 99, 99, 99, 99,
    99, 99, 99, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
    100, 100, 100, 100, 99, 99, 99, 99, 99, 99, 99, 99, 98, 98, 98, 98, 98, 98, 97, 97,
    97, 97, 96, 96, 96, 96, 95, 95, 95, 94, 94, 94, 93, 93, 93, 92, 92, 92, 91, 91,
    90, 90, 89, 89, 89, 88, 88, 87, 87, 86, 86, 85, 85, 84, 84, 83, 82, 82, 81, 81,
    80, 80, 79, 78, 78, 77, 76, 76, 75, 74, 74, 73, 72, 72, 71, 70, 69, 69, 68, 67,
    66, 66, 65, 64, 63, 62, 62, 61, 60, 59, 58, 57, 56, 56, 55, 54, 53, 52, 51, 50,
    49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 31, 30, 29,
    28, 27, 26, 25, 23, 22, 21, 20, 19, 17, 16, 15, 14, 12, 11, 10, 9, 7, 6, 5,
    3, 2, 1
]

# 7. Usar la función estimate_spo2_only que te pasé antes
spo2, spo2_valid = estimate_spo2_from_dataframe(df, uch_spo2_table)

# 8. Mostrar resultado
if spo2_valid:
    print(f"SpO₂ estimada: {spo2}%")
else:
    print("No se pudo estimar SpO₂ de forma válida.")

TypeError: ufunc 'left_shift' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

In [10]:
import numpy as np
from scipy.signal import find_peaks

def estimate_spo2_from_dataframe(df, spo2_table):
    # 1. Restar componente ambiental
    df['IR_clean'] = df['IR'] - df['AMB_IR']
    df['RED_clean'] = df['RED'] - df['AMB_RED']

    # 2. Eliminar los primeros 5 segundos
    df = df[df['Tiempo (ms)'] >= df['Tiempo (ms)'].min() + 5000].reset_index(drop=True)

    # 3. Extraer señales limpias como arrays enteros
    ir_signal = df['IR_clean'].astype(int).values
    red_signal = df['RED_clean'].astype(int).values

    # 4. Detectar valles en la señal IR (invertida)
    inverted_ir = -ir_signal
    peaks, _ = find_peaks(inverted_ir, distance=50)  # Ajusta si necesario
    an_ir_valley_locs = peaks.tolist()

    # 5. Ejecutar el algoritmo de SpO2 adaptado
    n_max_num = len(an_ir_valley_locs)
    if n_max_num < 2:
        return None, False

    i_ratio_count = 0
    ratios = []

    for k in range(n_max_num - 1):
        if an_ir_valley_locs[k+1] - an_ir_valley_locs[k] > 3:
            # Segmento entre dos valles consecutivos
            ir_segment = ir_signal[an_ir_valley_locs[k]:an_ir_valley_locs[k+1]]
            red_segment = red_signal[an_ir_valley_locs[k]:an_ir_valley_locs[k+1]]

            if len(ir_segment) == 0 or len(red_segment) == 0:
                continue

            ir_ac = np.max(ir_segment) - np.min(ir_segment)
            red_ac = np.max(red_segment) - np.min(red_segment)

            ir_dc = np.mean(ir_segment)
            red_dc = np.mean(red_segment)

            if ir_ac != 0 and red_ac != 0:
                num = (red_ac * ir_dc)
                den = (ir_ac * red_dc)
                if den != 0:
                    ratio = (num << 7) // den
                    ratios.append(ratio)
                    i_ratio_count += 1

    if i_ratio_count == 0:
        return None, False

    # Mediana para mayor robustez
    ratios.sort()
    median_ratio = ratios[i_ratio_count // 2]
    if median_ratio > 184:
        spo2 = spo2_table[184]
    else:
        spo2 = spo2_table[median_ratio]

    return spo2, True


In [11]:
spo2, spo2_valid = estimate_spo2_from_dataframe(df, uch_spo2_table)

if spo2_valid:
    print(f"SpO₂ estimada: {spo2}%")
else:
    print("No se pudo estimar SpO₂ de forma válida.")


TypeError: ufunc 'left_shift' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''