In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
import pandas as pd
import numpy as np
import pickle
from PIL import Image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions

from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from sklearn.metrics import f1_score

In [None]:
def macro_f1_score(y_true, y_pred):
    macro_f1 = f1_score(y_true, y_pred, average='macro')
    return macro_f1

In [None]:
labelsPath = '../data/labels_pd_pickle'
file = open(labelsPath,'rb')
data = pickle.load(file)

data.drop(columns=['text_ocr'])


taskA = data.take([0,2,7],axis=1)

one_hot_encoded = pd.get_dummies(taskA['overall_sentiment'],dtype=int)
taskA['ohe'] = one_hot_encoded.values.tolist()
labels = one_hot_encoded.columns.tolist()
taskA['text_corrected'] = taskA['text_corrected'].astype('string')
codes, uniques = pd.factorize(taskA['overall_sentiment'])
taskA['class'] = codes


print(taskA['class'].value_counts())
taskAbalanced = pd.concat([
    taskA[taskA['class'] == c].sample(n=151, replace=True) for c in taskA['class'].unique()
])
print(taskAbalanced['class'].value_counts())

taskAbalanced = taskAbalanced.sample(frac=1).reset_index(drop=True)

taskAbalanced.head()


In [None]:
taskAbalanced['overall_sentiment'].unique()

In [None]:
images_folder = '../data/images/'
images = []
df = taskAbalanced
for filename in df['image_name'].values:  # Zmień 'nazwa_pliku' na nazwę kolumny z nazwami plików w DataFrame
    img = Image.open(images_folder + filename)  # Tworzenie ścieżki do pliku obrazu
    img = img.resize((224, 224))  # Dopasowanie rozmiaru obrazu, jeśli to konieczne
    img = img.convert('RGB')  # Konwertuj do formatu RGB
    img_array = np.array(img)  # Konwersja obrazu PIL na tablicę numpy
    img_array = preprocess_input(img_array)
    images.append(img_array)  # Dodanie obrazu do listy
images = np.stack(images)

In [None]:
images[:].shape

In [None]:
import matplotlib.pyplot as plt
plt.imshow(images[1])
plt.show()
df.iloc[1]

In [None]:
# from sklearn.model_selection import train_test_split

# Xtrain, Xtest, ytrain, ytest = train_test_split(df['text_corrected'],df['class'],shuffle=False)
# Xtrain = np.asarray(Xtrain).astype(np.str)
# ytrain = np.asarray(ytrain).astype(np.int)
# Xtest = np.asarray(Xtest).astype(np.str)
# ytest = np.asarray(ytest).astype(np.int)

from sklearn.model_selection import KFold

kf = KFold(n_splits=5,shuffle=True, random_state=777)

trainFolds={}
testFolds={}

for i, (train_index, test_index) in enumerate(kf.split(df['text_corrected'])):
    trainFolds[i] = (df['text_corrected'][train_index].reset_index(drop=True),images[train_index], df['class'][train_index].reset_index(drop=True))
    testFolds[i]  = (df['text_corrected'][test_index].reset_index(drop=True),images[test_index] ,  df['class'][test_index].reset_index(drop=True))
    

In [None]:
def foldProccess(i,Xtrain, Xtest, ytrain, ytest, imagesTrain, imagesTest):    
    bert_preprocess = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3")
    bert_encoder = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4")
    text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
    preprocessed_text = bert_preprocess(text_input)
    bertOutputs = bert_encoder(preprocessed_text)
    l = tf.keras.layers.Dropout(0.1, name="dropout")(bertOutputs['pooled_output'])
#     base_model = ResNet50(weights='imagenet', include_top=False)
#     for layer in base_model.layers:
#         layer.trainable = False
#     #ResNet50
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(1024, activation='relu')(x)
#     #HEAD
#     concatenated = concatenate([x, l])
    
    
    connect = Dense(512, activation='relu')(l)
    l = tf.keras.layers.Dense(5, activation='softmax', name="output")(connect)
    # Use inputs and outputs to construct a final model
    model = tf.keras.Model(inputs=[text_input], outputs = [l])
    METRICS = [
        tf.keras.metrics.SparseCategoricalAccuracy(),
        tf.keras.metrics.SparseCategoricalCrossentropy()
    ]
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=METRICS)
    
    
    
    history = model.fit(Xtrain, ytrain, epochs=15,validation_split=0.1)#, callbacks=[early_stopping])
    model.save(f'models/cross4/{i}_multimod_bert_res50_e15_val0_1')
    model.evaluate(Xtest, ytest)
    ypred = model.predict(Xtest)
    z = np.argmax(ypred, axis=1)
    macro_f1 = macro_f1_score(ytest, z)
    print("Macro F1 Score:", macro_f1)

In [None]:
for i in range(0,5)[:]:
    Xtrain, imagesTrain, ytrain = trainFolds[i][0], trainFolds[i][1], trainFolds[i][2] 
    Xtest, imagesTest, ytest = testFolds[i][0], testFolds[i][1], testFolds[i][2]
    foldProccess(i,Xtrain,  Xtest, ytrain, ytest, imagesTrain, imagesTest)

In [None]:
model.save('models/multimod_bert_uni_e15_val0_1')

In [None]:
model.evaluate([Xtest, images[566:]], ytest)

In [None]:
ypred = model.predict([Xtest, images[566:]])
z = np.argmax(ypred, axis=1)

In [None]:
macro_f1 = macro_f1_score(ytest, z)
print("Macro F1 Score:", macro_f1)

In [None]:
history.history

In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
    # Wyświetlenie wartości straty dla zbioru treningowego i walidacyjnego
    plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['sparse_categorical_accuracy'], label='sparse_categorical_accuracy')
    plt.plot(history.history['sparse_categorical_crossentropy'], label='sparse_categorical_crossentropy')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss vs. Epoch')
    plt.legend()
    plt.show()
    
    # Wyświetlenie wartości dokładności dla zbioru treningowego i walidacyjnego
    #plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['val_loss'], label='val_loss')
    plt.plot(history.history['val_sparse_categorical_crossentropy'], label='val_sparse_categorical_crossentropy')
    plt.xlabel('Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy vs. Epoch')
    plt.legend()
    plt.show()
plot_history(history)