In [35]:
import numpy as np
import datetime 
import pandas as pd
from obspy import read
from scipy import signal, stats
from obspy.signal.trigger import classic_sta_lta
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import classification_report, f1_score, precision_recall_curve, auc
from imblearn.over_sampling import SMOTE
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, LSTM, Dense, Dropout, BatchNormalization, Bidirectional, Attention, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import os
from tqdm import tqdm
import logging
import multiprocessing
from joblib import Parallel, delayed

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Determine the number of CPU cores to use
num_cores = multiprocessing.cpu_count()
logging.info(f"Number of CPU cores available: {num_cores}")

# Configure TensorFlow to use all available CPU cores
tf.config.threading.set_intra_op_parallelism_threads(num_cores)
tf.config.threading.set_inter_op_parallelism_threads(num_cores)

def extract_features(window, sampling_rate):
    """Extract advanced features from the seismic data window."""
    fft = np.fft.fft(window)
    freqs = np.fft.fftfreq(len(window), 1/sampling_rate)
    power_spectrum = np.abs(fft)**2
    
    return {
        'mean': np.mean(window),
        'std': np.std(window),
        'max': np.max(window),
        'min': np.min(window),
        'median': np.median(window),
        'skewness': stats.skew(window),
        'kurtosis': stats.kurtosis(window),
        'energy': np.sum(window**2),
        'rms': np.sqrt(np.mean(window**2)),
        'zero_crossings': np.sum(np.diff(np.sign(window)) != 0),
        'sta_lta': classic_sta_lta(window, int(0.5 * sampling_rate), int(10 * sampling_rate)).max(),
        'dominant_freq': freqs[np.argmax(power_spectrum)],
        'spectral_centroid': np.sum(freqs * power_spectrum) / np.sum(power_spectrum),
        'spectral_bandwidth': np.sqrt(np.sum(((freqs - np.sum(freqs * power_spectrum) / np.sum(power_spectrum))**2) * power_spectrum) / np.sum(power_spectrum)),
        'spectral_rolloff': freqs[np.where(np.cumsum(power_spectrum) >= 0.85 * np.sum(power_spectrum))[0][0]]
    }

def create_model(input_shape, n_features):
       # Current complex model architecture
       ts_input = Input(shape=input_shape)
       x = Conv1D(64, 3, activation='relu', padding='same', kernel_regularizer=l2(1e-4))(ts_input)
       x = BatchNormalization()(x)
       x = MaxPooling1D(2)(x)
       x = Conv1D(128, 3, activation='relu', padding='same', kernel_regularizer=l2(1e-4))(x)
       x = BatchNormalization()(x)
       x = MaxPooling1D(2)(x)
       x = Bidirectional(LSTM(64, return_sequences=True))(x)
       x = Dropout(0.3)(x)
       x = Bidirectional(LSTM(32, return_sequences=True))(x)
       x = Dropout(0.3)(x)
       attention = Attention()([x, x])
       x = tf.keras.layers.GlobalAveragePooling1D()(attention)
       feat_input = Input(shape=(n_features,))
       combined = Concatenate()([x, feat_input])
       x = Dense(64, activation='relu', kernel_regularizer=l2(1e-4))(combined)
       x = BatchNormalization()(x)
       x = Dropout(0.3)(x)
       x = Dense(32, activation='relu', kernel_regularizer=l2(1e-4))(x)
       x = BatchNormalization()(x)
       x = Dropout(0.3)(x)
       output = Dense(2, activation='softmax')(x)
       
       # Suggested simpler model architecture
       # ts_input = Input(shape=input_shape)
       # x = Conv1D(32, 3, activation='relu', padding='same')(ts_input)
       # x = MaxPooling1D(2)(x)
       # x = Bidirectional(LSTM(32, return_sequences=False))(x)
       # feat_input = Input(shape=(n_features,))
       # combined = Concatenate()([x, feat_input])
       # x = Dense(32, activation='relu')(combined)
       # x = Dropout(0.3)(x)
       # output = Dense(2, activation='softmax')(x)
       
       model = Model(inputs=[ts_input, feat_input], outputs=output)
       model.compile(optimizer=Adam(learning_rate=1e-3), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
       return model

def process_seismic_file(mseed_file, catalog_file, time_column, window_size=60, overlap=0.5):
    """Process a single seismic data file and return windowed data with labels."""
    try:
        st = read(mseed_file)
        tr = st[0]
        data = tr.data
        sampling_rate = tr.stats.sampling_rate
    except Exception as e:
        logging.error(f"Error reading mseed file {mseed_file}: {e}")
        return None, None, None

    try:
        cat = pd.read_csv(catalog_file)
        event_times = pd.to_datetime(cat[time_column])
    except Exception as e:
        logging.error(f"Error reading catalog file: {e}")
        if 'cat' in locals():
            logging.info(f"Available columns: {cat.columns.tolist()}")
        return None, None, None
    
    trace_start_time = tr.stats.starttime.datetime

    window_samples = int(window_size * sampling_rate)
    overlap_samples = int(window_samples * overlap)
    stride = window_samples - overlap_samples

    windows, labels, features = [], [], []

    for i in range(0, len(data) - window_samples, stride):
        window = data[i:i + window_samples]
        window_features = extract_features(window, sampling_rate)

        window_start_time = trace_start_time + datetime.timedelta(seconds=i / sampling_rate)
        window_end_time = window_start_time + datetime.timedelta(seconds=window_size)
        is_event = any((window_start_time <= event <= window_end_time) for event in event_times)

        windows.append(window)
        features.append(window_features)
        labels.append(1 if is_event else 0)

    return np.array(windows), features, np.array(labels)

def prepare_data(data_directory, catalog_file, time_column):
    """Prepare data for training and evaluation using parallel processing."""
    mseed_files = [os.path.join(data_directory, f) for f in os.listdir(data_directory) if f.endswith('.mseed')]
    
    # Use joblib to parallelize file processing
    results = Parallel(n_jobs=num_cores)(
        delayed(process_seismic_file)(mseed_file, catalog_file, time_column)
        for mseed_file in tqdm(mseed_files, desc="Processing files")
    )

    all_windows, all_features, all_labels = [], [], []
    for windows, features, labels in results:
        if windows is not None:
            all_windows.append(windows)
            all_features.extend(features)
            all_labels.append(labels)

    if not all_windows:
        raise RuntimeError("No data could be processed successfully")

    all_windows = np.vstack(all_windows)
    all_labels = np.concatenate(all_labels)

    feature_names = list(all_features[0].keys())
    feature_array = np.array([[f[name] for name in feature_names] for f in all_features])

    return all_windows, feature_array, all_labels, feature_names

def train_and_evaluate(data_directory, catalog_file):
    """Train and evaluate the model with improved techniques and CPU optimization."""
    logging.info("Examining catalog file structure...")
    cat = pd.read_csv(catalog_file)
    if cat is None:
        return None, None, None

    possible_time_columns = ['time', 'Time', 'timestamp', 'Timestamp', 'event_time', 'Event_Time', 'DateTime', 'time_abs']
    time_column = next((col for col in cat.columns if any(time_name.lower() in col.lower() for time_name in possible_time_columns)), None)

    if time_column is None:
        logging.error("Could not automatically identify time column. Available columns are:")
        logging.error(cat.columns.tolist())
        return None, None, None

    logging.info(f"Using '{time_column}' as the time column.")

    logging.info("Preparing data...")
    try:
        windows, features, labels, feature_names = prepare_data(data_directory, catalog_file, time_column)
    except Exception as e:
        logging.error(f"Error preparing data: {e}")
        return None, None, None

    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)

    tscv = TimeSeriesSplit(n_splits=5)
    histories = []

    for fold, (train_idx, val_idx) in enumerate(tscv.split(windows)):
        logging.info(f"\nFold {fold+1}")
        
        X_train_ts, X_val_ts = windows[train_idx], windows[val_idx]
        X_train_feat, X_val_feat = scaled_features[train_idx], scaled_features[val_idx]
        y_train, y_val = labels[train_idx], labels[val_idx]
        
        smote = SMOTE(random_state=42, n_jobs=num_cores)  # Use all CPU cores for SMOTE
        X_train_feat_resampled, y_train_resampled = smote.fit_resample(X_train_feat, y_train)
        
        X_train_ts_flat = X_train_ts.reshape(X_train_ts.shape[0], -1)
        X_train_ts_resampled, _ = smote.fit_resample(X_train_ts_flat, y_train)
        X_train_ts_resampled = X_train_ts_resampled.reshape(-1, X_train_ts.shape[1], 1)
        
        model = create_model(input_shape=(windows.shape[1], 1), n_features=features.shape[1])
        
        early_stopping = EarlyStopping(monitor='val_loss', mode='min', patience=10, restore_best_weights=True)
        model_checkpoint = ModelCheckpoint(f'model_fold_{fold+1}.keras', monitor='val_loss', mode='min', save_best_only=True)
        reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001)
        
        history = model.fit(
            [X_train_ts_resampled, X_train_feat_resampled], 
            y_train_resampled,
            validation_data=([X_val_ts[..., np.newaxis], X_val_feat], y_val),
            epochs=10,
            batch_size=32 * num_cores,
            callbacks=[early_stopping, model_checkpoint, reduce_lr]
        )
        histories.append(history)
        
        y_pred_proba = model.predict([X_val_ts[..., np.newaxis], X_val_feat])
        y_pred = np.argmax(y_pred_proba, axis=1)
        logging.info("\nValidation Set Performance:")
        logging.info(classification_report(y_val, y_pred))
        logging.info(f"F1 Score: {f1_score(y_val, y_pred, average='weighted')}")
        
        # Calculate Precision-Recall AUC
        precision, recall, _ = precision_recall_curve(y_val, y_pred_proba[:, 1])
        pr_auc = auc(recall, precision)
        logging.info(f"Precision-Recall AUC: {pr_auc}")

    try:
        model.save('lunar_seismic_model.keras')
        np.save('feature_names.npy', feature_names)
        logging.info("Model and feature names saved successfully")
    except Exception as e:
        logging.error(f"Error saving model: {e}")

    return model, scaler, feature_names

if __name__ == "__main__":
    data_directory = './data/lunar/training/data/S12_GradeA'
    catalog_file = './data/lunar/training/catalogs/apollo12_catalog_GradeA_final.csv'
    
    model, scaler, feature_names = train_and_evaluate(data_directory, catalog_file)

2024-10-06 10:25:11,468 - INFO - Number of CPU cores available: 12


RuntimeError: Intra op parallelism cannot be modified after initialization.