In [8]:

COLAB = False

if COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    directry = '/content/drive/My Drive/IML/task3/task3_s2h9lr3rs/'
else:
    directry = 'task3_s2h9lr3rs/'

# Prepare data

In [9]:
import pandas as pd

df_train = pd.read_csv(directry+'train.csv')
df_test = pd.read_csv(directry+'test.csv')

In [10]:
print(df_train.shape)
df_train.head()

(112000, 2)


Unnamed: 0,Sequence,Active
0,DKWL,0
1,FCHN,0
2,KDQP,0
3,FNWI,0
4,NKRM,0


In [11]:
import numpy
from sklearn import preprocessing

def split_features(features):
    """Split one feature into 4 features
    
    A feature consists of 4 characters. 
    This function splits these into separated 4 features each of that consists of 1 character(1 row become 4 rows)
    (e.g.) ABCD -> A, B, C, D -> (00....1..0), (01....0), (100...0), (0.....1) 
    
    Args:
        features(series): Original features
    
    Returns:
        new_features(numpy): New features shaped (num_samples, 4, 20(one hot encoding))
    
    """
    tmp = features.values.tolist()
    new_features = pd.DataFrame.to_numpy(pd.get_dummies(pd.DataFrame(data=[list(tmp[i]) for i in range(len(tmp))])))
    new_features = numpy.reshape(new_features, (new_features.shape[0], 20, -1), order='F')
   
    return new_features.transpose(0, 2, 1) # reshape as (num_sample, time_t, onehotencoding)

In [5]:
def split_trainset(df_trainset):
    """Split train df into features X and labels y
    
    Features X further be splited into 4 chars using split_features function.
    
    Args:
        df_features(df): Dataset each of whose row contains a feature and label
    
    Returns:
        new_X(numpy): Features of training set
        new_y(numpy): Labels of training set

    """
    new_X = split_features(df_trainset['Sequence'])
    new_y = pd.Series(data=df_trainset['Active'])
    return new_X,  new_y

In [6]:
# Make train and test dataset using split_trainset function above
train_X, train_y = split_trainset(df_train)
test_X = split_features(df_test['Sequence'])

# Define an LSTM model using keras and train it



In [1]:
import tensorflow as tf
import keras
from keras import backend as K
from keras.models import Sequential
from keras.layers.core import Dense
from keras.layers.recurrent import LSTM
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from sklearn.utils import class_weight
import numpy as np
from numpy.random import seed
import matplotlib.pyplot as plt


seed(0)
tf.random.set_seed(0)

ModuleNotFoundError: No module named 'keras'

In [None]:
input_dim = 20            
output_dim = 1        
hidden_units_1 = 128
timesteps = 4            
batch_size = 64             
num_of_training_epochs = 100
learning_rate = 0.001        

In [None]:
def f1_metric(y_true, y_pred):
    """Compute f1 score
    """
    tp = K.sum(K.cast(y_true*y_pred, 'float'), axis=0)
    tn = K.sum(K.cast((1-y_true)*(1-y_pred), 'float'), axis=0)
    fp = K.sum(K.cast((1-y_true)*y_pred, 'float'), axis=0)
    fn = K.sum(K.cast(y_true*(1-y_pred), 'float'), axis=0)

    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())

    f1 = 2*precision*recall / (precision+recall+K.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return K.mean(f1) 

In [None]:
def f1_loss(y_true, y_pred):
    """f1-scored loss

    It's differentiable
    """
    return 1 - f1_metric(y_true, y_pred)

In [None]:
from sklearn.utils import class_weight
model = Sequential()
model.add(LSTM(hidden_units_1, input_shape=(timesteps, input_dim), return_sequences=False))
model.add(Dense(output_dim, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=[f1_metric])
weights = class_weight.compute_class_weight('balanced', np.unique(train_y), train_y)
model.summary()

In [None]:
checkpoint_path = 'weights.hdf5'
best_checkpoint = ModelCheckpoint(checkpoint_path, save_best_only=True, monitor='f1_metric', mode='max')

In [None]:
history = model.fit(
    train_X, train_y,
    batch_size=batch_size,
    epochs=num_of_training_epochs,
    validation_split=0.1,
    class_weight=weights,
    callbacks=[best_checkpoint]
)

model.load_weights(checkpoint_path)

In [None]:
 def compare_results(history):
    """Visualize the result of the training

    """
    # Set Parameters
    f1_score = history.history['f1_metric']
    val_f1_score = history.history['val_f1_metric']

    epochs = range(len(f1_score))

    # plot f1 score
    plt.plot(epochs, f1_score, 'bo' ,label = 'training f1')
    plt.plot(epochs, val_f1_score, 'b' , label= 'validation f1')
    plt.title('Training and Validation acc')
    plt.legend()
    plt.figure()
    plt.show()


In [None]:
#show Accuracy and Loss History
compare_results(history)

# Make predictions on test data

In [None]:
results = pd.DataFrame(np.where(model.predict(test_X)>0.5, 1, 0))

In [None]:
results.to_csv(directry+'prediction.csv', index=False, header=None)