In [None]:
!pip install obspy



In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import spectrogram, welch
from obspy.signal.invsim import cosine_taper
from obspy.signal.filter import highpass
from obspy.signal.trigger import classic_sta_lta, plot_trigger, trigger_onset, recursive_sta_lta
import os
import zipfile
import glob
import random

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

zip_file_path = '/content/drive/MyDrive/space_apps_2024_seismic_detection.zip'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    extracted_files = zip_ref.extractall('/content')

In [None]:
lunar_training_folder_path = './space_apps_2024_seismic_detection/data/lunar/training/data/S12_GradeA'
lunar_training_csv_files = glob.glob(os.path.join(lunar_training_folder_path, '*.csv'))

lunar_test_folder_path = './space_apps_2024_seismic_detection/data/lunar/test/data/S16_GradeA'
lunar_test_csv_files = glob.glob(os.path.join(lunar_test_folder_path, '*.csv'))

lunar_catalog_csv_file = './space_apps_2024_seismic_detection/data/lunar/training/catalogs/apollo12_catalog_GradeA_final.csv'


In [None]:
def load_true_triggers(file_path):
    labeled_data = pd.read_csv(file_path)
    true_triggers = {}
    for _, row in labeled_data.iterrows():
        filename = row['filename'].removesuffix('.csv')
        if filename not in true_triggers:
            true_triggers[filename] = []
        true_triggers[filename].append(row['time_rel(sec)'])
    return true_triggers

In [None]:
def implement_STA_LTA(df, sta_len, lta_len, thr_on, thr_off):
    time_rel = df['time_rel(sec)'].values
    velocity = df['velocity(m/s)'].values
    dt = time_rel[1] - time_rel[0]
    sampling_rate = 1 / dt

    cft = classic_sta_lta(velocity, int(sta_len * sampling_rate), int(lta_len * sampling_rate))
    on_off = np.array(trigger_onset(cft, thr_on, thr_off))
    detected_triggers = [time_rel[triggers[0]] for triggers in on_off if len(triggers) > 0]

    print(f"Detected triggers: {detected_triggers}")
    return detected_triggers

In [None]:
TOLERANCE = 1000

def match_triggers(true_triggers, detected_triggers):
    matches = []
    used_detected = set()

    for true_trigger in true_triggers:
        for i, detected_trigger in enumerate(detected_triggers):
            if i not in used_detected and abs(detected_trigger - true_trigger) <= TOLERANCE:
                matches.append((true_trigger, detected_trigger))
                used_detected.add(i)
                break

    return matches


def score_function(true_triggers, detected_triggers):
    # Match detected triggers to true triggers
    matches = match_triggers(true_triggers, detected_triggers)

    # Calculate precision, recall, and F1-score
    num_true = len(true_triggers)
    num_detected = len(detected_triggers)
    num_matches = len(matches)

    precision = num_matches / num_detected if num_detected > 0 else 0
    recall = num_matches / num_true if num_true > 0 else 0
    if precision + recall > 0:
        f1_score = 2 * (precision * recall) / (precision + recall)
    else:
        f1_score = 0

    return f1_score

In [None]:
# DO NOT TOUCH, LOADING FROM FOLDER
all_lunar_training_data = []
for file_path in lunar_training_csv_files:
    all_lunar_training_data.append({'df': pd.read_csv(file_path), 'filename': file_path.split('/')[-1].removesuffix('.csv')})
lunar_true_triggers = pd.read_csv(lunar_catalog_csv_file)

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-14-fab1c6258496>", line 4, in <cell line: 3>
    all_lunar_training_data.append({'df': pd.read_csv(file_path), 'filename': file_path.split('/')[-1].removesuffix('.csv')})
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/parsers/readers.py", line 1026, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/parsers/readers.py", line 626, in _read
    return parser.read(nrows)
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/parsers/readers.py", line 1923, in read
    ) = self._engine.read(  # type: ignore[attr-defined]
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/parsers/c_parser_wrapper.py", line 234, in read
    chunks = self._reader.read_low_memory(nrows)
  File "parsers.pyx", line 8

TypeError: object of type 'NoneType' has no len()

In [None]:
def objective_function(params):
    sta_len, lta_len, thr_on, thr_off = params
    total_f1 = 0

    for quak_file in all_lunar_training_data:
        print(f"Processing: {quak_file['filename']}")
        detected_triggers = implement_STA_LTA(quak_file['df'], sta_len, lta_len, thr_on, thr_off)

        # show_sound_plot(quak_file['filename'], detected_triggers)
        f1 = score_function(true_triggers.loc[true_triggers['filename'] == quak_file['filename']]['time_rel(sec)'], detected_triggers)
        total_f1 += f1

    return -total_f1

def evaluate_individual(individual):
    sta_len, lta_len, thr_on, thr_off = individual
    total_score = 0

    # Loop over each file in wave_files
    for file in wave_files:
        df = file['df']
        detected_triggers = implement_STA_LTA(df, sta_len, lta_len, thr_on, thr_off)
        total_score += score_function(detected_triggers, ground_truth)

    return -total_score,  # Return as a tuple, only difference from above, used for the evolutionary model

In [None]:
# Bayse optimizer
from skopt import gp_minimize
from skopt.space import Real
search_space = [
    Real(0.1, 10, name='sta_len'),
    Real(10, 100, name='lta_len'),
    Real(1.0, 3.0, name='thr_on'),
    Real(0.5, 2.0, name='thr_off')
]

result = gp_minimize(objective_function, search_space, n_calls=50)

best_params = result.x
print(f"Best parameters for Bayse optimizer: {best_params}")

In [None]:
# Gradient optimizer
from scipy.optimize import minimize
initial_guess = [1.0, 20.0, 2.0, 1.0]

bounds = [(0.1, 10), (10, 100), (1.0, 3.0), (0.5, 2.0)]

result = minimize(objective, initial_guess, bounds=bounds, method='L-BFGS-B')

best_params = result.x
print(f"Best parameters for Gradient optimizer: {best_params}")

In [None]:
# Evolutionary optimizer
from deap import base, creator, tools, algorithms

toolbox = base.Toolbox()
toolbox.register("attr_sta_len", np.random.uniform, 0.1, 10)
toolbox.register("attr_lta_len", np.random.uniform, 10, 100)
toolbox.register("attr_thr_on", np.random.uniform, 1.0, 3.0)
toolbox.register("attr_thr_off", np.random.uniform, 0.5, 2.0)

toolbox.register("individual", tools.initCycle, creator.Individual,
                 (toolbox.attr_sta_len, toolbox.attr_lta_len, toolbox.attr_thr_on, toolbox.attr_thr_off), n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("mate", tools.cxBlend, alpha=0.5)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=0.1, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("evaluate", evaluate_individual)

# Create initial population
population = toolbox.population(n=50)

# Perform evolutionary algorithm optimization
algorithms.eaSimple(population, toolbox, cxpb=0.5, mutpb=0.2, ngen=40, verbose=True)

# Get the best individual
best_individual = tools.selBest(population, 1)[0]
print(f"Best parameters for evolutionary model: {best_individual}")

In [None]:
{'sta_len': 220, 'lta_len': 700, 'thr_on': 3.0, 'thr_off': 1.3}
# optimize_parameters(all_lunar_training_data, lunar_true_triggers, tolerance=10)


{'sta_len': 220, 'lta_len': 700, 'thr_on': 3.0, 'thr_off': 1.3}

In [None]:
def show_sound_plot(filename, x_values=None):
    df = pd.read_csv(f'{lunar_test_folder_path}/{filename}.csv')
    time_rel = df['time_rel(sec)'].values
    velocity = df['velocity(m/s)']
    fig, ax = plt.subplots(1,1,figsize=(10,3))
    ax.plot(time_rel, velocity)
    ax.set_xlim([min(time_rel),max(time_rel)])

    if x_values is not None:
        if isinstance(x_values, list):  # Check if x_values is a list
            for x in x_values:
                ax.axvline(x=x, color='red', linestyle='--', label=f'x = {x}')
        else:  # If it's not a list, treat it as a single value
            ax.axvline(x=x_values, color='red', linestyle='--', label=f'x = {x_values}')
    plt.show()
