Importar librerias

In [None]:
import os
import h5py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
import torch.optim as optim

Generacion de bloques de datos

In [None]:
# Función para procesar un lote de eventos
def process_batch(batch_start, batch_end, earthquake_list, hdf5_events, path_arrays):
    print(f'Procesando lote: {batch_start} - {batch_end - 1}')
    
    X_batch = np.empty((batch_end - batch_start, 6000, 3), dtype=np.float32)
    Y_batch = []

    for i, event in enumerate(earthquake_list[batch_start:batch_end]):
        earthquake_waveform = np.array([hdf5_events.get('data/' + str(event))], dtype=np.float32)
        X_batch[i] = earthquake_waveform
        Y_batch.append(event)

    np.save(path_arrays + f'/array_noise_{batch_start}_{batch_end - 1}.npy', X_batch)
    np.save(path_arrays + f'/labels_noise_{batch_start}_{batch_end - 1}.npy', np.array(Y_batch, dtype=np.unicode_))
    print(f'Se guardó correctamente labels_noise_{batch_start}_{batch_end - 1} en {path_arrays}')

def main():
    N = 50000
    N_append = 500
    path_arrays = "C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado"
    comienzo = 0
    events_hdf5_path = "C:/Users/Vic_l/Downloads/merge.hdf5"  # Reemplaza con la ruta correcta
    
    # Leer lista de eventos (earthquake_list) y otros datos necesarios aquí

    with h5py.File(events_hdf5_path, 'r') as hdf5_events:
        num_batches = (len(earthquake_list) + N - 1) // N

        for i in range(num_batches):
            batch_start = i * N
            batch_end = min((i + 1) * N, len(earthquake_list))
            process_batch(batch_start, batch_end, earthquake_list, hdf5_events, path_arrays)

    print("Procesamiento finalizado.")

if __name__ == "__main__":
    main()

Lectura y carga de bloques clasificación

In [None]:
df_event = pd.read_csv(r"C:\Users\Vic_l\Documents\Maestría en Ciencias de la Computación\Tesis\dataset_mezclado\merge_per.csv")
df_event

In [None]:
#earthquake_signals = 'array_earthquake_200000_249999'
#labels_signal = 'labels_earthquake_200000_249999'
#x = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals}.npy')
#y = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal}.npy')
earthquake_signals = ['array_earthquake_0_49999', 'array_noise_0_49999']
for i in range(len(earthquake_signals)):
  earthquake = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals[i]}.npy')
  if i==0:
    x = earthquake
    y = np.ones(len(earthquake))
    print(i)
  else:
    x = np.concatenate((x, earthquake))
    y = np.concatenate((y, np.zeros(len(earthquake))))
    print(i)

In [None]:
x = torch.tensor(x, dtype=torch.float32)
x = torch.transpose(x,1,2)
y = torch.tensor(y).long()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=1)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=1)

In [None]:
x_train.shape

In [None]:
batch = 128
x_train = DataLoader(x_train, batch_size=batch, shuffle=False)
y_train = DataLoader(y_train, batch_size=batch, shuffle=False)
x_val = DataLoader(x_val, batch_size=batch, shuffle=False)
y_val = DataLoader(y_val, batch_size=batch, shuffle=False)
x_test = DataLoader(x_test, batch_size=batch, shuffle=False)
y_test = DataLoader(y_test, batch_size=batch, shuffle=False)

Modelo de clasificación

In [None]:
class LayerNorm2d(nn.Module):
    def __init__(self, dim):
        super(LayerNorm2d, self).__init__()
        self.gamma = nn.Parameter(torch.ones(1, dim, 1, 1))
        self.beta = nn.Parameter(torch.zeros(1, dim, 1, 1))

    def forward(self, x):
        mean = x.mean(dim=[2, 3], keepdim=True)
        var = x.var(dim=[2, 3], keepdim=True)
        x = (x - mean) / (torch.sqrt(var + 1e-8))
        return x * self.gamma + self.beta

# downsample module
class Downsample_Module(nn.Module):
    def __init__(self, in_chans=3, win_size=7, dsr=2, embed_dim=64):
        super(Downsample_Module, self).__init__()
        self.emb = nn.Conv2d(in_chans, embed_dim, kernel_size=(win_size, 1), stride=(dsr, 1), padding=(win_size // 2, 0))
        self.bn = LayerNorm2d(embed_dim)

    def forward(self, x):
        x = self.emb(x)
        return self.bn(x)

# attention module
class Attention_Module(nn.Module):
    def __init__(self, dim):
        super(Attention_Module, self).__init__()
        self.conv1 = nn.Conv2d(dim, dim, kernel_size=(9, 1), stride=(1, 1), padding=(4, 0), groups=dim)
        self.conv_sequential = nn.Conv2d(dim, dim, kernel_size=(11, 1), stride=(1, 1), padding=(5, 0), groups=dim)
        self.conv2 = nn.Conv2d(dim, dim, kernel_size=(1, 1), stride=(1, 1))

    def forward(self, x):
        iden = x.clone()
        attn = self.conv2(self.conv_sequential(self.conv1(x)))
        return iden * attn

class SAN_Block(nn.Module):
    def __init__(self, dim, cff_hid=None, cff_out=None):
        super(SAN_Block, self).__init__()
        self.proj_1 = nn.Conv2d(dim, dim, kernel_size=(1, 1))
        self.bn1 = LayerNorm2d(dim)
        self.act1 = nn.GELU(approximate='tanh')

        self.attn = Attention_Module(dim)

        self.proj_2 = nn.Conv2d(dim, dim, kernel_size=(1, 1))
        self.bn2 = LayerNorm2d(dim)

    def forward(self, x):
        iden = x.clone()
        x = self.act1(self.bn1(self.proj_1(x)))
        x = self.attn(x)
        x = self.bn2(self.proj_2(x))
        return iden + x

class SAN(nn.Module):
    def __init__(self, stage_blocks=[2, 2, 2, 2], downsampling_dims=[8, 16, 32, 64], down_strides=[4, 3, 2, 2],
                 upsampling_dims=[32, 16, 8, 1]):
        super(SAN, self).__init__()
        self.num_stages = len(stage_blocks)
        for i in range(self.num_stages):
            down = Downsample_Module(
                in_chans=3 if i == 0 else downsampling_dims[i - 1],
                win_size=7,
                dsr=down_strides[i],
                embed_dim=downsampling_dims[i]
            )
            blocks = nn.ModuleList(
                [SAN_Block(downsampling_dims[i], cff_hid=downsampling_dims[i] * 4) for _ in range(stage_blocks[i])]
            )
            norm = LayerNorm2d(downsampling_dims[i])
            setattr(self, f"down_{i + 1}", down)
            setattr(self, f"blocks_{i + 1}", blocks)
            setattr(self, f"norm_{i + 1}", norm)
            
        self.lstm = nn.LSTM(6000, hidden_size=100, num_layers=3, batch_first=True)
        self.mlp = nn.Sequential(
            nn.Linear(100, 50),
            nn.ReLU(),
            nn.Linear(50, 1)
        )
        self.sigmoid = nn.Sigmoid()

    def forward_features(self, x):
        feats = []
        for i in range(self.num_stages):
            win_embed = getattr(self, f"down_{i + 1}")
            blocks = getattr(self, f"blocks_{i + 1}")
            norm = getattr(self, f"norm_{i + 1}")
            x = win_embed(x)
            for block in blocks:
                x = block(x)
            x = norm(x)
            feats.append(x)
        return feats

    def forward(self, x):
        x = x.view(x.shape[0], x.shape[1], x.shape[2], 1)
        feats = self.forward_features(x)
        pred = feats[-1]
        #aplanamiento
        output = pred.view(pred.shape[0], -1)
        
         # Salida de la LSTM
        lstm_output, _ = self.lstm(output)
        
        # Pasar la salida a través de la MLP
        mlp_output = self.sigmoid(self.mlp(lstm_output))
        return mlp_output.view(-1)

Entrenamiento del modelo de clasificación

In [None]:
import time
model = SAN(stage_blocks = [4,4,4,4], downsampling_dims = [12, 24, 36, 48], upsampling_dims = [36, 24, 12, 1])

# Definir la función de pérdida y el optimizador
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 20
train_error = []
val_error = []

history = {"train": {"loss": [], "acc": []}, "val": {"loss": [], "acc": []}}
positive = 1 #terremotos
for epoch in range(0, epochs):
    start_time = time.time()
    train_loss=0
    correct = 0
    total = 0
    error = 0
    positive = 0
    TP_TRAIN = 0
    TN_TRAIN = 0
    FP_TRAIN = 0
    FN_TRAIN = 0
    model.train()
    for b, (inputs, labels) in enumerate(zip(x_train,y_train)):
            optimizer.zero_grad()
            # forward
            outputs = model(inputs)
            predicted = torch.round(outputs)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TRAIN += sum([1 if (i==j and i==1) else 0 for i, j in zip(labels, predicted)])
            FP_TRAIN += sum([1 if (i==0 and j==1) else 0 for i, j in zip(labels, predicted)])
            FN_TRAIN += sum([1 if (i==1 and j==0) else 0 for i, j in zip(labels, predicted)])
            TN_TRAIN += sum([1 if (i==j and i==0) else 0 for i, j in zip(labels, predicted)])

             # Calculate the loss
            loss = criterion(outputs, labels.float())
            train_loss += loss
            # Backward pass through the model and update the parameters
            loss.backward()
            torch.cuda.empty_cache()  # Liberar memoria GPU
            optimizer.step()
            g=b
    epoch_train_loss = train_loss/(g+1)
    epoch_train_correct = correct
    epoch_train_total = total
    precision_train = TP_TRAIN/(TP_TRAIN+FP_TRAIN+0.001)
    recall_train = TP_TRAIN/(TP_TRAIN+FN_TRAIN+0.001)
    f1_train = TP_TRAIN/(TP_TRAIN+0.5*(FP_TRAIN + FN_TRAIN+0.001))

    # Validation Loop
    model.eval()
    with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        for b, (inputs, labels) in enumerate(zip(x_val,y_val)):
            predicted_outputs = model(inputs)
            predicted = torch.round(predicted_outputs)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TEST += sum([1 if (i==j and i==1) else 0 for i, j in zip(labels, predicted)])
            FP_TEST += sum([1 if (i==0 and j==1) else 0 for i, j in zip(labels, predicted)])
            FN_TEST += sum([1 if (i==1 and j==0) else 0 for i, j in zip(labels, predicted)])
            TN_TEST += sum([1 if (i==j and i==0) else 0 for i, j in zip(labels, predicted)])

            #loss
            test_loss = criterion(predicted_outputs, labels.float())
            val_loss += test_loss
            g=b
        epoch_val_loss = val_loss/(g+1)
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST+0.001)
        recall_test = TP_TEST/(TP_TEST+FN_TEST+0.001)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST+0.001))

    history["train"]["loss"].append(epoch_train_loss)
    #history["train"]["acc"].append(epoch_train_error)
    history["val"]["loss"].append(epoch_val_loss)
    #history["val"]["acc"].append(epoch_val_error)

    elapsed_time = time.time() - start_time

    print('-' * 80)
    print(f'Epoch: {epoch+1:03}/{epochs} | Time: {elapsed_time:.4f}s | Train loss: {epoch_train_loss:.4f} | Exactitud {epoch_train_correct, epoch_train_total}| Precision: {precision_train} | Recall: {recall_train} | F1: {f1_train} | Dev loss: {epoch_val_loss:.4f} | Exactitud {epoch_val_correct, epoch_val_total}| Precision: {precision_test} | Recall: {recall_test} | F1: {f1_test} |')
    train_error.append(history["train"]["loss"][-1])
    val_error.append(history["val"]["loss"][-1])

In [None]:
(TP_TRAIN, TN_TRAIN, FP_TRAIN, FN_TRAIN), (TP_TEST, TN_TEST, FP_TEST, FN_TEST)

In [None]:
error = []
label = []
with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        media = torch.tensor([])
        for b, (inputs, labels) in enumerate(zip(x_test,y_test)):
            predicted_outputs = model(inputs)
            predicted = torch.round(predicted_outputs)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TEST += sum([1 if (i==j and i==1) else 0 for i, j in zip(labels, predicted)])
            FP_TEST += sum([1 if (i==0 and j==1) else 0 for i, j in zip(labels, predicted)])
            FN_TEST += sum([1 if (i==1 and j==0) else 0 for i, j in zip(labels, predicted)])
            TN_TEST += sum([1 if (i==j and i==0) else 0 for i, j in zip(labels, predicted)])

            #loss
            test_loss = criterion(predicted_outputs, labels.float())
            val_loss += test_loss
            g=b
        epoch_val_loss = val_loss/(g+1)
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST+0.001)
        recall_test = TP_TEST/(TP_TEST+FN_TEST+0.001)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST+0.001))

print(epoch_val_correct,  epoch_val_total, precision_test, recall_test, f1_test)

In [None]:
(TP_TEST, TN_TEST, FP_TEST, FN_TEST)

In [None]:
torch.save(model.state_dict(),"clasificador")

Reentrenamiento del modelo de clasificación

In [None]:
#earthquake_signals = 'array_earthquake_200000_249999'
#labels_signal = 'labels_earthquake_200000_249999'
#x = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals}.npy')
#y = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal}.npy')
earthquake_signals = ['array_earthquake_50000_99999', 'array_noise_50000_99999']
for i in range(len(earthquake_signals)):
  earthquake = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals[i]}.npy')
  if i==0:
    x = earthquake
    y = np.ones(len(earthquake))
    print(i)
  else:
    x = np.concatenate((x, earthquake))
    y = np.concatenate((y, np.zeros(len(earthquake))))
    print(i)

In [None]:
eventos_elegidos = df_event[df_event['trace_name'].isin(y)]
eventos_elegidos

In [None]:
x = torch.tensor(x, dtype=torch.float32)
x = torch.transpose(x,1,2)
y = torch.tensor(y).long()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=1)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=1)

In [None]:
x_train.shape

In [None]:
batch = 128
x_train = DataLoader(x_train, batch_size=batch, shuffle=False)
y_train = DataLoader(y_train, batch_size=batch, shuffle=False)
x_val = DataLoader(x_val, batch_size=batch, shuffle=False)
y_val = DataLoader(y_val, batch_size=batch, shuffle=False)
x_test = DataLoader(x_test, batch_size=batch, shuffle=False)
y_test = DataLoader(y_test, batch_size=batch, shuffle=False)

In [None]:
import time
model = SAN(stage_blocks = [4,4,4,4], downsampling_dims = [12, 24, 36, 48], upsampling_dims = [36, 24, 12, 1])

# Cargar los pesos del modelo guardado
checkpoint = torch.load(r"C:\Users\Vic_l\Documents\Maestría en Ciencias de la Computación\Tesis\clasificador")
model.load_state_dict(checkpoint)

# Definir la función de pérdida y el optimizador
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 20
train_error = []
val_error = []

history = {"train": {"loss": [], "acc": []}, "val": {"loss": [], "acc": []}}
positive = 1 #terremotos
for epoch in range(0, epochs):
    start_time = time.time()
    train_loss=0
    correct = 0
    total = 0
    error = 0
    positive = 0
    TP_TRAIN = 0
    TN_TRAIN = 0
    FP_TRAIN = 0
    FN_TRAIN = 0
    model.train()
    for b, (inputs, labels) in enumerate(zip(x_train,y_train)):
            optimizer.zero_grad()
            # forward
            outputs = model(inputs)
            predicted = torch.round(outputs)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TRAIN += sum([1 if (i==j and i==1) else 0 for i, j in zip(labels, predicted)])
            FP_TRAIN += sum([1 if (i==0 and j==1) else 0 for i, j in zip(labels, predicted)])
            FN_TRAIN += sum([1 if (i==1 and j==0) else 0 for i, j in zip(labels, predicted)])
            TN_TRAIN += sum([1 if (i==j and i==0) else 0 for i, j in zip(labels, predicted)])

             # Calculate the loss
            loss = criterion(outputs, labels.float())
            train_loss += loss
            # Backward pass through the model and update the parameters
            loss.backward()
            torch.cuda.empty_cache()  # Liberar memoria GPU
            optimizer.step()
            g=b
    epoch_train_loss = train_loss/(g+1)
    epoch_train_correct = correct
    epoch_train_total = total
    precision_train = TP_TRAIN/(TP_TRAIN+FP_TRAIN+0.001)
    recall_train = TP_TRAIN/(TP_TRAIN+FN_TRAIN+0.001)
    f1_train = TP_TRAIN/(TP_TRAIN+0.5*(FP_TRAIN + FN_TRAIN+0.001))

    # Validation Loop
    model.eval()
    with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        for b, (inputs, labels) in enumerate(zip(x_val,y_val)):
            predicted_outputs = model(inputs)
            predicted = torch.round(predicted_outputs)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TEST += sum([1 if (i==j and i==1) else 0 for i, j in zip(labels, predicted)])
            FP_TEST += sum([1 if (i==0 and j==1) else 0 for i, j in zip(labels, predicted)])
            FN_TEST += sum([1 if (i==1 and j==0) else 0 for i, j in zip(labels, predicted)])
            TN_TEST += sum([1 if (i==j and i==0) else 0 for i, j in zip(labels, predicted)])

            #loss
            test_loss = criterion(predicted_outputs, labels.float())
            val_loss += test_loss
            g=b
        epoch_val_loss = val_loss/(g+1)
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST+0.001)
        recall_test = TP_TEST/(TP_TEST+FN_TEST+0.001)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST+0.001))

    history["train"]["loss"].append(epoch_train_loss)
    #history["train"]["acc"].append(epoch_train_error)
    history["val"]["loss"].append(epoch_val_loss)
    #history["val"]["acc"].append(epoch_val_error)

    elapsed_time = time.time() - start_time

    print('-' * 80)
    print(f'Epoch: {epoch+1:03}/{epochs} | Time: {elapsed_time:.4f}s | Train loss: {epoch_train_loss:.4f} | Exactitud {epoch_train_correct, epoch_train_total}| Precision: {precision_train} | Recall: {recall_train} | F1: {f1_train} | Dev loss: {epoch_val_loss:.4f} | Exactitud {epoch_val_correct, epoch_val_total}| Precision: {precision_test} | Recall: {recall_test} | F1: {f1_test} |')
    train_error.append(history["train"]["loss"][-1])
    val_error.append(history["val"]["loss"][-1])

In [None]:
torch.save(model.state_dict(),"clasificador_reentrenado")

Lectura y carga de bloques para el modelo de deteccion

In [None]:
#earthquake_signals = 'array_earthquake_200000_249999'
#labels_signal = 'labels_earthquake_200000_249999'
#x = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals}.npy')
#y = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal}.npy')
earthquake_signals = ['array_earthquake_0_49999', 'array_earthquake_50000_99999']
labels_signal = ['labels_earthquake_0_49999', 'labels_earthquake_50000_99999']
for i in range(len(earthquake_signals)):
  earthquake = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals[i]}.npy')
  label = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal[i]}.npy')
  if i==0:
    x = earthquake
    y = label
    print(i)
  else:
    x = np.concatenate((x, earthquake))
    y = np.concatenate((y, label))
    print(i)

In [None]:
x.shape, y.shape

In [None]:
eventos_elegidos = df_event[df_event['trace_name'].isin(y)]
eventos_elegidos

In [None]:
p_wave = eventos_elegidos['p_arrival_sample'].to_numpy()
s_wave = eventos_elegidos['s_arrival_sample'].to_numpy()

In [None]:
x = torch.tensor(x, dtype=torch.float32)
x = torch.transpose(x,1,2)
y = torch.tensor(p_wave).long()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=1)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=1)

In [None]:
batch = 128
x_train = DataLoader(x_train, batch_size=batch, shuffle=False)
y_train = DataLoader(y_train, batch_size=batch, shuffle=False)
x_val = DataLoader(x_val, batch_size=batch, shuffle=False)
y_val = DataLoader(y_val, batch_size=batch, shuffle=False)
x_test = DataLoader(x_test, batch_size=batch, shuffle=False)
y_test = DataLoader(y_test, batch_size=batch, shuffle=False)

Modelo de deteccion de ondas

In [None]:
class LayerNorm2d(nn.Module):
    def __init__(self, dim):
        super(LayerNorm2d, self).__init__()
        self.gamma = nn.Parameter(torch.ones(1, dim, 1, 1))
        self.beta = nn.Parameter(torch.zeros(1, dim, 1, 1))

    def forward(self, x):
        mean = x.mean(dim=[2, 3], keepdim=True)
        var = x.var(dim=[2, 3], keepdim=True)
        x = (x - mean) / (torch.sqrt(var + 1e-8))
        return x * self.gamma + self.beta

# downsample module
class Downsample_Module(nn.Module):
    def __init__(self, in_chans=3, win_size=7, dsr=2, embed_dim=64):
        super(Downsample_Module, self).__init__()
        self.emb = nn.Conv2d(in_chans, embed_dim, kernel_size=(win_size, 1), stride=(dsr, 1), padding=(win_size // 2, 0))
        self.bn = LayerNorm2d(embed_dim)

    def forward(self, x):
        x = self.emb(x)
        return self.bn(x)

# attention module
class Attention_Module(nn.Module):
    def __init__(self, dim):
        super(Attention_Module, self).__init__()
        self.conv1 = nn.Conv2d(dim, dim, kernel_size=(9, 1), stride=(1, 1), padding=(4, 0), groups=dim)
        self.conv_sequential = nn.Conv2d(dim, dim, kernel_size=(11, 1), stride=(1, 1), padding=(5, 0), groups=dim)
        self.conv2 = nn.Conv2d(dim, dim, kernel_size=(1, 1), stride=(1, 1))

    def forward(self, x):
        iden = x.clone()
        attn = self.conv2(self.conv_sequential(self.conv1(x)))
        return iden * attn

# upsampling module
class UPsample_Module(nn.Module):
    def __init__(self, dim, out_dim, k_size=7, stride=(2, 1), last=False):
        super(UPsample_Module, self).__init__()
        self.last = last
        if last == 2:
            self.up = nn.Sequential(
                    nn.ConvTranspose2d(dim, dim, kernel_size=(k_size, 1), stride=(stride,1), padding=((k_size - stride + 1)//2, 0),
                                       groups=1, output_padding=(0,0)),
                    nn.Conv2d(dim, out_dim, kernel_size=(1, 1)),
                    LayerNorm2d(out_dim),
                    nn.GELU(approximate='tanh')
                ) 
        elif self.last == 3:
            self.up = nn.Sequential(
                nn.ConvTranspose2d(dim, dim, kernel_size=(k_size, 1), stride=(stride,1), padding=((k_size - stride + 1)//2, 0),
                                   groups=1, output_padding=(1,0)),
                nn.Conv2d(dim, out_dim, kernel_size=(1, 1)),
                nn.ReLU()
            )   
        else:
            self.up = nn.Sequential(
                    nn.ConvTranspose2d(dim, dim, kernel_size=(k_size, 1), stride=(stride,1), padding=((k_size - stride + 1)//2, 0),
                                       groups=1, output_padding=(1,0)),
                    nn.Conv2d(dim, out_dim, kernel_size=(1, 1)),
                    LayerNorm2d(out_dim),
                    nn.GELU(approximate='tanh')
                )  
            
    def forward(self, x, cat_x=None):
        x = self.up(x)
        if cat_x is not None:
            x = torch.cat([x, cat_x], dim=1)
        return x

class SAN_Block(nn.Module):
    def __init__(self, dim, cff_hid=None, cff_out=None):
        super(SAN_Block, self).__init__()
        self.proj_1 = nn.Conv2d(dim, dim, kernel_size=(1, 1))
        self.bn1 = LayerNorm2d(dim)
        self.act1 = nn.GELU(approximate='tanh')

        self.attn = Attention_Module(dim)

        self.proj_2 = nn.Conv2d(dim, dim, kernel_size=(1, 1))
        self.bn2 = LayerNorm2d(dim)

    def forward(self, x):
        iden = x.clone()
        x = self.act1(self.bn1(self.proj_1(x)))
        x = self.attn(x)
        x = self.bn2(self.proj_2(x))
        return iden + x

class SAN(nn.Module):
    def __init__(self, stage_blocks=[2, 2, 2, 2], downsampling_dims=[8, 16, 32, 64], down_strides=[4, 3, 2, 2],
                 upsampling_dims=[32, 16, 8, 1]):
        super(SAN, self).__init__()
        self.num_stages = len(stage_blocks)
        for i in range(self.num_stages):
            down = Downsample_Module(
                in_chans=3 if i == 0 else downsampling_dims[i - 1],
                win_size=7,
                dsr=down_strides[i],
                embed_dim=downsampling_dims[i]
            )
            blocks = nn.ModuleList(
                [SAN_Block(downsampling_dims[i], cff_hid=downsampling_dims[i] * 4) for _ in range(stage_blocks[i])]
            )
            norm = LayerNorm2d(downsampling_dims[i])
            setattr(self, f"down_{i + 1}", down)
            setattr(self, f"blocks_{i + 1}", blocks)
            setattr(self, f"norm_{i + 1}", norm)

        for task in ['p']:
            cur_dim = downsampling_dims[-1]
            for j in range(self.num_stages - 1):
                out_dim = upsampling_dims[j]
                cat_dim = downsampling_dims[-j - 2]
                setattr(self, f'{task}_up_{j + 1}', UPsample_Module(cur_dim, out_dim, stride=down_strides[-j - 1], last=j))
                cur_dim = out_dim + cat_dim
            setattr(self, f'{task}_up_{self.num_stages}', UPsample_Module(cur_dim, upsampling_dims[-1], stride=down_strides[0], last=3))

    def forward_features(self, x):
        feats = []
        for i in range(self.num_stages):
            win_embed = getattr(self, f"down_{i + 1}")
            blocks = getattr(self, f"blocks_{i + 1}")
            norm = getattr(self, f"norm_{i + 1}")
            x = win_embed(x)
            for block in blocks:
                x = block(x)
            x = norm(x)
            feats.append(x)
        return feats

    def forward(self, x):
        x = x.view(x.shape[0], x.shape[1], x.shape[2], 1)
        feats = self.forward_features(x)
        pred = feats[-1]
        for j in range(self.num_stages - 1):
            pred = getattr(self, f'p_up_{j + 1}')(pred, feats[-j - 2])
        pred = getattr(self, f'p_up_{self.num_stages}')(pred)
        return pred.view((x.shape[0],6000))

Entrenamiento del modelo de deteccion

In [None]:
import time

model = SAN(stage_blocks = [4,4,4,4], downsampling_dims = [12, 24, 36, 48], upsampling_dims = [36, 24, 12, 1])

# Definir la función de pérdida y el optimizador
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 10
softmax = nn.Softmax(dim=1)
train_error = []
val_error = []

history = {"train": {"loss": [], "acc": []}, "val": {"loss": [], "acc": []}}

for epoch in range(0, epochs):
    start_time = time.time()
    train_loss=0
    correct = 0
    total = 0
    error = 0
    positive = 0
    TP_TRAIN = 0
    TN_TRAIN = 0
    FP_TRAIN = 0
    FN_TRAIN = 0
    model.train()
    for b, (inputs, labels) in enumerate(zip(x_train,y_train)):
            b+=1
            optimizer.zero_grad()
            # forward
            outputs = model(inputs)

            value, predicted = torch.max(softmax(outputs), 1)
            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TRAIN += sum([1 if (i>=0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TRAIN += sum([1 if (i>=0.05 and j>10) else 0 for i, j in zip(value, delta)])
            FN_TRAIN += sum([1 if (i<0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TRAIN += sum([1 if (i<0.05 and j>10) else 0 for i, j in zip(value, delta)])

             # Calculate the loss
            loss = criterion(outputs, labels)
            train_loss += loss
            # Backward pass through the model and update the parameters
            loss.backward()
            torch.cuda.empty_cache()  # Liberar memoria GPU
            optimizer.step()
    epoch_train_loss = train_loss/b
    epoch_train_error = error/total
    epoch_train_correct = correct
    epoch_train_total = total
    precision_train = TP_TRAIN/(TP_TRAIN+FP_TRAIN+0.001)
    recall_train = TP_TRAIN/(TP_TRAIN+FN_TRAIN+0.001)
    f1_train = TP_TRAIN/(TP_TRAIN+0.5*(FP_TRAIN + FN_TRAIN+0.001))

    # Validation Loop
    model.eval()
    with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        for b, (inputs, labels) in enumerate(zip(x_val,y_val)):
            b+=1
            predicted_outputs = model(inputs)
            value, predicted = torch.max(softmax(predicted_outputs), 1)

            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TEST += sum([1 if (i>=0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TEST += sum([1 if (i>=0.05 and j>10) else 0 for i, j in zip(value, delta)])
            FN_TEST += sum([1 if (i<0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TEST += sum([1 if (i<0.05 and j>10) else 0 for i, j in zip(value, delta)])

            #loss
            test_loss = criterion(predicted_outputs, labels)
            val_loss += test_loss
        epoch_val_loss = val_loss/b
        epoch_val_error = error/total
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST+0.001)
        recall_test = TP_TEST/(TP_TEST+FN_TEST+0.001)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST+0.001))

    history["train"]["loss"].append(epoch_train_loss)
    history["train"]["acc"].append(epoch_train_error)
    history["val"]["loss"].append(epoch_val_loss)
    history["val"]["acc"].append(epoch_val_error)

    elapsed_time = time.time() - start_time

    print('-' * 80)
    print(f'Epoch: {epoch+1:03}/{epochs} | Time: {elapsed_time:.4f}s | Train loss: {epoch_train_loss:.4f} | Train error: {epoch_train_error:.4f} | Exactitud {epoch_train_correct, epoch_train_total}| Precision: {precision_train} | Recall: {recall_train} | F1: {f1_train} | Dev loss: {epoch_val_loss:.4f} | val error: {epoch_val_error:.4f} | Exactitud {epoch_val_correct, epoch_val_total}| Precision: {precision_test} | Recall: {recall_test} | F1: {f1_test} |')
    train_error.append(history["train"]["loss"][-1])
    val_error.append(history["val"]["loss"][-1])

In [None]:
(TP_TRAIN, TN_TRAIN, FP_TRAIN, FN_TRAIN), (TP_TEST, TN_TEST, FP_TEST, FN_TEST)

In [None]:
error = []
label = []
with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        media = torch.tensor([])
        for b, (inputs, labels) in enumerate(zip(x_test,y_test)):
            b+=1
            predicted_outputs = model(inputs)
            value, predicted = torch.max(softmax(predicted_outputs), 1)

            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()
            media = torch.cat((media, torch.abs(predicted-labels).float()),0)
            
            #Metricas de desempeño:
            TP_TEST += sum([1 if (i>=0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TEST += sum([1 if (i>=0.05 and j>10) else 0 for i, j in zip(value, delta)])
            FN_TEST += sum([1 if (i<0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TEST += sum([1 if (i<0.05 and j>10) else 0 for i, j in zip(value, delta)])
            
            #loss
            test_loss = criterion(predicted_outputs, labels)
            val_loss += test_loss
        epoch_val_loss = val_loss/b
        epoch_val_error = error/total
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST)
        recall_test = TP_TEST/(TP_TEST+FN_TEST)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST))

print(epoch_val_error, epoch_val_correct,  epoch_val_total, precision_test, recall_test, f1_test)

In [None]:
valores = []
for i in media:
  if i <= 50:
    valores.append(i.item())

In [None]:
valores = np.array(valores)
valores.shape

In [None]:
np.mean(valores), np.std(valores)

Visualización

In [None]:
i=0
# Create Plot
ventana = 100
fig, ax1 = plt.subplots()

ax1.set_xlabel('Time samples')
ax1.set_ylabel('Output', color = 'red')
a = predicted_outputs[i].detach().numpy()[labels[i]-ventana:labels[i]+ventana]
ax1.plot(np.arange(labels[i]-ventana, labels[i]+ventana),a , color = 'red')
ax1.tick_params(axis ='y', labelcolor = 'red')

# Adding Twin Axes

ax2 = ax1.twinx()

ax2.set_ylabel('Amplitude', color = 'blue')
ax2.plot(np.arange(labels[i]-ventana, labels[i]+ventana), inputs[i][0].detach().numpy()[labels[i]-ventana:labels[i]+ventana], color = 'blue')
ax2.tick_params(axis ='y', labelcolor = 'blue')

# Show plot
plt.axvline(labels[i], color='blue', linestyle='--', label='Línea Vertical')
plt.axvline(predicted[i], color='red', linestyle='--', label='Línea Vertical')

plt.show()

In [None]:
torch.save(model.state_dict(), 'detector_onda_p_1.pt')

Reentrenamiento del modelo de deteccion de fases

In [None]:
#earthquake_signals = 'array_earthquake_200000_249999'
#labels_signal = 'labels_earthquake_200000_249999'
#x = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals}.npy')
#y = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal}.npy')
earthquake_signals = ['array_earthquake_200000_249999', 'array_earthquake_250000_299999']
labels_signal = ['labels_earthquake_200000_249999', 'labels_earthquake_250000_299999']
for i in range(len(earthquake_signals)):
  earthquake = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{earthquake_signals[i]}.npy')
  label = np.load(f'C:/Users/Vic_l/Documents/Maestría en Ciencias de la Computación/Tesis/dataset_mezclado/{labels_signal[i]}.npy')
  if i==0:
    x = earthquake
    y = label
    print(i)
  else:
    x = np.concatenate((x, earthquake))
    y = np.concatenate((y, label))
    print(i)

In [None]:
x.shape, y.shape

In [None]:
eventos_elegidos = df_event[df_event['trace_name'].isin(y)]
eventos_elegidos

In [None]:
p_wave = eventos_elegidos['p_arrival_sample'].to_numpy()
s_wave = eventos_elegidos['s_arrival_sample'].to_numpy()

In [None]:
x = torch.tensor(x, dtype=torch.float32)
x = torch.transpose(x,1,2)
y = torch.tensor(p_wave).long()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=1)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=1)

In [None]:
batch = 128
x_train = DataLoader(x_train, batch_size=batch, shuffle=False)
y_train = DataLoader(y_train, batch_size=batch, shuffle=False)
x_val = DataLoader(x_val, batch_size=batch, shuffle=False)
y_val = DataLoader(y_val, batch_size=batch, shuffle=False)
x_test = DataLoader(x_test, batch_size=batch, shuffle=False)
y_test = DataLoader(y_test, batch_size=batch, shuffle=False)

In [None]:
# Cargar el modelo
model = SAN(stage_blocks = [4,4,4,4], downsampling_dims = [12, 24, 36, 48], upsampling_dims = [36, 24, 12, 1])

# Cargar los pesos del modelo guardado
checkpoint = torch.load("detector_onda_p_1.pt")
model.load_state_dict(checkpoint)

import time

# Definir la función de pérdida y el optimizador
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)

epochs = 10
softmax = nn.Softmax(dim=1)
train_error = []
val_error = []

history = {"train": {"loss": [], "acc": []}, "val": {"loss": [], "acc": []}}

for epoch in range(0, epochs):
    start_time = time.time()
    train_loss=0
    correct = 0
    total = 0
    error = 0
    positive = 0
    TP_TRAIN = 0
    TN_TRAIN = 0
    FP_TRAIN = 0
    FN_TRAIN = 0
    model.train()
    for b, (inputs, labels) in enumerate(zip(x_train,y_train)):
            b+=1
            optimizer.zero_grad()
            # forward
            outputs = model(inputs)

            value, predicted = torch.max(softmax(outputs), 1)
            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TRAIN += sum([1 if (i>=0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TRAIN += sum([1 if (i>=0.05 and j>10) else 0 for i, j in zip(value, delta)])
            FN_TRAIN += sum([1 if (i<0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TRAIN += sum([1 if (i<0.05 and j>10) else 0 for i, j in zip(value, delta)])

             # Calculate the loss
            loss = criterion(outputs, labels)
            train_loss += loss
            # Backward pass through the model and update the parameters
            loss.backward()
            torch.cuda.empty_cache()  # Liberar memoria GPU
            optimizer.step()
    epoch_train_loss = train_loss/b
    epoch_train_error = error/total
    epoch_train_correct = correct
    epoch_train_total = total
    precision_train = TP_TRAIN/(TP_TRAIN+FP_TRAIN)
    recall_train = TP_TRAIN/(TP_TRAIN+FN_TRAIN)
    f1_train = TP_TRAIN/(TP_TRAIN+0.5*(FP_TRAIN + FN_TRAIN))

    # Validation Loop
    model.eval()
    with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        for b, (inputs, labels) in enumerate(zip(x_val,y_val)):
            b+=1
            predicted_outputs = model(inputs)
            value, predicted = torch.max(softmax(predicted_outputs), 1)

            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()

            #Metricas de desempeño:
            TP_TEST += sum([1 if (i>=0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TEST += sum([1 if (i>=0.05 and j>10) else 0 for i, j in zip(value, delta)])
            FN_TEST += sum([1 if (i<0.05 and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TEST += sum([1 if (i<0.05 and j>10) else 0 for i, j in zip(value, delta)])

            #loss
            test_loss = criterion(predicted_outputs, labels)
            val_loss += test_loss
        epoch_val_loss = val_loss/b
        epoch_val_error = error/total
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST)
        recall_test = TP_TEST/(TP_TEST+FN_TEST)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST))

    history["train"]["loss"].append(epoch_train_loss)
    history["train"]["acc"].append(epoch_train_error)
    history["val"]["loss"].append(epoch_val_loss)
    history["val"]["acc"].append(epoch_val_error)

    elapsed_time = time.time() - start_time

    print('-' * 80)
    print(f'Epoch: {epoch+1:03}/{epochs} | Time: {elapsed_time:.4f}s | Train loss: {epoch_train_loss:.4f} | Train error: {epoch_train_error:.4f} | Exactitud {epoch_train_correct, epoch_train_total}| Precision: {precision_train} | Recall: {recall_train} | F1: {f1_train} | Dev loss: {epoch_val_loss:.4f} | val error: {epoch_val_error:.4f} | Exactitud {epoch_val_correct, epoch_val_total}| Precision: {precision_test} | Recall: {recall_test} | F1: {f1_test} |')
    train_error.append(history["train"]["loss"][-1])
    val_error.append(history["val"]["loss"][-1])

In [None]:
torch.save(model.state_dict(), 'detector_onda_p_2.pt')

In [None]:
(TP_TRAIN, TN_TRAIN, FP_TRAIN, FN_TRAIN), (TP_TEST, TN_TEST, FP_TEST, FN_TEST)

In [None]:
error = []
label = []
threshold=0.05
with torch.no_grad():
        val_loss = 0
        correct=0
        total=0
        error = 0
        positive = 0
        TP_TEST = 0
        TN_TEST = 0
        FP_TEST = 0
        FN_TEST = 0
        media = torch.tensor([])
        for b, (inputs, labels) in enumerate(zip(x_test,y_test)):
            b+=1
            predicted_outputs = model(inputs)
            value, predicted = torch.max(softmax(predicted_outputs), 1)

            total += labels.size(0)
            delta = torch.abs(predicted-labels)
            error += torch.sum(delta.float()).item()
            correct += (predicted == labels).sum().item()
            media = torch.cat((media, torch.abs(predicted-labels).float()),0)
            
            #Metricas de desempeño:
            TP_TEST += sum([1 if (i>=threshold and j<=10) else 0 for i, j in zip(value, delta)])
            FP_TEST += sum([1 if (i>=threshold and j>10) else 0 for i, j in zip(value, delta)])
            FN_TEST += sum([1 if (i<threshold and j<=10) else 0 for i, j in zip(value, delta)])
            TN_TEST += sum([1 if (i<threshold and j>10) else 0 for i, j in zip(value, delta)])
            
            #loss
            test_loss = criterion(predicted_outputs, labels)
            val_loss += test_loss
        epoch_val_loss = val_loss/b
        epoch_val_error = error/total
        epoch_val_correct = correct
        epoch_val_total = total
        precision_test = TP_TEST/(TP_TEST+FP_TEST)
        recall_test = TP_TEST/(TP_TEST+FN_TEST)
        f1_test = TP_TEST/(TP_TEST+0.5*(FP_TEST + FN_TEST))

print(epoch_val_error, epoch_val_correct,  epoch_val_total, precision_test, recall_test, f1_test)

In [None]:
valores = []
for i in media:
  if i <= 50:
    valores.append(i.item())

In [None]:
valores = np.array(valores)
valores.shape

In [None]:
np.mean(valores), np.std(valores)

In [None]:
predicted-labels

In [None]:
predicted_outputs =softmax(predicted_outputs)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
# Apply the default theme
sns.set_theme(style="white")
i = 4
softmax1 = nn.Softmax(dim=0)
# Create Plot
fig, ax1 = plt.subplots(figsize=(10, 3))
ax1.set_xlabel('Tiempo')
ax1.set_ylabel('Amplitude de la salida', color='black')
so = softmax1(predicted_outputs[i])
a = so.detach().numpy()
ax1.plot(np.arange(0, 6000), a, color='red', label='Predicción')

# Adding Twin Axes
ax2 = ax1.twinx()
ax2.set_ylabel('Amplitud de la señal', color='black')
ax2.plot(np.arange(0,6000), inputs[i][0].detach().numpy(), color='black', label='Señal',linewidth=0.5)

# Vertical lines
plt.axvline(labels[i], color='blue', linestyle='--', label='Etiqueta real')
plt.axvline(predicted[i], color='red', linestyle='--', label='Etiqueta predicha')

# Display legend
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')

# Guardar la figura en un archivo (por ejemplo, en formato PNG)
plt.savefig('s_fn.png')

# Show plot
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

i = 4
ventana = 100
softmax1 = nn.Softmax(dim=0)
# Create Plot
fig, ax1 = plt.subplots()
ax1.set_xlabel('Tiempo')
ax1.set_ylabel('Amplitude de la salida', color='black')
so = softmax1(predicted_outputs[i])
a = so.detach().numpy()[labels[i]-ventana:labels[i]+ventana]
ax1.plot(np.arange(labels[i]-ventana, labels[i]+ventana), a, color='red', label='Predicción')

# Adding Twin Axes
ax2 = ax1.twinx()
ax2.set_ylabel('Amplitud de la señal', color='black')
ax2.plot(np.arange(labels[i]-ventana, labels[i]+ventana), inputs[i][0].detach().numpy()[labels[i]-ventana:labels[i]+ventana], color='black', label='Señal')

# Vertical lines
plt.axvline(labels[i], color='blue', linestyle='--', label='Etiqueta real')
plt.axvline(predicted[i], color='red', linestyle='--', label='Etiqueta predicha')

# Display legend
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')

# Guardar la figura en un archivo (por ejemplo, en formato PNG)
plt.savefig('s_fn_amp.png')

# Show plot
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Ejemplo de valores de la matriz de confusión
tp = 77671  # True Positive
tn = 4881  # True Negative
fp = 17296   # False Positive
fn = 3176  # False Negative

# Crear las listas de etiquetas reales y predichas
y_true = [1] * tp + [0] * tn + [0] * fp + [1] * fn
y_pred = [1] * tp + [0] * tn + [1] * fp + [0] * fn

# Crear la matriz de confusión
conf_matrix = confusion_matrix(y_true, y_pred)

# Mostrar la matriz de confusión con seaborn
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
plt.xlabel('Predicción')
plt.ylabel('Realidad')
plt.title('Matriz de Confusión')
# Guardar la figura en un archivo (por ejemplo, en formato PNG)
plt.savefig('matriz_confusion.png')

# Mostrar la figura
plt.show()
