In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


In [2]:
!pip install -U tensorflow-addons

Collecting tensorflow-addons
  Downloading tensorflow_addons-0.16.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
[?25l[K     |▎                               | 10 kB 21.5 MB/s eta 0:00:01[K     |▋                               | 20 kB 10.8 MB/s eta 0:00:01[K     |▉                               | 30 kB 9.1 MB/s eta 0:00:01[K     |█▏                              | 40 kB 8.2 MB/s eta 0:00:01[K     |█▌                              | 51 kB 6.4 MB/s eta 0:00:01[K     |█▊                              | 61 kB 7.5 MB/s eta 0:00:01[K     |██                              | 71 kB 6.7 MB/s eta 0:00:01[K     |██▍                             | 81 kB 6.4 MB/s eta 0:00:01[K     |██▋                             | 92 kB 7.1 MB/s eta 0:00:01[K     |███                             | 102 kB 7.3 MB/s eta 0:00:01[K     |███▏                            | 112 kB 7.3 MB/s eta 0:00:01[K     |███▌                            | 122 kB 7.3 MB/s eta 0:00:01[K     |███▉

In [3]:
import tensorflow_addons as tfa

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
SR_data = np.load('/content/drive/MyDrive/SR_100_samp.npy')

In [6]:
AFIB_data=np.load('/content/drive/MyDrive/AFIB_100_samp.npy')

In [7]:
SR_data.shape

(1000, 217, 334, 3)

In [8]:
input_shape=SR_data[0].shape

In [9]:
input_shape

(217, 334, 3)

In [10]:
class_SR=np.zeros((1000,1))
class_AFIB=np.ones((1000,1))
Y_data=np.append(class_SR,class_AFIB)
Y_data.shape

(2000,)

In [11]:
data=np.concatenate((SR_data,AFIB_data))

In [12]:
data.shape

(2000, 217, 334, 3)

In [13]:
X_train, X_test, Y_train, Y_test = train_test_split(data,Y_data,test_size=0.2,shuffle=True)


In [14]:
Y_train=Y_train.reshape(1600,1)
Y_test=Y_test.reshape(400,1)

In [15]:
Y_train.shape

(1600, 1)

In [16]:
num_classes = 2


In [17]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
num_epochs = 30
image_size = 100  # We'll resize input images to this size
patch_size = 10  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

In [18]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# Compute the mean and the variance of the training data for normalization.
data_augmentation.layers[0].adapt(X_train)

In [20]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

In [23]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded

In [34]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

In [38]:
def transformer_encoder(input_patches):
        layer1 = layers.LayerNormalization(epsilon=1e-6)(input_patches) #normalization
        attention_output = layers.MultiHeadAttention(  
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(layer1, layer1)   #multiheaded attention
        layer2 = layers.Add()([attention_output, input_patches]) #adding attention output with encoded patches
        layer3 = layers.LayerNormalization(epsilon=1e-6)(layer2)  #normalization
        layer3 = mlp(layer3, hidden_units=transformer_units, dropout_rate=0.1) #Multilayer perceptron
        input_patches = layers.Add()([layer3, layer2])    #adding MLP output with normalized encoded patches output
        return input_patches


In [33]:
def create_ECG_image_classifier():
    inputs = layers.Input(shape=input_shape)
    augmented = data_augmentation(inputs) #data augmentation
    patches = Patches(patch_size)(augmented) #splitting image into patches
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)
    for _ in range(transformer_layers):
      encoded_patches=transformer_encoder(encoded_patches)   
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)  #aggregating data from all transformer blocks and layers
    representation = layers.Flatten()(representation)     #flatten the data for MLP
    representation = layers.Dropout(0.5)(representation)
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    logits = layers.Dense(num_classes)(features)  #final binary classification 
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [39]:
def run_experiment(model):
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )

    checkpoint_filepath = "/tmp/checkpoint"
    checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    model.summary()

    history = model.fit(
        x=X_train,
        y=Y_train,
        batch_size=batch_size,
        epochs=num_epochs,
        validation_split=0.1,
        callbacks=[checkpoint_callback],
    )

    model.load_weights(checkpoint_filepath)
    _, accuracy, top_5_accuracy = model.evaluate(X_test, Y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    return history


ECG_image_classifier = create_ECG_image_classifier()
history = run_experiment(ECG_image_classifier)

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, 217, 334, 3  0           []                               
                                )]                                                                
                                                                                                  
 data_augmentation (Sequential)  (None, 100, 100, 3)  7          ['input_5[0][0]']                
                                                                                                  
 patches_6 (Patches)            (None, None, 300)    0           ['data_augmentation[4][0]']      
                                                                                                  
 patch_encoder_4 (PatchEncoder)  (None, 100, 64)     25664       ['patches_6[0][0]']        