## Build and Train the Model, then generate figures

(Run this after running the data collection notebook)

In [None]:
import pandas as pd
import numpy as np

from tensorflow import keras
from tensorflow.keras import layers
from kerastuner.tuners import RandomSearch
from keras import backend as K
import matplotlib.pyplot as plt

#web scraping
import requests
import bs4
import lxml.etree as xml

#getting waveforms
import obspy
from obspy.clients.fdsn import Client
from obspy import UTCDateTime
from obspy import read

from obspy.signal.trigger import classic_sta_lta
from obspy.signal.trigger import plot_trigger

import sklearn.metrics as metrics
from sklearn.metrics import roc_curve

client = Client("IRIS")

import pickle

data_cols = ["ISC EventID", "Event Date", "Event Time", "Arrival Date", "Arrival Time", "Lat", "Long", "MagType", "Mag", "Station", "Channel", "ISC Phase", "Distance (Deg)", "IsExplosion", "SampRate", "Samples"]

with open("x_train.pic", "rb") as fp:
  x_train = pickle.load(fp)
with open("y_train.pic", "rb") as fp:
  y_train = pickle.load(fp)
with open("x_val.pic", "rb") as fp:
  x_val = pickle.load(fp)
with open("y_val.pic", "rb") as fp:
  y_val = pickle.load(fp)
with open("x_test.pic", "rb") as fp:
  x_test = pickle.load(fp)
with open("y_test.pic", "rb") as fp:
  y_test = pickle.load(fp)


num_classes = len(np.unique(y_train))

with open("train_info.pic", "rb") as fp:
  df_train = pickle.load(fp)
with open("val_info.pic", "rb") as fp:
  df_val = pickle.load(fp)
with open("test_info.pic", "rb") as fp:
  df_test = pickle.load(fp)

# Original data is from 60 seconds before to 120 seconds after arrival
# This changes it to 10 seconds before to 80 seconds after arrival
x_train = x_train[:,20*50:-20*40,:]
x_val = x_val[:,20*50:-20*40,:]
x_test = x_test[:,20*50:-20*40,:]

In [None]:
def acc(pred, t): #calculates the accuracy of predictions on training data at threshold t
    pos = 0
    neg = 0
    truepos = 0
    trueneg = 0
    for i in range (y_train.shape[0]):
        if y_train[i] == 1:
            pos += 1
            if pred[i] >= t:
                truepos += 1
        else:
            neg += 1
            if pred[i] < t:
                trueneg += 1

    print ("Total Accuracy:")
    print ((truepos+trueneg)/(pos+neg))
    print ("Positive Recall:")
    print (truepos/pos)
    print ("Negative Recall:")
    print (trueneg/neg)

In [None]:
#Build the model

def make_model(input_shape):
    input_layer = keras.layers.Input(input_shape)
    
    conv1 = keras.layers.Conv1D(filters=16, kernel_size=27, padding="same")(input_layer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)
    
    pool = keras.layers.MaxPooling1D()(conv1)
    
    bl1 = keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True))(pool)
    bl1 = keras.layers.BatchNormalization()(bl1)
    bl1 = keras.layers.Dropout(0.2)(bl1)
    
    l1 = keras.layers.LSTM(64, return_sequences=False)(bl1)
    l1 = keras.layers.BatchNormalization()(l1)
    l1 = keras.layers.Dropout(0.1)(l1)
    
    d1 = keras.layers.Dense(64, activation="relu")(l1)
    d1 = keras.layers.BatchNormalization()(d1)
    d1 = keras.layers.Dropout(0.1)(d1)
        
    output_layer = keras.layers.Dense(1, activation="sigmoid")(d1)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)


model = make_model(input_shape=x_train.shape[1:])

In [None]:
#Train the model

callbacks = [
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),
]
model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=["accuracy"],
)


history = model.fit(
    x_train,
    y_train,
    batch_size=32,
    epochs=60,
    callbacks=callbacks,
    verbose=1,
    shuffle=True,
    validation_data=(x_val, y_val),
)

print ("")
print ("Model Trained")
print ("")

pred = model.predict(x_train, verbose=0)
acc(pred, 0.5)
print ("\n\n")
#acc(pred, 0.65)
#print ("\n\n")
#acc(pred, 0.75)

#Plot training and validation accuracy
metric = "accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

# ROC curve
fpr, tpr, threshold = metrics.roc_curve(y_train, pred)
roc_auc = metrics.auc(fpr, tpr)
plt.title('Receiver Operating Characteristic')
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.legend(loc = 'lower right')
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()

plt.scatter(np.arange(0, pred.shape[0], 1), pred, s=3)

In [None]:
# Test the model on the test data

pred = model.predict(x_test, verbose=0)

t = 0.5
pos = 0
neg = 0
truepos = 0
trueneg = 0
for i in range (y_test.shape[0]):
    if y_test[i] == 1:
        pos += 1
        if pred[i] >= t:
            truepos += 1
    else:
        neg += 1
        if pred[i] < t:
            trueneg += 1

print ("Total Accuracy:")
print ((truepos+trueneg)/(pos+neg))
print ("Positive Recall:")
print (truepos/pos)
print ("Negative Recall:")
print (trueneg/neg)

# ROC curve
fpr, tpr, threshold = metrics.roc_curve(y_test, pred)
roc_auc = metrics.auc(fpr, tpr)
plt.title('Receiver Operating Characteristic')
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.legend(loc = 'lower right')
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()

## Plot figures

In [None]:
def SetPlotRC(): # Sets fonts to latex
    plt.rcParams['text.usetex'] = True
    
    #plt.rcParams['text.latex.preamble'] = [r'\usepackage{sansmath}', r'\sansmath'] #Force sans-serif math mode (for axes labels)
    plt.rcParams['font.family'] = 'sans-serif' # ... for regular text
    plt.rcParams['font.sans-serif'] = 'Computer Modern Sans serif' # Choose a nice font here

In [None]:
# Accuracy vs. max sta/lta value (within 10 seconds of arrival)

# Revert to full samples to calculate the cft value within 10 seconds of the arrival
with open("x_test.pic", "rb") as fp:
  x_test = pickle.load(fp)

y = []
for i in range (x_test.shape[0]):
    cft = classic_sta_lta(x_test[i,:,0], int(5 * 20), int(20 * 20))
    y.append(max(cft[50*20:70*20]))
    print (max(cft[50*20:70*20]))

right = np.zeros(8)
total = np.zeros(8)

for i in range (x_test.shape[0]):
    ind = (y[i] - 2) * 4
    ind = int(ind)
    if ind >= 0:
        total[ind] += 1
        if int(pred[i] + 0.5) == y_test[i]:
            right[ind] += 1
            
perc = right/total
perc *= 100

xt = []
for i in range (8):
    xt.append(str(0.25*i+2.0))# + "-" + str(5*i+25))

N = len(perc)
ind = np.arange(N)
width = 0.8
plt.figure(figsize=(6, 4.5))
SetPlotRC()

plt.bar(ind+(1-width)/2, perc, width)

plt.ylabel('\% accuracy', fontsize=20)
plt.xlabel('Max STA/LTA Value', fontsize=20)
#plt.title('Accuracy by Max STA/LTA Value')

plt.yticks(np.arange(0, 101, 20), fontsize=15)
plt.xticks(ind - width / 2, xt, fontsize=15)


plt.tight_layout()
#plt.savefig("acc_sta_lta.pdf")

plt.show()

In [None]:
#Accuracy based on distance (degrees)

#Ranges of 20 from 20-180 degrees
right = np.zeros((8))
total = np.zeros((8))

i = 0
for ind, row in df_test.iterrows():
    d = float(row["Distance (Deg)"])
    index = int(d-20) // 20
    
    #print (d, index)
    total[index] += 1
    if int(pred[i]+0.5) == int(row["IsExplosion"]):
        right[index] += 1
    i += 1

right = np.array(right)
total = np.array(total)

perc = right/total

for i in range (len(perc)):
    if total[i] == 0:
        perc[i] = 0;
    perc[i] *= 100

xt = []
for i in range (8):
    xt.append(str(20*i+20))# + "-" + str(5*i+25))
    
N = len(perc)
ind = np.arange(N) 
width = 0.8
plt.figure(figsize=(6, 4.5))

plt.bar(ind+(1-width)/2, perc, width)
#plt.bar(ind+(1-width)/2, und, width, color = (0.1, 0.1, 0.1))
SetPlotRC()

plt.ylabel('\% accuracy', fontsize=20)
plt.xlabel('Distance Detected (Degrees)', fontsize=20)
#plt.title('Accuracy by distance detected', fontsize=20)

plt.xticks(ind - width / 2, xt, fontsize=15)#, fontname='DejaVu Sans')
plt.yticks(np.arange(0, 101, 20), fontsize=15)#, fontname='DejaVu Sans')
#plt.yticks(ind - width / 2, xt, fontsize=15)


plt.tight_layout()
#plt.savefig("acc_dist.pdf")

plt.show()

In [None]:
#Training examples based on distance (degrees)

#Ranges of 20 from 20-155 degrees
total = np.zeros((8))

i = 0
for ind, row in df_train.iterrows():
    d = float(row["Distance (Deg)"])
    index = int(d-20) // 20
    
    #print (d, index)
    total[index] += 1
    i += 1

total = np.array(total)

xt = []
for i in range (8):
    xt.append(str(20*i+20))# + "-" + str(5*i+25))
    
N = len(total)
ind = np.arange(N) 
width = 0.8
plt.figure(figsize=(6, 4))

SetPlotRC()

plt.bar(ind+(1-width)/2, total, width)
#plt.bar(ind+(1-width)/2, und, width, color = (0.1, 0.1, 0.1))


plt.ylabel('\# training examples', fontsize=20)
plt.xlabel('Distance Detected (Degrees)', fontsize=20)
#plt.title('Training examples by distance detected')

plt.xticks(ind - width / 2, xt, fontsize=15)
plt.yticks(fontsize=15)
#plt.yticks(ind - width / 2, xt, fontsize=15)


plt.tight_layout()
#plt.savefig("train_dist.pdf")

plt.show()

In [None]:
#Accuracy based on Magnitude

#Ranges of 0.5 from 3.0-6.9 degrees
right = []
total = []
for i in range(8):
    right.append(0)
    total.append(0)

i = 0
for ind, row in df_test.iterrows():
    if row["Mag"] == '':
        i += 1
        continue
    m = float(row["Mag"])
    index = int((m-3) * 2)
    
    #print (m, index)
    total[index] += 1
    if int(pred[i]+0.5) == int(row["IsExplosion"]):
        right[index] += 1
    i += 1

right = np.array(right)
total = np.array(total)

perc = right/total

for i in range (len(perc)):
    if total[i] == 0:
        perc[i] = 0;
    perc[i] *= 100
    
xt = []
for i in range (8):
    xt.append(str(0.5*i+3.0))# + "-" + str(5*i+25))

N = len(perc)
ind = np.arange(N)
width = 0.8
plt.figure(figsize=(6, 4.5))
#SetPlotRC()

plt.bar(ind+(1-width)/2, perc, width)
#plt.bar(ind+(1-width)/2, und, width, color = (0.1, 0.1, 0.1))


plt.ylabel('\% accuracy', fontsize=20)
plt.xlabel('Magnitude of Event (mb)', fontsize=20)
#plt.title('Accuracy by Magnitude')

plt.yticks(fontsize=15)
plt.xticks(ind - width / 2, xt, fontsize=15)


plt.tight_layout()
#plt.savefig("acc_mag.pdf")

plt.show()

In [None]:
#Training examples based on Magnitude

#Ranges of 0.5 from 3.0-6.9 degrees
right = []
total = []
for i in range(8):
    right.append(0)
    total.append(0)

i = 0
for ind, row in df_train.iterrows():
    if row["Mag"] == '':
        i += 1
        continue
    m = float(row["Mag"])
    index = int((m-3) * 2)
    
    #print (m, index)
    total[index] += 1
    
    i += 1

right = np.array(right)
total = np.array(total)

perc = right/total

for i in range (len(perc)):
    if total[i] == 0:
        perc[i] = 0;
    perc[i] *= 100
    
xt = []
for i in range (8):
    xt.append(str(0.5*i+3.0))# + "-" + str(5*i+25))

N = len(perc)
ind = np.arange(N)
width = 0.8
plt.figure(figsize=(6, 4))
SetPlotRC()

plt.bar(ind+(1-width)/2, total, width)
#plt.bar(ind+(1-width)/2, und, width, color = (0.1, 0.1, 0.1))


plt.ylabel('\# training examples', fontsize=20)
plt.xlabel('Magnitude of Event (mb)', fontsize=20)
#plt.title('Training examples by Magnitude')

plt.yticks(fontsize=15)
plt.xticks(ind - width / 2, xt, fontsize=15)

plt.tight_layout()
#plt.savefig("train_mag.pdf")

plt.show()

## Test Model on unseen rockbursts (treated as explosions)

In [None]:
with open("x_rock.pic", "rb") as fp:
  x_rock = pickle.load(fp)
with open("y_rock.pic", "rb") as fp:
  y_rock = pickle.load(fp)
with open("df_rock.pic", "rb") as fp:
  df_rock = pickle.load(fp)

In [None]:
pred2 = model.predict(x_rock, verbose=0)

t = 0.5
right = 0
for i in range (y_rock.shape[0]):
    if y_rock[i] == 1:
        if pred2[i] >= t:
            right += 1
    else:
        if pred2[i] < t:
            right += 1

print ("Total Accuracy:")
print ((right)/(y_rock.shape[0]))