In [1]:
## Import necessary libraries

import mne
import numpy as np
import matplotlib.pyplot as plt
# import torch
from scipy.io import loadmat
from scipy.special import sph_harm
from numpy.linalg import solve
import json

In [2]:
import numpy as np, matplotlib.pyplot as plt, os
from matplotlib import gridspec
from scipy import ndimage
from ripser import ripser
from persim import plot_diagrams
import gudhi as gd
from sklearn.datasets import load_digits
try:
    from gtda.time_series import TakensEmbedding, SlidingWindow
    from gtda.homology import VietorisRipsPersistence
    from gtda.plotting import plot_diagram
    HAVE_GIOTTO = True
except Exception:
    HAVE_GIOTTO = False

In [3]:
good = [2, 3, 4, 6, 7, 9, 11, 12, 13, 14, 15, 16, 18, 19, 20, 21, 22, 24, 25, 26, 27, 28, 30, 31, 33, 34, 36, 38, 40, 41, 42, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 56, 57, 58, 59, 60, 65]
good = np.array(good) - 1
subjects = ["01", "02", "04", "09", "15", "37", "79"]
auds = [f"{i:02d}" for i in range(1, 24)]

bad = set(range(65)) - set(good)

san_disk = 'D:/Universidad/2025_2/TDA/data'

In [4]:
import numpy as np
from scipy.signal import hilbert, butter, filtfilt, find_peaks
from scipy.interpolate import interp1d
import librosa

# ---------------------------------------------------------
# 1. Hilbert envelope
# ---------------------------------------------------------
def envelope_hilbert(y, sr, smooth_ms=10):
    """Hilbert transform envelope, optionally smoothed with moving average."""
    analytic = hilbert(y)
    env = np.abs(analytic)
    win = int(smooth_ms * sr / 1000)
    if win > 1:
        env = np.convolve(env, np.ones(win)/win, mode='same')
    return env


# ---------------------------------------------------------
# 2. Rectify + Lowpass filter envelope
# ---------------------------------------------------------
def envelope_lowpass(y, sr, cutoff_hz=20, order=4):
    """Envelope via full-wave rectification and Butterworth lowpass."""
    rect = np.abs(y)
    nyq = 0.5 * sr
    b, a = butter(order, cutoff_hz / nyq, btype='low')
    env = filtfilt(b, a, rect)
    return env


# ---------------------------------------------------------
# 3. RMS (root-mean-square) envelope
# ---------------------------------------------------------
def envelope_rms(y, sr, frame_ms=128, hop_ms=None):
    """RMS energy envelope over frames."""
    frame = int(frame_ms * sr / 1000)
    hop = int(hop_ms * sr / 1000) if hop_ms else frame // 2
    rms = librosa.feature.rms(y=y, frame_length=frame, hop_length=hop)[0]
    # Upsample to original signal length
    env = np.repeat(rms, hop)
    return env[:len(y)]


# ---------------------------------------------------------
# 4. Peak interpolation envelope
# ---------------------------------------------------------
def envelope_peaks(y, sr, peak_thresh=0.01, min_dist_ms=20):
    """Envelope by finding peaks and interpolating between them."""
    rect = np.abs(y)
    min_dist = int(min_dist_ms * sr / 1000)
    peaks, _ = find_peaks(rect, height=np.max(rect)*peak_thresh, distance=min_dist)

    if len(peaks) < 2:  # fallback: return rectified signal
        return rect

    xs = np.concatenate(([0], peaks, [len(y)-1]))
    ys = np.concatenate(([rect[0]], rect[peaks], [rect[-1]]))
    f = interp1d(xs, ys, kind='linear')
    env = f(np.arange(len(y)))
    return env


# ---------------------------------------------------------
# 5. Exponential smoothing envelope (attack/release)
# ---------------------------------------------------------
def envelope_exponential(y, sr, attack_ms=1, release_ms=200):
    """Envelope follower with exponential attack/release smoothing."""
    x = np.abs(y)
    a_a = np.exp(-1.0 / (sr * attack_ms / 1000.0))
    a_r = np.exp(-1.0 / (sr * release_ms / 1000.0))
    env = np.zeros_like(x)
    for n in range(1, len(x)):
        coeff = a_a if x[n] > env[n-1] else a_r
        env[n] = coeff * env[n-1] + (1 - coeff) * x[n]
    return env

In [5]:
def cubical_pd_from_image(img):
    cc = gd.CubicalComplex(dimensions=img.shape, top_dimensional_cells=img.flatten())
    cc.compute_persistence()
    D0 = np.array(cc.persistence_intervals_in_dimension(0))
    D1 = np.array(cc.persistence_intervals_in_dimension(1))
    return [D0, D1]

def gudhi_rips_diagrams(X, maxdim=1, max_edge_length=None):
    max_edge = np.inf if max_edge_length is None else max_edge_length
    rc = gd.RipsComplex(points=X, max_edge_length=max_edge)
    st = rc.create_simplex_tree(max_dimension=maxdim+1)
    st.compute_persistence() # [(dim, [birth, death])]
    dgms = []
    # como gudhi tiene los intervalos (dim, [birth, death]) lo homogenizamos con el formato de ripser
    # una lista de listas de intervalos
    for dim in range(maxdim+1):
        d = st.persistence_intervals_in_dimension(dim)
        # Gudhi usa 'inf' para muertes infinitas; lo dejamos así para compatibilidad con persim
        dgms.append(np.array(d, dtype=float))
    return dgms

def takens_numpy(x, m=3, tau=10):
    """Takens embedding simple para una serie 1D -> matriz (N-(m-1)tau, m)."""
    N = len(x) - (m-1)*tau
    if N <= 0:
        raise ValueError("Serie muy corta para estos parámetros (m, tau).")
    return np.vstack([x[i:i+N] for i in range(0, m*tau, tau)]).T

In [9]:
for s in range(2):
    if s == 0:
        speed = 'fast'
    else:
        speed = 'slow'
    for m in range(1,7):
        bb = subjects[m]
        for n in range(len(auds)):
            ut = auds[n]
            # Load audio file
            try:
                data = loadmat(f'data/sound_sep/{speed}/bb{bb}_ut{ut}.mat')
            except FileNotFoundError:
                continue

            eeg = data["subeeg"]
            aud = data["y"]
            aud = aud[:,0]
            Fs = data["Fs"]
            fs = 64
            aud_resample = aud[::int(Fs/250)]
            aud_resample = aud_resample[::int(250/fs)]
            eeg_resample = eeg[::int(250/fs), good]

            if len(aud_resample) > len(eeg_resample):
                aud_resample = aud_resample[:len(eeg_resample)]
            elif len(eeg_resample) > len(aud_resample):
                eeg_resample = eeg_resample[:len(aud_resample)]

            aud_resample = (aud_resample - np.mean(aud_resample, axis=0)) / np.std(aud_resample, axis=0)
            eeg_resample = (eeg_resample - np.mean(eeg_resample, axis=0)) / np.std(eeg_resample, axis=0)
            
            for c in range(47):
                channel_data = eeg_resample[:, c]
                
                d = 3  # A common starting value
                tau = 1  # A common starting value

                t = range(len(channel_data))

                for emb in [30, 50, 64]:
                    for tid, tau in enumerate(["", "_tau10"]):

                        emb_arr = takens_numpy(channel_data, m=emb, tau=9*tid + 1)

                        with open(f'{san_disk}/eegs/method_1/{speed}/bb{bb}_ut{ut}_ch{c+1}_emb{emb}{tau}.json', 'w') as f:
                            json.dump(emb_arr.tolist(), f)


            # aud_dict = {
            #     'audio': aud_resample.tolist(),
            #     'hilbert': envelope_hilbert(aud_resample, fs).tolist(),
            #     'lowpass': envelope_lowpass(aud_resample, fs).tolist(),
            #     'rms': envelope_rms(aud_resample, fs).tolist(),
            #     'peaks': envelope_peaks(aud_resample, fs).tolist(),
            #     'exponential': envelope_exponential(aud_resample, fs).tolist()
            # }

            # for key, value in aud_dict.items():                

            #     d = 3  # A common starting value
            #     tau = 10  # A common starting value

            #     t = range(len(value))

            #     emb30 = takens_numpy(value, m=30, tau=tau)
            #     emb50 = takens_numpy(value, m=50, tau=tau)

            #     with open(f'{san_disk}/audios/method_1/{speed}/ut{ut}_emb30_tau10_{key}.json', 'w') as f:
            #         json.dump(emb30.tolist(), f)

            #     with open(f'{san_disk}/audios/method_1/{speed}/ut{ut}_emb50_tau10_{key}.json', 'w') as f:
            #         json.dump(emb50.tolist(), f)


            # with open(f'{san_disk}/audios/envelopes/{speed}/ut{ut}.json', 'w') as f:
            #     json.dump(aud_dict, f)

  aud_resample = aud[::int(Fs/250)]
