In [1]:
import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import joblib as jl
import cebra.datasets
from cebra import CEBRA

from matplotlib.collections import LineCollection
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F


In [2]:
hippocampus_pos = cebra.datasets.init('rat-hippocampus-single-achilles')
neural_data = torch.tensor(cebra.datasets.init('rat-hippocampus-single-achilles').neural, dtype=torch.float32)
labels = torch.tensor(cebra.datasets.init('rat-hippocampus-single-achilles').continuous_index.numpy(), dtype=torch.float32)




  neural_data = torch.tensor(cebra.datasets.init('rat-hippocampus-single-achilles').neural, dtype=torch.float32)


In [3]:
class ConvNetVAE(nn.Module):
    def __init__(self, num_features, num_hidden_units, latent_dim):
        super(ConvNetVAE, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=num_features, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=latent_dim, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=latent_dim, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_features, kernel_size=5, stride=1, padding=5),
            nn.ReLU()
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=latent_dim, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_features, kernel_size=5, stride=1, padding=2),
            nn.ReLU()
        )

    def forward(self, x):
        x = x.unsqueeze(1).permute(0, 2, 1)
        #print(x.shape)
        encoded = self.encoder(x)
        #print(encoded.shape)
        decoded = self.decoder(encoded)
        #print(decoded.shape)
        return encoded.squeeze(2), decoded.permute(0, 2, 1).squeeze(1)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Adjust latent dimension as needed
latent_dim = 10  # This is a hyperparameter you can tune
model = ConvNetVAE(num_features=120, num_hidden_units=16, latent_dim=latent_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=3e-4)

In [4]:
def reconstruction_loss(reconstructed, original):
    return F.mse_loss(reconstructed, original)

In [5]:
# DataLoader setup
dataset = TensorDataset(neural_data, labels)
loader = DataLoader(dataset, batch_size=512, shuffle=True)

In [6]:
def pretrain_epoch(loader, model, optimizer):
    model.train()
    total_loss = 0
    for data, _ in loader:
        data = data.to(device)
        optimizer.zero_grad()
        _, reconstructed = model(data)
        loss = reconstruction_loss(reconstructed, data)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

# Pretrain the model
pretrain_epochs = 100
for epoch in range(pretrain_epochs):
    loss = pretrain_epoch(loader, model, optimizer)
    if (epoch + 1) % 100 == 0:
        print(f"Pretrain Epoch {epoch + 1}, Loss: {loss:.4f}")

Pretrain Epoch 100, Loss: 0.0499


In [7]:
def info_nce_loss(features, labels, temperature=1.0, threshold=0.3):
    # Normalize features to unit length for cosine similarity
    features = F.normalize(features, dim=1)
    
    # Calculate the cosine similarity matrix
    similarity_matrix = torch.matmul(features, features.T)
    
    # Calculate Euclidean distances between labels for defining positive pairs
    labels_diff = torch.cdist(labels[:,0].unsqueeze(1), labels[:,0].unsqueeze(1))
    
    # Create a mask for positive samples based on a threshold in label space
    positive_mask = (labels_diff <= threshold).float()

    # Calculate exponentiated similarities scaled by temperature
    exp_sim = torch.exp(similarity_matrix / temperature)
    
    # Compute sums of exponentiated similarities where masks apply
    pos_sum = torch.sum(exp_sim * positive_mask, dim=1)
    all_sum = torch.sum(exp_sim, dim=1)
    
    # Calculate the InfoNCE loss
    loss = -torch.log(pos_sum / all_sum + 1e-6)

    return torch.mean(loss)


In [9]:
# Optionally save the model state
torch.save(model.state_dict(), 'pretrained_model.pth')


# Load the entire state dict from the file
full_state_dict = torch.load('pretrained_model.pth', map_location=device)

# Extract just the encoder part of the state dict
encoder_state_dict = {key.replace('encoder.', ''): value for key, value in full_state_dict.items() if key.startswith('encoder.')}

# Load the adjusted state dict into the encoder
model.encoder.load_state_dict(encoder_state_dict)

<All keys matched successfully>

In [11]:
class ConvNet(nn.Module):
    def __init__(self, num_features, num_hidden_units, output_dim):
        super(ConvNet, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels=num_features, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=output_dim, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )

    def forward(self, x):
        x = x.unsqueeze(1).permute(0, 2, 1)  # Change shape to [batch_size, channels, length]
        x = self.conv_layers(x)
        x = x.squeeze(2)  # Remove the last dimension after pooling
        return x


def info_nce_loss(features, labels, temperature=1.0, threshold=0.3):
    # Normalize features to unit length for cosine similarity
    features = F.normalize(features, dim=1)
    
    # Calculate the cosine similarity matrix
    similarity_matrix = torch.matmul(features, features.T)
    
    # Calculate Euclidean distances between labels for defining positive pairs
    labels_diff = torch.cdist(labels[:,0].unsqueeze(1), labels[:,0].unsqueeze(1))
    
    # Create a mask for positive samples based on a threshold in label space
    positive_mask = (labels_diff <= threshold).float()

    # Calculate exponentiated similarities scaled by temperature
    exp_sim = torch.exp(similarity_matrix / temperature)
    
    # Compute sums of exponentiated similarities where masks apply
    pos_sum = torch.sum(exp_sim * positive_mask, dim=1)
    all_sum = torch.sum(exp_sim, dim=1)
    
    # Calculate the InfoNCE loss
    loss = -torch.log(pos_sum / all_sum + 1e-6)

    return torch.mean(loss)

# Example usage in a training loop
# Assume `features` and `labels` are outputs from your model and target labels respectively
# loss = info_nce_loss(features, labels, temperature=1.0, threshold=0.1)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize model, optimizer
model = ConvNet(num_features=120, num_hidden_units=16, output_dim=3).to(device)
model.encoder.load_state_dict(torch.load('pretrained_model.pth', map_location=device)) 
optimizer = optim.Adam(model.parameters(), lr=3e-4)


AttributeError: 'ConvNet' object has no attribute 'encoder'

NameError: name 'ConvNet' is not defined

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Define the ConvNet with VAE-like architecture
class ConvNetVAE(nn.Module):
    def __init__(self, num_features, num_hidden_units, latent_dim):
        super(ConvNetVAE, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=num_features, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.Conv1d(in_channels=num_hidden_units, out_channels=latent_dim, kernel_size=5, stride=1, padding=5),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=latent_dim, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_hidden_units, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.ConvTranspose1d(in_channels=num_hidden_units, out_channels=num_features, kernel_size=5, stride=1, padding=2),
            nn.ReLU()
        )

    def forward(self, x):
        x = x.unsqueeze(1).permute(0, 2, 1)
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded.squeeze(2), decoded.permute(0, 2, 1).squeeze(1)

# Reconstruction loss definition
def reconstruction_loss(reconstructed, original):
    return F.mse_loss(reconstructed, original)

# DataLoader setup
neural_data = torch.randn(10178, 120)  # Simulated neural data (timesteps, features)
labels = torch.randn(10178, 3)         # Simulated continuous labels
dataset = TensorDataset(neural_data, labels)
loader = DataLoader(dataset, batch_size=512, shuffle=True)

# Pretrain the VAE model
def pretrain_epoch(loader, model, optimizer):
    model.train()
    total_loss = 0
    for data, _ in loader:
        data = data.to(device)
        optimizer.zero_grad()
        _, reconstructed = model(data)
        loss = reconstruction_loss(reconstructed, data)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
latent_dim = 10
model = ConvNetVAE(num_features=120, num_hidden_units=16, latent_dim=latent_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=3e-4)

# Run pretraining
pretrain_epochs = 50
for epoch in range(pretrain_epochs):
    loss = pretrain_epoch(loader, model, optimizer)
    if (epoch + 1) % 10 == 0:
        print(f"Pretrain Epoch {epoch + 1}, Loss: {loss:.4f}")

# Save the pre-trained encoder model
torch.save(model.encoder.state_dict(), 'encoder_pretrained.pth')

# Load the encoder for contrastive learning
model.encoder.load_state_dict(torch.load('encoder_pretrained.pth'))

# Define the InfoNCE loss function for contrastive learning
def info_nce_loss(features, labels, temperature=1.0, threshold=0.3):
    features = F.normalize(features, dim=1)
    similarity_matrix = torch.matmul(features, features.T)
    labels_diff = torch.cdist(labels[:,0].unsqueeze(1), labels[:,0].unsqueeze(1))
    positive_mask = (labels_diff <= threshold).float()
    exp_sim = torch.exp(similarity_matrix / temperature)
    pos_sum = torch.sum(exp_sim * positive_mask, dim=1)
    all_sum = torch.sum(exp_sim, dim=1)
    loss = -torch.log(pos_sum / all_sum + 1e-6)
    return torch.mean(loss)

# Training loop for contrastive learning
def train_contrastive_epoch(loader, model, optimizer):
    model.train()
    total_loss = 0
    for data, targets in loader:
        data, targets = data.to(device), targets.to(device)
        optimizer.zero_grad()
        features, _ = model(data)  # Use only encoder outputs
        loss = info_nce_loss(features, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

# Run contrastive training
contrastive_epochs = 50
for epoch in range(contrastive_epochs):
    loss = train_contrastive_epoch(loader, model, optimizer)
    if (epoch + 1) % 10 == 0:
        print(f"Contrastive Epoch {epoch + 1}, Loss: {loss:.4f}")

Pretrain Epoch 10, Loss: 0.9970
Pretrain Epoch 20, Loss: 0.9909
Pretrain Epoch 30, Loss: 0.9885
Pretrain Epoch 40, Loss: 0.9848
Pretrain Epoch 50, Loss: 0.9832
Contrastive Epoch 10, Loss: 1.9103
Contrastive Epoch 20, Loss: 1.9009
Contrastive Epoch 30, Loss: 1.8992
Contrastive Epoch 40, Loss: 1.8960
Contrastive Epoch 50, Loss: 1.8989
