### Import modules

In [None]:
import math
import numpy as np
import tensorflow as tf
from pprint import pprint
from scipy.io import loadmat
from collections import Counter
from matplotlib import pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical

plt.style.use("ggplot")
%matplotlib inline

### Load data

In [None]:
# each sample lasts 30 seconds (sampling rate: 200 Hz)
eeg_data = loadmat("./eeg_data.mat")
x = eeg_data['X_train']

# labels
sleep_stages = eeg_data['y_train']
sleep_stages[sleep_stages == 'W '] = 'W'
sleep_stages[sleep_stages == 'R '] = 'R'
print(np.unique(sleep_stages))

# targets
le = LabelEncoder()
labels = le.fit_transform(sleep_stages)
y = to_categorical(labels)

In [None]:
hx, hy = np.transpose(tuple(Counter(labels).items()))

# figure
fig, ax = plt.subplots(figsize=(10, 5))
fig.patch.set_facecolor("white")
ax.set(title="Class distribution")
ax.bar(hx, hy, width=0.4)
ax.set_xticklabels([""] + [le.inverse_transform(x) for x in range(5)])
ax.set_xlabel("sleep stage")
ax.set_ylabel("volume")
total = np.sum(hy)
for i, j in zip(hx, hy):
    ax.text(i - 0.3, j + 100, "%d (%.2f)" % (j, j/total))
plt.show()

In [None]:
# shape
print(x.shape)

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
plt.plot(x[0])

## Sub-sampling eeg signal

In [None]:
# sub-sampling
x_sub = x[:, ::6]
print(x_sub.shape)

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
plt.plot(x_sub[0])

## CNN architecture

In [None]:
# parameters
win_sec = 1
overlap = 0.2
num_filters = 300
learning_rate = 1e-2

# pretty print
n, sample_size = x_sub.shape
freq = sample_size / 30
num_classes = np.shape(y)[1]

# first pooling
win_size = int(win_sec * freq)
overlap_size = int(overlap * freq)
filter_size = win_size
stride = overlap_size
print("%d samples" % n)
print("sampling rate: %.2f Hz" % freq)
print("sample size: %d" % sample_size)
print()
print("window of %.2f seconds: filter size of %d samples" % (win_sec, win_size))
print("overlapping windows of %.2f seconds: stride of %d samples" % (overlap, overlap_size))
print()
print("number of classes: %d" % num_classes)
print("number of filter: %d" % num_filters)

In [None]:
# helper functions 
def apply_conv1d(x, filter_size, stride, out_size):
    filter_shape = [filter_size, 1, 1, out_size]
    W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name='W')
    conv = tf.nn.conv2d(x,
                        W, 
                        strides=[1, stride, 1, 1],
                        padding='VALID',
                        name='conv')
    b = tf.Variable(tf.constant(0.1, shape=[out_size]), name='b')
    return tf.nn.bias_add(conv, b)

def apply_max_pool(x, window, stride=1):
    pool = tf.nn.max_pool(x,
                          ksize=[1, window, 1, 1],
                          strides=[1, stride, 1, 1], 
                          padding='VALID', 
                          name='pool')
    return pool

def get_flat_size(x):
    shape = x.get_shape().as_list()
    return shape[1] * shape[2] * shape[3] 

def apply_fully_connected(x, in_size, out_size):
    W_o = tf.Variable(
        tf.truncated_normal(
            [in_size, out_size], stddev=0.1 ), 
            name='W_o'
    )
    b_o = tf.Variable(tf.constant(0.1, shape=[out_size]), name='b_o')
    return tf.nn.xw_plus_b(h_o, W_o, b_o, name='preds')

In [None]:
%%time

# placeholders
x_input = tf.placeholder(tf.float32, shape=[None, sample_size], name='x_input')
y_input = tf.placeholder(tf.float32, shape=[None, num_classes], name='y_input')
dropout = tf.placeholder(tf.float32, name="dropout_keep_prob")

with tf.device("/cpu:0"):
    
    with tf.name_scope("expand"):
        x_expanded = tf.expand_dims(tf.expand_dims(x_input, -1), -1)
    
    with tf.name_scope("conv1d"):
        conv = apply_conv1d(x_expanded, filter_size, stride, num_filters)
        h = tf.nn.relu(conv, name='relu')
        
    with tf.name_scope("pooling"): 
        h_pool = apply_max_pool(h, 20, 5)
        out_size = get_flat_size(h_pool)
        h_flat = tf.reshape(h_pool, [-1, out_size])
        
    with tf.name_scope("dropout"):
        h_o = tf.nn.dropout(h_flat, dropout)

    with tf.name_scope("fully_connect"):
        preds = apply_fully_connected(h_o, out_size, num_classes)
        
    with tf.name_scope("loss"):
        losses = tf.nn.softmax_cross_entropy_with_logits(
            logits=preds,
            labels=y_input,
            name='losses'
        )
        loss = tf.reduce_mean(losses, name='loss')

    with tf.name_scope("optimization"):
        optimizer = tf.train.AdamOptimizer(learning_rate)
        train_op = optimizer.minimize(loss) 
        
    with tf.name_scope("accuracy"):
        labels_pred = tf.argmax(preds, axis=1) 
        labels_true = tf.argmax(y_input, axis=1)
        correct_predictions = tf.equal(labels_pred, labels_true)
        accuracy = tf.reduce_mean(tf.cast(correct_predictions, 'float'), name='accuracy')
        
    init = tf.global_variables_initializer()

In [None]:
# testing
with tf.Session() as sess:
    sess.run(init)
    D = {x_input: x_sub[:10], y_input: y[:10], dropout: 1}
    __ = sess.run(h_flat, feed_dict=D)

In [None]:
%%time
with tf.Session() as sess:
    sess.run(init)
    D = {x_input: x_sub, y_input: y, dropout: 1}
    test_out = sess.run(preds, feed_dict=D)
    
print(test_out[0])

In [None]:
%%time
with tf.Session() as sess:
    sess.run(init)
    D = {x_input: x_sub, y_input: y, dropout: 1}
    acc, lb_pred, lb_true = sess.run([accuracy, labels_pred, labels_true], feed_dict=D)
    
print(acc)
print(Counter(lb_pred))
print(Counter(lb_true))

In [None]:
batch_size = 16
num_epochs = 3
p = 1
seed = 42
split = train_test_split(x_sub, y, labels, train_size=0.75, random_state=seed)
x_train, x_test, y_train, y_test, labels_train, labels_test = split

In [None]:
%%time

np.random.seed(10)

with tf.Session() as sess:
    # init
    sess.run(init)
    acc_train = sess.run(accuracy, feed_dict={x_input: x_train, y_input: y_train, dropout: 1})
    acc_test = sess.run(accuracy, feed_dict={x_input: x_test, y_input: y_test, dropout: 1})
    print("init.: _ train acc: %0.2f test acc: %0.2f" % (acc_train, acc_test))
    
    # training
    losses = []
    for e in range(num_epochs):
        for i in range(x_train.shape[0] // batch_size):
            
            idx = i * batch_size
            idxn = min(x_train.shape[0] - 1, (i+1) * batch_size)
            batch_xs = x_train[idx: idxn]
            batch_ys = y_train[idx: idxn]
            feed_train = {x_input: batch_xs, y_input: batch_ys, dropout: p}
            __, l = sess.run([train_op, loss], feed_dict=feed_train)
            losses.append(l)
            
        acc_train = sess.run(accuracy, feed_dict={x_input: x_train, y_input: y_train, dropout: p})
        acc_test = sess.run(accuracy, feed_dict={x_input: x_test, y_input: y_test, dropout: 1})
        print("epoch: %d train acc: %0.2f test acc: %0.2f" % (e, acc_train, acc_test))
        
    file_writer = tf.summary.FileWriter('./tensorflow_summaries', sess.graph)
    
    # use trained model
    feed = {x_input: x_test, y_input: y_test, dropout: 1}
    lb_pred, lb_true = sess.run([labels_pred, labels_true], feed_dict=feed)
    
    # debug
    test_out = sess.run(preds, feed_dict=feed)

In [None]:
Counter(np.argmax(batch_ys, axis=1))

In [None]:
plt.plot(losses)
print(Counter(lb_pred))
print(Counter(lb_true))