In [31]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
from typing import Optional
from unicore.modules import TransformerEncoderLayer, LayerNorm

# Define the dataset
class CustomDataset(Dataset):
    def __init__(self, matrix_data, vector_data):
        self.matrix_data = matrix_data
        self.vector_data = vector_data

    def __len__(self):
        return len(self.matrix_data)

    def __getitem__(self, idx):
        return self.matrix_data[idx], self.vector_data[idx]

# Define the 2D matrix encoder (similar to an image encoder)
class MatrixEncoder(nn.Module):
    def __init__(self, input_channels, output_dim):
        super(MatrixEncoder, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, output_dim)
        )

    def forward(self, x):
        return self.cnn(x)

# Define the 1D vector encoder (similar to a text encoder)
class VectorEncoder(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(VectorEncoder, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

# Define the Transformer encoder with pair
class TransformerEncoderWithPair(nn.Module):
    def __init__(
        self,
        encoder_layers: int = 6,
        embed_dim: int = 128,
        ffn_embed_dim: int = 3072,
        attention_heads: int = 4,
        emb_dropout: float = 0.1,
        dropout: float = 0.1,
        attention_dropout: float = 0.1,
        activation_dropout: float = 0.0,
        max_seq_len: int = 256,
        activation_fn: str = "gelu",
        post_ln: bool = False,
        no_final_head_layer_norm: bool = False,
    ) -> None:

        super().__init__()
        self.emb_dropout = emb_dropout
        self.max_seq_len = max_seq_len
        self.embed_dim = 128
        self.attention_heads = 4
        self.emb_layer_norm = LayerNorm(self.embed_dim)
        if not post_ln:
            self.final_layer_norm = LayerNorm(self.embed_dim)
        else:
            self.final_layer_norm = None

        if not no_final_head_layer_norm:
            self.final_head_layer_norm = LayerNorm(attention_heads)
        else:
            self.final_head_layer_norm = None

        self.layers = nn.ModuleList(
            [
                TransformerEncoderLayer(
                    embed_dim=self.embed_dim,
                    ffn_embed_dim=ffn_embed_dim,
                    attention_heads=attention_heads,
                    dropout=dropout,
                    attention_dropout=attention_dropout,
                    activation_dropout=activation_dropout,
                    activation_fn=activation_fn,
                    post_ln=post_ln,
                )
                for _ in range(encoder_layers)
            ]
        )

    def forward(
        self,
        emb: torch.Tensor,
        attn_mask: Optional[torch.Tensor] = None,
        padding_mask: Optional[torch.Tensor] = None,
    ) -> torch.Tensor:

        bsz = emb.size(0)
        seq_len = emb.size(1)
        x = self.emb_layer_norm(emb)
        x = F.dropout(x, p=self.emb_dropout, training=self.training)

        # account for padding while computing the representation
        if padding_mask is not None:
            x = x * (1 - padding_mask.unsqueeze(-1).type_as(x))

        if attn_mask is None:
            attn_mask = torch.zeros((bsz, 1, seq_len, seq_len), device=emb.device).repeat(1, self.attention_heads, 1, 1).view(-1, seq_len, seq_len)

        for i in range(len(self.layers)):
            x, attn_mask, _ = self.layers[i](
                x, padding_mask=padding_mask, attn_bias=attn_mask, return_attn=True
            )

        if self.final_layer_norm is not None:
            x = self.final_layer_norm(x)

        return x, attn_mask

# Define the CLIP model
class CLIPModel(nn.Module):
    def __init__(self, matrix_encoder, vector_encoder, transformer_encoder):
        super(CLIPModel, self).__init__()
        self.matrix_encoder = matrix_encoder
        self.vector_encoder = vector_encoder
        self.transformer_encoder = transformer_encoder

    def forward(self, matrix, vector):
        matrix_features = self.matrix_encoder(matrix)
        vector_features = self.vector_encoder(vector)
        transformer_input = torch.cat((matrix_features.unsqueeze(1), vector_features.unsqueeze(1)), dim=1)
        transformer_output, _ = self.transformer_encoder(transformer_input)
        return transformer_output[:, 0, :], transformer_output[:, 1, :]

# Contrastive loss function
def contrastive_loss(matrix_features, vector_features, temperature=0.07):
    print("=== Contrastive Loss Calculation ===")
    # Normalize the features
    matrix_features = F.normalize(matrix_features, dim=-1)
    print("Normalized Matrix Features:", matrix_features)
    vector_features = F.normalize(vector_features, dim=-1)
    print("Normalized Vector Features:", vector_features)

    # Compute the logits
    logits = torch.matmul(matrix_features, vector_features.t()) / temperature
    print("Logits (Similarity Matrix):", logits)
    labels = torch.arange(len(matrix_features)).to(matrix_features.device)

    # Calculate positive pair loss (loss1)
    positive_logit = torch.diag(logits)
    print("Positive Logits (Diagonal Elements):", positive_logit)
    loss1 = -torch.mean(torch.log(torch.exp(positive_logit) / torch.sum(torch.exp(logits), dim=1)))
    print("Loss1 (Positive Pair Loss):", loss1)

    # Calculate negative pair loss (loss2)
    negative_logits = logits[~torch.eye(len(matrix_features), dtype=bool)].view(len(matrix_features), -1)
    print("Negative Logits:", negative_logits)
    loss2 = -torch.mean(torch.log(1 - torch.exp(negative_logits) / torch.sum(torch.exp(logits), dim=1, keepdim=True)))
    print("Loss2 (Negative Pair Loss):", loss2)

    # Combine loss1 and loss2
    loss = (loss1 + loss2) / 2
    print("Final Loss (Average of Loss1 and Loss2):", loss)
    return loss

# Training loop
def train_clip(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0
    for batch_idx, (matrix, vector) in enumerate(dataloader):
        matrix, vector = matrix.to(device), vector.to(device)

        # Forward pass
        matrix_features, vector_features = model(matrix, vector)
        print(f"Batch {batch_idx + 1}/{len(dataloader)}: Matrix features: {matrix_features.shape}, Vector features: {vector_features.shape}")

        # Compute loss
        loss = contrastive_loss(matrix_features, vector_features)
        print(f"Loss for batch {batch_idx + 1}: {loss.item()}")

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

# Example usage
def main():
    # Sample data (replace with actual data)
    matrix_data = torch.randn(100, 3, 32, 32)  # 100 samples of 3x32x32 matrices
    vector_data = torch.randn(100, 50)          # 100 samples of 1D vectors with 50 elements each

    # Hyperparameters
    output_dim = 128
    batch_size = 16
    learning_rate = 1e-3
    num_epochs = 10

    # Create dataset and dataloader
    dataset = CustomDataset(matrix_data, vector_data)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Define device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize model, optimizer
    matrix_encoder = MatrixEncoder(input_channels=3, output_dim=output_dim)
    vector_encoder = VectorEncoder(input_dim=50, output_dim=output_dim)
    transformer_encoder = TransformerEncoderWithPair()
    model = CLIPModel(matrix_encoder, vector_encoder, transformer_encoder).to(device)
    print(model)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Train the model
    for epoch in range(num_epochs):
        loss = train_clip(model, dataloader, optimizer, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss:.4f}")

    # Save the trained model
    torch.save(model.state_dict(), "clip_model.pth")

    # Load the trained model and test with new input
    model.load_state_dict(torch.load("clip_model.pth", map_location=device),)
    model.eval()

    # Define new inputs
    matrix_input = torch.randn(1, 3, 32, 32).to(device)  # A single 3x32x32 matrix
    vector_input = torch.randn(1, 50).to(device)          # A single 1D vector with 50 elements

    # Pass the inputs through the model
    with torch.no_grad():
        matrix_features, vector_features = model(matrix_input, vector_input)

    # Print the output features
    print("Matrix features output:", matrix_features)
    print("Vector features output:", vector_features)

if __name__ == "__main__":
    main()


CLIPModel(
  (matrix_encoder): MatrixEncoder(
    (cnn): Sequential(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): ReLU()
      (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Flatten(start_dim=1, end_dim=-1)
      (7): Linear(in_features=2048, out_features=128, bias=True)
    )
  )
  (vector_encoder): VectorEncoder(
    (fc): Sequential(
      (0): Linear(in_features=50, out_features=128, bias=True)
      (1): ReLU()
      (2): Linear(in_features=128, out_features=128, bias=True)
    )
  )
  (transformer_encoder): TransformerEncoderWithPair(
    (emb_layer_norm): LayerNorm(torch.Size([128]), eps=1e-05, elementwise_affine=True)
    (final_layer_norm): LayerNorm(torch.Size([128]), eps=1e-05, elementwise_affine=True)
    (final

  model.load_state_dict(torch.load("clip_model.pth", map_location=device),)


In [21]:
output_dim = 128
batch_size = 16
learning_rate = 1e-3
num_epochs = 10
# Create dataset and dataloader


# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize model, optimizer
matrix_encoder = MatrixEncoder(input_channels=3, output_dim=output_dim)
vector_encoder = VectorEncoder(input_dim=50, output_dim=output_dim)
transformer_encoder = TransformerEncoderWithPair()
model = CLIPModel(matrix_encoder, vector_encoder, transformer_encoder).to(device)
print(model)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Load the trained model and test with new input
model.load_state_dict(torch.load("clip_model.pth", map_location=device))
model.eval()

# Define new inputs
matrix_input = torch.randn(1, 3, 32, 32).to(device)  # A single 3x32x32 matrix
vector_input = torch.randn(1, 50).to(device)          # A single 1D vector with 50 elements

# Pass the inputs through the model
with torch.no_grad():
    matrix_features, vector_features = model(matrix_input, vector_input)

# Print the output features
print("Matrix features output:", matrix_features)
print("Vector features output:", vector_features)

CLIPModel(
  (matrix_encoder): MatrixEncoder(
    (cnn): Sequential(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU()
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): ReLU()
      (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Flatten(start_dim=1, end_dim=-1)
      (7): Linear(in_features=2048, out_features=128, bias=True)
    )
  )
  (vector_encoder): VectorEncoder(
    (fc): Sequential(
      (0): Linear(in_features=50, out_features=128, bias=True)
      (1): ReLU()
      (2): Linear(in_features=128, out_features=128, bias=True)
    )
  )
  (transformer_encoder): TransformerEncoderWithPair(
    (emb_layer_norm): LayerNorm(torch.Size([128]), eps=1e-05, elementwise_affine=True)
    (final_layer_norm): LayerNorm(torch.Size([128]), eps=1e-05, elementwise_affine=True)
    (final

  model.load_state_dict(torch.load("clip_model.pth", map_location=device))


In [33]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
from typing import Optional
from unicore.modules import TransformerEncoderLayer, LayerNorm

# Define the dataset
class CustomDataset(Dataset):
    def __init__(self, matrix_data, vector_data):
        self.matrix_data = matrix_data
        self.vector_data = vector_data

    def __len__(self):
        return len(self.matrix_data)

    def __getitem__(self, idx):
        return self.matrix_data[idx], self.vector_data[idx]

# Define the 2D matrix encoder (similar to an image encoder)
class MatrixEncoder(nn.Module):
    def __init__(self, input_channels, output_dim):
        super(MatrixEncoder, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, output_dim)
        )

    def forward(self, x):
        return self.cnn(x)

# Define the 1D vector encoder (similar to a text encoder)
class VectorEncoder(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(VectorEncoder, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

# Define the Transformer encoder with pair
class TransformerEncoderWithPair(nn.Module):
    def __init__(
        self,
        encoder_layers: int = 6,
        embed_dim: int = 128,
        ffn_embed_dim: int = 3072,
        attention_heads: int = 4,
        emb_dropout: float = 0.1,
        dropout: float = 0.1,
        attention_dropout: float = 0.1,
        activation_dropout: float = 0.0,
        max_seq_len: int = 256,
        activation_fn: str = "gelu",
        post_ln: bool = False,
        no_final_head_layer_norm: bool = False,
    ) -> None:

        super().__init__()
        self.emb_dropout = emb_dropout
        self.max_seq_len = max_seq_len
        self.embed_dim = 128
        self.attention_heads = 4
        self.emb_layer_norm = LayerNorm(self.embed_dim)
        if not post_ln:
            self.final_layer_norm = LayerNorm(self.embed_dim)
        else:
            self.final_layer_norm = None

        if not no_final_head_layer_norm:
            self.final_head_layer_norm = LayerNorm(attention_heads)
        else:
            self.final_head_layer_norm = None

        self.layers = nn.ModuleList(
            [
                TransformerEncoderLayer(
                    embed_dim=self.embed_dim,
                    ffn_embed_dim=ffn_embed_dim,
                    attention_heads=attention_heads,
                    dropout=dropout,
                    attention_dropout=attention_dropout,
                    activation_dropout=activation_dropout,
                    activation_fn=activation_fn,
                    post_ln=post_ln,
                )
                for _ in range(encoder_layers)
            ]
        )

    def forward(
        self,
        emb: torch.Tensor,
        attn_mask: Optional[torch.Tensor] = None,
        padding_mask: Optional[torch.Tensor] = None,
    ) -> torch.Tensor:

        bsz = emb.size(0)
        seq_len = emb.size(1)
        x = self.emb_layer_norm(emb)
        x = F.dropout(x, p=self.emb_dropout, training=self.training)

        # account for padding while computing the representation
        if padding_mask is not None:
            x = x * (1 - padding_mask.unsqueeze(-1).type_as(x))

        if attn_mask is None:
            attn_mask = torch.zeros((bsz, 1, seq_len, seq_len), device=emb.device).repeat(1, self.attention_heads, 1, 1).view(-1, seq_len, seq_len)

        for i in range(len(self.layers)):
            x, attn_mask, _ = self.layers[i](
                x, padding_mask=padding_mask, attn_bias=attn_mask, return_attn=True
            )

        if self.final_layer_norm is not None:
            x = self.final_layer_norm(x)

        return x, attn_mask

# Define the CLIP model
class CLIPModel(nn.Module):
    def __init__(self, matrix_encoder, vector_encoder, transformer_encoder):
        super(CLIPModel, self).__init__()
        self.matrix_encoder = matrix_encoder
        self.vector_encoder = vector_encoder
        self.transformer_encoder = transformer_encoder

    def forward(self, matrix, vector):
        matrix_features = self.matrix_encoder(matrix)
        vector_features = self.vector_encoder(vector)
        transformer_input = torch.cat((matrix_features.unsqueeze(1), vector_features.unsqueeze(1)), dim=1)
        transformer_output, _ = self.transformer_encoder(transformer_input)
        return transformer_output[:, 0, :], transformer_output[:, 1, :]

# Contrastive loss function
def contrastive_loss(matrix_features, vector_features, temperature=0.07):
    # Normalize the features
    matrix_features = F.normalize(matrix_features, dim=-1)
    vector_features = F.normalize(vector_features, dim=-1)

    # Compute the logits
    logits = torch.matmul(matrix_features, vector_features.t()) / temperature
    labels = torch.arange(len(matrix_features)).to(matrix_features.device)

    # Calculate positive pair loss (loss1)
    positive_logit = torch.diag(logits)
    loss1 = -torch.mean(torch.log(torch.exp(positive_logit) / torch.sum(torch.exp(logits), dim=1)))

    # Calculate negative pair loss (loss2)
    negative_logits = logits[~torch.eye(len(matrix_features), dtype=bool)].view(len(matrix_features), -1)
    loss2 = -torch.mean(torch.log(1 - torch.exp(negative_logits) / torch.sum(torch.exp(logits), dim=1, keepdim=True)))

    # Combine loss1 and loss2
    loss = (loss1 + loss2) / 2
    return loss

# Training loop
def train_clip(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0
    for batch_idx, (matrix, vector) in enumerate(dataloader):
        matrix, vector = matrix.to(device), vector.to(device)

        # Forward pass
        matrix_features, vector_features = model(matrix, vector)

        # Compute loss
        loss = contrastive_loss(matrix_features, vector_features)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

# Example usage
def main():
    # Sample data (replace with actual data)
    matrix_data = torch.randn(1000, 3, 32, 32)  # 100 samples of 3x32x32 matrices
    vector_data = torch.randn(1000, 50)          # 100 samples of 1D vectors with 50 elements each

    # Hyperparameters
    output_dim = 128
    batch_size = 16
    learning_rate = 1e-3
    num_epochs = 10

    # Create dataset and dataloader
    dataset = CustomDataset(matrix_data, vector_data)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Define device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize model, optimizer
    matrix_encoder = MatrixEncoder(input_channels=3, output_dim=output_dim)
    vector_encoder = VectorEncoder(input_dim=50, output_dim=output_dim)
    transformer_encoder = TransformerEncoderWithPair()
    model = CLIPModel(matrix_encoder, vector_encoder, transformer_encoder).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Train the model
    for epoch in range(num_epochs):
        loss = train_clip(model, dataloader, optimizer, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss:.4f}")

    # Save the trained model
    torch.save(model.state_dict(), "clip_model.pth")

    # Load the trained model and test with new input
    model.load_state_dict(torch.load("clip_model.pth", map_location=device))
    model.eval()

    # Define new inputs
    matrix_input = torch.randn(1, 3, 32, 32).to(device)  # A single 3x32x32 matrix
    vector_input = torch.randn(1, 50).to(device)          # A single 1D vector with 50 elements

    # Pass the inputs through the model
    with torch.no_grad():
        matrix_features, vector_features = model(matrix_input, vector_input)

    # Print the output features
    print("Matrix features output:", matrix_features)
    print("Vector features output:", vector_features)

if __name__ == "__main__":
    main()


Epoch [1/10], Loss: 0.2217
Epoch [2/10], Loss: 0.0262
Epoch [3/10], Loss: 0.0198
Epoch [4/10], Loss: 0.0141
Epoch [5/10], Loss: 0.0109
Epoch [6/10], Loss: 0.0114
Epoch [7/10], Loss: 0.0111
Epoch [8/10], Loss: 0.0105
Epoch [9/10], Loss: 0.0096
Epoch [10/10], Loss: 0.0081
Matrix features output: tensor([[-0.1062, -0.6231,  0.0466,  0.6175, -0.2106, -1.3731, -1.3836, -0.1255,
          0.2300, -0.1454,  0.0505,  0.3020, -1.9088, -0.6142,  0.8860, -0.3528,
         -0.5218, -0.3380,  1.1101, -0.2486, -0.2771,  0.9125, -0.8314,  2.3649,
         -0.5489, -1.5923, -2.7846, -0.9512, -0.2407, -0.6325,  0.1340,  1.1260,
         -1.3846, -1.5218,  1.7004, -2.0450, -0.1582, -0.8042,  0.1150,  0.6896,
         -0.5543,  0.0840,  0.4283, -0.7652,  2.7006,  0.0977,  1.7894, -0.4454,
          0.6268,  0.2087,  0.7317,  1.0694,  0.6220,  0.0481, -1.5968,  0.4783,
         -0.1574, -0.4091, -0.8259,  0.3940,  0.3037,  0.9135, -0.4993,  0.3582,
          1.1283, -0.0860, -1.0280, -0.9660, -0.0829,  1.

  model.load_state_dict(torch.load("clip_model.pth", map_location=device))
