In [2]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = '3'
import pandas as pd
import numpy as np
import tensorflow as tf
tf.Graph()

# The Location of train_data，train_labels，test_data，test_labels
# DataSet Address
DIR = 'E:/eeg-prac/Dataset/Excels/'

# Model Saver Address
SAVE = 'E:/eeg-prac/Models/'

In [3]:
train_data = pd.read_csv(DIR + 'Training_labels.csv', header=None)
train_data = np.array(train_data).astype('float32')

train_labels = pd.read_csv(DIR + 'Training_labels.csv', header=None)
train_labels = np.array(train_labels)

test_data = pd.read_csv(DIR + 'Test_data.csv', header=None)
test_data = np.array(test_data).astype('float32')

test_labels = pd.read_csv(DIR + 'Test_labels.csv', header=None)
test_labels = np.array(test_labels)

In [4]:
batch_size = 64
n_batch = train_data.shape[0] // batch_size

In [5]:
def weight_variable(shape):
    initial = tf.random.truncated_normal(shape, stddev=0.01)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.01, shape=shape)
    return tf.Variable(initial)

def variable_summaries(name, var):
    with tf.name_scope('summaries'):
        mean = tf.reduce_mean(var)
        tf.summary.scalar('mean/'+ name, mean)

        with tf.name_scope('stddev'):
            stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
            tf.summary.scalar('stddev/'+name, stddev)

        tf.summary.scalar('max/'+name, tf.reduce_max(var))
        tf.summary.scalar('min/'+name, tf.reduce_min(var))
        tf.summary.histogram('histogram/'+name, var)
        

In [6]:
from tensorflow.keras import layers
from tensorflow.keras.models import Model

input_shape = (640,)
inputs = tf.keras.Input(shape=input_shape, name='Input_Data')
x_reshape = layers.Reshape((32, 20, 1), name='Reshape_Data')(inputs)

In [7]:
from tensorflow.keras import layers, models, initializers, regularizers
model = models.Sequential()


In [8]:
model.add(layers.Conv2D(
    filters=32, 
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='SAME',
    use_bias=True,
    kernel_initializer=initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=initializers.Constant(value=0.01),
    name='Convolutional_1'
))
model.add(layers.LeakyReLU(name='h_conv1_Acti'))
model.add(layers.Dropout(
    rate=0.5,  
    name='h_conv1_drop'
))

In [9]:
model.add(layers.Conv2D(
    filters=32, 
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='SAME',
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Convolutional_2'
))
model.add(layers.BatchNormalization(name='h_conv2_BN'))
model.add(layers.Dropout(rate=0.5, name='h_conv2_drop'))


In [10]:
model.add(layers.Conv2D(
    filters=64,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='SAME',
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Convolutional_3'
))
model.add(layers.LeakyReLU(name='h_conv3_Acti'))
model.add(layers.Dropout(rate=0.5, name='h_conv3_drop'))

In [11]:
model.add(layers.MaxPooling2D(
    pool_size=(2, 2),
    strides=(2, 2),
    padding='SAME',
    name='Pooling_1'
))

In [12]:
model.add(layers.Conv2D(
    filters=64,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='VALID',  
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Convolutional_4'
))
model.add(layers.BatchNormalization(name='h_conv4_BN'))
model.add(layers.LeakyReLU(name='h_conv4_Acti'))
model.add(layers.Dropout(rate=0.5, name='h_conv4_drop'))

In [13]:
model.add(layers.Conv2D(
    filters=64,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='SAME',
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Convolutional_5'
))
model.add(layers.BatchNormalization(name='h_conv5_BN'))
model.add(layers.LeakyReLU(name='h_conv5_Acti'))


In [14]:
model.add(layers.Conv2D(
    filters=128,
    kernel_size=(3, 3),
    strides=(1, 1),
    padding='SAME',
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Convolutional_6'
))
model.add(layers.BatchNormalization(name='h_conv6_BN'))
model.add(layers.LeakyReLU(name='h_conv6_Acti'))
model.add(layers.Dropout(rate=0.5, name='h_conv6_drop'))


In [17]:
model.add(layers.MaxPooling2D(
    pool_size=(2, 2),
    strides=(2, 2),
    padding='SAME',
    name='Pooling_2'
))

model.add(layers.Flatten(name='Flatten'))
model.add(layers.Dense(
    512,
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    name='Fully_Connected_1'
))
model.add(layers.BatchNormalization(name='h_fc1_BN'))
model.add(layers.LeakyReLU(name='h_fc1_Acti'))
model.add(layers.Dropout(rate=0.5, name='h_fc1_drop'))

In [20]:
model.add(layers.Dense(
    4,
    use_bias=True,
    kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.01),
    bias_initializer=tf.keras.initializers.Constant(value=0.01),
    activation='softmax',  # Softmax activation function is used for the output layer
    name='OP_Layer'
))

In [19]:
def euclidean_distance_loss(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_true - y_pred))

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
              loss=euclidean_distance_loss,
              metrics=['mean_squared_error']) 

In [21]:
y_true = None  
y_pred = model.predict(test_data)  

class SpecificClassAccuracy(tf.keras.metrics.Metric):
    def __init__(self, class_id, name='specific_class_accuracy', **kwargs):
        super(SpecificClassAccuracy, self).__init__(name=name, **kwargs)
        self.class_id = class_id
        self.correct_count = self.add_weight(name='correct_count', initializer='zeros')
        self.total_count = self.add_weight(name='total_count', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true_class = tf.equal(tf.argmax(y_true, 1), self.class_id)
        y_pred_class = tf.equal(tf.argmax(y_pred, 1), self.class_id)
        correct = tf.cast(tf.logical_and(y_true_class, y_pred_class), self.dtype)
        self.correct_count.assign_add(tf.reduce_sum(correct))
        self.total_count.assign_add(tf.reduce_sum(tf.cast(y_true_class, self.dtype)))

    def result(self):
        return self.correct_count / self.total_count

    def reset_states(self):
        self.correct_count.assign(0.)
        self.total_count.assign(0.)

t1_accuracy = SpecificClassAccuracy(class_id=0)
t2_accuracy = SpecificClassAccuracy(class_id=1)
t3_accuracy = SpecificClassAccuracy(class_id=2)
t4_accuracy = SpecificClassAccuracy(class_id=3)

t1_accuracy.update_state(y_true, y_pred)
t2_accuracy.update_state(y_true, y_pred)
t3_accuracy.update_state(y_true, y_pred)
t4_accuracy.update_state(y_true, y_pred)

writer = tf.summary.create_file_writer('logs')
with writer.as_default():
    tf.summary.scalar('Task 1 Accuracy', t1_accuracy.result(), step=0)
    tf.summary.scalar('Task 2 Accuracy', t2_accuracy.result(), step=0)
    tf.summary.scalar('Task 3 Accuracy', t3_accuracy.result(), step=0)
    tf.summary.scalar('Task 4 Accuracy', t4_accuracy.result(), step=0)
    writer.flush()

ValueError: in user code:

    File "C:\Users\spsin\AppData\Local\Temp\ipykernel_22612\295768917.py", line 12, in update_state  *
        y_true_class = tf.equal(tf.argmax(y_true, 1), self.class_id)

    ValueError: Attempt to convert a value (None) with an unsupported type (<class 'NoneType'>) to a Tensor.


In [None]:
class KappaMetric(tf.keras.metrics.Metric):
    def __init__(self, name='kappa_metric', **kwargs):
        super(KappaMetric, self).__init__(name=name, **kwargs)
        # Placeholder initialization for all the variables that will be used to compute Kappa
        self.p0 = self.add_weight(name='p0', initializer='zeros')
        self.pe = self.add_weight(name='pe', initializer='zeros')
        self.Test_Set_Num = None  # Replace with appropriate value

    def update_state(self, *args, **kwargs):
        # Update methods to calculate p0 and pe, considering the variables might be like T1_T1, T1_T2, etc.
        pass
    
    def result(self):
        return (self.p0 - self.pe) / (1 - self.pe)

    def reset_states(self):
        self.p0.assign(0)
        self.pe.assign(0)

kappa_metric = KappaMetric()

model.compile(
    optimizer='adam',   
    loss='categorical_crossentropy',  
    metrics=[kappa_metric]  
)

tb_callback = tf.keras.callbacks.TensorBoard(log_dir='./logs', update_freq='epoch')

model.fit(
    x=train_data, 
    y=train_labels, 
    epochs=2019, 
    batch_size=batch_size,  # Define batch_size appropriately
    verbose=1, 
    callbacks=[tb_callback], 
    validation_data=(test_data, test_labels)
)

output_prediction = model.predict(test_data)
np.savetxt("prediction.csv", output_prediction, delimiter=",")
np.savetxt("labels.csv", test_labels, delimiter=",")