In [1]:
import os
import gc
import cv2
import h5py
import random
import numpy as np
import pandas as pd
import tensorflow as tf

from tqdm import tqdm
from  matplotlib import pyplot as plt

In [2]:
mvsa_single_data_path = '../input/mvsasingle/MVSA_Single/data'
mvsa_single_label_path = '../input/mvsasingle/MVSA_Single/labelResultAll.txt'

IMAGE_SIZE = (224, 224)
NUM_CHANNELS = 3

In [3]:
def read_text_file(path, multi_line=False):
#     if multi_line == True:
#         lines = open(path, 'r', encoding='latin-1').readlines()
#         lines = [line.rstrip('\n') for line in lines]
#         return lines
    return open(path, 'r', encoding='latin-1').read()

def read_image_file(path):
    try:
        image = cv2.imread(path)[:, :, ::-1] #, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, IMAGE_SIZE, interpolation = cv2.INTER_AREA)

#         image = tf.keras.utils.load_img(path, target_size=IMAGE_SIZE)
#         image = tf.keras.preprocessing.image.img_to_array(image)
        invalid_ID = -1
    except:
        image = np.zeros((IMAGE_SIZE[0], IMAGE_SIZE[1], NUM_CHANNELS))
        invalid_ID = int(os.path.split(path)[1].split('.')[0])
    return image, invalid_ID

def read_labels_file(path):
    dataframe = pd.read_csv(path, sep="\s+|,", engine="python")
    return dataframe

In [4]:
def get_data_paths(path, extension):
    ''' Get list of data paths with input extension and sort by its filename (ID)
    path: Folder path
    extension: File extension wants to get
    '''
    paths = os.listdir(path)
    paths = list(filter(lambda x: x.endswith(extension), paths))
    paths.sort(key = lambda x : int(x.split('.')[0]))
    paths = [os.path.join(path, x) for x in paths]
    return paths

def get_image_with_id(path):
    filename = os.path.split(path)[1]
    ID = int(filename.split('.')[0])
    image = read_image_file(path)
    return (ID, image)

In [5]:
# there are 3 annotators labelling each modality labels in the MVSA-Multiple dataset
# merge those 3 label pairs into 1 pair by taking majority vote on each modality label
# since there are only 3 different labels, if 1 modality receives 3 different labels from 3 annotators
# => the data pair contains it is considered invalid
def merge_multi_label(dataframe):
    anno_1 = list(dataframe.loc[:, ['text', 'image']].itertuples(index=False, name=None))
    anno_2 = list(dataframe.loc[:, ['text.1', 'image.1']].itertuples(index=False, name=None))
    anno_3 = list(dataframe.loc[:, ['text.2', 'image.2']].itertuples(index=False, name=None))
    IDs = list(dataframe.iloc[:, 0])
    
    valid_pairs = []
    
    for i in range(len(anno_1)):
        pairs = [anno_1[i], anno_2[i], anno_3[i]]
        ID = IDs[i]
        
        text_labels = [pair[0] for pair in pairs]
        image_labels = [pair[1] for pair in pairs]
        
        max_occur_text_label = max(text_labels, key=text_labels.count)
        max_occur_image_label = max(image_labels, key=image_labels.count)

        if text_labels.count(max_occur_text_label) > 1 and image_labels.count(max_occur_image_label) > 1:
            valid_pair = (ID, max_occur_text_label, max_occur_image_label)
        else:
            valid_pair = (ID, 'invalid', 'invalid')
        valid_pairs.append(valid_pair)
    valid_dataframe = pd.DataFrame(valid_pairs, columns=['ID', 'text', 'image'])
    return valid_dataframe

def multimodal_label(text_label, image_label):
    if text_label == image_label:
        label = text_label
    elif (text_label == 'positive' and image_label == 'negative') or (text_label == 'negative' and image_label == 'positive'):
        label = 'invalid'
    elif (text_label == 'neutral' and image_label != 'neutral') or (text_label != 'neutral' or image_label == 'neutral'):
        label = image_label if text_label == 'neutral' else text_label
    return label

In [6]:
def create_multimodal_labels(path, multiple=False, mappings=False):
    dataframe = read_labels_file(path)
    
    if multiple == True:
        dataframe = merge_multi_label(dataframe)

    labels = []
    for label_pair in dataframe.loc[:, ['text', 'image']].values:
        label = multimodal_label(label_pair[0], label_pair[1])
        labels.append(label)
        
    if mappings == True:
        label_map = {}
        for i in range(len(labels)):
            ID = dataframe.iloc[i, 0]
            label_map[ID] = labels[i]            
        return label_map
    
    return np.array(labels, dtype='object')

def create_original_labels(path, multiple=False):
    dataframe = read_labels_file(path)
    
    if multiple == True:
        dataframe = merge_multi_label(dataframe)
        
    text_labels = dataframe['text'].to_numpy()
    image_labels = dataframe['image'].to_numpy()
    return text_labels, image_labels

def create_text_data(path):
    texts = []
    text_paths = get_data_paths(path, '.txt')
    
    print('Read text data')
    for text_path in tqdm(text_paths):
        text = read_text_file(text_path).rstrip('\n')
        texts.append(text)

    return texts

def create_image_data(path):
    images = []
    invalid_indices = []
    image_paths = get_data_paths(path, '.jpg')

    print('Read image data')
    for image_path in tqdm(image_paths):
        image, invalid_ID = read_image_file(image_path)
        images.append(image)

        if invalid_ID != -1:
            invalid_indices.append(invalid_ID)
            
    images = np.array(images, dtype='uint8')
    return images, invalid_indices

In [7]:
def remove_invalid(data, indices):
    valid_data = []
    for i in range(len(data)):
        if i not in indices:
            valid_data.append(data[i])
    return valid_data

In [8]:
def read_hdf5(path):
    read_file = h5py.File(path, 'r')
    
    feature_names = list(read_file.keys())
    loaded_data = []
    
    for name in feature_names:
        dataset = read_file[name][:]
        if dataset.dtype == np.dtype('object'):
            dataset = np.array([x.decode('utf-8') for x in dataset])            
        loaded_data.append((name, dataset))

    return loaded_data

In [9]:
def load_mvsa_data(path):
    data = read_hdf5(path)
    for x in data:
        if x[0] == 'texts':
            texts = x[1]
        if x[0] == 'multimodal-labels':
            labels = x[1]
        if x[0] == 'text-labels':
            text_labels = x[1]
        if x[0] == 'image-labels':
            image_labels = x[1]
            
    images_path = os.path.join(os.path.split(path)[0], os.path.split(path)[1].split('.')[0] + '-images.npz')
    npzfile = np.load(images_path)
    images = npzfile['arr_0']
        
    return texts, images, labels, text_labels, image_labels

In [10]:
mvsa_single_texts = create_text_data(mvsa_single_data_path)
mvsa_single_images, mvsa_single_images_invalid_indices = create_image_data(mvsa_single_data_path)
mvsa_single_multimodal_labels = create_multimodal_labels(mvsa_single_label_path)
mvsa_single_text_labels, mvsa_single_image_labels = create_original_labels(mvsa_single_label_path)
num_mvsa_single = len(mvsa_single_texts)

Read text data


100%|██████████| 4869/4869 [00:24<00:00, 195.41it/s]


Read image data


100%|██████████| 4869/4869 [01:00<00:00, 80.62it/s]


In [11]:
import numpy as np

def one_hot_encode(sentiments):
    """
    Converts an array of sentiments into a one-hot encoded array.
    
    Args:
        sentiments (list): List of sentiments ('positive', 'negative', 'neutral').
        
    Returns:
        numpy.ndarray: A 2D array with one-hot encoding for each sentiment.
    """
    # Define the mapping for one-hot encoding
    sentiment_map = {
        'positive': [1, 0, 0],
        'negative': [0, 1, 0],
        'neutral': [0, 0, 1]
    }
    
    # Create the one-hot encoded array
    one_hot_array = np.array([sentiment_map[sentiment] for sentiment in sentiments])
    return one_hot_array

In [12]:
mvsa_single_text_labels_one_hot = one_hot_encode(mvsa_single_text_labels)
print(mvsa_single_text_labels_one_hot[0:5])

[[0 0 1]
 [0 0 1]
 [0 0 1]
 [1 0 0]
 [1 0 0]]


In [13]:
mvsa_single_image_labels_one_hot = one_hot_encode(mvsa_single_image_labels)
print(mvsa_single_image_labels_one_hot[0:5])
print(mvsa_single_image_labels[0:5])

[[1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]]
['positive' 'positive' 'positive' 'positive' 'positive']


In [14]:
# Get duplicated text indices
# mvsa_single_texts_unique_indices = np.unique(mvsa_single_texts, return_index=True)[1]
# mvsa_single_texts_duplicated_indices = [i for i in range(num_mvsa_single) if i not in mvsa_single_texts_unique_indices]

# Get invalid label indices
mvsa_single_multimodal_labels_invalid_indices = [i for i in range(num_mvsa_single) if mvsa_single_multimodal_labels[i] == 'invalid']

In [15]:
print('Number of text-image pair in MVSA-Single:', num_mvsa_single)

mvsa_single_invalid_indices = []
# mvsa_single_invalid_indices.extend(mvsa_single_texts_duplicated_indices)
mvsa_single_invalid_indices.extend(mvsa_single_images_invalid_indices) # corrupted images
mvsa_single_invalid_indices.extend(mvsa_single_multimodal_labels_invalid_indices)
mvsa_single_invalid_indices = list(set(mvsa_single_invalid_indices))
print('Number of invalid data in MVSA-Single:', len(mvsa_single_invalid_indices))

mvsa_single_texts_valid = remove_invalid(mvsa_single_texts, mvsa_single_invalid_indices)
mvsa_single_images_valid = remove_invalid(mvsa_single_images, mvsa_single_invalid_indices)
mvsa_single_multimodal_labels_valid = remove_invalid(mvsa_single_multimodal_labels, mvsa_single_invalid_indices)
mvsa_single_text_labels_valid = remove_invalid(mvsa_single_text_labels, mvsa_single_invalid_indices)
mvsa_single_image_labels_valid = remove_invalid(mvsa_single_image_labels, mvsa_single_invalid_indices)

num_mvsa_single_valid = len(mvsa_single_texts_valid)
print('Number of text-image pair in MVSA-Single after removing invalid data:', num_mvsa_single_valid)

Number of text-image pair in MVSA-Single: 4869
Number of invalid data in MVSA-Single: 358
Number of text-image pair in MVSA-Single after removing invalid data: 4511


In [16]:
# save and load check data
with h5py.File('mvsa-single-{}.hdf5'.format(num_mvsa_single_valid), 'w') as f:
    f.create_dataset('texts', data = mvsa_single_texts_valid)
    f.create_dataset('images', data = mvsa_single_images_valid)
    f.create_dataset('multimodal-labels', data = mvsa_single_multimodal_labels_valid)
    f.create_dataset('text-labels', data = mvsa_single_text_labels_valid)
    f.create_dataset('image-labels', data = mvsa_single_image_labels_valid)
    
np.savez('./mvsa-single-{}-images'.format(num_mvsa_single_valid), mvsa_single_images_valid)
    
mvsa_single_texts_loaded, mvsa_single_images_loaded, \
mvsa_single_multimodal_labels_loaded, mvsa_single_text_labels_loaded, \
mvsa_single_image_labels_loaded = load_mvsa_data('./mvsa-single-{}.hdf5'.format(num_mvsa_single_valid))

print((mvsa_single_texts_valid == mvsa_single_texts_loaded).all())
print((mvsa_single_images_valid == mvsa_single_images_loaded).all())
print((mvsa_single_multimodal_labels_valid == mvsa_single_multimodal_labels_loaded).all())
print((mvsa_single_text_labels_valid == mvsa_single_text_labels_loaded).all())
print((mvsa_single_image_labels_valid == mvsa_single_image_labels_loaded).all())

True
True
True
True
True


In [17]:
mvsa_single_images = tf.transpose(mvsa_single_images, perm=[0, 3, 1, 2])

In [18]:
pip install transformers


  pid, fd = os.forkpty()


Note: you may need to restart the kernel to use updated packages.


In [19]:
from transformers import TFViTModel
from tensorflow.keras.layers import Dense, Input, GlobalAveragePooling1D, Lambda
from tensorflow.keras.models import Model
import tensorflow as tf

def create_vit_model(num_classes):
    # Input layer for images
    image_input = Input(shape=(3,224, 224), name="image_input", dtype=tf.float32)
    
    # Preprocessing: Normalize pixel values to the range [0, 1]
    normalized_input = Lambda(lambda x: x / 255.0, name="normalize")(image_input)
    
    # Load pre-trained Vision Transformer model
    vit_model = TFViTModel.from_pretrained("google/vit-base-patch16-224-in21k")

    # Wrap `vit_model` to handle pixel_values explicitly
    def vit_forward(inputs):
        return vit_model(pixel_values=inputs).last_hidden_state

    # Use Lambda layer to convert Keras tensor to TF tensor and pass to ViT
    vit_output = Lambda(vit_forward, name="vit_model", output_shape=(3,768))(normalized_input)

    # Pool across the sequence length
    pooled_output = GlobalAveragePooling1D()(vit_output)
    
    # Add dense layers for classification
    x = Dense(128, activation="relu")(pooled_output)
    output = Dense(num_classes, activation="softmax")(x)
    
    # Define the model
    model = Model(inputs=image_input, outputs=output)
    return model

# Example usage
num_classes = 3  # Replace with the number of classes in your dataset
vit_model = create_vit_model(num_classes)

# Print model summary
vit_model.summary()


config.json:   0%|          | 0.00/502 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

All PyTorch model weights were used when initializing TFViTModel.

All the weights of TFViTModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFViTModel for predictions without further training.


In [20]:
# Compile the model
vit_model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])



In [21]:
print(mvsa_single_images.shape)  # Should be (4869, 3, 224, 224)
print(mvsa_single_image_labels_one_hot.shape)  # Should match (4869, num_classes)


(4869, 3, 224, 224)
(4869, 3)


In [22]:
print(mvsa_single_images.shape) 

(4869, 3, 224, 224)


In [23]:
vit_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [24]:
# # Ensure input is in channels_first format for ViT: (batch_size, 3, 224, 224)
# mvsa_single_images = tf.transpose(mvsa_single_images, perm=[0, 3, 1, 2])
# print(mvsa_single_images.shape) 

# Initialize the model with a dummy input
dummy_input = tf.random.normal((1, 3, 224, 224))  # Batch size 1
_ = vit_model(dummy_input)


try:
    vit_model.fit(
        mvsa_single_images[0:3895],
        mvsa_single_image_labels_one_hot[0:3895],
        batch_size=32,
        epochs=10,
        validation_split=0.2
    )
except Exception as e:
    print("Error during training:", e)
    



Epoch 1/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 341ms/step - accuracy: 0.5180 - loss: 1.0212 - val_accuracy: 0.5302 - val_loss: 0.9837
Epoch 2/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 218ms/step - accuracy: 0.5762 - loss: 0.9097 - val_accuracy: 0.5340 - val_loss: 0.9530
Epoch 3/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 217ms/step - accuracy: 0.5878 - loss: 0.8672 - val_accuracy: 0.5571 - val_loss: 0.9258
Epoch 4/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 217ms/step - accuracy: 0.6291 - loss: 0.8172 - val_accuracy: 0.5623 - val_loss: 0.9024
Epoch 5/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 217ms/step - accuracy: 0.6611 - loss: 0.7894 - val_accuracy: 0.5841 - val_loss: 0.8846
Epoch 6/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 217ms/step - accuracy: 0.6802 - loss: 0.7610 - val_accuracy: 0.5879 - val_loss: 0.8673
Epoch 7/10
[1m98/98[

In [25]:
# Evaluate the model on the test dataset
test_loss, test_accuracy = vit_model.evaluate( mvsa_single_images[3896:], mvsa_single_image_labels_one_hot[3896:], batch_size=32)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")


[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 260ms/step - accuracy: 0.6343 - loss: 0.8345
Test Loss: 0.8334760069847107
Test Accuracy: 0.6166495084762573


In [26]:
# Get predictions for the test images
predictions = vit_model.predict(mvsa_single_images[3896:], batch_size=32)

# Convert predictions from probabilities to class labels
predicted_classes = tf.argmax(predictions, axis=1)  # Predicted class indices
true_classes = tf.argmax(mvsa_single_image_labels_one_hot[3896:], axis=1)       # True class indices (if labels are one-hot)

# Print a few predictions
print("Predicted classes:", predicted_classes.numpy()[:100])
print("True classes:", true_classes.numpy()[:100])


[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 320ms/step
Predicted classes: [0 2 0 1 0 0 2 0 0 2 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 2 0 0 1 2 0 0 0 0 0 0 0
 2 2 1 1 0 0 1 0 0 1 2 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 2 0 0 0 0 1 0 0 0 0 0
 2 0 0 0 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 1 1 0 0 2 0]
True classes: [1 1 1 1 1 0 2 0 0 2 2 2 2 2 0 1 1 0 0 0 0 0 0 1 0 1 1 2 1 2 0 1 0 1 1 0 0
 1 2 1 1 1 0 1 1 0 1 1 1 0 0 0 0 1 1 0 0 0 2 0 0 0 0 0 0 1 0 1 1 0 0 0 0 0
 2 0 0 0 0 0 0 0 0 2 2 0 0 2 0 0 2 0 0 0 0 0 0 0 2 0]


In [27]:
test_images=mvsa_single_images[3896:]

In [28]:
from sklearn.metrics import classification_report
import tensorflow as tf

# Step 1: Make predictions
predictions = vit_model.predict(test_images, batch_size=32)
predicted_classes = tf.argmax(predictions, axis=1).numpy()  # Convert predictions to class indices
true_classes = tf.argmax(mvsa_single_image_labels_one_hot[3896:], axis=1).numpy()       # Convert one-hot labels to class indices

# Step 2: Generate classification report
report = classification_report(true_classes, predicted_classes, target_names=['Positive','Negative','Neutral'])
print(report)


[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 169ms/step
              precision    recall  f1-score   support

    Positive       0.66      0.84      0.74       516
    Negative       0.53      0.27      0.36       270
     Neutral       0.51      0.50      0.51       187

    accuracy                           0.62       973
   macro avg       0.57      0.54      0.53       973
weighted avg       0.60      0.62      0.59       973

