Imports

In [1]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import obspy
from obspy import read
from obspy.signal.trigger import recursive_sta_lta, classic_sta_lta#, carl_sta_lta
from obspy.signal.trigger import plot_trigger, trigger_onset
from scipy.signal import butter, filtfilt, wiener, stft
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Input, Lambda, Concatenate
from tensorflow.keras import backend as K

Load and Preprocess the Data

In [None]:
def load_data(file_path):
    st = read(file_path)
    x = st[0].data
    arrival_time = st[0].stats.starttime.timestamp
    y = np.zeros_like(x)
    y[int(arrival_time)] = 1
    return x, y

def bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    y = filtfilt(b, a, data)
    return y

def preprocess_data(x, fs=100):
    x = bandpass_filter(x, 1.0, 10.0, fs)
    x = wiener(x)
    return x

Define the STA/LTA Algorithm

In [None]:
def apply_sta_lta(data, nsta, nlta, method='recursive'):
    if method == 'recursive':
        sta_lta = recursive_sta_lta(data, nsta, nlta)
    elif method == 'classic':
        sta_lta = classic_sta_lta(data, nsta, nlta)
    #elif method == 'carl':
        #sta_lta = carl_sta_lta(data, nsta, nlta)
    else:
        raise ValueError("Unsupported STA/LTA method")
    return sta_lta

def apply_stft(data, fs=100, nperseg=256):
    f, t, Zxx = stft(data, fs=fs, nperseg=nperseg)
    return np.abs(Zxx)



Define the 1D CNN Model with Learnable STA/LTA Windows

In [2]:
def create_model(input_shape):
    input_layer = Input(shape=input_shape)
    
    # Learnable STA/LTA windows
    nsta = Dense(1, activation='relu', name='nsta')(input_layer)
    nlta = Dense(1, activation='relu', name='nlta')(input_layer)
    
    # Apply STA/LTA
    sta_lta_layer = Lambda(lambda x: apply_sta_lta(x[0], K.cast(x[1], 'int32'), K.cast(x[2], 'int32')), name='sta_lta')([input_layer, nsta, nlta])

    # Apply STFT
    stft_layer = Lambda(lambda x: apply_stft(x), name='stft')(input_layer)

    # Combine STA/LTA and STFT
    combined = Concatenate()([sta_lta_layer, stft_layer])
    
    # CNN layers
    conv1 = Conv1D(16, kernel_size=3, activation='relu')(sta_lta_layer)
    #conv1 = Conv1D(16, kernel_size=3, activation='relu')(combined)
    pool1 = MaxPooling1D(pool_size=2)(conv1)
    conv2 = Conv1D(32, kernel_size=3, activation='relu')(pool1)
    pool2 = MaxPooling1D(pool_size=2)(conv2)
    flat = Flatten()(pool2)
    dense1 = Dense(64, activation='relu')(flat)
    output_layer = Dense(1, activation='sigmoid')(dense1)
    
    model = Model(inputs=input_layer, outputs=output_layer)
    return model

Adjust the Loss Function

In [None]:
def custom_loss(y_true, y_pred):
    loss = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    weight = tf.where(tf.equal(y_true, 1), 10.0, 1.0)
    return loss * weight

Train the Model

In [None]:
def train_model(model, x_train, y_train, epochs=50, batch_size=32):
    model.compile(optimizer='adam', loss=custom_loss, metrics=['accuracy'])
    model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2)

 Evaluate the Model

In [None]:
def evaluate_model(model, x_test, y_test):    
    # Use model.evaluate to get the loss and accuracy
    loss, accuracy = model.evaluate(x_test, y_test)
    print(f'Test Loss: {loss:.4f}')
    print(f'Test Accuracy: {accuracy:.4f}')
    
    # Predict the output for the test set
    y_pred = model.predict(x_test)
    y_pred = (y_pred > 0.5).astype(int)
    
    # Compute additional metrics
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    print(f'Test Precision: {precision:.4f}')
    print(f'Test Recall: {recall:.4f}')
    print(f'Test F1 Score: {f1:.4f}')

Main Script

In [None]:

# Load and preprocess data
x, y = load_data('data.mseed')
x = preprocess_data(x)

# Reshape data for CNN
x = x.reshape(-1, len(x), 1)
y = y.reshape(-1, len(y), 1)

# Split the data into training and testing sets
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

# Scale the data
scaler = StandardScaler()
x_train = scaler.fit_transform(x_train.reshape(-1, x_train.shape[-1])).reshape(x_train.shape)
x_test = scaler.transform(x_test.reshape(-1, x_test.shape[-1])).reshape(x_test.shape)

# Create and train the model
model = create_model(input_shape=(x_train.shape[1], 1))
train_model(model, x_train, y_train, epochs=50, batch_size=32)

# Evaluate the model
evaluate_model(model, x_test, y_test)