# Implementación de algoritmo de detección de Whitaker y Hayes

Basado en el trabajo https://doi.org/10.1016/j.chemolab.2018.06.009.

Carga de librerías externas, configuración de los parámetros a ser explorados durante la medición y nombre del archivo de donde se cargarán los datos espectrales.

In [None]:
import json
import pickle
import numpy as np
from random import sample
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix

archivo_carga_espectros = '../resultados.ndjson'
params = [
    { 'umbral': 4 },
    { 'umbral': 5 },
    { 'umbral': 6 },
    { 'umbral': 7 },
    { 'umbral': 8 },
    { 'umbral': 9 },
    { 'umbral': 10 },
    { 'umbral': 11 },
    { 'umbral': 12 },
    { 'umbral': 13 },
    { 'umbral': 14 },
    { 'umbral': 15 },
    { 'umbral': 16 },
]

Funciones de carga de espectros

In [None]:
def deserializar_espectro(espectro):
    espectro['vector_base'] = np.array(espectro['vector_base'])
    espectro['espectro_base_muestra'] = np.array(espectro['espectro_base_muestra'])
    espectro['espectro_base_background'] = np.array(espectro['espectro_base_background'])
    espectro['baseline_muestra'] = np.array(espectro['baseline_muestra'])
    espectro['baseline_background'] = np.array(espectro['baseline_background'])
    espectro['muestra_con_baseline'] = np.array(espectro['muestra_con_baseline'])
    espectro['background_con_baseline'] = np.array(espectro['background_con_baseline'])
    espectro['muestra_combinado_base'] = np.array(espectro['muestra_combinado_base'])
    espectro['espectro_ruido_combinado'] = np.array(espectro['espectro_ruido_combinado'])
    espectro['espectro_ruido_background'] = np.array(espectro['espectro_ruido_background'])
    espectro['spikes_muestra'] = np.array(espectro['spikes_muestra'])
    espectro['spikes_background'] = np.array(espectro['spikes_background'])
    espectro['flag_spikes_muestra'] = np.array(espectro['flag_spikes_muestra'])
    espectro['flag_spikes_background'] = np.array(espectro['flag_spikes_background'])
    espectro['muestra_base_con_spikes'] = np.array(espectro['muestra_base_con_spikes'])
    espectro['y_muestra'] = np.array(espectro['y_muestra'])
    espectro['y_background'] = np.array(espectro['y_background'])

    return espectro

def cargar_espectros():
    with open(archivo_carga_espectros, 'r') as fp:
        data = []
        for line in fp:
            data.append(deserializar_espectro(json.loads(line)))
        fp.close()
        return data

espectros = cargar_espectros()

Función para calcular métricas clasificación de spikes.

In [None]:
def calcular_metricas_clasificacion(valores_reales, predicciones):
    _predicciones = np.array(predicciones).flatten()
    _valores_reales = np.array(valores_reales).flatten()

    return precision_recall_fscore_support(_valores_reales, _predicciones), confusion_matrix(_valores_reales, _predicciones)

Implementación del algoritmo Whitaker-Hayes para detección de spikes, así como funciones auxiliares para recogida de datos y visualización de resultados.

In [None]:
def algoritmo_whitaker_hayes(input, umbral = 6):
    # Primero calculamos la serie diferenciada
    input = input
    serie_diferenciada = np.diff(input)

    # Luego calculamos la mediana y el MAD de la serie diferenciada
    mediana_diferenciada = np.median(serie_diferenciada)
    mad_diferenciada = np.median(np.array(list(map(lambda x: abs(x - mediana_diferenciada), serie_diferenciada))))

    # En este caso el input es un vector nulo y se previene una división por cero
    if mad_diferenciada == 0:
        return np.full(len(input), False), np.zeros(len(input) - 1), np.zeros(len(input) - 1)

    # Calculamos el valor zeta modificado
    zeta_modificado = np.array(list(map(lambda x: 0.6745 * (x - mediana_diferenciada) / mad_diferenciada, serie_diferenciada)))

    # Son marcados como spikes los valores cuyo puntaje z modificado asociado cae por encima o por debajo del umbral
    resultado = np.abs(zeta_modificado) > umbral

    # Este algoritmo no puede detectar spikes en el primer valor de la serie,
    # los autores indican que, puesto que los spikes son sustituidos con promedios móviles
    # lo más seguro es marcar como spikes el primer y último elemento de la serie
    # Sin embargo, nuestro interés es sólo la detección, por lo que más bien nos interesa
    # preservar la precisión del algoritmo marcando como False el primer elemento, puesto
    # que esta es la elección más probable
    resultado = np.insert(resultado, 0, False)

    return resultado, serie_diferenciada, zeta_modificado

def obtener_resultado_parametro_whitaker_hayes(espectros, etiquetas, umbral = 6):
    predicciones_whitaker_hayes = [algoritmo_whitaker_hayes(espectro, umbral)[0] for espectro in espectros]

    return predicciones_whitaker_hayes, calcular_metricas_clasificacion(etiquetas, predicciones_whitaker_hayes)

def visualizar_resultado_whitaker_hayes(vector_base, input, etiquetas, umbral = 6):
    resultados, serie_diferenciada, zeta_modificado = algoritmo_whitaker_hayes(input, umbral)

    fig = plt.figure(figsize=[45, 45], constrained_layout=True)

    fig.suptitle("Muestra de resultados al aplicar el algoritmo Whitaker-Hayes", fontsize=24, fontweight='bold')

    subfigs = fig.subfigures(nrows=4, ncols=1)

    for row, subfig in enumerate(subfigs):
        (ax) = subfig.subplots(nrows=1, ncols=1)

        if row == 0:
            ax.set_title("Serie original", fontsize=18)
            ax.plot(vector_base, input)
        elif row == 1:
            ax.set_title("Serie diferenciada", fontsize=18)
            serie_diferenciada = np.insert(serie_diferenciada, 0, 0)
            ax.plot(vector_base, serie_diferenciada)
        elif row == 2:
            ax.set_title("Zeta modificado y umbral", fontsize=18)
            zeta_modificado = np.insert(zeta_modificado, 0, 0)
            ax.plot(vector_base, zeta_modificado)
            plt.axhline(y = umbral, color = 'r', linestyle = '-')
            plt.axhline(y = -umbral, color = 'r', linestyle = '-')
        elif row == 3:
            colores = ['blue' if resultado and etiquetas[i] else 'red' if resultado else 'white' for i, resultado in enumerate(resultados)]
            ax.set_title("Serie con spikes detectados como puntos rojos", fontsize=18)
            ax.plot(vector_base, input)
            ax.scatter(vector_base, input, c=colores)

Separación de las curvas a emplear en la recogida de datos, así como sus respectivas etiquetas que marcan la posición real de los spikes añadidos, del conjunto de datos espectrales cargado.

In [None]:
espectros_base_con_spikes_con_ruido = [espectro['muestra_base_con_spikes'] for espectro in espectros]
espectros_base_con_spikes_sin_ruido = [espectro['espectro_base_muestra'] + espectro['spikes_muestra'] for espectro in espectros]
etiquetas = [espectro['flag_spikes_muestra'] for espectro in espectros]

Recogida de datos y guardado de los resultados en un archivo.

In [None]:
resultados_whitaker_hayes = []

for param in params:
    predicciones_whitaker_hayes_ruido, metricas_whitaker_hayes_ruido = obtener_resultado_parametro_whitaker_hayes(espectros_base_con_spikes_con_ruido, etiquetas, param['umbral'])
    predicciones_whitaker_hayes_sin_ruido, metricas_whitaker_hayes_sin_ruido = obtener_resultado_parametro_whitaker_hayes(espectros_base_con_spikes_sin_ruido, etiquetas, param['umbral'])

    for i in range(len(predicciones_whitaker_hayes_ruido)):
        resultados_whitaker_hayes.append({
            'etiquetas': etiquetas[i],
            'umbral': param['umbral'],
            'predicciones_ruido': predicciones_whitaker_hayes_ruido[i],
            'predicciones_sin_ruido': predicciones_whitaker_hayes_sin_ruido[i],
            'espectro_ruido': espectros_base_con_spikes_con_ruido[i],
            'espectro_sin_ruido': espectros_base_con_spikes_sin_ruido[i]
        })

with open('resultados_whitaker_hayes', 'wb') as fp:
    pickle.dump(resultados_whitaker_hayes, fp)

Toma de una muestra aleatoria para control de calidad

In [None]:
indice_muestra = sample(list(range(len(espectros))), 1)[0]

Visualización del resultado de la aplicación del algoritmo Whitaker-Hayes a un espectro aleatorio al que no le fue añadido ruido aleatorio

In [None]:
visualizar_resultado_whitaker_hayes(espectros[indice_muestra]['vector_base'], espectros_base_con_spikes_sin_ruido[indice_muestra], etiquetas[indice_muestra])

Visualización del resultado de la aplicación del algoritmo Whitaker-Hayes a un espectro aleatorio al que le fue añadido ruido aleatorio

In [None]:
visualizar_resultado_whitaker_hayes(espectros[indice_muestra]['vector_base'], espectros_base_con_spikes_con_ruido[indice_muestra], etiquetas[indice_muestra])