In [1]:
import numpy as np
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [2]:
# Step 1: Load the moonquake type mapping CSV file
mapping_file = 'archive/data/lunar/training/catalogs/apollo12_catalog_GradeA_final.csv'  # Replace with your path
mapping_df = pd.read_csv(mapping_file)

In [3]:
# Step 2: Load individual moonquake CSV files and process them
def process_moonquake_file(filepath):
    # Load moonquake data
    df = pd.read_csv(filepath)

    # Use the correct column name for time_abs
    time_column = str.format('time_abs(%Y-%m-%dT%H:%M:%S.%f)')
    
    # Check if the time_abs column exists
    if time_column not in df.columns:
        print(f"Warning: '{time_column}' column missing in file {filepath}")
        return None
    
    # Parse time features from the absolute time
    df[time_column] = pd.to_datetime(df[time_column], format='%Y-%m-%dT%H:%M:%S.%f')
    df['hour'] = df[time_column].dt.hour
    df['day'] = df[time_column].dt.day
    df['month'] = df[time_column].dt.month
    
    # Calculate velocity statistics
    velocity_stats = {
        'velocity_mean': df['velocity(m/s)'].mean(),
        'velocity_max': df['velocity(m/s)'].max(),
        'velocity_min': df['velocity(m/s)'].min(),
        'velocity_std': df['velocity(m/s)'].std(),
    }
    
    # Combine with time-based features (you can add more if needed)
    features = {
        'hour': df['hour'].iloc[0],
        'day': df['day'].iloc[0],
        'month': df['month'].iloc[0]
    }
    
    features.update(velocity_stats)
    
    return features

In [4]:
# Directory where moonquake CSV files are stored
moonquake_data_dir = 'archive/data/lunar/training/data/S12_GradeA'  # Replace with your directory

In [5]:
# Step 3: Merge moonquake data with the mapping
data = []

for _, row in mapping_df.iterrows():
    filename = row['filename']
    mq_type = row['mq_type']
    
    # Load and process the corresponding moonquake file
    moonquake_filepath = os.path.join(moonquake_data_dir, filename+'.csv')
    if os.path.exists(moonquake_filepath):
        features = process_moonquake_file(moonquake_filepath)
        features['mq_type'] = mq_type
        data.append(features)

In [6]:
# Convert the list of dictionaries into a DataFrame
moonquake_df = pd.DataFrame(data)

In [7]:
moonquake_df.head()

Unnamed: 0,hour,day,month,velocity_mean,velocity_max,velocity_min,velocity_std,mq_type
0,0,19,1,-8.443134e-13,7.874026e-09,-8.185283e-09,3.530059e-10,impact_mq
1,0,25,3,-1.939339e-12,4.707866e-09,-4.603228e-09,3.86514e-10,impact_mq
2,0,26,3,-2.980386e-13,5.969005e-09,-6.144452e-09,3.219585e-10,impact_mq
3,0,25,4,-1.547089e-13,6.853803e-09,-6.155705e-09,3.383785e-10,impact_mq
4,0,26,4,-6.921802e-13,5.491012e-09,-4.475551e-09,3.009882e-10,deep_mq


In [8]:
# Step 1: Encode the target labels (moonquake types)
encoder = LabelEncoder()
moonquake_df['mq_type_encoded'] = encoder.fit_transform(moonquake_df['mq_type'])

In [9]:
# Step 2: Prepare the sequences of data (for RNN)
def create_sequences(data, sequence_length):
    sequences = []
    targets = []
    for i in range(len(data) - sequence_length):
        seq = data[i:i + sequence_length]
        target = data['mq_type_encoded'].iloc[i + sequence_length]
        sequences.append(seq)
        targets.append(target)
    return np.array(sequences), np.array(targets)

In [10]:
# Assuming you want to use a window of 10 time steps to predict the next event type
sequence_length = 10

# Prepare the sequence data
features = moonquake_df[['velocity_mean','velocity_max','velocity_min','velocity_std','mq_type_encoded']]  # Use velocity or more features if needed
X, y = create_sequences(features, sequence_length)

In [11]:
# Step 3: Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8, random_state=42)

In [12]:
# Calculate number of samples
n_samples = X_train.shape[0]  # or you can use len(X_train)
n_samples

52

In [13]:

# Reshape into (n_samples, sequence_length, n_features)
X_train = X_train.reshape((n_samples, sequence_length, 5))

# Similarly, reshape X_test
n_test_samples = X_test.shape[0]
X_test = X_test.reshape((n_test_samples, sequence_length, 5))

In [14]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

In [15]:
# Step 4: Build the RNN model (LSTM)
model = Sequential()

In [16]:
# Add an LSTM layer
model.add(LSTM(units=50, return_sequences=True, input_shape=(sequence_length, 5)))
model.add(Dropout(0.2))  # Dropout to prevent overfitting

  super().__init__(**kwargs)


In [17]:
# Add a second LSTM layer (optional, for deeper RNNs)
model.add(LSTM(units=50))
model.add(Dropout(0.2))

# Add a Dense layer for output
model.add(Dense(units=len(encoder.classes_), activation='softmax'))  # Multiclass classification


In [18]:
# Step 5: Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [19]:
# Step 6: Train the model
history = model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 163ms/step - accuracy: 0.6154 - loss: 1.0193 - val_accuracy: 0.8462 - val_loss: 0.9410
Epoch 2/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.8558 - loss: 0.9412 - val_accuracy: 0.8462 - val_loss: 0.8684
Epoch 3/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - accuracy: 0.8454 - loss: 0.8588 - val_accuracy: 0.8462 - val_loss: 0.7920
Epoch 4/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8349 - loss: 0.7948 - val_accuracy: 0.8462 - val_loss: 0.7128
Epoch 5/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.8558 - loss: 0.6675 - val_accuracy: 0.8462 - val_loss: 0.6365
Epoch 6/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - accuracy: 0.8349 - loss: 0.6229 - val_accuracy: 0.8462 - val_loss: 0.5752
Epoch 7/20
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━

In [20]:
# Step 7: Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_accuracy * 100:.2f}%")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.8462 - loss: 0.5292
Test accuracy: 84.62%


In [21]:
# Step 8: Make predictions
y_pred = model.predict(X_test)
predicted_classes = np.argmax(y_pred, axis=1)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 105ms/step


In [22]:
# Step 9: Decode the predictions back to moonquake types
predicted_mq_types = encoder.inverse_transform(predicted_classes)

In [26]:
# Print classification report
from sklearn.metrics import classification_report
print(classification_report(y_test, predicted_classes, target_names=encoder.classes_, labels=np.unique(y_test)))

              precision    recall  f1-score   support

     deep_mq       0.00      0.00      0.00         2
   impact_mq       0.85      1.00      0.92        11

    accuracy                           0.85        13
   macro avg       0.42      0.50      0.46        13
weighted avg       0.72      0.85      0.78        13



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
