In [None]:
# ST-CNN Model for Identification of Pulse-like Ground Motions and Pulse Periods 
# Chenyang Zhu
# For the convenience of batch processing, the code here has been modified on the original basis

import csv
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from stockwell import st
from scipy.signal import chirp
from scipy.interpolate import interp2d
from tensorflow.keras import regularizers


def process_file(filename):
    with open(filename, 'r') as file:
        lines = file.readlines()
    dt,  npts= map(float, lines[0].split())
    at = np.array([float(line) for line in lines[1:]]) * 9.81
    assert len(at) == npts, "Data point mismatch"
    vt = np.cumsum(np.array(at) * dt)
    vt_cm = vt*100
    time = np.arange(0, npts * dt, dt)    
    return dt, npts, time, vt_cm

# Fault-normal ground motion at the Pacoima Dam (upper left abutment) seismic station during the 1971 San Fernando earthquake
file_example = "example.acc" 

# Generate velocity time series matrix
dt, npts, time, vt_cm = process_file(file_example)
T = []
T_ans = dt*(npts-1)
T.append(T_ans)
T = np.array(T).astype(np.float32)

# S-Transfrom
df = 1./(time[-1]-time[0])
fmin = 0  
fmax = 10  
fmin_samples = int(fmin/df)
fmax_samples = int(fmax/df)                        
stock = np.abs(st.st(vt_cm, fmin_samples, fmax_samples))
frequency = np.linspace(fmin, fmax, stock.shape[0])  
f = interp2d(time, frequency.flatten(), stock)

# Interpolation
new_time_100100 = np.linspace(time[0], time[-1], 100)
new_frequency_100100 = np.linspace(frequency.min(), frequency.max(), 100)
new_stock_100100 = f(new_time_100100, new_frequency_100100).reshape((100, 100))  
new_stock_100100_normalized = new_stock_100100.reshape((-1, 100, 100, 1))

new_time_200200 = np.linspace(time[0], time[-1], 200)
new_frequency_200200 = np.linspace(frequency.min(), frequency.max(), 200)
new_stock_200200 = f(new_time_200200, new_frequency_200200).reshape((200, 200)) 
new_stock_200200_normalized = new_stock_200200.reshape((-1, 200, 200, 1))

new_time_300300 = np.linspace(time[0], time[-1], 300)
new_frequency_300300 = np.linspace(frequency.min(), frequency.max(), 300)
new_stock_300300 = f(new_time_300300, new_frequency_300300).reshape((300, 300)) 
new_stock_300300_normalized = new_stock_300300.reshape((-1, 300, 300, 1))

In [None]:
# Strict Model

class Strict_ConvNet(tf.keras.Model):
    def __init__(self):
        super(Strict_ConvNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(200, 200, 1)) 
        self.pool1 = tf.keras.layers.MaxPooling2D((2, 2))  
        self.conv2 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.pool2 = tf.keras.layers.MaxPooling2D((2, 2)) 
        self.flatten = tf.keras.layers.Flatten() 
        self.fc1 = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001))  
        self.drop1 = tf.keras.layers.Dropout(0.5)
        self.fc2 = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.drop2 = tf.keras.layers.Dropout(0.5)
        self.fc3 = tf.keras.layers.Dense(3, activation='softmax') 
    def call(self, inputs):
        s, t = inputs
        x = self.conv1(s)
        x = self.pool1(x) 
        x = self.conv2(x) 
        x = self.pool2(x)
        x = self.flatten(x) 
        x = self.fc1(x)  
        x = tf.concat([x, t], axis=1) 
        x = self.drop1(x)      
        x = self.fc2(x)  
        x = self.drop2(x)
        x = self.fc3(x)
        return x

Strict_model = Strict_ConvNet()

Strict_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

Strict_model((np.zeros((1, 200, 200, 1)), np.zeros((1, 1))))

Strict_model.load_weights('200_200 strict 2 128.h5')

Strict_predictions = Strict_model.predict((new_stock_200200_normalized, tf.expand_dims(T, axis=1)))

Strict_predictions = np.array(Strict_predictions)

Strict_predicted_labels = np.argmax(Strict_predictions, axis=1)-1

# [1] == pulse-like; [0] == ambiguous; [-1] == non-pulse-like
print("Strict Identification: ", Strict_predicted_labels)

In [None]:
# General Model

class General_ConvNet(tf.keras.Model):
    def __init__(self):
        super(General_ConvNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(100, 100, 1)) 
        self.pool1 = tf.keras.layers.MaxPooling2D((2, 2))  
        self.conv2 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.pool2 = tf.keras.layers.MaxPooling2D((2, 2)) 
        self.conv3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.pool3 = tf.keras.layers.MaxPooling2D((2, 2))
        self.conv4 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.pool4 = tf.keras.layers.MaxPooling2D((2, 2))
        self.flatten = tf.keras.layers.Flatten() 
        self.fc1 = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001))  
        self.drop1 = tf.keras.layers.Dropout(0.5)
        self.fc2 = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)) 
        self.drop2 = tf.keras.layers.Dropout(0.5)
        self.fc3 = tf.keras.layers.Dense(2, activation='softmax') 
    def call(self, inputs):
        s, t = inputs
        x = self.conv1(s)
        x = self.pool1(x) 
        x = self.conv2(x) 
        x = self.pool2(x)
        x = self.conv3(x) 
        x = self.pool3(x)
        x = self.conv4(x) 
        x = self.pool4(x)
        x = self.flatten(x) 
        x = self.fc1(x)  
        x = tf.concat([x, t], axis=1) 
        x = self.drop1(x)      
        x = self.fc2(x)  
        x = self.drop2(x)
        x = self.fc3(x)
        return x

General_model = General_ConvNet()

General_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

General_model((np.zeros((1, 100, 100, 1)), np.zeros((1, 1))))

General_model.load_weights('100_100 general 4 128.h5')

General_predictions = General_model.predict((new_stock_100100_normalized, tf.expand_dims(T, axis=1)))

General_predictions = np.array(General_predictions)

General_predicted_labels = np.argmax(General_predictions, axis=1)

# [1] == pulse-like; [0] == non-pulse-like
print("General Identification: ", General_predicted_labels)

In [None]:
# Tp Model

class Tp_ConvNet(tf.keras.Model):
    def __init__(self):
        super(Tp_ConvNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(300, 300, 1)) 
        self.pool1 = tf.keras.layers.MaxPooling2D((2, 2))  
        self.conv2 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu') 
        self.pool2 = tf.keras.layers.MaxPooling2D((2, 2)) 
        self.conv3 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu') 
        self.pool3 = tf.keras.layers.MaxPooling2D((2, 2)) 
        self.conv4 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu') 
        self.pool4 = tf.keras.layers.MaxPooling2D((2, 2)) 
        self.flatten = tf.keras.layers.Flatten() 
        self.fc1 = tf.keras.layers.Dense(128, activation='relu')  
        self.drop1 = tf.keras.layers.Dropout(0.01)
        self.fc2 = tf.keras.layers.Dense(128, activation='relu') 
        self.drop2 = tf.keras.layers.Dropout(0.01)
        self.fc3 = tf.keras.layers.Dense(1) 
    def call(self, inputs):
        s, t = inputs
        x = self.conv1(s) 
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x) 
        x = self.conv3(x)
        x = self.pool3(x)
        x = self.conv4(x)
        x = self.pool4(x)
        x = self.flatten(x)  
        x = self.fc1(x)  
        x = tf.concat([x, t], axis=1)      
        x = self.drop1(x)       
        x = self.fc2(x)  
        x = self.drop2(x)
        x = self.fc3(x)
        return x

Tp_model = Tp_ConvNet()

optimizer = tf.keras.optimizers.Adam(lr=0.001)
Tp_model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['mae'])

Tp_model((np.zeros((1, 300, 300, 1)), np.zeros((1, 1))))

Tp_model.load_weights('300_300 Tp 4 128.h5')

Tp_predictions = Tp_model.predict((new_stock_300300_normalized, tf.expand_dims(T, axis=1)))

Tp_predictions = np.array(Tp_predictions)

print("Tp Identification: ", Tp_predictions)