# Triggering Mechanism Development

> one window for triggering

> one window for termination

In current version, we only use the first window 

In [1]:
# Import Depenencies
import pandas as pd  # for data manipulation
import numpy as np  # for numerical operations
import os
import csv
import random

import scipy.io as sio  # for loading .mat files
from scipy import signal  # for signal processing
from scipy.signal import hilbert  # for signal processing

import plotly.graph_objects as go  # for data visualization
import plotly.express as px  # for data visualization
from plotly.subplots import make_subplots  # for data visualization
import matplotlib.pyplot as plt  # for plotting
import seaborn as sns


In [1]:
# file path
path1 = "E:\Research\SATM-DATA" # for cross-platform compatibility
path2 = "\Data_by_Event.mat"
path = path1 + path2
print(path)

E:\Research\SATM-DATA\Data_by_Event.mat


In [3]:
# raw data
RawData = sio.loadmat(path)
RawData = np.transpose(RawData['Data_by_Event_PT'])

# shape of raw data
print("Shape of Raw Data: ", RawData.shape)


Shape of Raw Data:  (38400, 6000)


In [4]:
# this step is to remove the data containing sensor faults

# extract useful data
num_events = 4
event_types = ['AN', 'EQ', 'IP', 'WD']
num_faults = 4

# ambient noise
num_an_start = 0
num_an_end = RawData.shape[0] / num_events / num_faults
assert num_an_end.is_integer(), "num_an_end is not an integer"

# earthquake
num_eq_start = 1*RawData.shape[0] / num_events
num_eq_end = 1*RawData.shape[0] / num_events + 1*RawData.shape[0] / num_events / num_faults

# impact
num_ip_start = 2*RawData.shape[0] / num_events
num_ip_end = 2*RawData.shape[0] / num_events + 1*RawData.shape[0] / num_events / num_faults

# strong wind
num_wd_start = 3*RawData.shape[0] / num_events
num_wd_end = 3*RawData.shape[0] / num_events + 1*RawData.shape[0] / num_events / num_faults

# extract data
RawData_AN = RawData[int(num_an_start):int(num_an_end), :]
RawData_EQ = RawData[int(num_eq_start):int(num_eq_end), :]
RawData_IP = RawData[int(num_ip_start):int(num_ip_end), :]
RawData_WD = RawData[int(num_wd_start):int(num_wd_end), :]

# stack data
Data = np.vstack((RawData_AN, RawData_EQ, RawData_IP, RawData_WD))

# check size
print("Shape of Data: ", Data.shape)


Shape of Data:  (9600, 6000)


In [5]:
# plot data samples
selected_samples = np.arange(0, num_events, 1) * int(Data.shape[0]/num_events)
print("Selected Samples: ", selected_samples)

# use plotly to plot data samples
fig = make_subplots(rows=4, cols=1, subplot_titles=event_types)

for i in range(num_events):
    fig.add_trace(go.Scatter(x=np.arange(0, 6000, 1), y=Data[selected_samples[i], :], mode = 'lines', name=event_types[i]),
                  row=i + 1, col=1)
    
fig.update_layout(height=1000, width=800, title_text="Data Samples")
fig.show()


Selected Samples:  [   0 2400 4800 7200]


In [6]:
# Triggering Mechanism

# trigger_duration
trigger_duration_min = 0.01
trigger_duration_max = 0.5 

sampling_rate = 100 # Hz

trigger_width_min = int(trigger_duration_min * sampling_rate)
trigger_width_max = int(trigger_duration_max * sampling_rate)

trigger_width_range = np.arange(trigger_width_min, trigger_width_max + 1, 1)

# print trigger_width_min, trigger_width_max
print("trigger_width_min: ", trigger_width_min)
print("trigger_width_max: ", trigger_width_max)

# trigger_threshold
trigger_threshold_min = 0.0
trigger_threshold_max = 0.5
trigger_threshold_step = 0.05

trigger_threshold_range = np.arange(trigger_threshold_min, trigger_threshold_max + trigger_threshold_step, trigger_threshold_step)

# print trigger_threshold_min, trigger_threshold_max
print("trigger_threshold_min: ", trigger_threshold_min)
print("trigger_threshold_max: ", trigger_threshold_max)


trigger_width_min:  1
trigger_width_max:  50
trigger_threshold_min:  0.0
trigger_threshold_max:  0.5


In [7]:
# Trigger Check
# trigger check function
# triggering logic: if the absolute value of the signal is greater than the threshold for a certain duration, then it is a trigger

def trigger_check(data, trigger_width, trigger_threshold):
    # data: input data
    # trigger_width: width of the trigger
    # trigger_threshold: threshold of the trigger
    
    # take absolute value
    data = np.abs(data)

    # minus the trigger threshold
    data = data - trigger_threshold

    # use a mask to check if the data is greater than 0
    data = data > 0

    # if there are trigger_width consecutive True, then it is a trigger, return 1 else return 0
    trigger_check = np.convolve(data, np.ones(trigger_width), 'valid') == trigger_width

    # if any true in trigger_check, then return 1 else return 0
    result = 1 if np.any(trigger_check) else 0
    
    return result
    
    

In [8]:
# test

test_result = trigger_check(Data[0, :], trigger_width_min, trigger_threshold_min)

print("Test Result: ", test_result)

Test Result:  1


so far, the triggering mechanism looks good

In [9]:
# define a structure to hold results
results = np.zeros((len(trigger_threshold_range), len(trigger_width_range), num_events, int(Data.shape[0]/num_events))) # dim 1: threshold, dim 2: width, dim 3: event type, dim 4: detailed results for each sample in each event type

In [10]:
for i in range(len(trigger_threshold_range)):
    # progress report
    print("[Progress]: ", i + 1, "/", len(trigger_threshold_range))
    for j in range(len(trigger_width_range)):
        # # progress report
        # print("<Progress>: ", j, "/", len(trigger_width_range))
        for k in range(num_events):
            for l in range(int(Data.shape[0]/num_events)):
                results[i, j, k, l] = trigger_check(Data[l + k*int(Data.shape[0]/num_events), :], trigger_width_range[j], trigger_threshold_range[i])

[Progress]:  1 / 11
[Progress]:  2 / 11
[Progress]:  3 / 11
[Progress]:  4 / 11
[Progress]:  5 / 11
[Progress]:  6 / 11
[Progress]:  7 / 11
[Progress]:  8 / 11
[Progress]:  9 / 11
[Progress]:  10 / 11
[Progress]:  11 / 11


In [11]:
# Metrics Calculation
# helper functions

# accuracy
def accuracy(TP, FP, TN, FN):
    return (TP + TN) / (TP + FP + TN + FN)

# precision
def precision(TP, FP, TN, FN):
    return TP / (TP + FP)

# recall
def recall(TP, FP, TN, FN):
    return TP / (TP + FN)

# Fbeta
BETA = 2
def Fbeta(TP, FP, TN, FN, beta):
    return (1 + beta**2) * (precision(TP, FP, TN, FN) * recall(TP, FP, TN, FN)) / ((beta**2) * precision(TP, FP, TN, FN) + recall(TP, FP, TN, FN))


In [12]:
# metrics
metrics = np.zeros((len(trigger_threshold_range), len(trigger_width_range), 8)) # 8 metrics: TP, FP, TN, FN, Accuracy, Precision, Recall, Fbeta

# calculate metrics
for i in range(len(trigger_threshold_range)):
    # progress report
    print("[Progress]: ", i+1, "/", len(trigger_threshold_range))
    for j in range(len(trigger_width_range)):
        # # progress report
        # print("<Progress>: ", j, "/", len(trigger_width_range))
        # calculate TP, FP, TN, FN
        for k in range(num_events):
            for l in range(int(Data.shape[0]/num_events)):
                if results[i, j, k, l] == 1: # positive
                    if k == 0: # ambient noise - false
                        metrics[i, j, 1] += 1
                    else: # earthquake, impact, strong wind - true
                        metrics[i, j, 0] += 1
                else: # negative
                    if k == 0: # ambient noise - true
                        metrics[i, j, 2] += 1
                    else: # earthquake, impact, strong wind - false
                        metrics[i, j, 3] += 1
        
        # calculate accuracy, precision, recall, Fbeta
        metrics[i, j, 4] = accuracy(metrics[i, j, 0], metrics[i, j, 1], metrics[i, j, 2], metrics[i, j, 3])
        metrics[i, j, 5] = precision(metrics[i, j, 0], metrics[i, j, 1], metrics[i, j, 2], metrics[i, j, 3])
        metrics[i, j, 6] = recall(metrics[i, j, 0], metrics[i, j, 1], metrics[i, j, 2], metrics[i, j, 3])
        metrics[i, j, 7] = Fbeta(metrics[i, j, 0], metrics[i, j, 1], metrics[i, j, 2], metrics[i, j, 3], BETA)

# find the best Fbeta and return its corresponding threshold and width
max_Fbeta = np.max(metrics[:, :, 7])
print("Max Fbeta: ", max_Fbeta)

max_Fbeta_index = np.where(metrics[:, :, 7] == max_Fbeta)
max_Fbeta_index = (max_Fbeta_index[0][0], max_Fbeta_index[1][0])# only take the first one
print("Max Fbeta Index: ", max_Fbeta_index)

# print the best threshold and width
best_threshold = trigger_threshold_range[max_Fbeta_index[0]]
print("Best Threshold: ", round(best_threshold, 2)) # print the best threshold, 3 digits in total, 2 digits after decimal point

best_width = trigger_width_range[max_Fbeta_index[1]]
print("Best Width (number of samples): ", best_width) # print the best width, unit is sample

# print the best width time, unit is millisecond
best_width_time = best_width / sampling_rate * 1000
print("Best Width (time): ", round(best_width_time), "ms") # print the best width, 3 digits in total, 2 digits after decimal point, unit is millisecond


[Progress]:  1 / 11
[Progress]:  2 / 11
[Progress]:  3 / 11
[Progress]:  4 / 11
[Progress]:  5 / 11
[Progress]:  6 / 11
[Progress]:  7 / 11
[Progress]:  8 / 11
[Progress]:  9 / 11
[Progress]:  10 / 11
[Progress]:  11 / 11
Max Fbeta:  0.9992220709582418
Max Fbeta Index:  (6, 4)
Best Threshold:  0.3
Best Width (number of samples):  5
Best Width (time):  50 ms


In [13]:
# use plotly to plot the Accuracy mesh
# x axis: trigger threshold, y axis: trigger width, z axis: Accuracy
fig = go.Figure(data=[go.Surface(z=metrics[:, :, 4], x=trigger_threshold_range, y=trigger_width_range*1000/sampling_rate)])

fig.update_layout(title='Accuracy Mesh', autosize=False,
                    width=800, height=800,
                    margin=dict(l=65, r=50, b=65, t=90),
                    scene=dict(xaxis_title='Threshold', yaxis_title='Duration', zaxis_title='Accuracy'))

fig.show()

In [14]:
# use plotly to plot the Precision mesh
# x axis: trigger threshold, y axis: trigger width, z axis: Precision
fig = go.Figure(data=[go.Surface(z=metrics[:, :, 5], x=trigger_threshold_range, y=trigger_width_range*1000/sampling_rate)])

fig.update_layout(title='Precision Mesh', autosize=False,
                    width=800, height=800,
                    margin=dict(l=65, r=50, b=65, t=90),
                    scene=dict(xaxis_title='Threshold', yaxis_title='Duration', zaxis_title='Precision'))

fig.show()

In [15]:
# use plotly to plot the Recall mesh
# x axis: trigger threshold, y axis: trigger width, z axis: Recall

fig = go.Figure(data=[go.Surface(z=metrics[:, :, 6], x=trigger_threshold_range, y=trigger_width_range*1000/sampling_rate)])

fig.update_layout(title='Recall Mesh', autosize=False,
                    width=800, height=800,
                    margin=dict(l=65, r=50, b=65, t=90),
                    scene=dict(xaxis_title='Threshold', yaxis_title='Duration', zaxis_title='Recall'))

fig.show()

In [16]:
# use plotly to plot the Fbeta mesh
# x axis: trigger threshold, y axis: trigger width, z axis: Fbeta
fig = go.Figure(data=[go.Surface(z=metrics[:, :, 7], x=trigger_threshold_range, y=trigger_width_range*1000/sampling_rate)])

# highlight the best Fbeta
fig.add_trace(go.Scatter3d(x=[best_threshold], y=[best_width_time], z=[max_Fbeta], mode='markers', marker=dict(size=10, color='red')))

fig.update_layout(title='Fbeta Mesh', autosize=False,
                    width=800, height=800,
                    margin=dict(l=65, r=50, b=65, t=90),
                    scene=dict(xaxis_title='Threshold', yaxis_title='Duration', zaxis_title='Fbeta'))

fig.show()

