In [3]:
from scipy.stats import spearmanr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
import import_ipynb
from scipy.interpolate import PchipInterpolator
import seaborn as sns

In [None]:
def arm_detector(data, num_ventanas=25, b=0.6):
    n = len(data)
    x = np.arange(n)
    
    # Calcular el tamaño de cada ventana
    window_size = n // num_ventanas

    # Lista para almacenar los porcentajes de puntos dentro de las regiones
    porcentajes = []

    # Ajustar y graficar el ajuste lineal por cada ventana con las líneas paralelas
    for i in range(0, n, window_size):
        end = i + window_size
        if end > n:
            end = n
        
        x_window = x[i:end].reshape(-1, 1)
        y_window = data[i:end]
        
        # Crear y ajustar el modelo de regresión lineal
        modelo = LinearRegression()
        modelo.fit(x_window, y_window)
        
        # Obtener la pendiente y la intersección de la recta ajustada
        pendiente = modelo.coef_[0]
        interseccion = modelo.intercept_
        
        # Predecir valores ajustados
        y_pred = modelo.predict(x_window)
                
        # Calcular las rectas paralelas desplazadas por 'b'
        y_paralela_superior = pendiente * x_window + (interseccion + b)
        y_paralela_inferior = pendiente * x_window + (interseccion - b)
                
        # Calcular el porcentaje de datos dentro de la región delimitada por las líneas paralelas
        dentro_region = np.sum((y_window >= y_paralela_inferior.flatten()) & (y_window <= y_paralela_superior.flatten()))
        fuera_region = len(y_window) - dentro_region
        
        # Calcular el porcentaje de datos dentro de la región respecto a los datos fuera de la región
        porcentaje = (dentro_region * 100)/(dentro_region + fuera_region)
        porcentajes.append(porcentaje)

    def indice_mayor_90(arr):
        n = len(arr)
    
        # Revisar cada índice para encontrar el primero que cumpla la condición
        for i in range(n):
            if np.all(arr[i:] > 80) and len(arr[i:]) > 4:
                return i
        # Si no hay tal índice, regresar el último índice
        return n - 1
    
    indice = indice_mayor_90(np.array(porcentajes))*window_size
    return indice

def J_univariante(X, tau, corte):
    # Define una función para calcular la distancia euclidiana entre dos puntos.
    def distancia(p1, p2):
        return np.linalg.norm(np.array(p2) - np.array(p1))

    # Crea dos subconjuntos de datos con un desfase de `tau`.
    x1 = X[tau:]
    y1 = X[:-tau]

    # Calcula las fases (ángulos) del espectro de Fourier de los subconjuntos.
    ff1 = np.angle(np.fft.rfft(x1))
    ff2 = np.angle(np.fft.rfft(y1))

    # Si se aplica un corte, se detectan puntos de transición en ambas señales.
    if corte:
        transition_point1 = arm_detector(ff1)  # Detecta la transición en `ff1`.
        transition_point2 = arm_detector(ff2)  # Detecta la transición en `ff2`.

        # Si los puntos de transición difieren, se usa el menor de los dos.
        if transition_point1 != transition_point2:
            transition_point1 = transition_point2 = min(transition_point1, transition_point2)

        # Recorta los datos hasta el punto de transición detectado.
        ff2 = ff2[:transition_point2]
        ff1 = ff1[:transition_point1]

    # Inicializa una lista para almacenar los vectores desplazados.
    vectores = []
    for i in range(len(ff1) - 1):
        # Define los puntos actuales y siguientes en el espacio de fases.
        p1 = [ff1[i], ff2[i]]
        p2 = [ff1[i + 1], ff2[i + 1]]

        # Genera todas las combinaciones posibles de desplazamientos considerando periodicidad.
        cuadrante = [
            [p2[0] - p1[0], p2[1] - p1[1]],
            [p2[0] - p1[0], p2[1] + 2 * np.pi - p1[1]],
            [p2[0] + 2 * np.pi - p1[0], p2[1] + 2 * np.pi - p1[1]],
            [p2[0] + 2 * np.pi - p1[0], p2[1] - p1[1]],
            [p2[0] + 2 * np.pi - p1[0], p2[1] - 2 * np.pi - p1[1]],
            [p2[0] - p1[0], p2[1] - 2 * np.pi - p1[1]],
            [p2[0] - 2 * np.pi - p1[0], p2[1] - 2 * np.pi - p1[1]],
            [p2[0] - 2 * np.pi - p1[0], p2[1] - p1[1]],
            [p2[0] - 2 * np.pi - p1[0], p2[1] + 2 * np.pi - p1[1]],
        ]

        # Calcula la distancia de `p1` a cada posible `p2` desplazado.
        distancia1 = [distancia(p1, c) for c in cuadrante]

        # Selecciona el `p2` desplazado más cercano a `p1`.
        p2 = cuadrante[np.argmin(distancia1)]

        # Almacena el vector resultante entre `p1` y el `p2` más cercano.
        vectores.append([p2[0] - p1[0], p2[1] - p1[1]])

    # Inicializa una lista para almacenar los ángulos entre vectores consecutivos.
    angulos = []
    for i in range(len(vectores) - 1):
        v1 = vectores[i]
        v2 = vectores[i + 1]

        # Normaliza los vectores para calcular ángulos.
        v1_norm = v1 / np.linalg.norm(v1) if np.linalg.norm(v1) != 0 else v1
        v2_norm = v2 / np.linalg.norm(v2) if np.linalg.norm(v2) != 0 else v2

        # Calcula el ángulo entre los vectores usando el producto punto.
        angulo = np.arccos(np.clip(np.dot(v1_norm, v2_norm), -1.0, 1.0))

        # Calcula el producto cruzado para determinar la dirección del ángulo.
        cruz = v1[0] * v2[1] - v1[1] * v2[0]
        if cruz > 0:
            angulo = np.pi - angulo
        elif cruz == 0 and angulo < 0:
            angulo = np.pi
        elif cruz < 0:
            angulo += np.pi

        # Almacena el ángulo calculado.
        angulos.append(angulo)

    # Calcula la media de las exponenciales de los ángulos para estimar la uniformidad.
    e = [np.exp(ang * 1j) for ang in angulos]
    e1 = np.sum(e) / len(angulos)

    # Calcula `J`.
    J = 1.0 - np.abs(e1.real)
    return J


In [2]:
def J_bivariante(X,Y,corte):
    def distancia(p1, p2):
        return np.linalg.norm(np.array(p2)-np.array(p1))
    ff1 = np.angle(np.fft.rfft(X))
    ff2 = np.angle(np.fft.rfft(Y))
    if corte:
        print(len(ff1), len(ff2))
        transition_point1 = arm_detector(ff1)
        transition_point2 = arm_detector(ff2)
        if transition_point1 != transition_point2:
            transition_point1 = transition_point2 = min(transition_point1, transition_point2)
        ff2 = ff2[:transition_point2]
        ff1 = ff1[:transition_point1]
        print(len(ff1), len(ff2))
    vectores = []
    for i in range(len(ff1)-1):
        p1 = [ff1[i], ff2[i]]
        p2 = [ff1[i+1], ff2[i+1]]
        cuadrante = [[p2[0]-p1[0], p2[1]-p1[1]], [p2[0]-p1[0], p2[1]+2*np.pi-p1[1]],
            [p2[0]+2*np.pi-p1[0],p2[1]+2*np.pi-p1[1]],[p2[0]+2*np.pi-p1[0],p2[1]-p1[1]],
            [p2[0]+2*np.pi-p1[0],p2[1]-2*np.pi-p1[1]],[p2[0]-p1[0],p2[1]-2*np.pi-p1[1]],
            [p2[0]-2*np.pi-p1[0],p2[1]-2*np.pi-p1[1]],[p2[0]-2*np.pi-p1[0],p2[1]-p1[1]],
            [p2[0]-2*np.pi-p1[0],p2[1]+2*np.pi-p1[1]]]
        distancia1 = [distancia(p1,c) for c in cuadrante]
        p2=cuadrante[np.argmin(distancia1)]
        vectores.append([p2[0]-p1[0],p2[1]-p1[1]])
    angulos=[]
    for i in range(len(vectores)-1):
        v1=vectores[i]
        v2=vectores[i+1]
        v1_norm=v1/np.linalg.norm(v1)
        v2_norm=v2/np.linalg.norm(v2)
        angulo=np.arccos(np.clip(np.dot(v1_norm,v2_norm),-1.0,1.0))
        cruz=v1[0]*v2[1]-v1[1]*v2[0]
        if cruz>0:
            angulo=np.pi-angulo
        if cruz==0 and angulo==0:
            angulo=angulo
        if cruz==0 and angulo<0:
            angulo=np.pi
        if cruz<0:
            angulo=angulo+np.pi
        angulos.append(angulo)
    e=[]
    for k in range(len(angulos)):
        e.append(np.exp(angulos[k]*1j))
    e1=np.sum(e)/len(angulos)
    J=1.-np.abs(e1.real)
    return J

In [None]:
def interpolador(subject, method, size):
    """
    Interpola datos de entrada utilizando un método especificado y genera un conjunto más denso de puntos.

    Parámetros:
    -----------
    - subject: array-like
        Datos originales a interpolar (puede ser un array o una serie de pandas).
    - method: str
        Método de interpolación a usar: 'lineal' para interpolación lineal o 'herm' para interpolación cúbica de Hermite.
    - size: int
        Factor para aumentar la cantidad de puntos entre los datos originales.

    Retorna:
    --------
    - x_new: ndarray
        Nuevos puntos en el eje x, equidistantes entre los puntos originales.
    - data_interp: ndarray
        Datos interpolados correspondientes a los puntos en `x_new`.
    """
    
    # Si 'subject' es un DataFrame, convierte los datos a un array (línea comentada para contexto).
    # data = np.array([int(line.strip()) for line in subject.to_numpy()])
    
    data = subject  # Usa directamente el array de entrada como los datos a interpolar.
    
    # Crea un arreglo con índices originales (eje x).
    x = np.arange(len(data))
    
    # Genera un nuevo eje x con puntos equidistantes, incrementando la resolución por el factor `size`.
    x_new = np.linspace(0, len(data) - 1, size * (len(data) - 1) + len(data))
    
    # Realiza la interpolación según el método especificado.
    if method == 'lineal':
        # Interpolación lineal.
        data_interp = np.interp(x_new, x, data)
    elif method == 'herm':
        # Interpolación cúbica de Hermite usando el interpolador PCHIP.
        interpolator = PchipInterpolator(x, data)
        data_interp = interpolator(x_new)
    
    # Devuelve los nuevos puntos x y los datos interpolados.
    return x_new, data_interp

In [None]:
def interpolador_estocastico(s_0_discreto, n_steps):
    """
    Interpola un conjunto de datos discretos utilizando un proceso de puente de Brown (Brownian Bridge).
    Este proceso se utiliza para generar trayectorias estocásticas suavizadas entre puntos discretos.

    Parámetros:
    -----------
    - s_0_discreto: array-like
        Conjunto de datos discretos que se desea interpolar.
    - n_steps: int
        Número de pasos (intervalos) que se desea para cada trayecto de interpolación.

    Retorna:
    --------
    - t_list: ndarray
        Puntos de tiempo generados a lo largo de todo el proceso de interpolación.
    - X_list: ndarray
        Valores interpolados correspondientes a los puntos de tiempo.
    """

    # Inicializa las listas para almacenar los resultados.
    t_list, X_list = [], []

    # Define una función para crear el puente de Brown entre dos puntos.
    def brownian_bridge(t0, tT, x0, xT, n_steps=100):
        # Genera un conjunto de tiempos equidistantes entre t0 y tT.
        t = np.linspace(t0, tT, n_steps)
        
        # Genera un movimiento browniano con media cero.
        W = np.random.normal(0, np.sqrt(t[1] - t[0]), size=n_steps-1)
        W = np.insert(np.cumsum(W), 0, 0)  # Inserta W(0) = 0 y realiza la suma acumulativa
        
        # Interpola el puente de Brown, ajustando para que comience en x0 y termine en xT.
        X = x0 + (t - t0) / (tT - t0) * (xT - x0) + W - (t - t0) / (tT - t0) * W[-1]
        
        # Devuelve los tiempos y los valores del proceso.
        return t, X

    # Itera sobre los puntos discretos en s_0_discreto para aplicar el puente de Brown entre puntos consecutivos.
    for i in range(len(s_0_discreto) - 1):
        t0 = i  # El tiempo inicial es el índice actual.
        tT = i + 1  # El tiempo final es el siguiente índice.
        x0 = s_0_discreto[i]  # El valor inicial es el valor en s_0_discreto[i].
        xT = s_0_discreto[i + 1]  # El valor final es el valor en s_0_discreto[i+1].
        
        # Llama a la función brownian_bridge para interpolar entre t0 y tT.
        t, X = brownian_bridge(t0, tT, x0, xT, n_steps + 2)
        
        # Si es el primer intervalo, agrega los valores completos.
        if i == 0:
            t_list = np.concatenate((t_list, t[:]))
            X_list = np.concatenate((X_list, X[:]))
        else:
            # Si no es el primer intervalo, evita duplicar el primer punto de cada intervalo.
            t_list = np.concatenate((t_list, t[1:]))
            X_list = np.concatenate((X_list, X[1:]))

    # Devuelve los puntos de tiempo y los valores interpolados generados.
    return t_list, X_list
