<a href="https://colab.research.google.com/github/cedamusk/final-year/blob/main/FinalProjectIteration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow obspy matplotlib scikit-learn

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
from obspy import UTCDateTime, Stream, Trace
from obspy.clients.fdsn import Client
from obspy.signal.trigger import classic_sta_lta
from obspy.signal.filter import bandpass
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (LSTM, Dense, Dropout, BatchNormalization,
                                     Bidirectional, SimpleRNN, GRU, Conv1D, MaxPooling1D)
from tensorflow.keras.callbacks import(
    EarlyStopping, ReduceLROnPlateau,
    ModelCheckpoint
)

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    classification_report, confusion_matrix,
    accuracy_score, roc_auc_score, roc_curve,
    precision_recall_curve
)
import seaborn as sns
import tensorflow as tf


In [None]:
def create_output_directory():
  """Create a directory for saving plots and models"""
  base_dir='seismic_detection_outputs'
  os.makedirs(base_dir, exist_ok=True)
  return base_dir

In [None]:
def fetch_iris_data(network, station, location, channel, origin_time, duration=120):
  """Fetch Seismic data from the IRIS database"""
  try:
    client=Client("IRIS")
    start_time=origin_time
    end_time=origin_time+duration

    stream=client.get_waveforms(network, station, location, channel, start_time, end_time)
    return stream
  except Exception as e:
    print(f"Error fetching data from IRIS:{str(e)}")
    return None


In [None]:
def generate_synthetic_data(num_samples, sample_rate, event_duration, noise_level, num_events=3):
  """Generate synthetic data with multiple events"""
  time=np.arange(num_samples)/ sample_rate
  background=np.random.normal(0, noise_level, num_samples)
  events=np.zeros(num_samples)
  event_locations=[]

  for _ in range(np.random.randint(1, num_events+1)):
    event_start=np.random.randint(num_samples // 8, num_samples*7//8)
    event_end=event_start + int(event_duration*sample_rate)
    event_locations.append((event_start, event_end))

    #Create more realistic events with multiple frequency components
    freq1=np.random.uniform(3,8)
    freq2=np.random.uniform(10, 20)
    amp1=np.random.uniform(1.0, 2.0)
    amp2=np.random.uniform(0.5, 1.5)
    decay1=np.random.uniform(0.1, 0.3)
    decay2=np.random.uniform(0.2, 0.4)

    event_time=time[event_start:event_end]-time[event_start]
    event=(
        amp1*np.sin(2*np.pi*freq1*event_time)*np.exp(-event_time/decay1)+
        amp2*np.sin(2*np.pi*freq2*event_time)*np.exp(-event_time/decay2)
    )
    events[event_start:event_end]=event
  data=background+events
  return data, events, event_locations

In [None]:
def process_stream(stream, freqmin=0.5, freqmax=20):
  """Enhanced stream processing with more robust filtering"""
  processed_stream=stream.copy()
  processed_stream.detrend('linear')
  processed_stream.taper(max_percentage=0.1)
  processed_stream.filter('bandpass', freqmin=freqmin, freqmax=freqmax,
                          corners=6, zerophase=True)

  #Add STA/LTA trigger for additional event detection
  trace=processed_stream[0]
  cft=classic_sta_lta(trace.data, int(0.5*trace.stats.sampling_rate),
                      int(10*trace.stats.sampling_rate))
  return processed_stream, cft

In [None]:
def create_enhanced_windows(data, window_size, step, event_locations=None, cft=None):
  """Create window with advanced feature engineering and labelling"""
  windows=[]
  labels=[]

  #add STA/LTA trigger-based event detection if available
  def detect_event_by_trigger(window_start, window_end, cft=None):
    if cft is not None:
      window_cft=cft[window_start:window_end]
      return np.max(window_cft)>3.0 #Adjust threshold as needed
    return False

  for i in range(0, len(data)-window_size+1, step):
    window=data[i:i+window_size]
    freq_features=np.fft.fft(window)[:window_size//2].real
    freq_features_padded=np.pad(freq_features, (0, window_size-len(freq_features)), 'constant')

    #Advanced feature engineering
    window_features=np.column_stack([
        window, #Raw signal
        np.abs(window), #Absolute amplitude
        np.gradient(window), #First deravitive
        np.gradient(np.gradient(window)), #Second derivative
        np.log1p(np.abs(window)), #Log of absolute amplitude
        freq_features_padded, #Padded frequency domain features
    ])

    #Labelling logic with multiple detection methods
    label=0
    if event_locations:
      #Check if window contains an event from synthetic data
      for start, end in event_locations:
        if (i<=end and i +window_size >= start):
          label=1
          break

    #Additional event detection using STA/LTA
    if label==0 and detect_event_by_trigger(i, i +window_size, cft):
      label=1

    #Fall back: statistical anomaly detection
    if label==0:
      window_mean=np.abs(window).mean()
      data_std=np.abs(data).std()
      label=1 if window_mean> data_std *2.5 else 0

    windows.append(window_features)
    labels.append(label)
  return np.array (windows), np.array(labels)

In [None]:
def build_advanced_rnn_model(input_shape):
  """Enhanced RNN model with CNN-RNN hybrid architecture"""
  model= Sequential([
      #Convolutional layer for feature extraction
      Conv1D(64, kernel_size=5, activation='relu', input_shape=input_shape,
             kernel_regularizer=l2(0.001)),
      MaxPooling1D(pool_size=2),
      BatchNormalization(),


  #Bidirectional RNN layers with increased complexity
  Bidirectional(GRU(128, return_sequences=True,
                    kernel_regularizer=l2(0.001))),
  BatchNormalization(),
  Dropout(0.4),

  Bidirectional(LSTM(96, return_sequences=True,
                     kernel_regularizer=l2(0.001))),
  BatchNormalization(),
  Dropout(0.4),

  #Additional RNN layer
  SimpleRNN(64, kernel_regularizer=l2(0.001)),
  BatchNormalization(),
  Dropout(0.3),

  #Dense layers for classification with increased regularization
  Dense(128, activation='relu', kernel_regularizer=l2(0.002)),
  BatchNormalization(),
  Dropout(0.3),

  Dense(64, activation='relu', kernel_regularizer=l2(0.002)),
  Dense(1, activation='sigmoid')

  ])

  #More aggresive optimizer configuration
  optimizer=Adam(
      learning_rate=0.0003,
      beta_1=0.9,
      beta_2=0.999
  )

  model.compile(
      optimizer=optimizer,
      loss='binary_crossentropy',
      metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]

  )

  return model

In [None]:
def train_and_evaluate_model(X, y, base_dir):
  """Comprehensive model training with cross-validation"""
  #Use StandardScaler for better feature scaling
  scaler=StandardScaler()
  X_scaled=scaler.fit_transform(X.reshape(-1, X.shape[-1])).reshape(X.shape)

  #Stratified K-Fold cross validation with more splits
  kfold=StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

  #Tracking metrics
  cv_scores={
      'accuracy':[], 'precision':[],
      'recall':[], 'f1':[], 'auc':[]
  }

  #Model checkpoint directory
  model_dir=os.path.join(base_dir, 'best_models')
  os.makedirs(model_dir, exist_ok=True)

  for fold, (train_idx, val_idx) in enumerate(kfold.split(X_scaled, y), 1):
    print(f"\n=== Fold {fold}===")

    X_train, X_val=X_scaled[train_idx], X_scaled[val_idx]
    y_train, y_val=y[train_idx], y[val_idx]

    #Reset model for each fold
    model=build_advanced_rnn_model((X.shape[1], X.shape[2]))

    #Improved callbacks
    callbacks=[
        EarlyStopping(
            monitor='val_loss',
            patience=20,
            restore_best_weights=True
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.3,
            patience=10,
            min_lr=1e-6
        ),
        ModelCheckpoint(
            filepath=os.path.join(model_dir, f'best_model_fold_{fold}.keras'),
            monitor='val_recall',
            mode='max',
            save_best_only=True
        )
    ]

    #Train model with more aggressive class weighting
    history=model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=150,
        batch_size=32,
        callbacks=callbacks,
        class_weight={0:1., 1: 3.}, #Stronger emphasis on event class
        verbose=1
    )

    #Predictions and evaluation
    predictions=model.predict(X_val).flatten()
    binary_predictions=(predictions >0.5).astype(int)

    #compute metriics
    accuracy=accuracy_score(y_val, binary_predictions)
    precision=precision_score(y_val, binary_predictions)
    recall=recall_score(y_val, binary_predictions)
    f1=f1_score(y_val, binary_predictions)
    auc=roc_auc_score(y_val, predictions)

    cv_scores['accuracy'].append(accuracy)
    cv_scores['precision'].append(precision)
    cv_scores['recall'].append(recall)
    cv_scores['f1'].append(f1)
    cv_scores['auc'].append(auc)

    #Print detailed classification report
    print("\nClassification Report:")
    print(classification_report(y_val, binary_predictions))

    #Plot ROC curve
    plot_roc_curve(y_val, predictions, base_dir, fold)
    plot_training_history(history, base_dir)
    plot_confusion_matrix(y_val, binary_predictions, base_dir)
    plot_precision_recall_curve(y_val, predictions, base_dir)

  #Print average cross-validation scores
  print("\n=== Cross-Validation Results ===")
  for metric, scores in cv_scores.items():
    print(f"{metric.capitalize()}: {np.mean(scores):.4f} (±{np.std(scores):.4f})")

  return model, cv_scores


In [None]:
def plot_roc_curve(y_true, y_scores, base_dir, fold):
  """Plot and save ROC curve"""
  fpr, tpr, thresholds=roc_curve(y_true, y_scores)

  plt.figure(figsize=(8,6))
  plt.plot(fpr, tpr, color='blue', label='ROC curve')
  plt.plot([0,1], [0,1], color='red', linestyle='--', label='Random Classifier')
  plt.xlim([0.0, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title(f'Receiver Operating Characteristic- Fold{fold}')
  plt.legend(loc='lower right')

  #Save plot
  plt.savefig(os.path.join(base_dir, f'roc_curve_fold_{fold}.png'))
  plt.close()

In [None]:
def plot_training_history(history, base_dir):
  """Plot Training and validation metrics"""
  plt.figure(figsize=(15, 10))

  plt.subplot(2,2,1)
  plt.plot(history.history['loss'], label='Training Loss')
  plt.plot(history.history['val_loss'], label='Validation Loss')
  plt.title("Model Loss")
  plt.xlabel('Epoch')
  plt.ylabel('loss')
  plt.legend()

  plt.subplot(2,2,2)
  plt.plot(history.history['accuracy'], label='Training Accuracy')
  plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
  plt.title("Model Accuracy")
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.legend()

  plt.tight_layout()
  plt.savefig(os.path.join(base_dir, 'training_metric.png'))
  plt.close()

In [None]:
def plot_confusion_matrix(y_true, y_pred, base_dir):
  cm=confusion_matrix(y_true, y_pred)
  plt.figure(figsize=(8,6))
  sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
              xticklabels=['No Event', 'Event'],
              yticklabels=['No Event', 'Event'])
  plt.title('Confusion Matrix')
  plt.xlabel('Predicted Label')
  plt.ylabel('True label')
  plt.tight_layout()
  plt.savefig(os.path.join(base_dir, 'confusion_matrix.png'))
  plt.close()



In [None]:
def plot_precision_recall_curve(y_true, y_scores, base_dir):
  precision, recall, _=precision_recall_curve(y_true, y_scores)

  plt.figure(figsize=(8,6))
  plt.plot(recall, precision, color='blue', label='Precision-Recall Curve')
  plt.title('Precision-Recall curve')
  plt.xlabel('Recall')
  plt.ylabel('Precision')
  plt.legend()
  plt.tight_layout()
  plt.savefig(os.path.join(base_dir, 'precision_recall_curve.png'))
  plt.close()

In [None]:
def plot_seismic_data(real_data, synthetic_data, combined_data, base_dir):
  plt.figure(figsize=(15, 10))

  plt.subplot(3,1,1)
  plt.plot(real_data)
  plt.title('Real Seismic Data')
  plt.xlabel('Samples')
  plt.ylabel('Amplitude')

  plt.subplot(3,1,2)
  plt.plot(synthetic_data)
  plt.title('Synthetic Seismic Data')
  plt.xlabel('Sample')
  plt.ylabel('Amplitude')

  plt.subplot(3,1,3)
  plt.plot(combined_data)
  plt.title('Combined Seismic Data')
  plt.xlabel('Sample')
  plt.ylabel('Amplitude')

  plt.tight_layout()
  plt.savefig(os.path.join(base_dir, "seismic_data_comparison.png"))
  plt.close()

In [None]:
def main():
    # Configuration
    config = {
        'window_size': 250,
        'step': 25,
        'sample_rate': 100,
        'num_samples': 25000
    }

    # Create output directory
    base_dir = create_output_directory()

    # Fetch real earthquake data
    origin_time = UTCDateTime("2015-08-11T16:22:15.200000")
    real_stream = fetch_iris_data(
        network='YS', station='BAOP', location='',
        channel='BHZ', origin_time=origin_time, duration=120
    )

    # Process real and synthetic data
    if real_stream is not None:
        filtered_real_stream, real_cft = process_stream(real_stream)
        real_data = filtered_real_stream[0].data
        sample_rate = real_stream[0].stats.sampling_rate
    else:
        # Fallback to default values if no real data
        real_data = np.zeros(config['num_samples'])
        sample_rate = config['sample_rate']
        real_cft = None

    # Generate synthetic data
    synthetic_data, synthetic_events, synthetic_event_locations = generate_synthetic_data(
        len(real_data),
        sample_rate,
        event_duration=2,
        noise_level=0.1
    )

    # Combine datasets
    combined_data = real_data + synthetic_data

    # Create windows and labels
    X, y = create_enhanced_windows(
        combined_data,
        config['window_size'],
        config['step'],
        synthetic_event_locations,
        real_cft
    )

    # Train and evaluate model
    trained_model, cv_scores = train_and_evaluate_model(X, y, base_dir)
    plot_seismic_data(real_data, synthetic_data, combined_data, base_dir)

    print("\nModel training and evaluation complete. Check the output directory for visualizations.")

In [None]:
if __name__=="__main__":
  main()