Multichannel (n=21)

In [1]:
import pyedflib
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split

#Samples are represented in 16-bit 2's complement

# Get the directory where the script is located
script_dir = os.getcwd()
file_counter = 0
Files = []
badFiles = []
# Construct the full path to the file
file_path = os.path.join(script_dir, 'EDF', 'PD patient Frontal')
for filename in os.listdir(file_path):
    # Check if the file ends with .edf
    if filename.endswith('.edf'):
        file_counter = file_counter+1
        Files.append(filename)
max_values = []
min_values = []
for k in np.arange(file_counter):
    path = os.path.join(file_path, Files[k])
    try:
        f = pyedflib.EdfReader(path)
    except OSError:
        badFiles.append(Files[k])     
Files = [item for item in Files if item not in badFiles]
n = f.signals_in_file
n = n-9
number_of_samples = f.getNSamples()[0]
Nblocks = int((number_of_samples-250)/32)
TotalBlocks=(5*Nblocks + int((223500-250)/32))
BlockCount=0
full_data = np.ndarray(shape=(TotalBlocks, 21, 32, 1))
multiplier = f.getPhysicalMaximum(0)/f.getDigitalMaximum(0)
for index, name in enumerate(Files):
    path = os.path.join(file_path, name)
    f = pyedflib.EdfReader(path)
    number_of_samples = f.getNSamples()[0]
    Nblocks = int((number_of_samples-250)/32)
    sigbufs = np.zeros(number_of_samples)
    signalList = []  
    for i in np.arange(n):
        sigbufs[:] = f.readSignal(i, digital=True)
        signalList.append(sigbufs[250:]) 
        for j in np.arange(Nblocks):
                full_data[j+BlockCount][i] = signalList[i][j*32:(j+1)*32].reshape(32, 1)
    BlockCount = BlockCount+Nblocks
    f.close()
train_data, test_data = train_test_split(full_data, test_size=0.05, random_state=42)
print(train_data.shape, test_data.shape)

(40062, 21, 32, 1) (2109, 21, 32, 1)


In [2]:
import tensorflow as tf
def prd_loss_dig2phy(y_true, y_pred):
    y_true = (y_true)*multiplier
    y_pred = (y_pred)*multiplier
    rms_deviation = (tf.reduce_sum(tf.square(y_true - y_pred)))
    percentage_rmsd = tf.sqrt(rms_deviation/(tf.reduce_sum(tf.square(y_true))+tf.keras.backend.epsilon()))* 100
    return percentage_rmsd

2024-06-29 18:07:41.075283: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [3]:
import tensorflow as tf
def prd_loss(y_true, y_pred):
    rms_deviation = (tf.reduce_sum(tf.square(y_true - y_pred)))
    percentage_rmsd = tf.sqrt(rms_deviation/(tf.reduce_sum(tf.square(y_true))+tf.keras.backend.epsilon()))* 100
    return percentage_rmsd

CR = (21x32)/112 = 6

In [21]:
import tensorflow as tf
from tensorflow.keras import layers

model = tf.keras.Sequential()
model.add(layers.Input(shape=(128, 1)))  # Specify the input shape
model.add(layers.Dense(121, activation='relu'))  # Define the dense layer

# Print the model summary to verify the shapes
model.summary()


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_16 (Dense)             (None, 128, 121)          242       
Total params: 242
Trainable params: 242
Non-trainable params: 0
_________________________________________________________________


In [40]:
from tensorflow import keras
import numpy as np

# Define the encoder
input_data = keras.layers.Input(shape=(21, 32, 1), name="input_data")
x = keras.layers.Conv2D(16, (4, 4), activation='relu', padding='same')(input_data)
x = keras.layers.MaxPooling2D((1, 2), padding='same')(x)  # 21x32 -> 21x16
x = keras.layers.Conv2D(8, (4, 4), activation='relu', padding='same')(x)
x = keras.layers.MaxPooling2D((3, 1), padding='same')(x)  # 21x16 -> 7x16
x = keras.layers.Conv2D(1, (4, 4), activation='relu', padding='same')(x)  # Reduce to single filter
encoded = keras.layers.Reshape((112, 1))(x)  #  to shape (112, 1)

encoder = keras.models.Model(input_data, encoded, name="encoder")

# Define the decoder
encoded_input = keras.layers.Input(shape=(112, 1), name="encoded_input")  # 112, 1
x = keras.layers.Reshape((7, 16, 1))(encoded_input)  # Reshape back to (7, 16, 1)
x = keras.layers.Conv2D(8, (4, 4), activation='relu', padding='same')(x)
x = keras.layers.UpSampling2D((3, 1))(x)  #21x8 -> 21x8
x = keras.layers.Conv2D(16, (4, 4), activation='relu', padding='same')(x)
x = keras.layers.UpSampling2D((1, 2))(x)  # 21x16 -> 21x32
x = keras.layers.Conv2D(8, (4, 4), activation='relu', padding='same')(x)
decoded = keras.layers.Conv2D(1, (4, 4), activation='linear', padding='same')(x)  # 21x32

decoder = keras.models.Model(encoded_input, decoded, name="decoder")

# Define the autoencoder
autoencoder_input = keras.layers.Input(shape=(21, 32, 1), name="autoencoder_input")
encoded_data = encoder(autoencoder_input)
decoded_data = decoder(encoded_data)

autoencoder = keras.models.Model(autoencoder_input, decoded_data, name="autoencoder")

# Compile the autoencoder
autoencoder.compile(optimizer='adam', loss=prd_loss_dig2phy, metrics=['mse'])

# Summary of the autoencoder
encoder.summary()
decoder.summary()


Model: "encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_data (InputLayer)      [(None, 21, 32, 1)]       0         
_________________________________________________________________
conv2d_121 (Conv2D)          (None, 21, 32, 16)        272       
_________________________________________________________________
max_pooling2d_52 (MaxPooling (None, 21, 16, 16)        0         
_________________________________________________________________
conv2d_122 (Conv2D)          (None, 21, 16, 8)         2056      
_________________________________________________________________
max_pooling2d_53 (MaxPooling (None, 7, 16, 8)          0         
_________________________________________________________________
conv2d_123 (Conv2D)          (None, 7, 16, 1)          129       
_________________________________________________________________
reshape_51 (Reshape)         (None, 112, 1)            0   

In [41]:
# Train the autoencoder
autoencoder.fit(train_data, train_data, epochs=40, batch_size=64, shuffle=True, validation_split=0.2)

# Save the encoder and decoder
encoder.save("encoder_model.h5")
decoder.save("decoder_model.h5")

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


: 