### Install and prep CLIP lib

Interacting with CLIP nb: https://github.com/openai/CLIP/blob/main/notebooks/Interacting_with_CLIP.ipynb

In [None]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

In [None]:
import numpy as np
import torch
from pkg_resources import packaging

print("Torch version:", torch.__version__)

In [None]:
import clip

clip.available_models()

### Dataset filtering

In [None]:
import pandas as pd

# for image-text embeddings, for image embeddings

df_og = pd.read_pickle("../input/merged-df-with-gold/merged_df_with_gold_freq1.pkl")
# occhio con la freq=2

In [None]:
def anonymize_and_fix_amps(tweet):
    toks = str(tweet).split()
    for idx,t in enumerate(toks):
        if t[0] == '@':
            toks[idx] = '@USER'
        if t == '&amp;':
            toks[idx] = '&'
    return ' '.join(toks)

df_og['tweet'] = df_og['tweet'].apply(anonymize_and_fix_amps)

In [None]:
df_gold = df_og[df_og.T_Surprise.isnull() == False]

In [None]:
df_gold = df_gold[df_gold.M_gold_multi_label.str.len() != 0]

In [None]:
len(df_gold)

### Evaluate silver labels

In [None]:
def remove_neutral_smt(labels):
    new_l=[]
    for e in labels:
        if e != "Something else" and e != "Neutral":
            new_l.append(e)
    return new_l

In [None]:
df_gold.M_gold_multi_label, df_gold.T_gold_multi_label

In [None]:
df_gold['M_gold_multi_label'] = df_gold['M_gold_multi_label'].apply(remove_neutral_smt)

In [None]:
df_gold['T_gold_multi_label'] = df_gold['T_gold_multi_label'].apply(remove_neutral_smt)

In [None]:
df_gold.iloc[0].T_gold_multi_label, df_gold.iloc[0].multi_label

In [None]:
from sklearn.metrics import classification_report

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score


#y_train_silvgold = merged_all_labels['M_gold_multi_label']

y_1 = df_gold['M_gold_multi_label']
y_2 = df_gold['T_gold_multi_label']


mlb = MultiLabelBinarizer()
#yt_silvgold = mlb.fit_transform(y_train_silvgold)
y_1 = mlb.fit_transform(y_1)
y_2 = mlb.fit_transform(y_2)
labels=mlb.classes_

#out_dict = classification_report(y_true=y_1, y_pred=y_2, target_names=labels,output_dict=True)
print(classification_report(y_true=y_2, y_pred=y_1, target_names=labels))

In [None]:
print(classification_report(y_true=y_1, y_pred=y_2, target_names=labels))

In [None]:
df = df_gold.explode('path_photos')

In [None]:
len(df)

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(df, test_size=0.20,shuffle=True, random_state=42)

In [None]:
df_og = df_og.explode('path_photos')

In [None]:
df = df_gold

In [None]:
len(df)

In [None]:
df_notest = pd.concat([df_og,test])
df_notest = df_notest.drop_duplicates(subset='path_photos', keep=False).reset_index(drop=True)

In [None]:
len(df_notest)

In [None]:
# maiuscola anche per le silver label, per uniformità

def capitalize_emo(labels):
    return [y.capitalize() for y in labels]

df_notest['multi_label'] = df_notest['multi_label'].apply(lambda x: capitalize_emo(x))

In [None]:
# gold labels + silver labels

from copy import deepcopy

merged_all_labels = deepcopy(df_notest)
for idx, row in merged_all_labels.iterrows():
    curr = merged_all_labels.iloc[idx]
    
    if type(curr.T_gold_multi_label) == float or (type(curr.T_gold_multi_label) == list and len(curr.T_gold_multi_label) == 0):
        merged_all_labels.at[idx, "T_gold_multi_label"] = curr.multi_label
    if type(curr.M_gold_multi_label) == float or (type(curr.M_gold_multi_label) == list and len(curr.M_gold_multi_label) == 0):
        merged_all_labels.at[idx, "M_gold_multi_label"] = curr.multi_label

### With base model

In [None]:
import clip
import torch
import numpy as np 

device = "cuda:0" if torch.cuda.is_available() else "cpu" 
model, preprocess = clip.load("ViT-B/32")
model.cuda().eval()
input_resolution = model.visual.input_resolution
context_length = model.context_length
vocab_size = model.vocab_size

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in model.parameters()]):,}")
print("Input resolution:", input_resolution)
print("Context length:", context_length)
print("Vocab size:", vocab_size)

In [None]:
print(f'Cuda is available: {torch.cuda.is_available()}')
print(f'Available devices: {torch.cuda.device_count()}')
print(f'Device name: {torch.cuda.get_device_name()}')

from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, ConcatDataset
from torch.utils.data import TensorDataset, random_split, Dataset
from torch import nn, optim

from PIL import Image
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from numpy import cumsum
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import random

seed = 0
random.seed = seed
np.random.seed(seed)

### Or with fine-tuned model

In [None]:
del model

In [None]:
import clip

device = "cuda:0" if torch.cuda.is_available() else "cpu" 

model, preprocess = clip.load("ViT-B/32",device=device,jit=False) #Must set jit=False for training
checkpoint = torch.load("../input/clip-ft-256-1-lr5/clip-ft-256-1-lr5.pt")

# Use these 3 lines if you use default model setting(not training setting) of the clip. For example, if you set context_length to 100 since your string is very long during training, then assign 100 to checkpoint['model_state_dict']["context_length"] 
#checkpoint['model_state_dict']["input_resolution"] = model.input_resolution #default is 224
#checkpoint['model_state_dict']['context_length'] = model.context_length # default is 77
#checkpoint['model_state_dict']['vocab_size'] = model.vocab_size 

model.load_state_dict(checkpoint['model_state_dict'])

-----

In [None]:
device = "cuda:0" if torch.cuda.is_available() else "cpu" 

In [None]:
import tensorflow as tf

#### Obtain and preprocess **multimodal X** values (aka **image-text embeddings**)

In [None]:
from PIL import Image

def extract_and_concatenate(text, image):
    image = preprocess(Image.open(image)).unsqueeze(0).to(device)
    image_features = model.encode_image(image)

    text = clip.tokenize([text], truncate=True).to(device)
    text_features = model.encode_text(text)

    with torch.no_grad():
        concat = tf.concat(values=[image_features.cpu(), text_features.cpu()], axis=1)
    
    return concat

images = df.path_photos.values
texts = df.tweet.values

import tensorflow as tf

X = []

for idx,p in enumerate(images):
    curr_img = images[idx]
    curr_txt = texts[idx]
    conc = extract_and_concatenate(curr_txt, curr_img)
    X.append(conc)

In [None]:
import pickle

with open('clip-ft-64-2-silvergold.pkl', 'wb') as b:
    pickle.dump(X_train,b)

In [None]:
images_test = test.path_photos.values
texts_test = test.tweet.values

import tensorflow as tf

X_test = []

for idx,p in enumerate(images_test):
    curr_img = images_test[idx]
    curr_txt = texts_test[idx]
    conc = extract_and_concatenate(curr_txt, curr_img)
    X_test.append(conc)

In [None]:
images_test = test.path_photos.values
texts_test = test.tweet.values

import tensorflow as tf

X_test = []

for idx,p in enumerate(images_test):
    curr_img = images_test[idx]
    curr_txt = texts_test[idx]
    conc = extract_and_concatenate(curr_txt, curr_img)
    X_test.append(conc)

In [None]:
len(X_test)+len(X_train)+len(X_train_gold)

# X_train sia silv che gold
# X_train_gold solo gold
# X_test solo gold

#### Obtain and preprocess **textual X** values (aka **text embeddings**)

In [None]:
del texts
del X

In [None]:
import tensorflow as tf

def extract_textual(text):
    text = clip.tokenize([text], truncate=True).to(device)
    text_features = model.encode_text(text)

    with torch.no_grad():
        text_fts = tf.convert_to_tensor(text_features.cpu().numpy())
    
    return text_fts

texts = df.tweet.values

X = []

for idx,p in enumerate(texts):
    curr_txt = texts[idx]
    features = extract_textual(curr_txt)
    X.append(features)

In [None]:
texts_2 = test.tweet.values

X_test = []

for idx,p in enumerate(texts_2):
    curr_txt = texts[idx]
    features = extract_textual(curr_txt)
    X_test.append(features)

#### Obtain and preprocess **visual X** values (aka **image embeddings**)

In [None]:
del X

In [None]:
from PIL import Image

def extract_visual(image):
    image = preprocess(Image.open(image)).unsqueeze(0).to(device)
    image_features = model.encode_image(image)

    with torch.no_grad():
        img_fts = tf.convert_to_tensor(image_features.cpu().numpy())
    
    return img_fts

images = df.path_photos.values

X = []

for idx,image in enumerate(images):
    features = extract_visual(image)
    X.append(features)

-----

In [None]:
X_train = np.squeeze(np.asarray([i.numpy() for i in X_train]))
X_test = np.squeeze(np.asarray([i.numpy() for i in X_test]))
X_train_gold = np.squeeze(np.asarray([i.numpy() for i in X_train_gold]))


In [None]:
X_train = np.squeeze(np.asarray([i.numpy() for i in X_train]))
X_test = np.squeeze(np.asarray([i.numpy() for i in X_test]))

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer

#y_train_silvgold = merged_all_labels['M_gold_multi_label']
y_train = train['M_gold_multi_label']
y_test = test['T_gold_multi_label']


# X_train sia silv che gold
# X_train_gold solo gold
# X_test solo gold

mlb = MultiLabelBinarizer()
#yt_silvgold = mlb.fit_transform(y_train_silvgold)
yt_train = mlb.fit_transform(y_train)
yt_test = mlb.fit_transform(y_test)

In [None]:
len(X)

In [None]:
from sklearn.preprocessing import MultiLabelBinarizer


X = np.squeeze(np.asarray([i.numpy() for i in X]))

y = df['T_gold_multi_label']

mlb = MultiLabelBinarizer()
yt = mlb.fit_transform(y)

In [None]:
len(y_train_silv), len(yt_test)

In [None]:
len(X), len(yt)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, yt, test_size=0.15,shuffle=True, random_state=42)

In [None]:
from keras import backend as K
from keras.layers import Dropout
from keras.models import Model
from keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam, schedules, SGD
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from keras.models import Sequential
from keras.layers import Dense, Input, LSTM
from keras.utils.vis_utils import plot_model
import tensorflow as tf 

tf.config.run_functions_eagerly(True)

#keras custom f1_score metric
def f1(y_true, y_pred):
    def recall(y_true, y_pred):
        """Recall metric."""

        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        """Precision metric."""

        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision
        
    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

model_1 = Sequential(name="ANN-1")
model_1.add(Input(shape=(512,), name='input'))  # is 512 when using only visual or textual embeddings
                                 # & 1024 when using multimodal embeddings
model_1.add(Dense(400, activation='relu', name="feedforward_1"))
model_1.add(Dropout(0.4, name='dropout_0.4'))

model_1.add(Dense(200, activation='relu', name="feedforward_2"))
model_1.add(Dropout(0.2, name="dropout_0.2"))
model_1.add(Dense(10, activation='sigmoid', name="output"))
model_1.output_shape

#opt = tf.keras.optimizers.SGD(learning_rate=0.002, nesterov=True)
model_1.compile(loss='binary_crossentropy',
              optimizer="adam",
              metrics=[f1, "accuracy"],)

In [None]:
for l in model_1.layers:   
    l.trainable = False

In [None]:
model_1.add(Dense(10, activation='sigmoid', name="output2"))

In [None]:
[l.trainable for l in model_1.layers]

In [None]:
model_1.layers

In [None]:
from keras import backend as K
from keras.layers import Dropout
from keras.models import Model
from keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam, schedules, SGD
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from keras.models import Sequential
from keras.layers import Dense, Input, LSTM
from keras.utils.vis_utils import plot_model
import tensorflow as tf 

tf.config.run_functions_eagerly(True)

#keras custom f1_score metric
def f1(y_true, y_pred):
    def recall(y_true, y_pred):
        """Recall metric."""

        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        """Precision metric."""

        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision
        
    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, yt, test_size=0.15,
                                                   shuffle=True, random_state=42)

del model_1
model_1 = Sequential(name="ANN-1")
model_1.add(Input(shape=(1024,), name='input'))  # is 512 when using only visual or textual embeddings
                                 # & 1024 when using multimodal embeddings
model_1.add(Dense(400, activation='relu', name="feedforward_1"))
model_1.add(Dropout(0.4, name='dropout_0.4'))

model_1.add(Dense(200, activation='relu', name="feedforward_2"))
model_1.add(Dropout(0.2, name="dropout_0.2"))
model_1.add(Dense(8, activation='sigmoid', name="output"))
model_1.output_shape

#opt = tf.keras.optimizers.SGD(learning_rate=0.002, nesterov=True)
model_1.compile(loss='binary_crossentropy',
              optimizer="adam",
              metrics=[f1, "accuracy"],)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, yt, test_size=0.15,shuffle=True, random_state=42)

model_2 = Sequential(name="ANN-1")
model_2.add(Input(shape=(1024,), name='input'))  # is 512 when using only visual or textual embeddings
                                 # & 1024 when using multimodal embeddings


model_2.add(Dense(20, activation='relu', name="feedforward_2"))
model_2.add(Dropout(0.2, name="dropout_0.2"))
model_2.add(Dense(10, activation='sigmoid', name="output"))
model_2.output_shape

#opt = tf.keras.optimizers.SGD(learning_rate=0.002, nesterov=True)
model_2.compile(loss='binary_crossentropy',
              optimizer="adam",
              metrics=[f1, "accuracy"],)

In [None]:
es = EarlyStopping(monitor='val_loss', patience=20, verbose=1)
BATCH = int(len(X_train)/10)

print('Train...')
model_1.fit(X_train, y_train,
          batch_size=BATCH,
          epochs=300,
          validation_split=0.15,
          shuffle=True,
          callbacks=[es]
          )

In [None]:
es = EarlyStopping(monitor='val_loss', patience=20, verbose=1)
BATCH = int(len(X_train)/10)

print('Train...')
model_silver.fit(X_train, yt_silv,
          batch_size=BATCH,
          epochs=300,
          validation_split=0.15,
          shuffle=True,
          callbacks=[es]
          )

In [None]:
predict= model_1.evaluate(x=X_test, y=y_test)

print("Keras F1")
print("ANN-1:")
print("Loss:"+str(predict[0]))
print("F1:"+str(predict[1]))
print("Accuracy:"+str(predict[2]))

from sklearn.metrics import f1_score

y_pred = model_1.predict([X_test], verbose=3)

result_1 = f1_score(y_true=y_test.round(), y_pred=y_pred.round(), labels=None, average="weighted")

from sklearn.metrics import classification_report
labels=mlb.classes_
print(classification_report(y_true=y_test.round(), y_pred=y_pred.round(), target_names=labels))

In [None]:
predict= model_silver.evaluate(x=X_test, y=yt_test)

print("Keras F1")
print("ANN-1:")
print("Loss:"+str(predict[0]))
print("F1:"+str(predict[1]))
print("Accuracy:"+str(predict[2]))

from sklearn.metrics import f1_score

y_pred = model_silver.predict([X_test], verbose=3)

result_1 = f1_score(y_true=yt_test.round(), y_pred=y_pred.round(), labels=None, average="weighted")
result_1

In [None]:
from sklearn.metrics import f1_score

y_pred = model_1.predict([X_test], verbose=3)

result_1 = f1_score(y_true=y_test.round(), y_pred=y_pred.round(), labels=None, average="weighted")
result_1

### Save the embeddings

In [None]:
import pickle

with open('clip-ft-256-1-lr5-visual.pkl', 'wb') as b:
    pickle.dump(X,b)