In [None]:
!pip install transformers
!pip install -U sentence-transformers

In [47]:
# Importing packages
import requests
import pandas as pd
from PIL import Image
from urllib import request
from sentence_transformers import SentenceTransformer, util
from io import BytesIO
from matplotlib import pyplot as plt
import numpy as np
from collections import defaultdict
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras.callbacks import ModelCheckpoint
from keras.layers.core import Dense, Dropout
from keras import layers, Input, Model
from keras import backend as K
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.utils import load_img, img_to_array, to_categorical
from keras.initializers import Constant
from keras.models import Sequential, Model
from keras.layers import Concatenate
from keras.layers import Dense, Embedding, LSTM, Concatenate as Merge, Reshape, Dropout, Convolution2D, MaxPooling2D, ZeroPadding2D, Flatten
from keras.optimizers import Adam
import keras_preprocessing
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import average_precision_score
import torch
from transformers import BertTokenizer,BertModel,LxmertModel,LxmertTokenizer,BartModel,BartTokenizer

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert = BertModel.from_pretrained('bert-base-uncased')

# Dataset splitting
img_dir = "https://vizwiz.cs.colorado.edu//VizWiz_visualization_img/"
split_train = "train"
annotation_file_train = "https://vizwiz.cs.colorado.edu/VizWiz_final/vqa_data/Annotations/%s.json" %split_train
print(annotation_file_train)
split_test = "test"
annotation_file_test = "https://vizwiz.cs.colorado.edu/VizWiz_final/vqa_data/Annotations/%s.json" %split_test
print(annotation_file_test)
split_val = "val"
annotation_file_val = "https://vizwiz.cs.colorado.edu/VizWiz_final/vqa_data/Annotations/%s.json" %split_val
print(annotation_file_val)

In [8]:
split_data = requests.get(annotation_file_train, allow_redirects=True)
data_train = split_data.json()

split_data = requests.get(annotation_file_test, allow_redirects=True)
data_test = split_data.json()

split_data = requests.get(annotation_file_val, allow_redirects=True)
data_val = split_data.json()

In [39]:
st_model = SentenceTransformer('sentence-transformers/distilbert-base-nli-mean-tokens')
embeddings = st_model.encode(data_train[0]['question'])

#Bert sentence transformers
def sentence_extractor(sentence):
  return st_model.encode(sentence)

#VIT extractor
VITmodel = SentenceTransformer('clip-ViT-L-14')
def VIT_extract_Img(image_name):
  res = request.urlopen(image_name).read()
  img = Image.open(BytesIO(res))
  img_emb = VITmodel.encode(img)
  img_emb = np.array(img_emb)
  return img_emb

#Encode text descriptions
def VIT_extract_lang(question):
  text_emb = VITmodel.encode(question)
  return text_emb

def extract_question_features(questions):
  inputs=tokenizer(questions,return_tensors="pt")
  output=bert(**inputs)
  language_feature=output.pooler_output
  return language_feature

def resize(image_url):
  res = request.urlopen(image_url).read()
  img = Image.open(BytesIO(res)).resize((224,224))
  return img

vggmodel = VGG16(weights='imagenet', include_top=True)
#model.summary()
vgg_model = Model(inputs = vggmodel.inputs, outputs = vggmodel.layers[-2].output )
def image_extract(img):
  img_data = img_to_array(img)
  img_data = np.expand_dims(img_data, axis=0)
  img_data = preprocess_input(img_data)
  feature_maps = vgg_model.predict(img_data)
  return feature_maps

In [40]:
count=0
temp = [data_train,data_val,data_test]
for i in range(len(temp)):
  X = [] #feats
  y = [] #labels
  images_features=[]
  language_features=[]
  language_features_sen=[]
  all_possible_answers=set()
  X_VIT=[]
  y_b = []
  frequent_dict=defaultdict(int)
  for vq in temp[i]:
    if(len(X))==4:
      break

    if i !=2:
      answers = vq['answers']
      all_answers=defaultdict(int)

      for answer in answers:
        all_answers[answer['answer'].lower()]+=1

      all_answers=sorted(all_answers.items(),key=lambda x:x[1],reverse=True)
      if frequent_dict.get(all_answers[0][0],0)>=3:

        continue
      else:
        frequent_dict[all_answers[0][0]]+=1

      label = vq['answerable']
      y_b.append(label)
    
    # Extract features describing the image
    image_name = vq['image']
    image_url = img_dir + image_name

    img_emb=VIT_extract_Img(image_url)
    question = vq['question']
    q_emb=VIT_extract_lang(question)

    img = resize(image_url)
    image_feature= image_extract(img)
    images_features.append(image_feature)

    # Extract features describing the question
    question = vq['question']
    question_feature_sen = sentence_extractor(question)
    question_feature = extract_question_features(question).detach().numpy()
    language_features.append(question_feature)
    language_features_sen.append(question_feature_sen)

    # Multimodal feature representing both question and image (e.g. concatenate, multiply, etc.)
    multimodal_features = np.concatenate((question_feature, image_feature),axis=None)
    print(multimodal_features)
    VIT_mul_feature=np.concatenate((img_emb,q_emb),axis=None)
    # Prepare features and labels
    X.append(multimodal_features)
    X_VIT.append(VIT_mul_feature)

    if i !=2:
      try:
        top_n=5 if len(all_answers)>=5 else len(all_answers)
      except:
        print(all_answers)   
  
      answer_set=set(list( x[0] for x in all_answers)[:top_n])
      all_possible_answers=all_possible_answers.union(answer_set)
      gold_label = all_answers[0][0]
      y.append(gold_label)

    print(image_name)

  if i == 0:
    y_b_train = np.array(y_b)
    X_train = np.array(X)
    y_train = y
    
    images_features_train = images_features
    language_features_train = language_features
    language_features_sen_train = language_features_sen

    X_VIT_train = X_VIT
    encoder=LabelEncoder()
    all_possible_answers=list(set(all_possible_answers))
    answers_set=encoder.fit_transform(all_possible_answers)
    y_train = encoder.transform(y_train)
    train_labels = to_categorical(y_train,num_classes=len(all_possible_answers))
    y_train = np.array(train_labels)
    print("Train Shape: ", X_train.shape, y_train.shape, y_b_train.shape)

  if i == 1:
    y_b_val = np.array(y_b)
    X_val = np.array(X)
    y_val = y
    images_features_val = images_features
    language_features_val = language_features
    language_features_sen_val = language_features_sen
    X_VIT_val = X_VIT
    encoder=LabelEncoder()
    all_possible_answers=list(set(all_possible_answers))
    answers_set=encoder.fit_transform(all_possible_answers)
    y_val = encoder.transform(y_val)
    val_labels = to_categorical(y_val,num_classes=len(all_possible_answers))
    y_val = np.array(val_labels)
    print("Val Shape: ", X_val.shape, y_val.shape, y_b_val.shape)

  if i == 2:
    X_test = np.array(X)

    images_features_test = images_features
    language_features_test = language_features
    language_features_sen_test = language_features_sen
    X_VIT_test = X_VIT

[-0.94377875 -0.41452673 -0.69359535 ...  0.          0.
  0.        ]
VizWiz_train_00000000.jpg
[-0.89090514 -0.37024656 -0.6383059  ...  0.15287846  0.
  1.194839  ]
VizWiz_train_00000001.jpg
[-0.95164216 -0.51667565 -0.97550666 ...  0.          1.3009968
  0.15316188]
VizWiz_train_00000002.jpg
[-0.9272516  -0.39783087 -0.6313059  ...  0.          0.
  5.547513  ]
VizWiz_train_00000003.jpg
Train Shape:  (4, 4864) (4, 13) (4,)
[-0.8113056  -0.38658386 -0.86514163 ...  0.          0.
  1.7918983 ]
VizWiz_val_00000000.jpg
[-0.88212967 -0.40676525 -0.52065223 ...  0.          0.
  4.909209  ]
VizWiz_val_00000001.jpg
[-0.92578334 -0.40315208 -0.689809   ...  0.          1.3171558
  2.2084641 ]
VizWiz_val_00000002.jpg
[-0.8683339  -0.19161084  0.0600514  ...  0.          0.89025486
  1.3195412 ]
VizWiz_val_00000003.jpg
Val Shape:  (4, 4864) (4, 15) (4,)
[-0.9441264 -0.3928359 -0.8742506 ...  1.7149589  0.         2.2841873]
VizWiz_test_00000000.jpg
[-0.9636999  -0.41657716 -0.8599354  ... 

In [25]:
# Math definitions
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

print(embeddings.shape)
X_VIT_train = np.array(X_VIT_train)
print(X_VIT_train.shape)
print(img_emb.shape,q_emb.shape)

(768,)
(4, 1536)
(768,) (768,)


In [46]:
model = Sequential()
model.add(Dense(512, activation='relu', input_dim=(4864)))

model.add(Dense(64, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(256, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(1, activation='softmax'))

filepath = "SimpleRNN_EM_model.h1"
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]
opt = tf.keras.optimizers.Adam(learning_rate=1e-05)
model.compile(loss='categorical_crossentropy',
optimizer=opt,
metrics=['accuracy'])
hist=model.fit(X_train,y_b_train,epochs=20,batch_size=128,validation_data=(X_val, y_b_val),callbacks=callbacks_list)

Epoch 1/20
Epoch 00001: val_loss improved from inf to 0.00000, saving model to SimpleRNN_EM_model.h1
Epoch 2/20
Epoch 00002: val_loss did not improve from 0.00000
Epoch 3/20
Epoch 00003: val_loss did not improve from 0.00000
Epoch 4/20
Epoch 00004: val_loss did not improve from 0.00000
Epoch 5/20
Epoch 00005: val_loss did not improve from 0.00000
Epoch 6/20
Epoch 00006: val_loss did not improve from 0.00000
Epoch 7/20
Epoch 00007: val_loss did not improve from 0.00000
Epoch 8/20
Epoch 00008: val_loss did not improve from 0.00000
Epoch 9/20
Epoch 00009: val_loss did not improve from 0.00000
Epoch 10/20
Epoch 00010: val_loss did not improve from 0.00000
Epoch 11/20
Epoch 00011: val_loss did not improve from 0.00000
Epoch 12/20
Epoch 00012: val_loss did not improve from 0.00000
Epoch 13/20
Epoch 00013: val_loss did not improve from 0.00000
Epoch 14/20
Epoch 00014: val_loss did not improve from 0.00000
Epoch 15/20
Epoch 00015: val_loss did not improve from 0.00000
Epoch 16/20
Epoch 00016: 

In [49]:
results = model.predict(X_test)



In [54]:
#predict the test and convert the idnex to the label class
result=model.predict(X_test)
y_test_pred = np.argmax(result, axis=1)
# y_test_gold= np.argmax(y_val, axis=1)
result=encoder.inverse_transform(y_test_pred)
print(list(result).count('unanswerable')/500)

0.0


In [None]:
gtlist = [x['answerable'] for x in data_train]

# Save the accuracies
acc_list = []
i = 0

# Compute accuracy for each image
for pred in result:

    # Get the GT answer list and preprocess
    gt_ans = gtlist[i] 
    gt_ans = [x['answer'] for x in gt_ans]
    gt_ans = [x.lower() for x in gt_ans]

    # Compute accuracy (compare with at least 3 human answers)
    cur_acc = np.minimum(1.0, gt_ans.count(pred)/3.0)

    acc_list.append(cur_acc)
    i +=1

print ('Accuracy: {}'.format(round(np.mean(acc_list), 2)))

In [63]:
#All answers
gtlist = [x['answerable'] for x in data_train]

#save the scores
y_test = []
pred = []

for i in range(0, 1000):
  y_test.append(gtlist[i])
  pred.append(results[i])

y_test = np.array(y_test)
pred = np.array(pred)

average_precision = average_precision_score(y_test, pred)

print("AP: {}".format(round(100*average_precision, 4)))

IndexError: ignored

In [None]:
embedding_model = Sequential()
embedding_model.add(Embedding(X_VIT_train.shape[0], X_VIT_train.shape[1], embeddings_initializer = Constant(X_VIT_train),	trainable = False))

image_model = Sequential()
image_model.add(Dense(X_VIT_train.shape[1],input_dim=4864,activation='linear'))
image_model.add(Reshape((1,X_VIT_train.shape[1])))

main_model = Sequential()
main_model.add(Concatenate([image_model,embedding_model]))
main_model.add(LSTM(1001))
main_model.add(Dropout(0.5))
main_model.add(Dense(1001,activation='sigmoid'))

# Compile and summarize model
main_model.compile(loss = 'binary_crossentropy', optimizer ='adam',metrics = ["accuracy",f1_m,precision_m, recall_m])
# main_model.summary()

# Train/save best model
filepath = "SimpleRNN_EM_model.h1"
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', mode = "min", verbose =1, save_best_only = True)

print(X_train.shape,y_b_train.shape,X_val.shape,y_b_val.shape)

hist=main_model.fit(X_train,y_b_train,epochs=20,batch_size=128,validation_data=(X_val, y_b_val),callbacks = [checkpoint],verbose = 1 )

In [None]:
EMBEDDING_SIZE = 500
embedding_layer = Embedding(int(X_VIT_train.shape[1]/2), int(X_VIT_train.shape[1]/2),embeddings_initializer= Constant(int(X_VIT_train.shape[1]/2)),trainable=False)

int_sequences_input = Input(shape=(None,), dtype="int64")
embedded_sequences = embedding_layer(int_sequences_input)
x = layers.Bidirectional(layers.SimpleRNN(100, return_sequences=True))(embedded_sequences)
x = layers.Bidirectional(layers.SimpleRNN(100))(x)
preds = layers.Dense(1, activation="sigmoid")(x)
model = Model(int_sequences_input, preds)

# summarize the model
model.summary()
model.compile(loss = 'binary_crossentropy', optimizer ='adam',metrics = ["accuracy",f1_m,precision_m, recall_m])

# Train and save the best model
filepath = "SimpleRNN_EM_model.h1"
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', mode = "min", verbose =1, save_best_only = True)

print(X_train.shape,y_b_train.shape,X_val.shape,y_b_val.shape)

hist=model.fit(X_train,y_b_train,epochs=20,batch_size=128,validation_data=(X_val, y_b_val),callbacks = [checkpoint],verbose = 1 )

In [None]:
#predict the test and convert the idnex to the label class
print(X_test)
result=model.predict(X_test)
y_test_pred = np.argmax(result, axis=1)
y_test_gold= np.argmax(y_val, axis=1)
result=encoder.inverse_transform(y_test_pred)

In [None]:
# convert the text feature into discrete value
encoder=LabelEncoder()
# all_possible_answers=list(all_possible_answers)
all_possible_answers=list(set(all_possible_answers))
answers_set=encoder.fit_transform(all_possible_answers)
y_train=encoder.transform(y_train)
y_val=encoder.transform(y_val)