In [1]:
# Sure, here's an example of how you could implement contrastive learning for 4-channel physiological signals self-supervised learning with 3 masking methods in PyTorch:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import StandardScaler

class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.5):
        super(ContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, x, y):
        batch_size = x.shape[0]
        embeddings = torch.cat([x, y], dim=0)
        similarity_matrix = torch.exp(torch.mm(embeddings, embeddings.t()) / self.temperature)
        mask = torch.eye(2 * batch_size, dtype=torch.bool)
        contrastive_matrix = similarity_matrix[~mask].view(2 * batch_size, -1)
        loss = (-torch.log(contrastive_matrix / torch.sum(similarity_matrix, dim=1, keepdim=True)))
        loss = torch.mean(loss)

        return loss

class Encoder(nn.Module):
    def __init__(self, input_dim=4):
        super(Encoder, self).__init__()
        self.input_dim = input_dim
        self.features = nn.Sequential(
            nn.Linear(self.input_dim, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 256),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool1d(1),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return x

class MaskingMethod1(nn.Module):
    def __init__(self):
        super(MaskingMethod1, self).__init__()

    def forward(self, x):
        batch_size = x.shape[0]
        t = np.random.randint(1, 3)
        indices = torch.randperm(batch_size)
        x1 = x[:batch_size//2, t:]
        x2 = x[indices[:batch_size//2], :t]
        masked_x1 = torch.cat([x1, torch.zeros_like(x1)], dim=1)
        masked_x2 = torch.cat([torch.zeros_like(x2), x2], dim=1)
        return masked_x1, masked_x2

class MaskingMethod2(nn.Module):
    def __init__(self):
        super(MaskingMethod2, self).__init__()

    def forward(self, x):
        batch_size = x.shape[0]
        t = np.random.randint(1, 3)
        indices = torch.randperm(batch_size)
        x1 = x[:batch_size//2, t:]
        x2 = x[indices[:batch_size//2], t:]
        masked_x1 = torch.cat([x1, torch.zeros_like(x1)], dim=1)
        masked_x2 = torch.cat([torch.zeros_like(x2), x2], dim=1)
        return masked_x1, masked_x2

class MaskingMethod3(nn.Module):
    def __init__(self):
        super(MaskingMethod3, self).__init__()

    def forward(self, x):
        batch_size = x.shape[0]
        t = np.random.randint(1, 3)
        indices = torch.randperm(batch_size)
        x1 = x[:batch_size//2, :]
        x2 = x[indices[:batch_size//2], :]
        x1[:, t:] = 0
        x2[:, :t] = 0
        return x1, x2

class ContrastiveModel(nn.Module):
    def __init__(self, input_dim=4):
        super(ContrastiveModel, self).__init__()
        self.encoder = Encoder(input_dim=input_dim)
        self.projector = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, 16),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.projector(x)
        return x

# Load the physiological signal dataset
data = np.random.rand(1000, 4)  # 1000 samples, 4 channels
scaler = StandardScaler()
data = scaler.fit_transform(data)
data = torch.from_numpy(data).float()

# Define the masking methods
masking_methods = [MaskingMethod1(), MaskingMethod2(), MaskingMethod3()]

# Define the contrastive model and optimizer
model = ContrastiveModel(input_dim=4)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [3]:
criterion = ContrastiveLoss()

for epoch in range(10):
    for i in range(0, len(data), 2):
        # Randomly select a masking method
        masking_method = np.random.choice(masking_methods)

        # Apply the masking method to the data
        x1, x2 = masking_method(data[i:i+2])

        # Encode the masked data
        z1 = model(x1)
        z2 = model(x2)

        # Compute the contrastive loss
        loss = criterion(z1, z2)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print training progress
        if i % 100 == 0:
            print(f"Epoch [{epoch}/{10}], Step [{i}/{len(data)}], Loss: {loss.item():.4f}")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x1 and 256x128)

In [None]:
# Sure, here's an example of how you could use the pre-trained contrastive model from the previous example in a binary classification downstream task:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

class Encoder(nn.Module):
    def __init__(self, input_dim=4):
        super(Encoder, self).__init__()
        self.input_dim = input_dim
        self.features = nn.Sequential(
            nn.Linear(self.input_dim, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 256),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool1d(1),
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return x

class BinaryClassifier(nn.Module):
    def __init__(self, input_dim=16):
        super(BinaryClassifier, self).__init__()
        self.input_dim = input_dim
        self.classifier = nn.Sequential(
            nn.Linear(self.input_dim, 8),
            nn.ReLU(inplace=True),
            nn.Linear(8, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.classifier(x)
        return x

# Load the physiological signal dataset
data = np.random.rand(1000, 4)  # 1000 samples, 4 channels
labels = np.random.randint(0, 2, size=1000)  # 0 or 1 labels
scaler = StandardScaler()
data = scaler.fit_transform(data)
data = torch.from_numpy(data).float()
labels = torch.from_numpy(labels).float()

# Split the data into train and test sets
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, random_state=42)

# Load the pre-trained contrastive model
contrastive_model = ContrastiveModel(input_dim=4)
contrastive_model.load_state_dict(torch.load("contrastive_model.pth"))

# Freeze the contrastive model weights
for param in contrastive_model.parameters():
    param.requires_grad = False

# Define the binary classification model and optimizer
encoder = Encoder(input_dim=4)
classifier = BinaryClassifier(input_dim=16)
optimizer = optim.Adam(classifier.parameters(), lr=1e-3)

# Train the binary classification model
criterion = nn.BCELoss()

for epoch in range(10):
    for i in range(0, len(train_data), 32):
        # Encode the training data using the pre-trained contrastive model
        with torch.no_grad():
            z = contrastive_model.encoder(train_data[i:i+32])

        # Train the binary classification model on the encoded data
        y_pred = classifier(z)
        loss = criterion(y_pred.squeeze(), train_labels[i:i+32])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print training progress
        if i % 100 == 0:
            print(f"Epoch [{epoch}/{10}], Step [{i}/{len(train_data)}], Loss: {loss.item():.4f}")

# Evaluate the binary classification model on the test set
with torch.no_grad():
    z_test = contrastive_model.encoder(test_data)
    y_pred_test = classifier(z_test)
    test_loss = criterion(y_pred_test.squeeze(), test_labels)

print(f"Test Loss: {test_loss.item():.4f}")
# The code defines a BinaryClassifier module that implements a simple binary classification model with a single hidden layer. The Encoder module is also defined, which is used to encode the physiological signals using the pre-trained contrastive model.
# During training, the code encodes the training data using the pre-trained contrastive model, and then trains the binary classification model on the encoded data. The binary classification model is trained using the Adam optimizer with a learning rate of 1e-3.
# The code prints the training progress every 100 steps. After 10 epochs of training, the binary classification model should have learned to predict the labels of the physiological signals with decent accuracy.
# Finally, the code evaluates the binary classification model on the test set and prints the test loss.