### Impostazione iniziale codice

In [None]:
%reset -f
#reset variabili 

import os
os.environ["KERAS_BACKEND"] = "tensorflow"
from keras import layers, models
import keras 
import os
import timm
import tensorflow
#os.environ['CURL_CA_BUNDLE'] = ''

import numpy
import json
import pickle
import torch
from matplotlib import pyplot as plt
numpy.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})

from transformers import AutoTokenizer, AutoModelForSequenceClassification




# variante di BERT preaddestrata in compiti similari
models = ["cardiffnlp/twitter-roberta-large-emotion-latest",
          #"MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli",
          "sileod/deberta-v3-small-tasksource-nli"
          ]


md = []
# ottenimento tokenizer e modello BERT pre-addestrato
for model in models:
    # codice per caricare tutto offline
    #model = ("models/"+model).replace("/","--")
    #cached = "C:\\Users\\USER\\.cache\\huggingface\\hub\\"
    #model = cached+model+"\\snapshots\\"
    #model = model +"\\"+os.listdir(model)[0] + "\\"
    tokenizer = AutoTokenizer.from_pretrained(model)#,local_files_only=True)
    model = AutoModelForSequenceClassification.from_pretrained(model,output_hidden_states=True)#,local_files_only=True)
    md.append((tokenizer,model))

# Estrazione features da CNN
# Mobilenetv3 fa preprocessing automatico

tf_models = [
    (keras.applications.MobileNetV3Large,'expanded_conv_14_squeeze_excite_avg_pool','imagenet'),
    ]
timm_models = [
    ("hf_hub:timm/mobilenetv4_hybrid_large.e600_r384_in1k","test"),
               ]
cnns = []
for model in tf_models:
    # Estrazione features https://keras.io/api/applications/#usage-examples-for-image-classification-models
    base_model = model[0](weights=model[2], include_top=False)
    pooling_layer = model[1]
    #base_model.summary(line_length = 200)
    #modello di estrazione di features dal global average pooling
    CNN = keras.models.Model(inputs=base_model.input, outputs=base_model.get_layer(pooling_layer).output)
    cnns.append((CNN,"tf"))

for model in timm_models:
    base_model = timm.create_model(model[0], pretrained=True,num_classes=0)
    base_model.eval()
    data_config = timm.data.resolve_model_data_config(base_model)
    transforms = timm.data.create_transform(**data_config, is_training=False)
    def cn(val):
        data = transforms(val).unsqueeze(0)
        return base_model(data)
    cnns.append((cn,"torch"))

base_path = os.path.dirname(os.path.abspath("__file__"))

### Impostazione path di base del dataset

In [None]:

trainpath = base_path + "\\train"
trainjson = trainpath+"\\train.json"
train_images = trainpath+"\\train_images\\"


testpath = base_path + "\\test"
testjson = testpath+"\\en_subtask2a_test_unlabeled.json"
test_images = testpath+"\\test_images\\"

validation_path = base_path + "\\validation"
validationjson = validation_path+ "\\validation.json"
validation_images = validation_path+"\\validation_images\\"

devpath = base_path + "\\dev"
devjson = devpath+"\\dev_subtask2a_en.json"
dev_images = devpath+"\\dev_images\\"


Task 2b

In [None]:

trainpath = base_path + "\\train"
trainjson = trainpath+"\\train_2b.json"
train_images = trainpath+"\\train_images\\"


testpath = base_path + "\\test"
testjson = testpath+"\\en_subtask2b_test_unlabeled.json"
test_images = testpath+"\\test_images\\"

validation_path = base_path + "\\validation"
validationjson = validation_path+ "\\val_2b.json"
validation_images = validation_path+"\\validation_images\\"

devpath = base_path + "\\dev"
devjson = devpath+"\\dev_subtask2b_en.json"
dev_images = devpath+"\\dev_images\\"

Estrazione info generiche da dataset di cui:
* numero classi
* probabilità varie(alcune scartate)
* label dele classi
* statistiche del numero di label per istanza

In [None]:
#getting the classes
task = 1
with open(trainjson,"r",encoding="utf8") as f:
    trainreadjson = json.load(f)

var = set()
meancalc = []
for elem in trainreadjson:
    if(task == 0):

        if len(var)>23:
            break
        for j in elem["labels"]:
            var.add(j)
    else:
        if len(var)>2:
            break
        var.add(elem["label"])
#print(var)
var = sorted(var)
prob = dict.fromkeys(var, 0)
if task == 0:
    prob_2 = dict.fromkeys(range(9),0)
else:
    prob_2 = dict.fromkeys(var, 0)
total = 0
for elem in trainreadjson:
    if(task == 0):
        prob_2[len(elem["labels"])] += 1
        for j in elem["labels"]:
            prob[j] += 1
            total += 1
    else:
        prob_2[elem["label"]] += 1
        prob[elem["label"]] += 1
        total += 1

for key in prob.keys():
    prob[key] = prob[key]/total

final_prob_vec = []
for elem in var:
    final_prob_vec.append(prob[elem])
print(prob_2)
for elem in prob_2.keys():
    prob_2[elem] = prob_2[elem]/total
print(prob_2)
#var


Estrazione embeddings testuali e unione di essi alle features dalle immagini dei vari dataset usati

In [None]:
def preprocess_embeddings(json,imagepath):
    leghjson = len(json)
    extracted_unit = 0
    for value in json:
        value["tokenized"] = []
        #value["context_full_embedding"] = []
        value["context_CLS_embedding"] = []
        value["context_no_CLS_embedding"] = []
        tokenized = None
        for mod in md:
            tokenizer = mod[0]
            model = mod[1]
            #print(value["labels"])
            # estrazione embeddings
            try:
                tokenized = tokenizer(value["text"],return_tensors="pt")
                v = model(**tokenized)
            except RuntimeError:
                print("cut")
                # handling di testi troppo lunghi, estraggo la parte centrale dato che si suppone la più importante
                leng_token = len(value["text"])//3
                tokenized = tokenizer(value["text"][leng_token:2*leng_token],return_tensors="pt")
                v = model(**tokenized)
            # estrazione ultimo layer di logits
            value["tokenized"].append(tokenized)
            last_layer = v.hidden_states[-1][0].detach().numpy()

            # salvataggio embedding perchè potrebbe tornare utile
            #value["context_full_embedding"].append(last_layer)

            # sono stati effettuati diversi test su quale embedding delle frasi fosse migliore e se escludere il CLS,
            # ma alla fine è stato utilizzato comunque quello
            value["context_CLS_embedding"].append(last_layer[0])

            # due embeddings in cui si fa la media e nell'altro la normalizzazione 
            # degli altri vettori dell'embedding senza il CLS
            #value["context_AVG_embedding"] = tensorflow.math.reduce_sum(last_layer[1:],axis=0)/numpy.mean(last_layer[1:],axis=0)
            last_layer = tensorflow.math.reduce_sum(last_layer[1:],axis=0)
            last_layer = last_layer/numpy.linalg.norm(last_layer,2)
            value["context_no_CLS_embedding"].append(last_layer)
        value["image_embedding"] = []
        for CNN in cnns:
            if(CNN[1] == "tf"):
                #estrazione degli embeddings dalle immagini
                image = keras.utils.img_to_array(keras.utils.load_img(imagepath+value["image"]))
                image = numpy.expand_dims(image, axis=0)
                image = CNN[0].predict(image,verbose=0)
        
                while type(image[0]) == numpy.ndarray:
                    image = image[0]
                value["image_embedding"].append(image)
            else:
                image =Image.open(imagepath+value["image"]).convert('RGB')
                image = CNN[0](image)
                image = image.detach().numpy()
                while type(image[0]) == numpy.ndarray:
                    image = image[0]
                value["image_embedding"].append(image)
            
        # estrazione dei label ove disponibili(il try catch serve per i casi in cui non ci sono i label)
        try:
            if(task == 0):
                
                value["y"] = [1 if(x in value["labels"]) else 0 for x in var]
                sum = numpy.sum(value["y"])
                value["y"] = value["y"]
                value["y_total"] = [1 if(x+1 == sum) else 0 for x in range(len(prob_2.keys()))]
            else:
                value["y"] = [1 if(x == value["label"]) else 0 for x in var]
        except:
            pass
        extracted_unit += 1
        #percentuale di completamento estrazione
        print(extracted_unit/leghjson)
# concatenazione embeddings testuali e di immagine e creazione metrice di input per le reti neurali
def convert_to_model(dataset,cls=True,old=False):

    if cls:
        val = "context_CLS_embedding"
    else:
        val = "context_no_CLS_embedding"
    if old:
        val = "context_AVG_embedding"
    few = dataset
    # concatenazione di tutti gli embeddings in modo da vedere se riesco a tirare fuori qualcosa da più modelli

    input_x = []
    for v in few:
        var = []
        for j in v[val]:
            var.append(j)
        for j in v["image_embedding"]:
            var.append(j)
        var = numpy.concatenate(var)
        var = var.flatten()
        input_x.append(var)

    input_x = numpy.array(input_x)
    try:
        y = numpy.array([v["y"] for v in few])
        if(task == 0):

            y_total = numpy.array([v["y_total"] for v in few])
        else:
            y_total = []
        return(input_x,y,y_total)
    except:
        return input_x

vera estrazione features e salvataggio in variabili
#### l'estrazione features è molto lunga, lo script fa in modo da salvare le features estratte in modo che l'estrazione sia fatta una volta sola

In [None]:
from pathlib import Path

file = Path("./objs.pkl")
if not file.exists():
    with open(trainjson,"r",encoding="utf8") as f:
        trainreadjson = json.load(f)
    train = trainreadjson#[0:trainsize]
    preprocess_embeddings(train,train_images)

    with open(testjson,"r",encoding="utf8") as f:
        testreadjson = json.load(f)
    test = testreadjson
    preprocess_embeddings(test,test_images)

    with open(validationjson,"r",encoding="utf8") as f:
        validreadjson = json.load(f)
    validat_x = validreadjson
    preprocess_embeddings(validat_x,validation_images)

    with open(devjson,"r",encoding="utf8") as f:
        devreadjson = json.load(f)
    dev_x = devreadjson
    preprocess_embeddings(dev_x,dev_images)
    # salvataggio oggetti in un file pickle in modo da non dover riestrarre tutto ogni volta
    with open('objs.pkl', 'wb') as f:  # Python 3: open(..., 'wb')
        pickle.dump([train, test, validat_x, dev_x], f)


In [None]:
with open("objs.pkl","rb") as f:  # Python 3: open(..., 'rb')
    train, test, validat_x, dev_x = pickle.load(f)

estrazione embedding testo+immagini dei diversi dataset

In [None]:
input_x,y,y_total = convert_to_model(train)
input_test = convert_to_model(test)
input_validation,y2,y_total2 = convert_to_model(validat_x)
input_dev,y3,y_total3 = convert_to_model(dev_x)

#normalizzazione non porta migliorie, quindi è stata scritta in modo che sia modificabile, ma non serve
#normalizer = layers.Normalization()
#normalizer.adapt(input_x)
normalizer = lambda x:x

#variabili di gestione reti neurali
anothervar = input_x.shape[1]
shapey = len(y[0])
if(task == 0):
    shapey_toy = len(y_total[0])

funzioni di creazione reti neurali

In [None]:

inpt = layers.Input(shape=(anothervar,))

def build_NN1():
    layers = keras.layers
    NN1 = keras.models.Model
    

    ##########
    l8 = layers.Dense(anothervar//6, activation='relu')(inpt)
    l8 = layers.Dropout(0.3)(l8)
    l8 = layers.Dense(shapey, activation='linear')(l8)
    simg_predictions = layers.Dense(shapey, activation='softmax')(l8)
    ##########
    NN1 = NN1(inpt,simg_predictions)
    return NN1

def build_NN2():
    
    NN2 = keras.models.Model
    add_layer = layers.Dense(anothervar//8, activation='relu')(inpt)
    l8 = layers.Dropout(0.3)(add_layer)
    l8 = layers.Dense(shapey_toy, activation='linear')(l8)
    sigm2_extractpredict = layers.Dense(shapey_toy, activation='softmax')(l8)
    NN2 = NN2(inpt,sigm2_extractpredict)
    return NN2




manipolazione dataset in modo da poter manipolare train-dev-validation

In [None]:
#input_x = input_validation
#y = y2
#y_total =y_total2

input_x = numpy.concatenate([input_x,input_dev])
y = numpy.concatenate([y,y3])
y_total = numpy.concatenate([y_total,y_total3])
input_x = numpy.concatenate([input_x,input_validation])
y = numpy.concatenate([y,y2])
y_total = numpy.concatenate([y_total,y_total2])

#input_validation = numpy.concatenate([input_dev,input_validation])
#y2 = numpy.concatenate([y3,y2])
#y_total2 = numpy.concatenate([y_total3,y_total2])

# ensemble di reti neurali performa peggio di utilizzarne solo 1
# il codice è per un ensemble con reti neurali con diversi dataset in allenamento,
# ma è stato impostato per usare una sola rete
div = 1
leng_input = len(input_x)//div

input_x = [input_x[j*leng_input:(j+1)*leng_input] for j in range(div)]
y = [y[j*leng_input:(j+1)*leng_input] for j in range(div)]
y_tot = [y_total[j*leng_input:(j+1)*leng_input] for j in range(div)]


In [None]:
# impostando il codice bene è possibile riallenare le reti
# la logica era però usa e getta e non è stata sviluppata completamente 

retrain = False
if not retrain:
    listt_NN1 = []
for j in range(div):
    iter = input_x[j]
    y_iter = y[j]
    if not retrain:
        NN1 = build_NN1()
        NN1.compile(loss=['binary_crossentropy'], optimizer='adam', metrics=['accuracy'])
        history = NN1.fit(x=normalizer(iter),y=y_iter,validation_data=(normalizer(input_validation),y2),batch_size=128, epochs=8)
        retrain=True
    else:
        NN1 = NN1
        
    #cerco di ridurre il più possibile la loss di validation
    NN1.compile(loss=['binary_crossentropy'], optimizer='sgd', metrics=['accuracy'])                                        #1000
    history = NN1.fit(x=normalizer(iter),y=y_iter,validation_data=(normalizer(input_validation),y2),batch_size=8500, epochs=1*(1+j))
    NN1.acc = history.history['val_accuracy']
    listt_NN1.append(NN1)

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

codice non utilizzato per il modello finale, ma su cui sono stati fatti dei test di predizione del numero di labels

In [None]:
#retrain = False
#if not retrain:
#    listt_NN2 = []
#or j in range(div):
#   if not retrain:
#       NN2 = build_NN2()
#   else:
#       NN2 = listt_NN2[j]
#   iter = input_x[j]
#   y_iter = y_tot[j]
#   NN2.compile(loss=['categorical_crossentropy'], optimizer='adam', metrics=['accuracy'])
##   history = NN2.fit(x=normalizer(iter),y=y_iter,validation_data=(normalizer(input_validation),y_total2),batch_size=128, epochs=3)
##   NN2.compile(loss=['categorical_crossentropy'], optimizer='sgd', metrics=['accuracy'])
##   history = NN2.fit(x=normalizer(iter),y=y_iter,validation_data=(normalizer(input_validation),y_total2),batch_size=8500, epochs=65)
##   listt_NN2.append(NN2)
##
#plt.plot(history.history['loss'])
#plt.plot(history.history['val_loss'])
#plt.title('model loss')
#plt.ylabel('loss')
#plt.xlabel('epoch')
#plt.legend(['train', 'val'], loc='upper left')
#plt.show()#

In [None]:
listt_NN1 = [NN1]

funzione di supporto per estrazione predizioni e per convertire i risultati in json per mandarli al semEval

In [None]:
task = 1
l=list(var)
def pred_to_json_ensemble(list_NN1,list_NN2,input_validation,validat_x,filename):
    inp = normalizer(input_validation)
    k = list_NN1[0].predict(inp,verbose=0)
    for v in list_NN1[1:]:
        k = k + v.predict(inp,verbose=0)
    res = k
    """
    k = list_NN2[0].predict(inp,verbose=0)
    for v in list_NN2[1:]:
        k = k + v.predict(inp,verbose=0)
    """
    res = (res,k)
    
    #res = (NN1.predict(inp,verbose=0),NN2.predict(inp,verbose=0))
    s = []
    for v,k in zip(res[0],res[1]):
        # prima provavo a prevedere il numero di classi in base ad una rete neurale secondaria
        """
        c=(numpy.argmax(k)+2)
        if c > len(k):
            c = k-1
        if c == 0:
            c=1
        ind = numpy.argpartition(v,-c)[-c:]
        fin = [1 if x in ind else 0 for x in range(len(v))]
        """
        # treeshold calcolato in validazione, metodo che non usa la seconda neural network
        if task == 0:
            fin = [1 if v[x]> 0.215*div else 0 for x in range(len(v))]
        else:
            m = max(v)
            fin = [1 if m == v[x] else 0 for x in range(len(v))]
        
        s.append(fin)
        
    s = numpy.array(s)
    k = numpy.array([((lambda mx:[1 if mx==x else 0 for x in range(6)]) ((lambda v:numpy.argmax(v))(j))) for j in res[1]])
    newdict = []
    for v,j in zip(validat_x,s):
        dic = {}
        labelcontainer = []
        i = 0
        while i<len(l):
            if(j[i]==1):
                #print(l[i])
                labelcontainer.append(l[i])
            i+=1
        
        if task == 0:
            dic["labels"] = labelcontainer
        else:
            dic["label"] = labelcontainer[0]
        dic["id"]=v["id"]
        dic["text"]=v["text"]
        newdict.append(dic)
    with open(filename, "w") as outfile: 
        json.dump(newdict, outfile)

In [None]:
listt_NN2 = []

In [None]:
pred_to_json_ensemble(listt_NN1,listt_NN2,input_validation,validat_x,"./validation/predictions.json")
pred_to_json_ensemble(listt_NN1,listt_NN2,input_test,test,"predictions.json.txt")

codice per modello di rete neurale utilizzato/salvato al semEval, ho utilizzato una tipologia di allenamento custom, su cui ho avuto fortuna nell'effettuare manipolazione del dataset

In [None]:
#import keras
#NN1 = keras.saving.load_model("NN1.keras")
#NN2 = keras.saving.load_model("NN2.keras")
#listt_NN1 = [NN1]
#listt_NN2 = [NN2]
#pred_to_json_ensemble(listt_NN1,listt_NN2,input_validation,validat_x,"predictions.json")
#pred_to_json_ensemble(listt_NN1,listt_NN2,input_test,test,"predictions.json.txt")

funzioni dai salvataggio dei modelli keras quando riuscivo a raggiungere un buon minimo in validazione

In [None]:
#NN1.save("NN1.keras")
#NN2.save("NN2.keras")

In [None]:
#NN1.summary()