In [None]:
import obspy
from datetime import datetime
import obspy.signal
import obspy.signal.filter
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from collections import OrderedDict
plt.style.use('ggplot')

catalog = pd.read_csv('./data/lunar/training/catalogs/apollo12_catalog_GradeA_final.csv')
data_path = './data/lunar/training/data/S12_GradeA/'

quake_data = OrderedDict()
for quake in catalog.iloc:
    filename = quake["filename"]
    time = quake["time_rel(sec)"]
    if quake_data.get(quake["evid"]) is None:
        quake_data[quake["evid"]] = [(filename, time)]
    else:
        quake_data[quake["evid"]].append((filename, time))

# display(quake_data)

# traces = [obspy.read(f)[0].detrend().filter("highpass", freq=1, corners=1) for f in quake_data]

print(f"loaded {len(quake_data)} traces")

In [29]:
def plot_trigger(trace, cft, thr_on, thr_off, real=None, show=True):
    """
    Plot characteristic function of trigger along with waveform data and
    trigger On/Off from given thresholds.

    :type trace: :class:`~obspy.core.trace.Trace`
    :param trace: waveform data
    :type cft: :class:`numpy.ndarray`
    :param cft: characteristic function as returned by a trigger in
        :mod:`obspy.signal.trigger`
    :type thr_on: float
    :param thr_on: threshold for switching trigger on
    :type thr_off: float
    :param thr_off: threshold for switching trigger off
    :type show: bool
    :param show: Do not call `plt.show()` at end of routine. That way,
        further modifications can be done to the figure before showing it.
    """
    import matplotlib.pyplot as plt
    from obspy.signal.trigger import trigger_onset
    df = trace.stats.sampling_rate
    npts = trace.stats.npts
    t = np.arange(npts, dtype=np.float32) / df
    fig = plt.figure()
    ax1 = fig.add_subplot(211)
    ax1.plot(t, trace.data, 'k')
    ax2 = fig.add_subplot(212, sharex=ax1)
    ax2.plot(t, cft, 'k')
    on_off = np.array(trigger_onset(cft, thr_on, thr_off))
    i, j = ax1.get_ylim()
    try:
        ax1.vlines(on_off[:, 0] / df, i, j, color='r', lw=2,
                   label="Trigger On")
        ax1.vlines(on_off[:, 1] / df, i, j, color='b', lw=2,
                   label="Trigger Off")
        if real is not None:
            ax1.vlines(real, i, j, color='pink', lw=2, label="Truth Picked")
        ax1.legend()
    except IndexError:
        pass
    ax2.axhline(thr_on, color='red', lw=1, ls='--')
    ax2.axhline(thr_off, color='blue', lw=1, ls='--')
    ax2.set_xlabel("Time after %s [s]" % trace.stats.starttime.isoformat())
    fig.suptitle(trace.id)
    fig.canvas.draw()
    if show:
        plt.show()

In [30]:
def analyze(trace: obspy.Trace, true_time=0, show=False):
    from obspy.signal.trigger import z_detect, trigger_onset
    import matplotlib.pyplot as plt

    lta = 1200
    cF = z_detect(trace.data, int(lta * trace.stats.sampling_rate))

    on = 0.4
    off = 0.2

    on_off = np.array(trigger_onset(cF, on, off))

    if show:
        _, ax = plt.subplots(1,1,figsize=(12,3))
        ax.plot(trace.times(), cF)
        plot_trigger(trace, cF, on, off, real=true_time)

    error = true_time - (on_off[:, 0] / trace.stats.sampling_rate)
    # print(on_off[:, 0] / trace.stats.sampling_rate)
    
    return error

def view(k, trace_file, show=False): 
    corr = 0
    for trace in trace_file:
        trace_data = obspy.read(data_path+trace[0]+".mseed")[0]
        trace_time = trace[1]

        error = analyze(trace_data, true_time=trace_time, show=show)

        print(f"[{k}] {"late" if error[0] > 0 else "early"} by {abs(error[0])} seconds")

        if abs(error[0]) < 1000.0:
            corr += 1
    return corr


In [None]:
correct = 0
for k, trace_file in quake_data.items():
    correct += view(k, trace_file)

print(f"{correct}/{len(catalog)}")

In [None]:
import glob
import obspy
import numpy as np
name_prefix = 'mars_stalta_'
data_path = './data/mars/test/data/'
data_files = glob.glob(f"{data_path}*.mseed")
data_files.sort()
results = []

print(f"loaded {len(data_files)} traces")

In [7]:
def plot_trigger(trace, cft, thr_on, thr_off, show=True, save=""):
    """
    Plot characteristic function of trigger along with waveform data and
    trigger On/Off from given thresholds.

    :type trace: :class:`~obspy.core.trace.Trace`
    :param trace: waveform data
    :type cft: :class:`numpy.ndarray`
    :param cft: characteristic function as returned by a trigger in
        :mod:`obspy.signal.trigger`
    :type thr_on: float
    :param thr_on: threshold for switching trigger on
    :type thr_off: float
    :param thr_off: threshold for switching trigger off
    :type show: bool
    :param show: Do not call `plt.show()` at end of routine. That way,
        further modifications can be done to the figure before showing it.
    """
    import matplotlib.pyplot as plt
    from obspy.signal.trigger import z_detect, trigger_onset
    df = trace.stats.sampling_rate
    npts = trace.stats.npts
    t = np.arange(npts, dtype=np.float32) / df
    fig = plt.figure()
    ax1 = fig.add_subplot(211)
    ax1.plot(t, trace.data, 'k')
    ax2 = fig.add_subplot(212, sharex=ax1)
    ax2.plot(t, cft, 'k')
    on_off = np.array(trigger_onset(cft, thr_on, thr_off))
    i, j = ax1.get_ylim()
    try:
        ax1.vlines(on_off[:, 0] / df, i, j, color='r', lw=2,
                   label="Trigger On")
        ax1.vlines(on_off[:, 1] / df, i, j, color='b', lw=2,
                   label="Trigger Off")
        ax1.legend()
    except IndexError:
        pass
    ax2.axhline(thr_on, color='red', lw=1, ls='--')
    ax2.axhline(thr_off, color='blue', lw=1, ls='--')
    ax2.set_xlabel("Time after %s [s]" % trace.stats.starttime.isoformat())
    fig.suptitle(trace.id)
    fig.canvas.draw()
    if save:
        plt.savefig(save, format='jpeg', bbox_inches='tight')
        # plt.close()
    if show:
        plt.show()
    

In [None]:
# this performs best on the moon.
for i, file in enumerate(data_files[:]):
    def analyze(trace: obspy.Trace, show=False):
        from obspy.signal.trigger import z_detect, trigger_onset, classic_sta_lta
        import matplotlib.pyplot as plt

        lta = 1800
        cF = z_detect(trace.data, int(lta * trace.stats.sampling_rate))

        on = 1.0
        off = 0.5

        on_off = np.array(trigger_onset(cF, on, off))

        if show:
            _, ax = plt.subplots(1,1,figsize=(12,3))
            ax.plot(trace.times(), cF)
            plot_trigger(trace, cF, on, off, show=False, save=f"images/{name_prefix}{i}.jpeg")
            plt.close()
        
        return on_off

    s = obspy.read(file).detrend().filter("bandpass", freqmin=0.5, freqmax=1)
    results.append(analyze(s[0], show=True))

results

In [None]:
# this performs best on the moon.
for i, file in enumerate(data_files[:]):
    def analyze(trace: obspy.Trace, show=False):
        from obspy.signal.trigger import z_detect, trigger_onset, classic_sta_lta
        import matplotlib.pyplot as plt

        lta = 300
        cF = z_detect(trace.data, int(lta * trace.stats.sampling_rate))

        on = 1.0
        off = 0.5

        on_off = np.array(trigger_onset(cF, on, off))

        if show:
            _, ax = plt.subplots(1,1,figsize=(12,3))
            ax.plot(trace.times(), cF)
            plot_trigger(trace, cF, on, off, show=True, save=f"images/{name_prefix}{i}.jpeg")
            plt.close()
        
        return on_off

    s = obspy.read(file).detrend().filter("bandpass", freqmin=0.5, freqmax=1)
    results.append(analyze(s[0], show=True))

results