In [1]:


import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)


import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))



/kaggle/input/mvsasingle/MVSA_Single/labelResultAll.txt
/kaggle/input/mvsasingle/MVSA_Single/data/1893.txt
/kaggle/input/mvsasingle/MVSA_Single/data/1711.txt
/kaggle/input/mvsasingle/MVSA_Single/data/4682.txt
/kaggle/input/mvsasingle/MVSA_Single/data/5064.txt
/kaggle/input/mvsasingle/MVSA_Single/data/3504.txt
/kaggle/input/mvsasingle/MVSA_Single/data/1269.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/3863.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/1773.txt
/kaggle/input/mvsasingle/MVSA_Single/data/623.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/559.txt
/kaggle/input/mvsasingle/MVSA_Single/data/3750.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/2008.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/1812.txt
/kaggle/input/mvsasingle/MVSA_Single/data/1093.txt
/kaggle/input/mvsasingle/MVSA_Single/data/2081.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/4417.txt
/kaggle/input/mvsasingle/MVSA_Single/data/3919.jpg
/kaggle/input/mvsasingle/MVSA_Single/data/4503.txt
/kaggle/input/mvsasingle/MVS

In [2]:
import os
import re
import collections
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
import pandas as pd
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from collections import Counter
import nltk

In [3]:
def textCleaning(s):
    s = re.sub('[^A-Za-z]+', ' ', s)
    return s

In [4]:
def preProcessing(x_train, y_train, x_test, y_test):
    vocab = []
    stop_words = set(stopwords.words('english'))

    for sent in x_train:
        temp = ''.join(sent)
        for word in temp.split():
            word = word.lower()
            word = textCleaning(word)
            if word not in stop_words and word != '':
                vocab.append(word)
    
    corpus = Counter(vocab)
    corpus_ = sorted(corpus, key=corpus.get, reverse=True)
    
    onehot_dict = {w: i + 1 for i, w in enumerate(corpus_)}
    
    x_final_train, x_final_test = [], []

    for sent in x_train:
        temp = ''.join(sent)
        x_final_train.append([onehot_dict[textCleaning(word.lower())] for word in temp.split() if textCleaning(word) in onehot_dict.keys()])
    for sent in x_test:
        temp = ''.join(sent)
        x_final_test.append([onehot_dict[textCleaning(word.lower())] for word in temp.split() if textCleaning(word) in onehot_dict.keys()])
        
    y_final_train = [1 if label == 'positive' else 0 for label in y_train]
    y_final_test = [1 if label == 'positive' else 0 for label in y_test]
    
    return x_final_train, x_final_test, y_final_train, y_final_test, onehot_dict

In [5]:
def load_text_data(data_folder):
    texts = []
    filenames = os.listdir(data_folder)
    filenames.sort(key=lambda x: int(x[:-4]))  # Sort the filenames numerically
    for filename in filenames:
        if filename.endswith(".txt"):
            with open(os.path.join(data_folder, filename), 'r', encoding='latin-1') as file:
                text = file.read().strip()
                texts.append(text)
    return texts

In [6]:
def load_labels(result_file, filenames):
    labels_text = []
    labels_image = []
    with open(result_file, 'r') as file:
        next(file)  # Skip header
        for line in file:
            parts = line.strip().split('\t')
            text_id = int(parts[0])  # Extract the text ID
            text_label, image_label = parts[1].split(',')  # Extract the labels
            if text_id <= len(filenames):  # Check if the ID is within the range of filenames
                labels_text.append(text_label.strip())  # Append the text label
                labels_image.append(image_label.strip())  # Append the image label
    return labels_text, labels_image

In [7]:
def padding_(data, seqLen):
    features = np.zeros((len(data), seqLen), dtype=int)
    for i, rev in enumerate(data):
        if len(rev) != 0:
            features[i, -len(rev):] = np.array(rev)[:seqLen]
    return features

In [8]:

class LSTM(nn.Module):
    def __init__(self, inputDimension, hiddenSize, numLayer, batchFirst, outputSize, glove):
        super(LSTM, self).__init__()
        
        self.input_size = inputDimension
        self.hidden_size = hiddenSize
        self.num_layer = numLayer
        self.batch_first = batchFirst
        self.outputSize = outputSize
        
        self.embedding = nn.Embedding.from_pretrained(glove.vectors)
        self.lstm = nn.LSTM(input_size=inputDimension,
                            hidden_size=hiddenSize,
                            num_layers=numLayer,
                            bidirectional=False,
                            batch_first=batchFirst)
        self.dropout = nn.Dropout(0.3)
        self.linear = nn.Linear(hiddenSize, outputSize)
        self.sig = nn.Sigmoid()
    
    def forward(self, input):
        batch_size = input.size(0)
        h = torch.zeros(self.num_layer, batch_size, self.hidden_size).to(input.device)
        c = torch.zeros(self.num_layer, batch_size, self.hidden_size).to(input.device)

        embd = self.embedding(input)
        output, (hidden_, cell_) = self.lstm(embd, (h, c))

        output = self.dropout(output)
        output = self.linear(output[:, -1, :])
        output = self.sig(output)

        return output, (hidden_, cell_)

In [9]:
data_folder = "/kaggle/input/mvsasingle/MVSA_Single/data/"
texts = load_text_data(data_folder)

result_file = '/kaggle/input/mvsasingle/MVSA_Single/labelResultAll.txt'
filenames = os.listdir(data_folder)
labels_text, labels_image = load_labels(result_file, filenames)

In [10]:
x_train, x_test, y_train, y_test = train_test_split(texts, labels_text, stratify=labels_text)

x_train, x_test, y_train, y_test, onehot_dict = preProcessing(x_train, y_train, x_test, y_test)

x_train = np.array(x_train, dtype=object)
x_test = np.array(x_test, dtype=object)
y_train = np.array(y_train, dtype=np.int64)
y_test = np.array(y_test, dtype=np.int64)

In [11]:
x_train_pad = padding_(x_train, 500)
x_test_pad = padding_(x_test, 500)

In [12]:
glove_path = "/kaggle/input/glove-6b300/glove.6B.300d.txt"

# Load GloVe embeddings
from torchtext.vocab import Vectors
glove = Vectors(name=glove_path)

print("Loaded GloVe embeddings successfully!")


100%|█████████▉| 399999/400000 [00:44<00:00, 9012.72it/s]


Loaded GloVe embeddings successfully!


In [13]:

print(glove)

<torchtext.vocab.vectors.Vectors object at 0x7b3740495900>


In [14]:
inputDimension = 300  # Dimension of GloVe embeddings
hiddenSize = 64
numLayer = 2
batchFirst = True
outputSize = 3  # Adjusting for 3 classes (negative, neutral, positive)

text_lstm = LSTM(inputDimension, hiddenSize, numLayer, batchFirst, outputSize, glove)
print(text_lstm)

LSTM(
  (embedding): Embedding(400000, 300)
  (lstm): LSTM(300, 64, num_layers=2, batch_first=True)
  (dropout): Dropout(p=0.3, inplace=False)
  (linear): Linear(in_features=64, out_features=3, bias=True)
  (sig): Sigmoid()
)


In [15]:
# Map sentiment labels to integers
sentiment_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}
labels_image_int = [sentiment_mapping[label] for label in labels_image]

# Initialize list for images and processed labels
images = []
processed_labels = []

In [16]:
# Define image transformation
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

In [17]:
image_folder = '/kaggle/input/mvsasingle/MVSA_Single/data'
for i in range(1, len(labels_image) + 1):
    image_path = os.path.join(image_folder, f'{i}.jpg')
    if os.path.exists(image_path):
        image = Image.open(image_path).convert('RGB')
        image = transform(image)
        images.append(image)
        processed_labels.append(labels_image_int[i-1])

In [18]:
# Convert lists to tensors
images_tensor = torch.stack(images)
labels_image_tensor = torch.tensor(processed_labels, dtype=torch.long)

# Verify shapes
print(images_tensor.shape)  # Should match the number of images
print(labels_image_tensor.shape)  # Should match the number of labels

torch.Size([4609, 3, 224, 224])
torch.Size([4609])


In [19]:
import torch
import torch.nn as nn
import torchvision.models as models

# Path to your uploaded ResNet-50 pre-trained weights
weights_path = "/kaggle/input/resnet50-0676ba61-pth/resnet50-0676ba61.pth"

# Initialize the ResNet-50 architecture
resnet = models.resnet50()

# Load the pre-trained weights
resnet.load_state_dict(torch.load(weights_path))

# Modify the final fully connected layer to match your task
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 3)  # 3 sentiment classes: negative, neutral, positive

# Move the model to GPU if available, otherwise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet = resnet.to(device)

# Print the model architecture
print(resnet)


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [20]:
class FusionModel(nn.Module):
    def __init__(self, text_model, image_model, output_size):
        super(FusionModel, self).__init__()
        self.text_model = text_model
        self.image_model = image_model
        
        self.text_output_size = 3  # From text_output.shape[1]
        self.image_output_size = 3  # From image_output.shape[1]
        
        self.fc1 = nn.Linear(self.text_output_size + self.image_output_size, 128)
        self.fc2 = nn.Linear(128, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text_input, image_input):
        text_output, _ = self.text_model(text_input)
        image_output = self.image_model(image_input)
        
        combined = torch.cat((text_output, image_output), dim=1)
        x = self.fc1(combined)
        x = self.fc2(x)
        output = self.sigmoid(x)
        return output

In [21]:
fusion_model = FusionModel(text_lstm, resnet, outputSize).to(device)
print(fusion_model)

FusionModel(
  (text_model): LSTM(
    (embedding): Embedding(400000, 300)
    (lstm): LSTM(300, 64, num_layers=2, batch_first=True)
    (dropout): Dropout(p=0.3, inplace=False)
    (linear): Linear(in_features=64, out_features=3, bias=True)
    (sig): Sigmoid()
  )
  (image_model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats

In [22]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(fusion_model.parameters(), lr=0.001)

In [23]:
from torch.utils.data import TensorDataset, DataLoader

# Define batch size
batch_size = 32

# Create TensorDataset and DataLoader for training data
train_data = TensorDataset(torch.tensor(x_train_pad, dtype=torch.long), images_tensor[:len(x_train_pad)], torch.tensor(y_train, dtype=torch.long))
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

# Create TensorDataset and DataLoader for testing data
test_data = TensorDataset(torch.tensor(x_test_pad, dtype=torch.long), images_tensor[:len(x_test_pad)], torch.tensor(y_test, dtype=torch.long))
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

In [24]:
num_epochs = 10

for epoch in range(num_epochs):
    fusion_model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    for batch_idx, (text_inputs, image_inputs, labels) in enumerate(train_loader):
        text_inputs, image_inputs, labels = text_inputs.to(device), image_inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = fusion_model(text_inputs, image_inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)
    
    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_predictions
    
    epoch_accuracy_percentage = epoch_accuracy * 100
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy_percentage:.2f}%')

Epoch [1/10], Loss: 0.8655, Accuracy: 64.50%
Epoch [2/10], Loss: 0.8515, Accuracy: 64.45%
Epoch [3/10], Loss: 0.8420, Accuracy: 64.45%
Epoch [4/10], Loss: 0.8205, Accuracy: 64.45%
Epoch [5/10], Loss: 0.7998, Accuracy: 64.45%
Epoch [6/10], Loss: 0.7784, Accuracy: 64.45%
Epoch [7/10], Loss: 0.7691, Accuracy: 64.45%
Epoch [8/10], Loss: 0.7555, Accuracy: 64.50%
Epoch [9/10], Loss: 0.7529, Accuracy: 64.45%
Epoch [10/10], Loss: 0.7482, Accuracy: 64.45%


In [25]:
fusion_model.eval()

FusionModel(
  (text_model): LSTM(
    (embedding): Embedding(400000, 300)
    (lstm): LSTM(300, 64, num_layers=2, batch_first=True)
    (dropout): Dropout(p=0.3, inplace=False)
    (linear): Linear(in_features=64, out_features=3, bias=True)
    (sig): Sigmoid()
  )
  (image_model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats

In [26]:
# Save the trained model
torch.save(fusion_model.state_dict(), 'fusion_model.pth')
print('Model saved to fusion_model.pth')

Model saved to fusion_model.pth


In [27]:
# Load the saved model
fusion_model.load_state_dict(torch.load('fusion_model.pth'))
fusion_model.eval()

FusionModel(
  (text_model): LSTM(
    (embedding): Embedding(400000, 300)
    (lstm): LSTM(300, 64, num_layers=2, batch_first=True)
    (dropout): Dropout(p=0.3, inplace=False)
    (linear): Linear(in_features=64, out_features=3, bias=True)
    (sig): Sigmoid()
  )
  (image_model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats

In [28]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
from IPython.display import display
from ipywidgets import widgets
from ipywidgets import FileUpload
from io import BytesIO

In [29]:
def preprocess_image(image_path):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    image = image.unsqueeze(0)  # Add batch dimension
    return image.to(device)

In [30]:
def preprocess_text(text, onehot_dict, seq_len=500):
    tokens = text.lower().split()
    indices = [onehot_dict.get(textCleaning(word), 0) for word in tokens]
    padded = np.zeros(seq_len, dtype=int)
    if len(indices) != 0:
        padded[-len(indices):] = np.array(indices)[:seq_len]
    tensor = torch.tensor(padded, dtype=torch.long)
    tensor = tensor.unsqueeze(0)  # Add batch dimension
    return tensor.to(device)

In [31]:
def predict_sentiment(fusion_model, image_bytes, caption, onehot_dict):
    image = preprocess_image(image_bytes)
    text_tensor = preprocess_text(caption, onehot_dict)
    
    with torch.no_grad():
        outputs = fusion_model(text_tensor, image)
        _, predicted = torch.max(outputs, 1)
    
    sentiment_labels = ['negative', 'neutral', 'positive']
    sentiment = sentiment_labels[predicted.item()]
    return sentiment

In [32]:
# Example cell to input the image path and caption
image_path = '/kaggle/input/mvsasingle/MVSA_Single/data/21.jpg'  # Replace with your image path
caption = 'Thank you to Eastwood 8th grader Sam L. for helping hang 7th grade #SafeDates Valentines Day cards. #Caring'  # Replace with your caption

In [33]:
# Ensure the model and onehot_dict are already defined
sentiment = predict_sentiment(fusion_model, image_path, caption, onehot_dict)
print(f"The sentiment of the post is: {sentiment}")

The sentiment of the post is: negative
