In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.io import loadmat
import math
import os
from scipy import signal
from tqdm import tqdm
import torch
import torch.nn.functional as F
from torch import nn
import torch.optim as optim
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

In [None]:
def load_challenge_data(filename):
    """Load ECG files"""
    x = loadmat(filename)
    data = np.asarray(x['val'], dtype=np.float64)
    new_file = filename.replace('.mat','.hea')
    input_header_file = os.path.join(new_file)
    with open(input_header_file,'r') as f:
        header_data=f.readlines()
    return data, header_data

In [None]:
def downsample_ecg(ecg, samples=1000, leads=12):
    """Function used to downsample ECGs from 500Hz to 100Hz"""
    new_ecg = np.zeros((samples,leads))
    for i,j in enumerate(ecg):
        new_ecg[:,i] = signal.resample(j,samples)
    return new_ecg

In [None]:
print("Importing real ECGs..")
ECGs = []
for ecgfilename in tqdm(sorted(os.listdir("/kaggle/input/ptb-diagnostic-ecg-database/Training_PTB/"))):
    if ecgfilename.endswith(".mat"):
        data = load_challenge_data("/kaggle/input/ptb-diagnostic-ecg-database/Training_PTB/" + ecgfilename)[0]
        ECGs.append(downsample_ecg(data))
ECGs = abs(np.asarray(ECGs))
# Convert ECG unit from microvolt to millivolt
ECGs= ECGs/1000
print("A total of {} ECGs were loaded".format(len(ECGs)))
ECGs = ECGs[:100,:50,:]

In [None]:
plt.figure(figsize=(12,4))
plt.title("Real ECG - Lead I", fontsize=16)
plt.plot(ECGs[0,:,1])
plt.show()

In [None]:
data = torch.from_numpy(ECGs)

In [None]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, mask=None):
        batch_size, max_sequence_length, d_model = x.size()
        qkv = self.qkv_layer(x)
        qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask)
        values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim)
        out = self.linear_layer(values)
        return out

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y  + self.beta
        return out

In [None]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x):
        residual_x = x
        x = self.attention(x, mask=None)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])
    def forward(self, x):
        x = self.layers(x)
        return x

In [None]:
class PositionalEncoding(nn.Module):
    """ Implements the sinusoidal positional encoding.
    """
    def __init__(self, d_model, dropout_rate=0.1, max_len=50):
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        self.dropout_rate = dropout_rate
        self.max_len = max_len
        self.dropout = nn.Dropout(dropout_rate)
        # compute positional encodings
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
        )  # (d_model,)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)  # (max_len, 1, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """ x: (batch_size, seq_len, d_model)
        """
        x = x + self.pe[:x.size(0), :]  # (batch_size, seq_len, d_model)
        x = self.dropout(x)  # (batch_size, seq_len, d_model)
        # x: (batch_size, seq_len, d_model)
        return x

In [None]:
class Generator(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer_encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
        self.conv2d = nn.Conv2d(12, 12, kernel_size=1, stride=1, padding=0)

    def forward(self, x):
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = x.unsqueeze(1)
        x = x.permute(0,3,2, 1)
        x = self.conv2d(x)
        x = x.squeeze(3)
        x = x.permute(0,2,1)
        return x

class Discriminator(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer_encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
        self.fc = nn.Linear(d_model, 1)  # Binary classification

    def forward(self, x):
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x

In [None]:
def jensen_shannon_distance(tensor1, tensor2):
    # Flatten tensors
    flat_tensor1 = tensor1.view(tensor1.size(0), -1)
    flat_tensor2 = tensor2.view(tensor2.size(0), -1)

    # Convert to probabilities using softmax
    probs1 = F.softmax(flat_tensor1, dim=1)
    probs2 = F.softmax(flat_tensor2, dim=1)

    # Calculate the average distribution
    avg_probs = 0.5 * (probs1 + probs2)

    # Calculate Jensen-Shannon divergence
    js_div1 = F.kl_div(probs1.log(), avg_probs, reduction='batchmean')
    js_div2 = F.kl_div(probs2.log(), avg_probs, reduction='batchmean')

    # Calculate Jensen-Shannon distance
    js_distance = 0.5 * (js_div1 + js_div2)

    return js_distance.item()

In [None]:
d_model = 12
num_heads = 4
drop_prob = 0.1
batch_size = 50
# max_sequence_length = 1000
max_sequence_length = 50
ffn_hidden = 48
num_layers = 5

In [None]:
idx = list(range(0, 100))
trainLoader = torch.utils.data.DataLoader(data,batch_size=batch_size,
                                          sampler = torch.utils.data.sampler.SubsetRandomSampler(idx))
#Set up an iterator for data loader and get the next batch
dataIter = iter(trainLoader)
ecgs = next(dataIter)

In [None]:
# Create instances of Generator and Discriminator
DC_G = Generator(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
DC_D = Discriminator(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [None]:
cosine_similarities = []
js_distance = []

# Set up optimizers
G_opt = optim.Adam(DC_G.parameters(), lr=0.0002, betas=(0.5, 0.999))
D_opt = optim.Adam(DC_D.parameters(), lr=0.0002, betas=(0.5, 0.999))

G_losses = []
D_losses = []

best_G_loss = 0
best_D_loss = 0

real_label = 1.0
fake_label = 0.0

max_epochs = 20

for epoch in range(max_epochs):
  G_loss_run = 0.0
  D_loss_run = 0.0

  for i, data in enumerate(trainLoader):
    X = data 
    X = X.to(torch.float32)
    batch_size = X.size(0)
    label = torch.full((batch_size,), real_label)
    label_f = torch.full((batch_size,), fake_label)
    z = torch.rand( (batch_size, max_sequence_length, d_model))
    fixed_noise = torch.rand(50, 50, 12)

    ## DISCRIMINATOR TRAINING
    #train with real
    DC_D.zero_grad()
    D_real = DC_D(X)
    D_real = D_real.squeeze(1)
    D_real = torch.sigmoid(D_real)
    D_real_loss = F.binary_cross_entropy(D_real, label)  #loss -(1/m)(log D(x))

    #train with fake
    fake = DC_G(z)
    D_fake = DC_D(fake.detach())
    D_fake = D_fake.squeeze(1)
    D_fake = torch.sigmoid(D_fake)
    D_fake_loss = F.binary_cross_entropy(D_fake, label_f)  #loss -(1/m)(log(1-D(G(z))))
    D_loss = D_real_loss + D_fake_loss
    D_loss.backward()
    D_opt.step()

    #GENERATOR training
    D_fake = DC_D(fake)
    D_fake = D_fake.squeeze(1)
    D_fake = torch.sigmoid(D_fake)
    G_loss = F.binary_cross_entropy(D_fake, label)  #loss -(1/m)(log (1-D(G(z))))
    G_opt.zero_grad()
    G_loss.backward()
    G_opt.step()
    G_loss_run += G_loss.item()
    D_loss_run += D_loss.item()
    with torch.no_grad():
        real_data = X.reshape(batch_size, -1)
        samples = DC_G(fixed_noise).detach()
        fake_data = samples.reshape(batch_size, -1)
        cos_sim = cosine_similarity(real_data, fake_data)
        cosine_similarities.append(cos_sim.mean().item())
        js_dist = jensen_shannon_distance(real_data, fake_data)
        js_distance.append(js_dist)

  print('Epoch:{},   G_loss:{},    D_loss:{},    Cosine Similarity:{}, Jensen Shannon Distance{}'.format(epoch, G_loss_run / (i + 1), D_loss_run / (i + 1), cosine_similarities[-1],js_distance[-1]))
#   print('Epoch:{},   G_loss:{},    D_loss:{}'.format(epoch, G_loss_run/(i+1), D_loss_run/(i+1)))

  G_losses.append(G_loss_run/(i+1))
  D_losses.append(D_loss_run/(i+1))
  if epoch%5 == 0:
    with torch.no_grad():
        samples = DC_G(fixed_noise).detach()
        plt.figure(figsize=(12,4))
        plt.title("Synthetic ECG - Lead I", fontsize=16)
        plt.plot(samples[0,:,1])
        plt.show()
        
    #Saving the best model
    print("Saving the final model state dictionary...at DC_G.dth and DC_D.dth")
    torch.save(DC_G.state_dict(), "DC_G.dth")
    torch.save(DC_D.state_dict(), "DC_D.dth")

In [None]:
# Assuming tensor1 and tensor2 are your 3D tensors
tensor1 = X    # shape = torch.Size([500, 50, 12])
tensor2 = DC_G(fixed_noise).detach()   # shape = torch.Size([500, 50, 12])

# Convert tensors to NumPy arrays and flatten
flat_tensor1 = tensor1.reshape(500, -1).numpy()
flat_tensor2 = tensor2.reshape(500, -1).numpy()

# Apply t-SNE
tsne = TSNE(n_components=2, random_state=42)
tsne_result1 = tsne.fit_transform(flat_tensor1)
tsne_result2 = tsne.fit_transform(flat_tensor2)

# Plot t-SNE results
plt.subplot(1, 2, 2)
plt.scatter(tsne_result1[:, 0], tsne_result1[:, 1], marker='o', color='red', label='Real Data')
plt.scatter(tsne_result2[:, 0], tsne_result2[:, 1], marker='s', color='blue', label='Synthetic Data')
plt.title('t-SNE Visualization')
plt.xlabel('t-SNE Dimension 1')
plt.ylabel('t-SNE Dimension 2')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
avg_cosine_similarities = sum(cosine_similarities[-20:])/20
avg_js_distance = sum(js_distance[-20:])/20

In [None]:
avg_cosine_similarities

In [None]:
avg_js_distance

In [None]:
plt.figure(figsize=(12,4))
plt.title("Synthetic ECG - Lead I", fontsize=16)
plt.plot(samples[34,:,1])
plt.show()

In [None]:
G_loss.to_numpy()
plt.plot(G_loss)
plt.show()