In [11]:
import torch
import torch.nn as nn

class MultiHeadCrossAttention(nn.Module):
    def __init__(self, input_dim1, input_dim2, num_heads, head_dim):
        super(MultiHeadCrossAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.total_dim = num_heads * head_dim
        
        # Linear transformations for queries and keys
        self.linear_q1 = nn.Linear(input_dim1, self.total_dim)
        self.linear_k2 = nn.Linear(input_dim2, self.total_dim)
        
        # Linear transformation for values
        self.linear_v1 = nn.Linear(input_dim1, self.total_dim)
        
        # Linear transformation for the output of multi-head attention
        self.linear_out = nn.Linear(self.total_dim, self.total_dim)
        
    def split_heads(self, x):
        # Reshape the input tensor to separate heads
        batch_size, seq_len, _ = x.size()
        x = x.view(batch_size, seq_len, self.num_heads, self.head_dim)
        return x.permute(0, 2, 1, 3)
    
    def forward(self, x1, x2):
        # Linearly transform queries, keys, and values
        q1 = self.linear_q1(x1)
        k2 = self.linear_k2(x2)
        v1 = self.linear_v1(x1)
        
        # Split heads for queries, keys, and values
        q1 = self.split_heads(q1)
        k2 = self.split_heads(k2)
        v1 = self.split_heads(v1)
        
        # Compute scaled dot-product attention
        attn_weights = torch.matmul(q1, k2.permute(0, 1, 3, 2)) / (self.head_dim ** 0.5)
        attn_weights = torch.nn.functional.softmax(attn_weights, dim=-1)
        output = torch.matmul(attn_weights, v1)
        
        # Concatenate the multi-head outputs and linearly transform
        output = output.permute(0, 2, 1, 3).contiguous().view(x1.size(0), -1, self.total_dim)
        output = self.linear_out(output)
        
        return output

# 示例用法
input_dim1 = 1280
input_dim2 = 300
num_heads = 4
head_dim = 64

multi_head_attention = MultiHeadCrossAttention(input_dim1, input_dim2, num_heads, head_dim)

# 输入示例
x1 = torch.randn(32, 10, input_dim1)  # 输入1，形状为(batch_size, sequence_length, input_dim1)
x2 = torch.randn(32, 20, input_dim2)  # 输入2，形状为(batch_size, sequence_length, input_dim2)
print('x1:',x1.shape)
print('x2:',x2.shape)
output = multi_head_attention(x1, x2)
print("输出形状:", output.shape)

x1: torch.Size([32, 10, 1280])
x2: torch.Size([32, 20, 300])


RuntimeError: Expected batch2_sizes[0] == bs && batch2_sizes[1] == contraction_size to be true, but got false.  (Could this error message be improved?  If so, please report an enhancement request to PyTorch.)

In [20]:
import torch
import torch.nn as nn

class MultiHeadCrossAttention(nn.Module):
    def __init__(self, input_dim1, input_dim2, num_heads, head_dim):
        super(MultiHeadCrossAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = head_dim
        self.total_dim = num_heads * head_dim
        
        # Linear transformations for queries and keys
        self.linear_q1 = nn.Linear(input_dim1, self.total_dim)
        self.linear_k2 = nn.Linear(input_dim2, self.total_dim)
        
        # Linear transformation for values
        self.linear_v1 = nn.Linear(input_dim1, self.total_dim)
        
        # Linear transformation for the output of multi-head attention
        self.linear_out = nn.Linear(self.total_dim, self.total_dim)
        
    def split_heads(self, x):
        # Reshape the input tensor to separate heads
        batch_size, seq_len, _ = x.size()
        x = x.view(batch_size, seq_len, self.num_heads, self.head_dim)
        return x.permute(0, 2, 1, 3)
    
    def forward(self, x1, x2):
        # Linearly transform queries, keys, and values
        q1 = self.linear_q1(x1)
        k2 = self.linear_k2(x2)
        v1 = self.linear_v1(x1)
        print('q1:',q1.shape)
        print('k2:',k2.shape)
        print('v1:',v1.shape)
        # Split heads for queries, keys, and values
        q1 = self.split_heads(q1)
        k2 = self.split_heads(k2)
        v1 = self.split_heads(v1)
        print('v1_split:',v1.shape)
        print('q1_split:',q1.shape)
        print('k2_split:',k2.shape)
        # Compute scaled dot-product attention
        attn_weights = torch.matmul(q1, k2.permute(0, 1, 3, 2)) / (self.head_dim ** 0.5)
        attn_weights = torch.nn.functional.softmax(attn_weights, dim=-1)
        print('attn_weight:',attn_weights.shape)
        print('v1:',v1.shape)
        output = torch.matmul(attn_weights, v1)
        
        # Concatenate the multi-head outputs and linearly transform
        output = output.permute(0, 2, 1, 3).contiguous().view(x1.size(0), -1, self.total_dim)
        print('output:',output.shape)
        output = self.linear_out(output)
        
        return output

class MLPWithMultiHeadCrossAttention(nn.Module):
    def __init__(self, input_dim1, input_dim2, num_heads, head_dim, hidden_dim, num_classes):
        super(MLPWithMultiHeadCrossAttention, self).__init__()
        self.multi_head_attention = MultiHeadCrossAttention(input_dim1, input_dim2, num_heads, head_dim)
        self.fc1 = nn.Linear(num_heads * head_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x1, x2):
        print('x1:',x1.shape)
        print('x2:',x2.shape)
        
        # Multi-Head Cross Attention
        attention_output = self.multi_head_attention(x1, x2)
        print('attention_output:',attention_output.shape)
        # Flatten and pass through MLP layers
        flattened_output = attention_output.view(attention_output.size(0), -1)
        print('flattened_output:',flattened_output.shape)
        out = self.fc1(flattened_output)
        out = self.relu(out)
        out = self.fc2(out)
        
        return out

# 示例用法
input_dim1 = 1280
input_dim2 = 300
num_heads = 4
head_dim = 64
hidden_dim = 256
num_classes = 10  # 替换为你的分类类别数
num_epoches=200

model = MLPWithMultiHeadCrossAttention(input_dim1, input_dim2, num_heads, head_dim, hidden_dim, num_classes)
print('model:',model)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 输入示例
#x1 = torch.randn(32, 10, input_dim1)  # 输入1，形状为(batch_size, sequence_length, input_dim1)
#x2 = torch.randn(32, 20, input_dim2)  # 输入2，形状为(batch_size, sequence_length, input_dim2)

#output = model(x1, x2)
#print("输出形状:", output.shape)

model: MLPWithMultiHeadCrossAttention(
  (multi_head_attention): MultiHeadCrossAttention(
    (linear_q1): Linear(in_features=1280, out_features=256, bias=True)
    (linear_k2): Linear(in_features=300, out_features=256, bias=True)
    (linear_v1): Linear(in_features=1280, out_features=256, bias=True)
    (linear_out): Linear(in_features=256, out_features=256, bias=True)
  )
  (fc1): Linear(in_features=256, out_features=256, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=256, out_features=10, bias=True)
)


In [41]:
import numpy as np

# 生成 2x3 的随机矩阵
#a=np.random.randint(100,1028)
#a = np.random.rand(100, 1028)
#print(a)
a=np.random.randint(low=0,high=1000,size=(100,27,1280),dtype='int')
a=torch.from_numpy(a)
a=torch.tensor(a,dtype=torch.float)

#b=np.random.rand(100,300)
#print(b)
b=np.random.randint(low=0,high=1000,size=(100,35,300),dtype='int')
b=torch.from_numpy(b)
b=torch.tensor(b,dtype=torch.float)

label=np.random.randint(low=6,high=10,size=(100),dtype='int')
#print(label,label.shape)
label=torch.from_numpy(label)

  if __name__ == '__main__':
  from ipykernel import kernelapp as app


In [42]:
import torch
from torch.utils.data import Dataset,DataLoader


class myDataset(Dataset):
    def __init__(self,a,b,label):
        #super.__init__()
        self.a=a
        self.b=b
        self.label=label
    def __getitem__(self,idx):
        x1=self.a[idx]
        x2=self.b[idx]
        label=self.label[idx]
        return (x1,x2,label)
    def __len__(self,):
        return len(self.a)
        

In [43]:
train_dataset=myDataset(a,b,label)
train_loader=DataLoader(train_dataset,batch_size=32,num_workers=0,shuffle=True)

In [44]:
for epoch in range(num_epoches):
    for x1_batch, x2_batch, labels in train_loader:  # dataloader是你的数据加载器
        optimizer.zero_grad()
        outputs = model(x1_batch, x2_batch)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

x1: torch.Size([32, 27, 1280])
x2: torch.Size([32, 35, 300])
q1: torch.Size([32, 27, 256])
k2: torch.Size([32, 35, 256])
v1: torch.Size([32, 27, 256])
v1_split: torch.Size([32, 4, 27, 64])
q1_split: torch.Size([32, 4, 27, 64])
k2_split: torch.Size([32, 4, 35, 64])
attn_weight: torch.Size([32, 4, 27, 35])
v1: torch.Size([32, 4, 27, 64])


RuntimeError: Expected batch2_sizes[0] == bs && batch2_sizes[1] == contraction_size to be true, but got false.  (Could this error message be improved?  If so, please report an enhancement request to PyTorch.)

In [14]:
q1=torch.randn(10,4,10,64)
k2=torch.randn(10,4,10,64)
v1=torch.randn(10,4,10,64)
#print(q1)
#print(k2)
head_dim=4
attn_weights = torch.matmul(q1, k2.permute(0, 1, 3, 2)) / (head_dim ** 0.5)
print(attn_weights.shape)
output = torch.matmul(attn_weights, v1)
print(output.shape)

torch.Size([10, 4, 10, 10])
torch.Size([10, 4, 10, 64])


In [10]:
import torch

# 创建两个四维张量，形状分别为 (batch_size, channels, height, width)
input1 = torch.randn(32, 3, 64, 64)
input2 = torch.randn(32, 3, 64, 64)

# 执行矩阵乘法操作
result = torch.matmul(input1, input2)

# 输出结果的形状
print(result.shape)

torch.Size([32, 3, 64, 64])


In [108]:
class MultiHeadAttention(nn.Module):
    # n_heads：多头注意力的数量
    # hid_dim：每个词输出的向量维度
    def __init__(self, input_dim1, input_dim2, n_heads,head_dim, dropout=0.4):
        super(MultiHeadAttention, self).__init__()
        self.head_dim = head_dim
        self.n_heads = n_heads

        # 强制 hid_dim 必须整除 h
        assert head_dim % n_heads == 0
        # 定义 W_q 矩阵
        self.w_q = nn.Linear(input_dim1, head_dim)
        # 定义 W_k 矩阵
        self.w_k = nn.Linear(input_dim2, head_dim)
        # 定义 W_v 矩阵
        self.w_v = nn.Linear(input_dim1, head_dim)
        self.fc = nn.Linear(head_dim, head_dim)
        self.do = nn.Dropout(dropout)
        # 缩放
        self.scale = torch.sqrt(torch.FloatTensor([head_dim // n_heads]))

    def forward(self, query, key, value, mask=None):
        # K: [64,10,300], batch_size 为 64，有 12 个词，每个词的 Query 向量是 300 维
        # V: [64,10,300], batch_size 为 64，有 10 个词，每个词的 Query 向量是 300 维
        # Q: [64,12,300], batch_size 为 64，有 10 个词，每个词的 Query 向量是 300 维
        bsz = query.shape[0]
        Q = self.w_q(query)
        K = self.w_k(key)
        V = self.w_v(value)
        print('Q,K,V:',Q.shape,K.shape,V.shape)
        # 这里把 K Q V 矩阵拆分为多组注意力，变成了一个 4 维的矩阵
        # 最后一维就是是用 self.hid_dim // self.n_heads 来得到的，表示每组注意力的向量长度, 每个 head 的向量长度是：300/6=50
        # 64 表示 batch size，6 表示有 6组注意力，10 表示有 10 词，50 表示每组注意力的词的向量长度
        # K: [64,10,300] 拆分多组注意力 -> [64,10,6,50] 转置得到 -> [64,6,10,50]
        # V: [64,10,300] 拆分多组注意力 -> [64,10,6,50] 转置得到 -> [64,6,10,50]
        # Q: [64,12,300] 拆分多组注意力 -> [64,12,6,50] 转置得到 -> [64,6,12,50]
        # 转置是为了把注意力的数量 6 放到前面，把 10 和 50 放到后面，方便下面计算
        Q = Q.view(bsz, -1, self.n_heads, self.head_dim //
                   self.n_heads).permute(0, 2, 1, 3)
        print('Q:',Q.shape)
        K = K.view(bsz, -1, self.n_heads, self.head_dim //
                   self.n_heads).permute(0, 2, 1, 3)
        V = V.view(bsz, -1, self.n_heads, self.head_dim //
                   self.n_heads).permute(0, 2, 1, 3)
        print('Q,K,V:',Q.shape,K.shape,V.shape)
        # 第 1 步：Q 乘以 K的转置，除以scale
        # [64,6,12,50] * [64,6,50,10] = [64,6,12,10]
        # attention：[64,6,12,10]
        attention = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
        print('attention:',attention.shape)
        # 把 mask 不为空，那么就把 mask 为 0 的位置的 attention 分数设置为 -1e10
        if mask is not None:
            attention = attention.masked_fill(mask == 0, -1e10)

        # 第 2 步：计算上一步结果的 softmax，再经过 dropout，得到 attention。
        # 注意，这里是对最后一维做 softmax，也就是在输入序列的维度做 softmax
        # attention: [64,6,12,10]
        attention = self.do(torch.softmax(attention, dim=-1))

        # 第三步，attention结果与V相乘，得到多头注意力的结果
        # [64,6,12,10] * [64,6,10,50] = [64,6,12,50]
        # x: [64,6,12,50]
        #x = torch.matmul(attention, V)
        x = torch.matmul(V,attention)
        # 因为 query 有 12 个词，所以把 12 放到前面，把 5 和 60 放到后面，方便下面拼接多组的结果
        # x: [64,6,12,50] 转置-> [64,12,6,50]
        x = x.permute(0, 2, 1, 3).contiguous()
        # 这里的矩阵转换就是：把多组注意力的结果拼接起来
        # 最终结果就是 [64,12,300]
        # x: [64,12,6,50] -> [64,12,300]
        x = x.view(bsz, -1, self.n_heads * (self.head_dim // self.n_heads))
        x = self.fc(x)
        return x


# batch_size 为 64，有 12 个词，每个词的 Query 向量是 300 维
#query = torch.rand(64, 12, 300)
# batch_size 为 64，有 12 个词，每个词的 Key 向量是 300 维
#key = torch.rand(64, 10, 1280)
# batch_size 为 64，有 10 个词，每个词的 Value 向量是 300 维
#value = torch.rand(64, 12, 300)
#attention = MultiheadAttention(hid_dim=300, n_heads=6, dropout=0.1)
#output = attention(query, key, value)
## output: torch.Size([64, 12, 300])
#print(output.shape)
#————————————————
#版权声明：本文为CSDN博主「小郭小郭学富五车」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。
#原文链接：https://blog.csdn.net/qq_42750193/article/details/122715902

In [109]:
class MLPWithMultiHeadCrossAttention(nn.Module):
    def __init__(self, input_dim1, input_dim2, num_heads, head_dim, hidden_dim, num_classes):
        super(MLPWithMultiHeadCrossAttention, self).__init__()
        self.multi_head_attention = MultiHeadAttention(input_dim1, input_dim2, num_heads, head_dim)
        self.fc1 = nn.Linear(num_heads * head_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x1, x2):
        print('x1:',x1.shape)
        print('x2:',x2.shape)
        
        # Multi-Head Cross Attention
        attention_output = self.multi_head_attention(x1, x2,x1)
        print('attention_output:',attention_output.shape)
        # Flatten and pass through MLP layers
        flattened_output = attention_output.view(attention_output.size(0), -1)
        print('flattened_output:',flattened_output.shape)
        out = self.fc1(flattened_output)
        out = self.relu(out)
        out = self.fc2(out)
        
        return out

# 示例用法
input_dim1 = 1280
input_dim2 = 300
num_heads = 4
head_dim = 64
hidden_dim = 256
num_classes = 10  # 替换为你的分类类别数
num_epoches=200

model = MLPWithMultiHeadCrossAttention(input_dim1, input_dim2, num_heads, head_dim, hidden_dim, num_classes)
print('model:',model)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 输入示例
#x1 = torch.randn(32, 10, input_dim1)  # 输入1，形状为(batch_size, sequence_length, input_dim1)
#x2 = torch.randn(32, 20, input_dim2)  # 输入2，形状为(batch_size, sequence_length, input_dim2)

#output = model(x1, x2)
#print("输出形状:", output.shape)

model: MLPWithMultiHeadCrossAttention(
  (multi_head_attention): MultiHeadAttention(
    (w_q): Linear(in_features=1280, out_features=64, bias=True)
    (w_k): Linear(in_features=300, out_features=64, bias=True)
    (w_v): Linear(in_features=1280, out_features=64, bias=True)
    (fc): Linear(in_features=64, out_features=64, bias=True)
    (do): Dropout(p=0.4, inplace=False)
  )
  (fc1): Linear(in_features=256, out_features=256, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=256, out_features=10, bias=True)
)


In [110]:
import numpy as np

# 生成 2x3 的随机矩阵
#a=np.random.randint(100,1028)
#a = np.random.rand(100, 1028)
#print(a)
a=np.random.randint(low=0,high=1000,size=(100,27,1280),dtype='int')
a=torch.from_numpy(a)
a=torch.tensor(a,dtype=torch.float)

#b=np.random.rand(100,300)
#print(b)
b=np.random.randint(low=0,high=1000,size=(100,35,300),dtype='int')
b=torch.from_numpy(b)
b=torch.tensor(b,dtype=torch.float)

label=np.random.randint(low=6,high=10,size=(100),dtype='int')
#print(label,label.shape)
label=torch.from_numpy(label)

  if __name__ == '__main__':
  from ipykernel import kernelapp as app


In [112]:
import torch
from torch.utils.data import Dataset,DataLoader


class myDataset(Dataset):
    def __init__(self,a,b,label):
        #super.__init__()
        self.a=a
        self.b=b
        self.label=label
    def __getitem__(self,idx):
        x1=self.a[idx]
        x2=self.b[idx]
        label=self.label[idx]
        return (x1,x2,label)
    def __len__(self,):
        return len(self.a)
        

In [113]:
train_dataset=myDataset(a,b,label)
train_loader=DataLoader(train_dataset,batch_size=32,num_workers=0,shuffle=True)

In [114]:
for epoch in range(num_epoches):
    for x1_batch, x2_batch, labels in train_loader:  # dataloader是你的数据加载器
        optimizer.zero_grad()
        outputs = model(x1_batch, x2_batch)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

x1: torch.Size([32, 27, 1280])
x2: torch.Size([32, 35, 300])
Q,K,V: torch.Size([32, 27, 64]) torch.Size([32, 35, 64]) torch.Size([32, 27, 64])
Q: torch.Size([32, 4, 27, 16])
Q,K,V: torch.Size([32, 4, 27, 16]) torch.Size([32, 4, 35, 16]) torch.Size([32, 4, 27, 16])
attention: torch.Size([32, 4, 27, 35])


RuntimeError: Expected batch2_sizes[0] == bs && batch2_sizes[1] == contraction_size to be true, but got false.  (Could this error message be improved?  If so, please report an enhancement request to PyTorch.)