Code for PacMAP

# First for Resnet

In [None]:
class ResidualUnit(nn.Module):
    def __init__(self, n_filters_in, n_filters_out, kernel_size=17, stride=1, dropout_rate=0.2, 
                 preactivation=True, postactivation_bn=False, activation_function='relu'):
        super(ResidualUnit, self).__init__()
        self.preactivation = preactivation
        self.postactivation_bn = postactivation_bn
        self.dropout_rate = dropout_rate

        # Activation function
        if activation_function == 'relu':
            self.activation = nn.ReLU(inplace=True)
        else:
            raise NotImplementedError("Activation function '{}' not implemented.".format(activation_function))

        self.bn1 = nn.BatchNorm1d(n_filters_in)
        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size, stride=1, padding=kernel_size//2, bias=False)
        self.bn2 = nn.BatchNorm1d(n_filters_out)
        self.conv2 = nn.Conv1d(n_filters_out, n_filters_out, kernel_size, stride=stride, padding=kernel_size//2 - 1, bias=False)
        self.dropout = nn.Dropout(dropout_rate)
        self.downsample = stride != 1 or n_filters_in != n_filters_out
        if self.downsample:
            self.conv_shortcut = nn.Conv1d(n_filters_in, n_filters_out, 1, stride=stride, bias=False)

    def forward(self, x):
        identity = x

        out = x
        if self.preactivation:
            out = self.bn1(out)
            out = self.activation(out)
        out = self.conv1(out)
        out = self.bn2(out)
        out = self.activation(out)
        if self.dropout_rate > 0:
            out = self.dropout(out)
        out = self.conv2(out)

        if self.downsample:
            identity = self.conv_shortcut(identity)

        out += identity
        if not self.preactivation or self.postactivation_bn:
            out = self.bn2(out)
            out = self.activation(out)
        return out

class ResNet1DRegression(nn.Module):
    def __init__(self, input_length=100, kernel_size=16):
        super(ResNet1DRegression, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size, stride=1, padding=kernel_size//2, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        
        self.res1 = ResidualUnit(64, 128, kernel_size=kernel_size, stride=3)
        self.res2 = ResidualUnit(128, 196, kernel_size=kernel_size, stride=3)
        self.res3 = ResidualUnit(196, 256, kernel_size=kernel_size, stride=2)
        self.res4 = ResidualUnit(256, 320, kernel_size=kernel_size, stride=2)

        self.flatten_dim = self._get_flatten_dim(input_length)
        self.fc = nn.Linear(self.flatten_dim, 1)

    def _get_flatten_dim(self, input_length):
        with torch.no_grad():
            x = torch.zeros(1, 1, input_length)
            x = self.conv1(x)
            x = self.bn1(x)
            x = self.relu(x)
            x = self.res1(x)
            x = self.res2(x)
            x = self.res3(x)
            x = self.res4(x)
            flatten_dim = x.view(1, -1).size(1)
        return flatten_dim

    def forward(self, x):
        # x shape: [batch_size, 1, input_length]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.res1(x)
        x = self.res2(x)
        x = self.res3(x)
        x = self.res4(x)

        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    
    def get_latent(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.res1(x)
        x = self.res2(x)
        x = self.res3(x)
        x = self.res4(x)
        latent = x.view(x.size(0), -1)
        
        return latent

def train_age_resnet(device, X, Y, lr=0.001, batch_size=256, num_epoch=16, end_factor=0.1, use_tqdm=True, input_length=100):
    model = ResNet1DRegression(input_length=input_length).to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.LinearLR(
        optimizer, start_factor=1.0, end_factor=end_factor, total_iters=(num_epoch * len(X)) // batch_size
    )
    criterion = nn.MSELoss()  # MSE loss for regression
    
    epoch_iterator = tqdm(range(num_epoch), desc="Training Epochs") if use_tqdm else range(num_epoch)
    for epoch in epoch_iterator:
        # Shuffle data at each epoch
        permutation = np.random.permutation(len(X))
        X = X[permutation]
        Y = Y[permutation]
        
        for batch_idx in range(0, len(X), batch_size):
            batch_data = X[batch_idx:batch_idx+batch_size]
            batch_target = Y[batch_idx:batch_idx+batch_size]
            
            # Prepare tensors
            batch_data = torch.tensor(batch_data, dtype=torch.float32).unsqueeze(1).to(device)
            batch_target = torch.tensor(batch_target, dtype=torch.float32).view(-1, 1).to(device)
            
            optimizer.zero_grad()
            pred = model(batch_data)
            loss = criterion(pred, batch_target)
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            epoch_iterator.set_description(f"Epoch {epoch+1}, loss: {loss.item():.5f}")
    
    return model

# Testing function for the ResNet regression model
def test_age_resnet(model, device, X):
    model.eval()
    predictions = []
    with torch.no_grad():
        for i in range(len(X)):
            data = torch.tensor(X[i], dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
            output = model(data)
            predictions.append(output.cpu().numpy())
    predictions = np.concatenate(predictions, axis=0)
    return predictions

In [None]:
import pacmap
import matplotlib.pyplot as plt
import numpy
model = ResNet1DRegression(input_length=100).to(device)
# Load the saved state dictionary into the model
#fold = random.randint(1, 5)
fold=4
model.load_state_dict(torch.load("ModelOutputNormalized_Resnet/resnet_lr0001_model_state_fold{}.pth".format(fold)))
model.eval()

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold, train_test_split

def extract_latent(model, device, X):
    model.eval()
    latent_list = []
    dataset = TensorDataset(torch.tensor(X, dtype=torch.float32).unsqueeze(1))
    loader = DataLoader(dataset, batch_size=1024, shuffle=False)
    with torch.no_grad():
        for batch in loader:
            x_batch = batch[0].to(device)
            latent = model.get_latent(x_batch)
            latent_list.append(latent.cpu().numpy())
    latent_data = np.concatenate(latent_list, axis=0)
    return latent_data

    
kf = KFold(n_splits=5, shuffle=True, random_state=seed)
indices = np.arange(len(X))

for fold_idx, (train_val_idx, _) in enumerate(kf.split(indices)):
    if (fold_idx + 1) == fold:
        train_fold_idx, val_fold_idx = train_test_split(train_val_idx, test_size=0.20, random_state=seed)
        X_val_fold = X[val_fold_idx]
        break
        
latent_data = extract_latent(model, device, X_val_fold)


In [None]:
embedding = pacmap.PaCMAP(n_components=2, random_state=42)
latent_2d = embedding.fit_transform(latent_data)

In [None]:
plt.figure(figsize=(8, 6))
scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=Y[val_fold_idx], cmap='viridis')
cbar = plt.colorbar(scatter)
cbar.set_label('Age', fontsize=20) 
cbar.ax.tick_params(labelsize=18)
plt.xlabel("Component 1", fontsize=20)
plt.ylabel("Component 2", fontsize=20)
plt.title(f"PacMAP of ResNet PPG Signal Embeddings", fontsize=22)
plt.xticks([]) 
plt.yticks([])  
plt.tight_layout()
#plt.show()
plt.savefig("pacmap_resnet.png", dpi=300, bbox_inches='tight')
plt.close()


# Now for SMoLK

In [None]:
class LearnedFiltersAge(nn.Module):
    def __init__(self, num_kernels=24):
        super(LearnedFiltersAge, self).__init__()
        # 100 point signals so chose 25, 15, 10 instead of 192, 96, 64
        self.conv1 = nn.Conv1d(1, num_kernels, kernel_size=100, stride=1, bias=True)
        self.conv2 = nn.Conv1d(1, num_kernels, kernel_size=50, stride=1, bias=True)
        self.conv3 = nn.Conv1d(1, num_kernels, kernel_size=20, stride=1, bias=True)
        
        self.linear = nn.Linear(num_kernels*3 + 51, 1)  # 51 instead of 321 for size of power spectrum
    
    def forward(self, x, powerspectrum):
        c1 = F.leaky_relu(self.conv1(x)).mean(dim=-1) # shape of x is [B, 1, 100]
        c2 = F.leaky_relu(self.conv2(x)).mean(dim=-1)
        c3 = F.leaky_relu(self.conv3(x)).mean(dim=-1)
        
        aggregate = torch.cat([c1, c2, c3, powerspectrum], dim=1) # shape of powerspectrum is [B, 51]
        aggregate = self.linear(aggregate)
        
        return aggregate
    
    def get_latent(self, x, powerspectrum):
        c1 = F.leaky_relu(self.conv1(x)).mean(dim=-1)
        c2 = F.leaky_relu(self.conv2(x)).mean(dim=-1)
        c3 = F.leaky_relu(self.conv3(x)).mean(dim=-1)
        
        # Concatenate to form latent representation
        latent = torch.cat([c1, c2, c3, powerspectrum], dim=1)
        return latent

def train_age(device, X, Y, num_kernels=24, lr=0.001, batch_size=256, num_epoch=16, end_factor=0.1, use_tqdm=True):
    # compute power spectra for X
    PowerSpectra = []
    for i in tqdm(range(0, len(X)), desc="Computing Power Spectra"):
        PowerSpectra.append(periodogram(X[i], fs=100)[1])
    PowerSpectra = np.float32(PowerSpectra)
    
    model = LearnedFiltersAge(num_kernels=num_kernels).to(device)
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.LinearLR(
        optimizer, start_factor=1.0, end_factor=end_factor, total_iters=(num_epoch * len(X)) // batch_size
    )
    criterion = nn.MSELoss() # MSE Loss instead of CrossEntropy
    
    epoch_iterator = tqdm(range(num_epoch), desc="Training Epochs") if use_tqdm else range(num_epoch)
    for epoch in epoch_iterator:
        # Shuffle data at each epoch
        permutation = np.random.permutation(len(X))
        X = X[permutation]
        Y = Y[permutation]
        PowerSpectra = PowerSpectra[permutation]
        
        for batch_idx in range(0, len(X), batch_size):
            batch_data = X[batch_idx:batch_idx+batch_size]
            batch_power = PowerSpectra[batch_idx:batch_idx+batch_size]
            batch_target = Y[batch_idx:batch_idx+batch_size]
            
            # Prepare tensors
            batch_data = torch.tensor(batch_data, dtype=torch.float32).unsqueeze(1).to(device) 
            batch_power = torch.tensor(batch_power, dtype=torch.float32).to(device)
            batch_target = torch.tensor(batch_target, dtype=torch.float32).view(-1, 1).to(device)
            
            optimizer.zero_grad()
            pred = model(batch_data, batch_power)
            loss = criterion(pred, batch_target)
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            epoch_iterator.set_description(f"Epoch {epoch+1}, loss: {loss.item():.5f}")
    
    return model

def test_age(model, device, X):
    # compute power spectra for X
    PowerSpectra = []
    for i in range(len(X)):
        PowerSpectra.append(periodogram(X[i], fs=100)[1])
    PowerSpectra = np.stack(PowerSpectra)
    
    model.eval()
    predictions = []
    with torch.no_grad():
        for i in range(0, len(X)):
            data = torch.tensor(X[i], dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device) 
            powerspectrum = torch.tensor(PowerSpectra[i], dtype=torch.float32).unsqueeze(0).to(device) 
            output = model(data, powerspectrum)
            predictions.append(output.cpu().numpy())
    predictions = np.concatenate(predictions, axis=0)
    return predictions

In [None]:
import pacmap
import matplotlib.pyplot as plt
import numpy
model = LearnedFiltersAge(num_kernels=384).to(device)
# Load the saved state dictionary into the model
#fold = random.randint(1, 5)
fold = 4
model.load_state_dict(torch.load("ModelOutputNormalized/smolk_model_lr0001_384_200_fold{}.pth".format(fold)))
model.eval()

In [None]:
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import KFold, train_test_split


In [None]:
def extract_latent_smolk(model, device, X, fs=100, batch_size=1024):
    # Compute power spectra for each sample in X.
    power_spectra = []
    for i in tqdm(range(len(X)), desc="Computing Power Spectra for Latent Extraction"):
        # periodogram returns (freqs, power), and here we take the power spectrum.
        power_spectra.append(periodogram(X[i], fs=fs)[1])
    power_spectra = np.array(power_spectra).astype(np.float32)
    
    # Create a dataset with both the raw signal and its corresponding power spectrum.
    # Expected shapes: X -> [N, signal_length] and power_spectra -> [N, 51] (or however many bins you obtain)
    dataset = TensorDataset(
        torch.tensor(X, dtype=torch.float32).unsqueeze(1),   # Add channel dimension: [N, 1, signal_length]
        torch.tensor(power_spectra, dtype=torch.float32)
    )
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    
    latent_list = []
    model.eval()
    with torch.no_grad():
        for batch in loader:
            x_batch, power_batch = batch
            x_batch = x_batch.to(device)
            power_batch = power_batch.to(device)
            # Use the get_latent method to extract the representation
            latent = model.get_latent(x_batch, power_batch)
            latent_list.append(latent.cpu().numpy())
    
    latent_data = np.concatenate(latent_list, axis=0)
    return latent_data

def visualize_latent_with_pacmap(latent_data, labels=None):
    embedding = pacmap.PaCMAP(n_components=2, random_state=42)
    latent_2d = embedding.fit_transform(latent_data)
    
    plt.figure(figsize=(8, 6))
    if labels is not None:
        scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=labels, cmap='viridis')
        plt.colorbar(scatter, label='Age')
    else:
        plt.scatter(latent_2d[:, 0], latent_2d[:, 1], cmap='viridis')
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.title("Latent Space Embedding via PaCMAP")
    plt.show()


kf = KFold(n_splits=5, shuffle=True, random_state=seed)
indices = np.arange(len(X))

for fold_idx, (train_val_idx, _) in enumerate(kf.split(indices)):
    if (fold_idx + 1) == fold:
        train_fold_idx, val_fold_idx = train_test_split(train_val_idx, test_size=0.20, random_state=seed)
        X_val_fold = X[val_fold_idx]
        break

latent_data = extract_latent_smolk(model, device, X_val_fold)
visualize_latent_with_pacmap(latent_data, labels=Y[val_fold_idx])

In [None]:
embedding = pacmap.PaCMAP(n_components=2, random_state=42)
latent_2d = embedding.fit_transform(latent_data)



In [None]:
plt.figure(figsize=(8, 6))
scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=Y[val_fold_idx], cmap='viridis')
cbar = plt.colorbar(scatter)
cbar.set_label('Age', fontsize=26) 
cbar.ax.tick_params(labelsize=22)
plt.xlabel("Component 1", fontsize=26)
plt.ylabel("Component 2", fontsize=26)
plt.title(f"PacMAP of SMoLK PPG Embeddings", fontsize=28)
plt.xticks([]) 
plt.yticks([])  
plt.tight_layout()
#plt.show()
plt.savefig("pacmap_smolk.png", dpi=300, bbox_inches='tight')
plt.close()