## Flare prediction with LSTM's
This notebook can be used to train a LSTM to perform flare predictions

In [None]:
import glob
import numpy as np
import tensorflow as tf
from tensorflow import keras
from matplotlib import pyplot as plt
from tensorflow.keras.layers import Dense, RNN, LSTM, Flatten, TimeDistributed, LSTMCell, GlobalAveragePooling2D, Input, Dropout, Attention
from tensorflow.keras.layers import RepeatVector, Conv2D, SimpleRNN, GRU, Reshape, ConvLSTM2D, Conv2DTranspose, Conv3DTranspose
from tensorflow.keras import regularizers
from sklearn.utils import shuffle
import keras.backend as K
from sklearn.metrics import classification_report

from google.colab import drive
drive.mount('/content/gdrive')

Upload the data

In [None]:
file_inp = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/train_input.npy'
file_tar = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/train_target.npy'
train_input = np.load(file_inp)
train_target = np.load(file_tar)

file_inp = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/val_input.npy'
file_tar = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/val_target.npy'
val_input = np.load(file_inp)
val_target = np.load(file_tar)


file_inp = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/test_input.npy'
file_tar = '/content/gdrive/MyDrive/ADL_project/Flare_Prediction/Multi_label_classification/test_target.npy'
test_input = np.load(file_inp)
test_target = np.load(file_tar)


train_input, train_target = shuffle(train_input, train_target, random_state=0)  #shuffle training set


binary = False  #if you want to treat the problem as a binary problem
if binary:
    temp = np.zeros(shape = (train_target.shape[0], 1, 2))
    i = np.where(train_target[:, :, 0] == 1)[0]
    j = np.where(train_target[:, :, 0] != 1)[0]
    temp[i, :, 0] = 1
    temp[j, :, 1] = 1
    train_target = temp

    temp = np.zeros(shape = (val_target.shape[0], 1, 2))
    i = np.where(val_target[:, :, 0] == 1)[0]
    j = np.where(val_target[:, :, 0] != 1)[0]
    temp[i, :, 0] = 1
    temp[j, :, 1] = 1
    val_target = temp

    temp = np.zeros(shape = (test_target.shape[0], 1, 2))
    i = np.where(test_target[:, :, 0] == 1)[0]
    j = np.where(test_target[:, :, 0] != 1)[0]
    temp[i, :, 0] = 1
    temp[j, :, 1] = 1
    test_target = temp
    
if binary:
    train_target = train_target.reshape(77738, 2)
    val_target = val_target.reshape(24683, 2)
    test_target = test_target.reshape(40858, 2)
else:
    train_target = train_target.reshape(77738, 5)
    val_target = val_target.reshape(24683, 5)
    test_target = test_target.reshape(40858, 5)

Customized F1 loss function for training the model

In [None]:
def f1(y_true, y_pred):
    y_pred = K.round(y_pred)
    tp = K.sum(K.cast(y_true*y_pred, 'float'), axis=0)
    tn = K.sum(K.cast((1-y_true)*(1-y_pred), 'float'), axis=0)
    fp = K.sum(K.cast((1-y_true)*y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true*(1-y_pred), 'float'), axis=0)

    p = tp / (tp + fp + K.epsilon())
    r = tp / (tp + fn + K.epsilon())

    f1 = 2*p*r / (p+r+K.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return K.mean(f1)

def f1_loss(y_true, y_pred):
    
    tp = K.sum(K.cast(y_true*y_pred, 'float'), axis=0)
    tn = K.sum(K.cast((1-y_true)*(1-y_pred), 'float'), axis=0)
    fp = K.sum(K.cast((1-y_true)*y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true*(1-y_pred), 'float'), axis=0)

    p = tp / (tp + fp + K.epsilon())
    r = tp / (tp + fn + K.epsilon())

    f1 = 2*p*r / (p+r+K.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return 1 - K.mean(f1)

Model

In [None]:
model = tf.keras.Sequential()
model.add(LSTM(256, input_shape=(10, 40), return_sequences = True))
model.add(LSTM(59))
model.add(Dropout(0.2))
model.add(Dense(200, activation = 'relu'))
model.add(Dropout(0.2))
model.add((Dense(2, activation = 'softmax')))

initial_learning_rate = .002
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=100000,
    decay_rate=0.96,
    staircase=False)

metric = tfa.metrics.F1Score(num_classes=5, threshold=None)
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=lr_schedule), loss=f1_loss, metrics=['accuracy', metric])

model.summary()


Training the model

In [None]:
history = model.fit(train_input, train_target, epochs = 20, shuffle=True, batch_size = 500, validation_data = (val_input, val_target) )

Evaluate the model on the test set

In [None]:
pred = model.predict(test_input)
prediction = np.zeros(shape = pred.shape)
for i, p in enumerate(pred):
  j = np.argmax(p)
  prediction[i, j] = 1


pred = prediction

y_true = np.where(test_target[:, :]==1)[1]
y_pred = np.where(pred[:, :]==1)[1]
target_names = ['No Flare','Flare']
print(classification_report(y_true, y_pred, target_names=target_names))
