<a href="https://colab.research.google.com/github/cedamusk/AI-N-ML/blob/main/Seismic_LSTM_Earthquake_Signal_prediction_with_Synthetic_data_and_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install scikit-learn numpy pandas matplotlib scipy

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import matplotlib.pyplot as plt
from scipy.stats import skew, kurtosis
from scipy.signal import find_peaks, welch
import warnings
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from scipy.signal import stft
from scipy.stats import entropy
from typing import Tuple, List, Optional
from dataclasses import dataclass
from torch.nn import functional as F


In [None]:
def generate_synthetic_data(n_samples: int = 2000) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Generate synthetic earthquake data with main shocks and aftershocks

    Args:
        n_samples: Number of time points to generate

    Returns:
        Tuple containing:
        - time: Array of time points
        - signal: Combined signal (background + earthquakes + aftershocks)
        - earthquakes: Main earthquake signals
        - aftershocks: Aftershock signals
    """
    np.random.seed(42)
    time = np.linspace(0, 200, n_samples)

    # Generate complex background noise
    background = (
        np.random.normal(0, 0.03, n_samples) +
        0.02 * np.sin(2 * np.pi * time / 20) +  # Daily variation
        0.01 * np.sin(2 * np.pi * time / 100) +  # Seasonal variation
        0.005 * np.sin(2 * np.pi * time / 50)    # Additional periodic component
    )

    earthquakes = np.zeros(n_samples)
    aftershocks = np.zeros(n_samples)

    # Generate main earthquakes and aftershocks
    n_events = np.random.randint(4, 7)  # Random number of main events
    for i in range(n_events):
        # Main earthquake
        pos = np.random.randint(200, n_samples-400)
        magnitude = np.random.uniform(0.6, 1.2)
        duration = np.random.randint(20, 40)

        # Create more realistic earthquake shape with P-wave and S-wave components
        t = np.linspace(0, 3, duration)
        p_wave = 0.3 * magnitude * (t * np.exp(-2*t))  # P-wave arrives first
        s_wave = magnitude * (t * np.exp(-t))          # S-wave follows
        earthquake_shape = p_wave + s_wave

        earthquakes[pos:pos+duration] += earthquake_shape

        # Generate aftershocks with decay pattern
        n_aftershocks = np.random.randint(2, 5)
        decay_factor = 0.8  # Magnitude decay for successive aftershocks

        for j in range(n_aftershocks):
            # Increasing time gaps between aftershocks
            min_gap = 50 + j * 20
            max_gap = 200 + j * 30
            aftershock_pos = pos + np.random.randint(min_gap, max_gap)

            if aftershock_pos + 25 >= n_samples:
                continue

            aftershock_magnitude = magnitude * decay_factor**(j+1) * np.random.uniform(0.8, 1.2)
            aftershock_duration = np.random.randint(10, 25)

            t_after = np.linspace(0, 2, aftershock_duration)
            aftershock_shape = aftershock_magnitude * (t_after * np.exp(-1.5 * t_after))
            aftershocks[aftershock_pos:aftershock_pos+aftershock_duration] += aftershock_shape

    # Add some random micro-seismic events
    n_micro = np.random.randint(8, 15)
    for i in range(n_micro):
        pos = np.random.randint(0, n_samples-20)
        magnitude = np.random.uniform(0.1, 0.3)
        duration = np.random.randint(5, 15)
        t_micro = np.linspace(0, 1, duration)
        micro_shape = magnitude * (t_micro * np.exp(-2 * t_micro))
        earthquakes[pos:pos+duration] += micro_shape

    # Combine all components
    signal = background + earthquakes + aftershocks

    # Add some random spikes (instrument artifacts)
    n_spikes = np.random.randint(3, 8)
    spike_positions = np.random.choice(n_samples, n_spikes, replace=False)
    signal[spike_positions] += np.random.uniform(0.2, 0.4, n_spikes)

    return time, signal, earthquakes, aftershocks



In [None]:
class ModelConfig:
  sequence_length: int=50
  prediction_horizon: int=1
  hidden_size: int=128
  num_layers: int=3
  dropout: float=0.3
  learning_rate: float=0.001
  batch_size: int=32
  epochs: int=50
  early_stopping_patience: int=10
  input_size: int=36

In [None]:
class EarthquakeDataset(Dataset):
  def __init__(self, X: np.ndarray, y:np.ndarray):
    self.X=torch.FloatTensor(X)
    self.y=torch.FloatTensor(y)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    return self.X[idx], self.y[idx]

In [None]:
class AttentionLSTM(nn.Module):
  def __init__(self, input_size: int, config: ModelConfig):
    super(AttentionLSTM, self).__init__()

    self.input_size=input_size

    self.input_projection=nn.Sequential(
        nn.Linear(input_size, config.hidden_size),
        nn.LayerNorm(config.hidden_size),
        nn.ReLU(),
        nn.Dropout(config.dropout)
    )
    self.lstm1=nn.LSTM(
        input_size=34,
        hidden_size=config.hidden_size,
        num_layers=1,
        batch_first=True,
        bidirectional=True
    )
    self.lstm2=nn.LSTM(
        input_size=34,
        hidden_size=config.hidden_size,
        num_layers=1,
        batch_first=True,
        bidirectional=True
    )
    self.attention=nn.MultiheadAttention(
        embed_dim=config.hidden_size*2,
        num_heads=4,
        dropout=config.dropout
    )
    self.feature_projection=nn.Sequential(
        nn.Linear(config.hidden_size*2, config.hidden_size*2),
        nn.LayerNorm(config.hidden_size*2),
        nn.ReLU(),
        nn.Dropout(config.dropout),
        nn.Linear(config.hidden_size*2, config.hidden_size),
        nn.LayerNorm(config.hidden_size),
        nn.ReLU(),
        nn.Dropout(config.dropout/2)
    )
    self.regressor=nn.Sequential(
        nn.Linear(config.hidden_size, 128),
        nn.ReLU(),
        nn.Dropout(config.dropout/2),
        nn.Linear(128, 64),
        nn.ReLU(),
        nn.Dropout(config.dropout/4),
        nn.Linear(64, 1)
    )

    self.norm1=nn.LayerNorm(config.hidden_size*2)
    self.norm2=nn.LayerNorm(config.hidden_size*2)

  def forward(self, x:torch.Tensor)-> torch.Tensor:
    x=self.input_projection(x)

    lstm1_out, _=self.lstm1(x)
    lstm1_out=self.norm1(lstm1_out)

    lstm2_out, _=self.lstm2(lstm1_out)
    lstm2_out=self.norm2(lstm2_out+lstm1_out)

    attention_out, _=self.attention(
        lstm2_out.transpose(0,1),
        lstm2_out.transpose(0,1),
        lstm2_out.transpose(0,1)
    )
    attention_out=attention_out.transpose(0,1)

    pooled=torch.mean(attention_out, dim=1)

    features=self.feature_projection(pooled)
    return self.regressor(features).squeeze()


In [None]:
class EarthquakePredictor:
  def __init__(self, config: ModelConfig):
    self.config=config
    self.device=torch.device('cuda' if torch.cuda.is_available()else'cpu')
    self.model=None
    self.scaler_X=StandardScaler()
    self.scaler_y=StandardScaler()

    self.feature_names=[
        'mean','std', 'max', 'min', 'skew', 'kurtosis', 'median', 'ptp', 'entropy',
        'psd_mean', 'psd_std', 'psd_max', 'zxx_mean', 'zxx_std',
        'peak_count', 'peak_mean_height', 'peak_height_std', 'peak_interval',
        'extra1', 'extra2', 'extra3', 'extra4', 'extra5',
        'extra6', 'extra7', 'extra8', 'extra9', 'extra10',
        'poly_coef1', 'poly_coef2', 'poly_coef3',
        'poly_coef4', 'poly_coef5', 'poly_coef6'
    ]


  def extract_features(self, signal_window: np.ndarray)-> List[float]:
    features=[]
    with warnings.catch_warnings():
      warnings.simplefilter('ignore')

      base_features=[
          np.mean(signal_window),
          np.std(signal_window),
          np.max(signal_window),
          np.min(signal_window),
          skew(signal_window),
          kurtosis(signal_window),
          np.median(signal_window),
          np.ptp(signal_window),
          entropy(np.abs(signal_window)),
          np.percentile(signal_window, 25),
          np.percentile(signal_window, 75),
          np.sum(np.abs(np.diff(signal_window))),
          np.mean(np.abs(np.diff(signal_window))),
          np.std(np.diff(signal_window))
      ]
      features.extend(base_features)

      freqs, psd=welch(signal_window, fs=1.0, nperseg=min(len(signal_window), 256))
      f, t, Zxx=stft(signal_window, fs=1.0, nperseg=min(len(signal_window), 256))

      total_power=np.sum(psd)



      spectral_features=[
          np.mean(psd),
          np.std(psd),
          np.max(psd),
          np.mean(np.abs(Zxx)),
          np.std(np.abs(Zxx)),
          np.max(np.abs(Zxx)),
          np.sum(psd*freqs)/total_power if total_power>0 else 0,
          entropy(psd)
      ]
      features.extend(spectral_features)


      peaks, properties=find_peaks(signal_window, height=np.mean(signal_window))
      peak_features=[
          len(peaks),
          np.mean(properties["peak_heights"]) if len(peaks)>0 else 0,
          np.std(properties["peak_heights"]) if len(peaks) > 1 else 0,
          np.mean(np.diff(peaks)) if len(peaks) >1 else 0,
          np.max(properties["peak_heights"]) if len(peaks)>0 else 0,
          np.min(properties["peak_heights"]) if len(peaks)>0 else 0
      ]

      features.extend(peak_features)

      poly_features=list(np.polyfit(np.arange(len(signal_window)), signal_window,5))
      features.extend(poly_features)

      while len(features)<len(self.feature_names):
        features.append(0.0)

        features=features[:len(self.feature_names)]

    return features

  def prepare_sequences(self, signal: np.ndarray)-> Tuple[np.ndarray, np.ndarray]:

    X, y=[], []
    window_size=10

    for i in range(len(signal)- self.config.sequence_length - self.config.prediction_horizon +1):
      sequence=signal[i:(i+self.config.sequence_length)]
      features=[]

      for j in range(0, len(sequence)-window_size+1, window_size//2):
        window=sequence[j:j + window_size]
        features.append(self.extract_features(window))

      while len(features)<self.config.sequence_length:
        features.append([0.0]*len(self.feature_names))
      features=features[:self.config.sequence_length]

      X.append(features)
      y.append(signal[i+self.config.sequence_length+self.config.prediction_horizon-1])

    return np.array(X), np.array(y)

  def train(self, train_loader: DataLoader, val_loader: DataLoader)-> None:
    input_size=next(iter(train_loader))[0].shape[-1]
    self.config.input_size=input_size
    self.model=AttentionLSTM(input_size, self.config).to(self.device)

    optimizer=optim.Adam(self.model.parameters(), lr=self.config.learning_rate)
    scheduler=optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
    criterion=nn.HuberLoss()

    best_val_loss=float('inf')
    patience_counter=0
    best_model=None

    for epoch in range(self.config.epochs):
      self.model.train()
      train_loss=0
      for batch_X, batch_y in train_loader:
        batch_X, batch_y=batch_X.to(self.device), batch_y.to(self.device)
        optimizer.zero_grad()
        output=self.model(batch_X)
        loss=criterion(output, batch_y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
        optimizer.step()
        train_loss+=loss.item()

      self.model.eval()
      val_loss=0
      with torch.no_grad():
        for batch_X, batch_y in val_loader:
          batch_X, batch_y=batch_X.to(self.device), batch_y.to(self.device)
          output=self.model(batch_X)
          val_loss+=criterion(output, batch_y).item()

      scheduler.step(val_loss)

      if val_loss<best_val_loss:
        best_val_loss=val_loss
        best_model=self.model.state_dict().copy()
        patience_counter=0
      else:
        patience_counter+=1

        if patience_counter>=self.config.early_stopping_patience:
          print(f"Early stopping at epoch {epoch}")
          break

        if epoch % 5==0:
          print(f'Epoch{epoch}, Train Loss:{train_loss/len(train_loader):.6f},'
                f'Val Loss:{val_loss/len(val_loader):.6f}')
    self.model.load_state_dict(best_model)

  def predict(self, test_loader: DataLoader)->Tuple[np.ndarray, np.ndarray]:
    self.model.eval()
    predictions=[]
    actuals=[]

    with torch.no_grad():
      for batch_X, batch_y in test_loader:
        batch_X=batch_X.to(self.device)
        output=self.model(batch_X)
        predictions.extend(output.cpu().numpy())
        actuals.extend(batch_y.numpy())

    return np.array(predictions), np.array(actuals)

  def evaluate(self, predictions: np.ndarray, actuals: np.ndarray)-> dict:
    return{
        'mse': mean_squared_error(actuals, predictions),
        'mae':mean_absolute_error(actuals, predictions),
        'r2':r2_score(actuals, predictions)
    }
  def plot_results(self, predictions: np.ndarray, actuals: np.ndarray,
                   n_samples: int=200)-> None:

      plt.figure(figsize=(15, 10))

      plt.subplot(2,1,2)
      errors=actuals-predictions
      plt.hist(errors, bins=50, alpha=0.7)
      plt.axvline(x=0, color='r', linestyle='--', alpha=0.5)
      plt.title('Prediction Error Distribution')
      plt.xlabel('Error')
      plt.ylabel('Frequency')

      plt.tight_layout()
      plt.show()

  def set_input_size(self, X_train):
    if self.model is not None:
      input_size=X_train.shape[-1]
      self.model.input_projection[0]=nn.Linear(input_size, self.config.hidden_size)



In [None]:
def debug_input_sizes(X_train):
    """
    Debug function to check input sizes and shapes
    """
    print("X_train shape:", X_train.shape)
    print("Input feature details:")
    print("Total number of features:", X_train.shape[-1])
    print("Shape of a single sequence:", X_train[0].shape)

    # Breakdown of feature extraction
    time, signal, _, _ = generate_synthetic_data()
    predictor = EarthquakePredictor(ModelConfig())

    # Demonstrate feature extraction
    window_size = 10
    sequence = signal[:50]
    features = []
    for j in range(0, len(sequence) - window_size + 1, window_size // 2):
        window = sequence[j:j + window_size]
        extracted_features = predictor.extract_features(window)
        features.append(extracted_features)

    print("\nFeature extraction debug:")
    print("Number of windows:", len(features))
    print("Features per window:", len(features[0]))
    print("Total features per sequence:", len(features[0]) * len(features))


In [None]:
def debug_feature_extraction():
  config=ModelConfig()
  predictor=EarthquakePredictor(config)

  time, signal, _, _=generate_synthetic_data()

  X, y=predictor.prepare_sequences(signal)

  print("Feature Extraction debug:")
  print("X shape:", X.shape[1])
  print("Features per window:", X.shape[2])
  print("Feature names:", len(predictor.feature_names))

  print("\nFirst Sequence sample:")
  first_sequence=X[0]
  for i, window_features in enumerate(first_sequence):
    print(f"Window {i} feature count:{len(window_features)}")

  return X, y

In [None]:
def adjust_model_config(X_train):
    """
    Dynamically adjust model configuration based on input size
    """
    feature_size = X_train.shape[-1]
    config = ModelConfig()
    config.input_size = feature_size

    print(f"\nAdjusted Input Size: {feature_size}")
    return config

In [None]:
def main():
  config=ModelConfig()
  predictor=EarthquakePredictor(config)

  time, signal, _, _=generate_synthetic_data()

  X, y=predictor.prepare_sequences(signal)


  config.input_size=X.shape[-1]
  predictor.model.input_projection=torch.nn.Linear(config.input_size, config.hidden_size)

  print(f"Number of featurs per sequence: {config.input_size}")

  train_size=int(0.7*len(X))
  val_size=int(0.15*len(X))

  X_train=X[:train_size]
  y_train=y[:train_size]
  X_val=X[train_size:train_size+val_size]
  y_val=y[train_size:train_size+val_size]
  X_test=X[train_size+val_size:]
  y_test=y[train_size+val_size:]

  X_train_scaled=predictor.scaler_X.fit_transform(
      X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
  X_val_scaled=predictor.scaler_X.transform(
      X_val.reshape(-1, X_val.shape[-1])).reshape(X_val.shape)
  X_test_scaled=predictor.scaler_X.transform(
      X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

  y_train_scaled=predictor.scaler_y.fit_transform(y_train.reshape(-1,1)).flatten()
  y_val_scaled=predictor.scaler_y.transform(y_val.reshape(-1,1)).flatten()
  y_test_scaled=predictor.scaler_y.transform(y_test.reshape(-1,1)).flatten()

  train_dataset=EarthquakeDataset(X_train_scaled, y_train_scaled)
  val_dataset=EarthquakeDataset(X_val_scaled, y_val_scaled)
  test_dataset=EarthquakeDataset(X_test_scaled, y_test_scaled)

  train_loader=DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
  val_loader=DataLoader(val_dataset, batch_size=config.batch_size)
  test_loader=DataLoader(test_dataset, batch_size=config.batch_size)

  predictor.train(train_loader, val_loader)

  predictions, actuals=predictor.predict(test_loader)

  predictions=predictor.scaler_y.inverse_transform(
      predictions.reshape(-1, 1)).flatten()
  actuals=predictor.scaler_y.inverse_transform(actuals.reshape(-1,1)).flatten()

  metrics=predictor.evaluate(predictions, actuals)
  print("\nModel Metrics:")
  for metric, value in metrics.items():
    print(f"{metric.upper()}:{value:.4f}")

  predictor.plot_results(predictions, actuals)


In [None]:
if __name__=="__main__":
  main()