In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd
from tensorflow.keras import layers, preprocessing
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import pyarrow.feather as feather

In [None]:
edf_path="/content/drive/MyDrive/temp/signal_sync_EDF ("

In [None]:
edf = pd.DataFrame()

for i in range (1,23):
  path = edf_path + str(i) + ').feather'
  edf_temp = feather.read_table(path).to_pandas()
  edf_temp['Patient'] = i
  edf = pd.concat([edf, edf_temp.iloc[::500,:]])

In [None]:
X_data = edf.groupby('Patient')['SpO2'].apply(list).reset_index(name='TimeSeries')['TimeSeries'].tolist()

In [None]:
len(X_data)

22

In [None]:
del edf

In [None]:
X_data = preprocessing.sequence.pad_sequences(X_data, padding="post", dtype='float32')


In [None]:
y = np.array([32.98709474346868,40.34420652391435,18.013110808499857,61.88520676153185,43.288490284005974,22.309859154929576,51.50981226076919,33.02068965517241,29.995583689091713,39.90900644003716,56.37456501107244,56.37456501107244,23.339089339271105,
    44.145334434351774,51.50981226076919,24.066003807911994,44.145334434351774,70.04614242844697,36.30885122410546,31.33331048898331,79.17007141080755,49.21925595452256])

In [None]:
X_data = X_data[..., np.newaxis]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_data, y, test_size=0.2, random_state=42)

In [None]:
len(X_train)

17

In [None]:
y_train

array([22.30985915, 79.17007141, 56.37456501, 61.88520676, 43.28849028,
       70.04614243, 23.33908934, 36.30885122, 44.14533443, 18.01311081,
       39.90900644, 49.21925595, 33.02068966, 56.37456501, 51.50981226,
       31.33331049, 51.50981226])

In [None]:
d_model = 16
nhead = 4
dim_feedforward = 16


In [None]:
def transformer_encoder(input_layer, d_model, nhead, dim_feedforward):
    multi_head_attn = layers.MultiHeadAttention(num_heads=nhead, key_dim=d_model)
    x = multi_head_attn(input_layer, input_layer)

    x = layers.Add()([input_layer, x])
    x = layers.LayerNormalization()(x)

    ffn = tf.keras.Sequential(
        [
            layers.Dense(dim_feedforward, activation="relu"),
            layers.Dense(d_model),
        ]
    )

    x = ffn(x)
    seq_output = layers.Add()([x, input_layer])
    seq_output = layers.LayerNormalization()(seq_output)

    return seq_output


In [None]:
masking_layer = layers.Masking(mask_value=0.)

input_layer = layers.Input(shape=(None, 1))

x = masking_layer(input_layer)

x = transformer_encoder(x, d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward)

x = layers.GlobalAveragePooling1D()(x)
output_layer = layers.Dense(1, activation='linear')(x)
model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
model.compile(optimizer='adam', loss='mean_squared_error')

In [None]:
model.fit(X_train, y_train, epochs=20, batch_size=1, verbose=2)


In [None]:
y_pred = model.predict(X_test)
test_mse = mean_squared_error(y_test, y_pred)
print(f"Test MSE: {test_mse}")