In [1]:
import matplotlib.pyplot as plt
import tensorflow as tf
import datetime
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, BatchNormalization, MaxPooling2D, Reshape, LSTM, add, AveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import glorot_uniform, he_uniform
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from utils import *


Device mapping: no known devices.


## Data Loading

### Raw Data

In [None]:
X_train, y_train, X_test, y_test = init_data(subject=None, verbose=True)

### Data Augmentation

In [2]:
X_train_aug, y_train_aug, X_test_aug, y_test_aug, X_valid_aug, y_valid_aug= preprocess_data(X_train, y_train, X_test, y_test, verbose=True)
x_train_aug, y_train_aug, x_valid_aug, y_valid_aug, x_test_aug, y_test_aug = load_data(X_train_aug, y_train_aug, X_valid_aug, y_valid_aug, X_test_aug, y_test_aug, verbose=True)

NameError: name 'X_train' is not defined

### Data CWT

In [None]:
X_train, y_train, X_test, y_test = init_data(subject=None)
X_train, y_train, X_test, y_test, X_valid, y_valid = preprocess_data(X_train, y_train, X_test, y_test, verbose=True)

scales = np.arange(1, 7)

X_train_cwt = cwt_transform(X_train, scales, ricker, verbose=True)
X_valid_cwt = cwt_transform(X_valid, scales, ricker, verbose=True)
X_test_cwt = cwt_transform(X_test, scales, ricker, verbose=True)

# X_train_cwt, y_train_cwt, X_test_cwt, y_test_cwt, X_valid_cwt, y_valid_cwt= preprocess_data(X_train_cwt, y_train, X_test_cwt, y_test, verbose=True)
# X_train_cwt, y_train_cwt, X_valid_cwt, y_valid_cwt, X_test_cwt, y_test_cwt = load_data(X_train_cwt, y_train_cwt, X_valid_cwt, y_valid_cwt, X_test_cwt, y_test_cwt, verbose=True)



y_train_cwt = to_categorical(y_train, 4)
y_valid_cwt = to_categorical(y_valid, 4)
y_test_cwt = to_categorical(y_test, 4)

x_train_cwt = np.swapaxes(X_train_cwt, 1, 3)
x_valid_cwt = np.swapaxes(X_valid_cwt, 1, 3)
x_test_cwt = np.swapaxes(X_test_cwt, 1, 3)


## MaxPool_CNN_v2 w/Residual block

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Dense, Flatten, add, Activation
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam

def residual_block(X, filters, dropout, kernel_size=(4,1), adjust_filters=False):
    X_shortcut = X

    # If needed, adjust the number of filters in the shortcut path
    if adjust_filters:
        X_shortcut = Conv2D(filters=filters, kernel_size=(1,1), padding='same', kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X_shortcut)
        X_shortcut = BatchNormalization()(X_shortcut)  # It's a good practice to include BatchNormalization

    # First component of the main path
    X = Conv2D(filters=filters, kernel_size=kernel_size, padding='same', activation='elu',kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X)
    X = BatchNormalization()(X)
    X = Dropout(dropout)(X)

    # Second component of the main path
    X = Conv2D(filters=filters, kernel_size=kernel_size, padding='same', kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X)
    X = BatchNormalization()(X)
    X = Dropout(dropout)(X)
    # Add shortcut value to main path, and pass it through a RELU activation
    X = add([X, X_shortcut])
    X = Activation('elu')(X)

    return X

early_stop = EarlyStopping(monitor='val_loss', # Value to monitor
                           patience=7,         # How many epochs to wait after min has been hit
                           verbose=1,          # Logging level
                           mode='min',         # The direction is automatically inferred from the name of the monitored quantity
                           restore_best_weights=True) # Whether to restore model weights from the epoch with the best value of the monitored quantity

dropout = 0.5
learning_rate = 0.0003
weight_decay = 0.003
epochs = 25

input_shape = (250,1,22)

# Define the input as a tensor with shape input_shape
X_input = Input(input_shape)

# Initial Convolution and MaxPooling
X = Conv2D(filters=22, kernel_size=(4,1), padding='same', activation='elu', input_shape=input_shape, kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X_input)
X = MaxPooling2D(pool_size=(2,1), strides=(2,1), padding='same')(X)
X = BatchNormalization()(X)
X = Dropout(dropout)(X)

# Incorporating Residual Blocks
X = residual_block(X, 44, dropout, adjust_filters=True)
X = MaxPooling2D(pool_size=(2,1), strides=(2,1), padding='same')(X)

# X = residual_block(X, 96, dropout, adjust_filters=True)
# X = MaxPooling2D(pool_size=(3,1), strides=(3,1), padding='same')(X)

X = residual_block(X, 88, dropout, adjust_filters=True)
X = MaxPooling2D(pool_size=(2,1), strides=(2,1), padding='same')(X)

# X = residual_block(X, 160, dropout, adjust_filters=True)
# X = MaxPooling2D(pool_size=(3,1), strides=(3,1), padding='same')(X)

# X = residual_block(X, 192, dropout, adjust_filters=True)
# X = MaxPooling2D(pool_size=(3,1), strides=(3,1), padding='same')(X)

# X = residual_block(X, 224, dropout, adjust_filters=True)
# X = MaxPooling2D(pool_size=(3,1), strides=(3,1), padding='same')(X)

# X = residual_block(X, 256, dropout, adjust_filters=True)
# X = MaxPooling2D(pool_size=(3,1), strides=(3,1), padding='same')(X)

# Output layer
X = Flatten()(X)
# X = Dense(16, activation='elu', kernel_regularizer=l2(weight_decay), bias_regularizer=l2(weight_decay), kernel_initializer='glorot_uniform')(X)
X = Dense(4, activation='softmax', kernel_regularizer=l2(weight_decay), bias_regularizer=l2(weight_decay), kernel_initializer='glorot_uniform')(X)

# Create model
model = Model(inputs=X_input, outputs=X, name='ResNetCNN')

# Compile the model
optimizer = Adam(lr=learning_rate)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
results = model.fit(x_train_aug,
                  y_train_aug,
                  batch_size=32,
                  epochs=epochs,
                  validation_data=(x_valid_aug, y_valid_aug), callbacks=[early_stop], verbose=1)


In [None]:
score = model.evaluate(x_test_aug, y_test_aug, verbose=0)
print(f'Test accuracy:{score[1]}')

In [None]:
results = model.fit(x_train_aug,
                  y_train_aug,
                  batch_size=32,
                  epochs=50,
                  validation_data=(x_valid_aug, y_valid_aug), callbacks=[early_stop], verbose=1)
score = model.evaluate(x_test_aug, y_test_aug, verbose=0)
print(f'Test accuracy:{score[1]}')

### Train with CWT data

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Dense, Flatten, add, Activation
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam

def residual_block(X, filters, dropout, kernel_size=(4,1), adjust_filters=False):
    X_shortcut = X

    # If needed, adjust the number of filters in the shortcut path
    if adjust_filters:
        X_shortcut = Conv2D(filters=filters, kernel_size=(1,1), padding='same', kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X_shortcut)
        X_shortcut = BatchNormalization()(X_shortcut)  # It's a good practice to include BatchNormalization

    # First component of the main path
    X = Conv2D(filters=filters, kernel_size=kernel_size, padding='same', activation='elu',kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X)
    X = BatchNormalization()(X)
    X = Dropout(dropout)(X)

    # Second component of the main path
    X = Conv2D(filters=filters, kernel_size=kernel_size, padding='same', kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X)
    X = BatchNormalization()(X)
    X = Dropout(dropout)(X)
    # Add shortcut value to main path, and pass it through a RELU activation
    X = add([X, X_shortcut])
    X = Activation('elu')(X)

    return X

early_stop = EarlyStopping(monitor='val_loss', # Value to monitor
                           patience=7,         # How many epochs to wait after min has been hit
                           verbose=1,          # Logging level
                           mode='min',         # The direction is automatically inferred from the name of the monitored quantity
                           restore_best_weights=True) # Whether to restore model weights from the epoch with the best value of the monitored quantity

dropout = 0.5
learning_rate = 0.0005
weight_decay = 0.05
epochs = 25

input_shape = (250,6,22)

# Define the input as a tensor with shape input_shape
X_input = Input(input_shape)

# Initial Convolution and MaxPooling
X = Conv2D(filters=22, kernel_size=(4,1), padding='same', activation='elu', input_shape=input_shape, kernel_regularizer=l2(weight_decay), kernel_initializer='he_uniform')(X_input)
X = MaxPooling2D(pool_size=(2,1), strides=(2,1), padding='same')(X)
X = BatchNormalization()(X)
X = Dropout(dropout)(X)

# Incorporating Residual Blocks
X = residual_block(X, 44, dropout, adjust_filters=True)
X = MaxPooling2D(pool_size=(2,1), strides=(2,1), padding='same')(X)

# Output layer
X = Flatten()(X)
# X = Dense(100, activation='elu', kernel_regularizer=l2(weight_decay), bias_regularizer=l2(weight_decay), kernel_initializer='glorot_uniform')(X)
X = Dense(4, activation='softmax', kernel_regularizer=l2(weight_decay), bias_regularizer=l2(weight_decay), kernel_initializer='glorot_uniform')(X)

# Create model
model = Model(inputs=X_input, outputs=X, name='ResNetCNN')

# Compile the model
optimizer = Adam(lr=learning_rate)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
results = model.fit(x_train_cwt,
                  y_train_cwt,
                  batch_size=32,
                  epochs=epochs,
                  validation_data=(x_valid_cwt, y_valid_cwt), callbacks=[early_stop], verbose=1)


In [None]:
score = model.evaluate(x_test_cwt, y_test_cwt, verbose=0)
print(f'Test accuracy:{score[1]}')

In [None]:
results = model.fit(x_train_cwt,
                  y_train_cwt,
                  batch_size=32,
                  epochs=epochs,
                  validation_data=(x_valid_cwt, y_valid_cwt), callbacks=[early_stop], verbose=1)
score = model.evaluate(x_test_cwt, y_test_cwt, verbose=0)
print(f'Test accuracy:{score[1]}')