# Enhanced Earthquake Prediction System using Deep Learning

This notebook implements an advanced earthquake prediction system using:
- Bidirectional LSTM with Attention
- Multi-layer Architecture
- Real-time USGS Data Integration

**Author:** abhinavuser  
**Date:** 2025-05-09

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import LayerNorm
from torch.utils.data import Dataset, DataLoader
import requests
from datetime import datetime, timedelta
from io import StringIO
import warnings
warnings.filterwarnings('ignore')

# Print versions and device info
print(f"NumPy version: {np.__version__}")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

# Set random seeds
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
class EarthquakeDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = torch.tensor(sequences, dtype=torch.float32)
        self.targets = torch.tensor(targets, dtype=torch.float32)
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

class CustomRandomSampler(torch.utils.data.Sampler):
    def __init__(self, data_source):
        self.data_source = data_source
        
    def __iter__(self):
        indices = torch.randperm(len(self.data_source)).tolist()
        return iter(indices)
    
    def __len__(self):
        return len(self.data_source)

In [None]:
class EnhancedEarthquakePredictionModel(nn.Module):
    def __init__(self, input_size=10, hidden_size=128, num_layers=3, output_size=3):
        super().__init__()
        
        self.hidden_size = hidden_size
        
        # Input projection
        self.input_proj = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            LayerNorm(hidden_size)
        )
        
        # Bidirectional LSTM
        self.lstm = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=0.3
        )
        
        # Attention mechanism
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )
        
        # Output layers
        self.fc1 = nn.Linear(hidden_size * 2, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size // 2)
        self.fc3 = nn.Linear(hidden_size // 2, output_size)
        
        self.dropout = nn.Dropout(0.2)
        self.layer_norm1 = LayerNorm(hidden_size)
        self.layer_norm2 = LayerNorm(hidden_size // 2)
    
    def attention_net(self, lstm_output):
        attention_weights = self.attention(lstm_output)
        attention_weights = F.softmax(attention_weights, dim=1)
        context = torch.sum(attention_weights * lstm_output, dim=1)
        return context
    
    def forward(self, x):
        # Input projection
        x = self.input_proj(x)
        
        # LSTM
        lstm_out, _ = self.lstm(x)
        
        # Attention
        context = self.attention_net(lstm_out)
        
        # Multi-layer prediction
        out = self.fc1(context)
        out = self.layer_norm1(out)
        out = F.relu(out)
        out = self.dropout(out)
        
        residual = out
        out = self.fc2(out)
        out = self.layer_norm2(out)
        out = F.relu(out)
        out = self.dropout(out)
        
        predictions = self.fc3(out)
        return predictions

In [None]:
class EarthquakeDataProcessor:
    def __init__(self, start_date=None, end_date=None):
        self.start_date = start_date or (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
        self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
        self.base_url = 'https://earthquake.usgs.gov/fdsnws/event/1/query'
    
    def load_data(self):
        """Load earthquake data from USGS API"""
        params = {
            'format': 'csv',
            'starttime': self.start_date,
            'endtime': self.end_date,
            'minmagnitude': 2.5
        }
        
        response = requests.get(self.base_url, params=params)
        
        if response.status_code == 200:
            self.df = pd.read_csv(StringIO(response.text))
            print(f'Loaded {len(self.df)} earthquake records')
            return self.df
        else:
            raise Exception(f"Failed to fetch data: Status code {response.status_code}")
    
    def preprocess_data(self):
        """Preprocess earthquake data"""
        self.df['datetime'] = pd.to_datetime(self.df['time'])
        
        # Extract temporal features
        self.df['hour'] = self.df['datetime'].dt.hour
        self.df['day'] = self.df['datetime'].dt.day
        self.df['month'] = self.df['datetime'].dt.month
        self.df['year'] = self.df['datetime'].dt.year
        self.df['dayofweek'] = self.df['datetime'].dt.dayofweek
        
        # Calculate additional features
        self.df['depth_km'] = self.df['depth'].abs()
        self.df['energy'] = 10 ** (1.5 * self.df['mag'])
        
        # Normalize features
        features = ['latitude', 'longitude', 'depth_km', 'mag', 'energy']
        self.scaler = StandardScaler()
        self.df[features] = self.scaler.fit_transform(self.df[features])
        
        return self.df
    
    def create_sequences(self, seq_length=24):
        """Create sequences for time series prediction"""
        features = ['latitude', 'longitude', 'depth_km', 'mag', 'energy',
                   'hour', 'day', 'month', 'year', 'dayofweek']
        
        sequences = []
        targets = []
        
        for i in range(len(self.df) - seq_length):
            seq = self.df[features].iloc[i:i+seq_length].values
            target = self.df[['latitude', 'longitude', 'mag']].iloc[i+seq_length].values
            sequences.append(seq)
            targets.append(target)
        
        return np.array(sequences, dtype=np.float32), np.array(targets, dtype=np.float32)

In [None]:
def train_model(model, train_loader, val_loader, epochs=100, learning_rate=0.001):
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=learning_rate,
        epochs=epochs,
        steps_per_epoch=len(train_loader),
        pct_start=0.3
    )
    
    best_val_loss = float('inf')
    patience = 10
    patience_counter = 0
    
    history = {'train_loss': [], 'val_loss': []}
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            scheduler.step()
            train_loss += loss.item()
        
        # Validation phase
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        
        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            patience_counter = 0
        else:
            patience_counter += 1
        
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break
        
        if epoch % 5 == 0:
            print(f'Epoch: {epoch+1}/{epochs}')
            print(f'Training Loss: {train_loss:.6f}')
            print(f'Validation Loss: {val_loss:.6f}')
            print('-' * 50)
    
    # Load best model
    model.load_state_dict(torch.load('best_model.pth'))
    return model, history

In [None]:
def plot_training_history(history):
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 6))
    plt.plot(history['train_loss'], label='Training Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Model Loss Over Time')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
def main():
    try:
        # Initialize data processor and load data
        processor = EarthquakeDataProcessor()
        data = processor.load_data()
        processed_data = processor.preprocess_data()
        
        # Create sequences
        sequences, targets = processor.create_sequences(seq_length=24)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            sequences, targets, test_size=0.2, random_state=42
        )
        X_train, X_val, y_train, y_val = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42
        )
        
        print(f"Training set shape: {X_train.shape}")
        print(f"Validation set shape: {X_val.shape}")
        print(f"Test set shape: {X_test.shape}")
        
        # Create datasets and dataloaders
        batch_size = min(32, len(X_train) // 10)
        train_dataset = EarthquakeDataset(X_train, y_train)
        val_dataset = EarthquakeDataset(X_val, y_val)
        test_dataset = EarthquakeDataset(X_test, y_test)
        
        train_sampler = CustomRandomSampler(train_dataset)
        
        train_loader = DataLoader(
            train_dataset, 
            batch_size=batch_size,
            sampler=train_sampler,
            drop_last=False
        )
        val_loader = DataLoader(
            val_dataset, 
            batch_size=batch_size,
            shuffle=False,
            drop_last=False
        )
        test_loader = DataLoader(
            test_dataset, 
            batch_size=batch_size,
            shuffle=False,
            drop_last=False
        )
        
        # Initialize model
        model = EnhancedEarthquakePredictionModel(
            input_size=X_train.shape[2],
            hidden_size=128,
            num_layers=3,
            output_size=3
        )
        
        # Train model
        trained_model, history = train_model(
            model,
            train_loader,
            val_loader,
            epochs=100,
            learning_rate=0.001
        )
        
        print('Training complete!')
        
        # Plot training history
        plot_training_history(history)
        
        # Evaluate on test set
        model.eval()
        test_loss = 0
        criterion = nn.MSELoss()
        
        predictions = []
        actuals = []
        
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += criterion(output, target).item()
                
                predictions.extend(output.cpu().numpy())
                actuals.extend(target.cpu().numpy())
        
        test_loss /= len(test_loader)
        print(f'Final Test Loss: {test_loss:.6f}')
        
        # Save predictions
        predictions = np.array(predictions)
        actuals = np.array(actuals)
        np.save('predictions.npy', predictions)
        np.save('actuals.npy', actuals)
        
        # Save model
        torch.save({
            'model_state_dict': trained_model.state_dict(),
            'scaler': processor.scaler,
            'history': history
        }, 'final_model.pth')
        
        print('Model and predictions saved successfully!')
        
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        import traceback
        traceback.print_exc()

        if __name__ == '__main__':
            main()