In [1]:
import pandas as pd
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer
import argparse
from torchvision.models import resnet50

  warn(


In [57]:
#获取训练文本
def get_texts_from_textsPath(folder_path,df):
    texts=[]
    for i in df['guid']:
        file = "./data/"+str(i)+".txt"
        with open(file, "r",encoding="GB18030") as infile:
            content = infile.read()
            texts.append(content)
    return texts

In [None]:
#获取训练图片
def get_valid_imagesPath_from_directory(folder_path ,df):
    image_paths = []
    for ind in df['guid']:
        image_path = folder_path+str(ind)+".jpg"
        image = cv2.imread(image_path)
        height,width,channels = image.shape
        image_paths.append(image_path)
    
    return image_paths

In [58]:
#数据预处理

train_label_path = "train.txt"
train_label_df = pd.read_csv(train_label_path,sep=",")

#将情感标签替换为数字
column_dict = {"positive": 0, "negative": 1,"neutral":2}
new_df = train_label_df.replace({"tag": column_dict})
labels = list(new_df['tag'])

In [60]:
# 原始数据
image_paths = get_valid_imagesPath_from_directory("./data/",new_df)
texts = get_texts_from_textsPath("./data/",new_df)
print(texts[:10])
print(image_paths[:10])

['RT @AmitSwami77: The conspirators have an evil eye & are now set to physically attack Asaram Bapu Ji! #WeDemandSafety4Bapuji http://t.co/N8…\n', 'Waxwing trills, Chickadees calling "here sweetie", enthusiastic athletes, blue sky & snow at #ualbertafarm #UAlberta \n', '@NYSE is looking a little despondent today...??? http://t.co/o5xiKyJgT7\n', 'FERVENT | S,M,L | 140k free PLASTIC CLIP, keychain rubber AND sticker 085725737197 / 28ae36f3 \n', 'Nice day chilling in the park yesterday relieved my mood for a short while. #friends #summer #outside #depression \n', 'Ford : F-350 Lariat 6.4L 2008 Lariat Heated Leather Rear Camera 2008 ford f 250 diesel 4 x… \n', 'RT @MOVIEMEMORlES: Furious 7 http://t.co/CEPxKf3QlY\n', '@MattSmith1230 @ProFlowers The flowers look like a dejected King Tritan: \n', '#廃墟 #廃線 #abandoned #写真撮ってる人と繋がりたい #写真好きな人と繋がりたい \n', "RT @Pablothemako: UPDATE!Navy discarded ilegal fishing after boarding chinese vessels in #Chile's Excl Econ Zone htt… \n"]
['./data/4597.jpg', '

In [62]:
# 划分训练集
image_paths_train, image_paths_val, texts_train, texts_val, labels_train, labels_val = train_test_split(
    image_paths, texts, labels, test_size=0.2, random_state=5)

In [64]:
#文本预处理
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')#分词器
pretrained_model = BertModel.from_pretrained("bert-base-multilingual-cased")

max_length = 147

def text_preprocess(texts):
    tokenized_texts = [tokenizer(text,padding='max_length',max_length=max_length,truncation=True,return_tensors="pt") for text in texts]
    return tokenized_texts

Some weights of the model checkpoint at bert-base-multilingual-cased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [65]:
tokenized_texts_train = text_preprocess(texts_train)
tokenized_texts_val = text_preprocess(texts_val)

In [7]:
# 图像数据预处理
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(), 
])

In [66]:
# 定义训练集类
class Dataset(Dataset):
    def __init__(self, image_paths, tokenized_texts, labels,transform=None):
        self.image_paths = image_paths     
        self.transform = transform
        self.input_ids = [x['input_ids'] for x in tokenized_texts]
        self.attention_mask = [x['attention_mask'] for x in tokenized_texts]
        self.labels = labels

    def __getitem__(self, index):
        input_ids = torch.tensor(self.input_ids[index])
        attention_mask = torch.tensor(self.attention_mask[index])
        labels = torch.tensor(self.labels[index])
        image_path = self.image_paths[index]
        image = Image.open(image_path)
        image = self.transform(image)
        return image ,input_ids, attention_mask, labels
    def __len__(self):
        return len(self.input_ids)

In [67]:
dataset_train = Dataset(image_paths_train, tokenized_texts_train, labels_train, transform)
dataset_val = Dataset(image_paths_val,tokenized_texts_val, labels_val, transform)

#run test
x_train = Dataset(image_paths_train[:1000], tokenized_texts_train[:1000], labels_train[:1000], transform)
x_val = Dataset(image_paths_val[:200],tokenized_texts_val[:200], labels_val[:200], transform)

In [16]:
# 图片特征提取模型
class ImageFeatureExtractor(nn.Module):
    def __init__(self):
        super(ImageFeatureExtractor, self).__init__()
        self.resnet = resnet50(pretrained=True) 
    
    def forward(self, image):
        features = self.resnet(image)
        return features

In [17]:
# 文本特征提取模型
class TextFeatureExtractor(nn.Module):
    def __init__(self):
        super(TextFeatureExtractor, self).__init__()
        self.bert = pretrained_model

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  
        output = pooled_output
        return output

In [18]:
# 多模态融合模型定义
class MixModel(nn.Module):
    def __init__(self, num_classes,option):
        super(MixModel, self).__init__()
        self.image_extractor = ImageFeatureExtractor()  
        self.text_encoder = TextFeatureExtractor()
        self.option=option
        
        #仅图像
        self.classifier0 = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(1000, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.4),
             nn.Linear(256, num_classes),
            nn.ReLU(inplace=True),
           
        )
        #仅文本
        self.classifier1 = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(768, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.4),
            nn.Linear(256, num_classes),
            nn.ReLU(inplace=True),
        )
        #多模态融合
        self.classifier2 = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(1768, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.4),
            nn.Linear(1024, num_classes),
            nn.ReLU(inplace=True),
        )

    
    def forward(self, image, input_ids,attention_mask):
        if(self.option==0):#图
            image_features = self.image_extractor(image)
            output = image_features
            output = self.classifier0(image_features)
        elif(self.option==1):#文本
            text_features = self.text_encoder(input_ids, attention_mask)
            output = self.classifier1(text_features)
        else:#图文
            image_features = self.image_extractor(image)
            text_features = self.text_encoder(input_ids,attention_mask)
            fusion_features = torch.cat((text_features,image_features), dim=-1)
            output = self.classifier2(fusion_features)
        return output

In [19]:
def train_model(model, train_loader, criterion, optimizer,device):
    model.train()  
    running_loss = 0
    total_correct = 0 
    for images, input_ids, attention_mask, labels in train_loader:
        images = images.to(device)
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.to(device)    
        labels = labels.to(device)  
        optimizer.zero_grad()     
        outputs = model(images, input_ids,attention_mask)
        _, preds = torch.max(outputs, 1)
        total_correct += torch.sum(preds == labels)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()   
        running_loss += loss.item()
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = total_correct.item() / len(train_loader.dataset)
    return epoch_loss, epoch_acc

In [43]:
# 预测
def predict_model(model, test_loader,device):
    model.eval()
    predictions = []
    for images,input_ids, attention_mask,  _ in test_loader:
        images = images.to(device)
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.to(device)
        with torch.no_grad():
            outputs = model(images, input_ids,attention_mask)
            _, preds = torch.max(outputs, 1)
        predictions.extend(preds.cpu().numpy())
    return predictions

In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()
lr = [0.001,0.01,0.1]
batch_size = 10
best_acc = 0
best_model = None

loader_train = DataLoader(x_train, batch_size=batch_size, shuffle=True)
loader_val = DataLoader(x_val, batch_size=batch_size, shuffle=False)

#既有图片又有文本
for l in lr:
    model = MixModel(3,2)
    optimizer = torch.optim.Adam(model.parameters(), l)
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss, train_acc = train_model(model, loader_train, criterion, optimizer,device)
        val_predictions = predict_model(model, loader_val,device)
        #数组格式
        val_predictions = np.array(val_predictions)
        val_labels = np.array(labels_val)
        sum = 0
        for x in range(len(val_predictions)):
            if (val_predictions[x] == val_labels[x]):
                       sum += 1
        val_acc = sum / len(val_labels)
        if(val_acc>best_acc):
            best_acc = val_acc
            best_model = model
        print(f"lr: {l}, Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
print("最佳准确率")
print(best_acc)

  input_ids = torch.tensor(self.input_ids[index])
  attention_mask = torch.tensor(self.attention_mask[index])


KeyboardInterrupt: 

In [None]:
#只有图片
for l in lr:
    model = MixModel(3,0)
    optimizer = torch.optim.Adam(model.parameters(), lr=l)
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss, train_acc = train_model(model, loader_train, criterion, optimizer,device)
        val_predictions = predict_model(model, loader_val,device)
        # 计算验证集准确率    
        val_predictions = np.array(val_predictions)
        val_labels = np.array(labels_val)
        val_acc = (val_predictions == val_labels).sum() / len(val_labels)
        if(val_acc>best_acc):
            best_acc = val_acc
        print(f"lr: {l}, Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
print("最佳准确率")
print(best_acc)

In [None]:
#只有文本
for l in lr:
    model = MixModel(3,1)
    optimizer = torch.optim.Adam(model.parameters(), lr=l)
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss, train_acc = train_model(model, loader_train, criterion, optimizer,device)
        val_predictions = predict_model(model, loader_val,device)
        # 计算验证集准确率    
        val_predictions = np.array(val_predictions)
        val_labels = np.array(labels_val)
        val_acc = (val_predictions == val_labels).sum() / len(val_labels)
        if(val_acc>best_acc):
            best_acc = val_acc
        print(f"lr: {l}, Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
print("最佳准确率")
print(best_acc)

In [54]:
#加载测试集并预处理
test_path = "test_without_label.txt"
test_df = pd.read_csv(test_path,sep=",")
test_df.iloc[:,-1]=0
test_labels = np.array(test_df['tag'])

#image_paths
image_paths_test = get_valid_imagesPath_from_directory("./data/",test_df)
test_texts = get_texts_from_textsPath("./data/",test_df)

tokenized_texts_test = text_preprocess(test_texts)
dataset_test = Dataset(image_paths_test, tokenized_texts_test, test_labels, transform)
loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)

In [68]:
#最优模型预测
test_predictions = predict_model(best_model, loader_test, device)  
test_predictions = np.array(test_predictions)

  input_ids = torch.tensor(self.input_ids[index])
  attention_mask = torch.tensor(self.attention_mask[index])


In [None]:
# 读取测试数据文件
test_data_file = pd.read_csv("test_without_label.txt")['guid'].values

# 将标签和预测结果写入test_without_label文件
with open('test_without_label.txt', 'w') as f:
    f.write('guid,tag\n')
    for i in range(len(test_data_file)):
        if predict_final[i] == 0:
            f.write(str(test_data_file[i]) + ',' + "positive" + '\n')
        elif predict_final[i] == 1:
            f.write(str(test_data_file[i]) + ',' + "negative" + '\n')
        else:
            f.write(str(test_data_file[i]) + ',' + "neutral" + '\n')

In [1]:
pip freeze > requirements.txt

Note: you may need to restart the kernel to use updated packages.


