In [2]:
# 多模态模型
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torch import nn
from torch.optim import Adam
from transformers import BertTokenizer, BertModel
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split
import os
import csv

# 数据加载部分
def load_labels(filename):
    labels = {}
    with open(filename, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        next(reader)  # 跳过标题行
        for row in reader:
            guid, tag = row
            labels[guid] = tag
    return labels

def load_data(data_dir, labels):
    data = []
    for guid, tag in labels.items():
        text_file = os.path.join(data_dir, f"{guid}.txt")
        image_file = os.path.join(data_dir, f"{guid}.jpg")

        # 读取文本数据
        with open(text_file, 'r', encoding='utf-8', errors='replace') as file:
            text_data = file.read()

        # 读取图像数据
        image_data = Image.open(image_file)

        data.append((guid, text_data, image_data, tag))
    return data

# 使用数据加载函数
labels = load_labels("train.txt")
data_dir = 'data'
data = load_data(data_dir, labels)

# 划分数据集
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)

# 图像转换函数
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 自定义数据集类
class MultimodalDataset(Dataset):
    def __init__(self, data, tokenizer, max_text_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_text_len = max_text_len
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _, text, image, tag = self.data[idx]

        # 处理图像
        image = image_transform(image)

        # 处理文本
        text = self.tokenizer(text, padding='max_length', max_length=self.max_text_len, truncation=True, return_tensors="pt")

        # 获取标签
        label = self.label_mapping[tag]

        return image, text, label

# 自定义多模态网络类
class MultimodalNetwork(nn.Module):
    def __init__(self):
        super(MultimodalNetwork, self).__init__()
        self.image_model = models.resnet50(pretrained=True)
        self.text_model = BertModel.from_pretrained('bert-base-uncased')

        # 冻结ResNet50的参数
        for param in self.image_model.parameters():
            param.requires_grad = False

        # 修改ResNet50的最后一层以提取特征，而不是进行分类
        self.image_model.fc = nn.Identity()

        # 定义组合特征后的分类层
        self.classifier = nn.Linear(2048 + 768, 3)

    def forward(self, images, input_ids, attention_mask):
        # 提取图像特征
        img_features = self.image_model(images)

        # 提取文本特征
        text_outputs = self.text_model(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output

        # 合并特征
        combined_features = torch.cat((img_features, text_features), dim=1)

        # 分类
        logits = self.classifier(combined_features)

        return logits

# 为训练集和验证集创建 DataLoader
train_dataset = MultimodalDataset(train_data, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)

val_dataset = MultimodalDataset(val_data, tokenizer)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 模型、损失函数和优化器
model = MultimodalNetwork().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=1e-4)

# 训练循环
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    for i, (images, texts, labels) in enumerate(train_dataloader):
        images = images.to(device)
        input_ids = texts['input_ids'].squeeze(1).to(device)
        attention_mask = texts['attention_mask'].squeeze(1).to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images, input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        if (i + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_dataloader)}], Loss: {loss.item()}')

    print(f'Epoch [{epoch + 1}/{num_epochs}] Finished Training')

    def evaluate_model(model, dataloader, file_name="error_analysis.txt"):
        model.eval()  # 设置模型为评估模式
        correct = 0
        total = 0

        with torch.no_grad(), open(file_name, "w") as file:
            for images, texts, labels in dataloader:
                images = images.to(device)
                input_ids = texts['input_ids'].squeeze(1).to(device)
                attention_mask = texts['attention_mask'].squeeze(1).to(device)
                labels = labels.to(device)

                outputs = model(images, input_ids, attention_mask)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                # 收集并记录错误预测的样本
                mismatches = (predicted != labels).nonzero(as_tuple=False)
                for idx in mismatches:
                    actual_label = labels[idx].item()
                    predicted_label = predicted[idx].item()
                    file.write(f"Actual Label: {actual_label}, Predicted Label: {predicted_label}\n")

        accuracy = 100 * correct / total
        print(f'Accuracy of the model on the validation set: {accuracy} %')

    # 在每个epoch后使用修改后的评估函数
    evaluate_model(model, val_dataloader)

evaluate_model(model, val_dataloader)

# 加载测试数据
def load_test_data(test_file, data_dir):
    test_data = []
    with open(test_file, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        next(reader)  # 跳过标题行
        for row in reader:
            guid = row[0]
            text_file = os.path.join(data_dir, f"{guid}.txt")
            image_file = os.path.join(data_dir, f"{guid}.jpg")

            # 读取文本数据
            with open(text_file, 'r', encoding='utf-8', errors='replace') as file:
                text_data = file.read()

            # 读取图像数据
            image_data = Image.open(image_file)

            test_data.append((guid, text_data, image_data))
    return test_data

test_data = load_test_data('test_without_label.txt', data_dir)

# 创建测试数据集
class TestDataset(Dataset):
    def __init__(self, data, tokenizer, max_text_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_text_len = max_text_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        guid, text, image = self.data[idx]

        # 处理图像
        image = image_transform(image)

        # 处理文本
        text = self.tokenizer(text, padding='max_length', max_length=self.max_text_len, truncation=True, return_tensors="pt")

        return guid, image, text

test_dataset = TestDataset(test_data, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 预测函数
def predict(model, dataloader):
    model.eval()
    predictions = {}
    with torch.no_grad():
        for guid, images, texts in dataloader:
            images = images.to(device)
            input_ids = texts['input_ids'].squeeze(1).to(device)
            attention_mask = texts['attention_mask'].squeeze(1).to(device)

            outputs = model(images, input_ids, attention_mask)
            _, predicted = torch.max(outputs.data, 1)

            for g, p in zip(guid, predicted):
                predictions[g] = p.item()

    return predictions

# 进行预测
model_predictions = predict(model, test_dataloader)

# 转换预测结果为标签
predicted_labels = {guid: ['negative', 'neutral', 'positive'][label] for guid, label in model_predictions.items()}

print(predicted_labels)

# 将预测结果写入新文件
output_file = 'predicted_labels_0128.txt'

with open(output_file, 'w', encoding='utf-8') as file:
    file.write('guid,tag\n')

    # 写入预测的标签
    for guid, label in predicted_labels.items():
        file.write(f'{guid},{label}\n')

print(f'预测结果已保存到文件：{output_file}')

Epoch [1/10], Step [100/200], Loss: 0.7988011240959167
Epoch [1/10], Step [200/200], Loss: 0.5584688186645508
Epoch [1/10] Finished Training
Accuracy of the model on the validation set: 67.125 %
Epoch [2/10], Step [100/200], Loss: 0.6507971286773682
Epoch [2/10], Step [200/200], Loss: 0.6159613132476807
Epoch [2/10] Finished Training
Accuracy of the model on the validation set: 63.75 %
Epoch [3/10], Step [100/200], Loss: 0.7071638107299805
Epoch [3/10], Step [200/200], Loss: 0.7235562801361084
Epoch [3/10] Finished Training
Accuracy of the model on the validation set: 70.5 %
Epoch [4/10], Step [100/200], Loss: 0.3674232065677643
Epoch [4/10], Step [200/200], Loss: 0.20071054995059967
Epoch [4/10] Finished Training
Accuracy of the model on the validation set: 65.25 %
Epoch [5/10], Step [100/200], Loss: 0.9101855158805847
Epoch [5/10], Step [200/200], Loss: 1.0329341888427734
Epoch [5/10] Finished Training
Accuracy of the model on the validation set: 61.625 %
Epoch [6/10], Step [100/200]

In [1]:
#消融实验 单个epoch
import torch
from torchvision.models import resnet50, ResNet50_Weights
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torch import nn
from torch.optim import Adam
from transformers import BertTokenizer, BertModel
from PIL import Image
from sklearn.model_selection import train_test_split
import os
import csv

# 数据加载部分
def load_labels(filename):
    labels = {}
    with open(filename, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        next(reader)  # 跳过标题行
        for row in reader:
            guid, tag = row
            labels[guid] = tag
    return labels

def load_data(data_dir, labels):
    data = []
    for guid, tag in labels.items():
        text_file = os.path.join(data_dir, f"{guid}.txt")
        image_file = os.path.join(data_dir, f"{guid}.jpg")
        with open(text_file, 'r', encoding='utf-8', errors='replace') as file:
            text_data = file.read()
        image_data = Image.open(image_file)
        data.append((guid, text_data, image_data, tag))
    return data

labels = load_labels("train.txt")
data_dir = 'data'
data = load_data(data_dir, labels)
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# 图像转换函数
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 仅文本数据的数据集类
class TextOnlyDataset(Dataset):
    def __init__(self, data, tokenizer, max_text_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_text_len = max_text_len
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _, text, _, tag = self.data[idx]
        text = self.tokenizer(text, padding='max_length', max_length=self.max_text_len, truncation=True, return_tensors="pt")
        label = self.label_mapping[tag]
        return text, label
    
class ImageOnlyDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _, _, image, tag = self.data[idx]
        image = image_transform(image)
        label = self.label_mapping[tag]
        return image, label

# 仅文本的模型
class TextOnlyModel(nn.Module):
    def __init__(self):
        super(TextOnlyModel, self).__init__()
        self.text_model = BertModel.from_pretrained('bert-base-uncased')
        self.classifier = nn.Linear(768, 3) 

    def forward(self, input_ids, attention_mask):
        text_outputs = self.text_model(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output
        logits = self.classifier(text_features)
        return logits

# 仅图像的模型
class ImageOnlyModel(nn.Module):
    def __init__(self):
        super(ImageOnlyModel, self).__init__()
        self.image_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        # 冻结ResNet50的参数
        for param in self.image_model.parameters():
            param.requires_grad = False
        self.image_model.fc = nn.Linear(self.image_model.fc.in_features, 3)  # 修改为分类层

    def forward(self, images):
        logits = self.image_model(images)
        return logits

# 创建DataLoader
text_only_train_dataset = TextOnlyDataset(train_data, tokenizer)
text_only_train_dataloader = DataLoader(text_only_train_dataset, batch_size=16, shuffle=True)

image_only_train_dataset = ImageOnlyDataset(train_data)
image_only_train_dataloader = DataLoader(image_only_train_dataset, batch_size=16, shuffle=True)

# 训练和评估函数
def train_model(model, dataloader, criterion, optimizer, num_epochs=1, is_text_model=False):
    model.train()  # 将模型设置为训练模式
    for epoch in range(num_epochs):
        for i, batch in enumerate(dataloader):
            # 如果是文本模型
            if is_text_model:
                texts, labels = batch
                input_ids = texts['input_ids'].squeeze(1).to(device)
                attention_mask = texts['attention_mask'].squeeze(1).to(device)
                labels = labels.to(device)
                outputs = model(input_ids, attention_mask)  # 传递正确的参数
            else:
                # 如果是图像模型
                images, labels = batch
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)

            # 重置梯度
            optimizer.zero_grad()

            # 前向传播
            loss = criterion(outputs, labels)

            # 反向传播和优化
            loss.backward()
            optimizer.step()

            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(dataloader)}], Loss: {loss.item()}')

    print(f'Epoch [{epoch + 1}/{num_epochs}] Finished Training')

def evaluate_model(model, dataloader, is_text_model=False):
    model_type = "Text" if is_text_model else "Image"
    model.eval()  # 将模型设置为评估模式
    total = 0
    correct = 0
    with torch.no_grad():
        for batch in dataloader:
            if is_text_model:
                texts, labels = batch
                input_ids = texts['input_ids'].squeeze(1).to(device)
                attention_mask = texts['attention_mask'].squeeze(1).to(device)
                labels = labels.to(device)
                outputs = model(input_ids, attention_mask)
            else:
                images, labels = batch
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Accuracy of the {model_type} Model on the validation set: {accuracy} %')

# 初始化模型、损失函数和优化器
text_model = TextOnlyModel().to(device)
image_model = ImageOnlyModel().to(device)

criterion = nn.CrossEntropyLoss()
text_optimizer = Adam(text_model.parameters(), lr=1e-4)
image_optimizer = Adam(image_model.parameters(), lr=1e-4)

# 训练和评估
train_model(text_model, text_only_train_dataloader, criterion, text_optimizer, is_text_model=True)
evaluate_model(text_model, text_only_train_dataloader, is_text_model=True)


train_model(image_model, image_only_train_dataloader, criterion, image_optimizer)
evaluate_model(image_model, image_only_train_dataloader)

  from .autonotebook import tqdm as notebook_tqdm


cuda
Epoch [1/1], Step [100/200], Loss: 1.0511832237243652
Epoch [1/1], Step [200/200], Loss: 0.6059260368347168
Epoch [1/1] Finished Training
Accuracy of the Text Model on the validation set: 76.84375 %
Epoch [1/1], Step [100/200], Loss: 0.9519345760345459
Epoch [1/1], Step [200/200], Loss: 0.8102669715881348
Epoch [1/1] Finished Training
Accuracy of the Image Model on the validation set: 61.71875 %


In [1]:
#消融实验
import torch
from torchvision.models import resnet50, ResNet50_Weights
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torch import nn
from torch.optim import Adam
from transformers import BertTokenizer, BertModel
from PIL import Image
from sklearn.model_selection import train_test_split
import os
import csv

# 数据加载部分
def load_labels(filename):
    labels = {}
    with open(filename, mode='r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        next(reader)  # 跳过标题行
        for row in reader:
            guid, tag = row
            labels[guid] = tag
    return labels

def load_data(data_dir, labels):
    data = []
    for guid, tag in labels.items():
        text_file = os.path.join(data_dir, f"{guid}.txt")
        image_file = os.path.join(data_dir, f"{guid}.jpg")
        with open(text_file, 'r', encoding='utf-8', errors='replace') as file:
            text_data = file.read()
        image_data = Image.open(image_file)
        data.append((guid, text_data, image_data, tag))
    return data

labels = load_labels("train.txt")
data_dir = 'data'
data = load_data(data_dir, labels)

train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# 图像转换函数
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 仅文本数据的数据集类
class TextOnlyDataset(Dataset):
    def __init__(self, data, tokenizer, max_text_len=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_text_len = max_text_len
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _, text, _, tag = self.data[idx]
        text = self.tokenizer(text, padding='max_length', max_length=self.max_text_len, truncation=True, return_tensors="pt")
        label = self.label_mapping[tag]
        return text, label
    
class ImageOnlyDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        _, _, image, tag = self.data[idx]
        image = image_transform(image)
        label = self.label_mapping[tag]
        return image, label

# 仅文本的模型
class TextOnlyModel(nn.Module):
    def __init__(self):
        super(TextOnlyModel, self).__init__()
        self.text_model = BertModel.from_pretrained('bert-base-uncased')
        self.classifier = nn.Linear(768, 3)  # 假设BERT的输出维度是768

    def forward(self, input_ids, attention_mask):
        text_outputs = self.text_model(input_ids=input_ids, attention_mask=attention_mask)
        text_features = text_outputs.pooler_output
        logits = self.classifier(text_features)
        return logits

# 仅图像的模型
class ImageOnlyModel(nn.Module):
    def __init__(self):
        super(ImageOnlyModel, self).__init__()
        self.image_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        # 冻结ResNet50的参数
        for param in self.image_model.parameters():
            param.requires_grad = False
        self.image_model.fc = nn.Linear(self.image_model.fc.in_features, 3)  # 修改为分类层

    def forward(self, images):
        logits = self.image_model(images)
        return logits

# 创建DataLoader
text_only_train_dataset = TextOnlyDataset(train_data, tokenizer)
text_only_train_dataloader = DataLoader(text_only_train_dataset, batch_size=16, shuffle=True)

image_only_train_dataset = ImageOnlyDataset(train_data)
image_only_train_dataloader = DataLoader(image_only_train_dataset, batch_size=16, shuffle=True)

# 训练和评估函数
def train_model(model, dataloader, criterion, optimizer, num_epochs=10, is_text_model=False):
    model.train()  # 将模型设置为训练模式
    for epoch in range(num_epochs):
        for i, batch in enumerate(dataloader):
            # 如果是文本模型
            if is_text_model:
                texts, labels = batch
                input_ids = texts['input_ids'].squeeze(1).to(device)
                attention_mask = texts['attention_mask'].squeeze(1).to(device)
                labels = labels.to(device)
                outputs = model(input_ids, attention_mask)  # 传递正确的参数
            else:
                # 如果是图像模型
                images, labels = batch
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)

            # 重置梯度
            optimizer.zero_grad()

            # 前向传播
            loss = criterion(outputs, labels)

            # 反向传播和优化
            loss.backward()
            optimizer.step()

            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(dataloader)}], Loss: {loss.item()}')

    print(f'Epoch [{epoch + 1}/{num_epochs}] Finished Training')

def evaluate_model(model, dataloader, is_text_model=False):
    model_type = "Text" if is_text_model else "Image"
    model.eval()  # 将模型设置为评估模式
    total = 0
    correct = 0
    with torch.no_grad():
        for batch in dataloader:
            if is_text_model:
                texts, labels = batch
                input_ids = texts['input_ids'].squeeze(1).to(device)
                attention_mask = texts['attention_mask'].squeeze(1).to(device)
                labels = labels.to(device)
                outputs = model(input_ids, attention_mask)
            else:
                images, labels = batch
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Accuracy of the {model_type} Model on the validation set: {accuracy} %')

# 初始化模型、损失函数和优化器
text_model = TextOnlyModel().to(device)
image_model = ImageOnlyModel().to(device)

criterion = nn.CrossEntropyLoss()
text_optimizer = Adam(text_model.parameters(), lr=1e-4)
image_optimizer = Adam(image_model.parameters(), lr=1e-4)

# 训练和评估
train_model(text_model, text_only_train_dataloader, criterion, text_optimizer, is_text_model=True)
evaluate_model(text_model, text_only_train_dataloader, is_text_model=True)


train_model(image_model, image_only_train_dataloader, criterion, image_optimizer)
evaluate_model(image_model, image_only_train_dataloader)

  from .autonotebook import tqdm as notebook_tqdm


cuda
Epoch [1/10], Step [100/200], Loss: 1.0498110055923462
Epoch [1/10], Step [200/200], Loss: 0.7482496500015259
Epoch [2/10], Step [100/200], Loss: 0.7246271967887878
Epoch [2/10], Step [200/200], Loss: 0.7929370403289795
Epoch [3/10], Step [100/200], Loss: 0.9160317182540894
Epoch [3/10], Step [200/200], Loss: 1.2740421295166016
Epoch [4/10], Step [100/200], Loss: 1.007752537727356
Epoch [4/10], Step [200/200], Loss: 0.9982258677482605
Epoch [5/10], Step [100/200], Loss: 0.9333962202072144
Epoch [5/10], Step [200/200], Loss: 1.058244228363037
Epoch [6/10], Step [100/200], Loss: 1.0649231672286987
Epoch [6/10], Step [200/200], Loss: 1.184614896774292
Epoch [7/10], Step [100/200], Loss: 0.991972029209137
Epoch [7/10], Step [200/200], Loss: 0.7762249708175659
Epoch [8/10], Step [100/200], Loss: 0.9536113739013672
Epoch [8/10], Step [200/200], Loss: 1.0249760150909424
Epoch [9/10], Step [100/200], Loss: 0.8444915413856506
Epoch [9/10], Step [200/200], Loss: 0.8965730667114258
Epoch [10