In [13]:
import tensorflow as tf
from tensorflow.keras.layers import (
    Dense,
    Concatenate,
    Input,
    Embedding,
    Lambda,
    GlobalAveragePooling2D,
)
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from sklearn.model_selection import KFold
import pandas as pd
import requests
from PIL import Image
from io import BytesIO
import numpy as np

In [14]:
user_df = pd.read_csv("../dataset/users.csv", delimiter=";")
artwork_df = pd.read_csv("../dataset/artworks.csv", delimiter=";")

In [15]:
def preprocess_image(image_url):
    response = requests.get(image_url)
    if response.status_code == 200:
        image = Image.open(BytesIO(response.content))
        image = image.convert("RGB")
        image = image.resize((224, 224))
        image_array = np.array(image)
        image_array = image_array / 255.0
        return image_array
    else:
        raise FileNotFoundError(
            f"Impossibile scaricare l'immagine dall'URL: {image_url}"
        )

In [16]:
def create_triples(user_df, artwork_df):
    triples = []
    for _, user_row in user_df.iterrows():
        user_interactions = artwork_df[artwork_df["author"] == user_row["name"]]
        if len(user_interactions) < 2:
            continue
        Pu = user_interactions.sample(frac=1).iloc[:-1]
        i = user_interactions.sample()
        non_interacted = artwork_df[~artwork_df["author"].isin([user_row["name"]])]
        j = non_interacted.sample()
        triples.append((Pu, i, j))
    return triples

In [17]:
def preprocess_data(triples, image_preprocessor):
    processed_triples = []

    for Pu, i, j in triples:
        Pu_images = np.array([image_preprocessor(url) for url in Pu["img"].values])
        i_image = np.array(image_preprocessor(i["img"].values[0]))
        j_image = np.array(image_preprocessor(j["img"].values[0]))
        processed_triples.append(
            (Pu_images, i_image, j_image,i.iloc[0])
        )

    return processed_triples

In [18]:
embedding_dim = 200
pu_dim = 400
margin = 0.4

In [19]:
resnet = ResNet50(weights="imagenet", include_top=False, pooling=None)
for layer in resnet.layers:
    layer.trainable = False


def extract_features(image):
    features = resnet(image)
    features = GlobalAveragePooling2D()(features)
    return features

In [20]:
def custom_reduce_sum(x, y):
    return K.sum(x * y, axis=1)

In [21]:
def custom_triplet_loss(y_true, y_pred, margin=0.4):
    score_i, score_j = tf.split(y_pred, num_or_size_splits=2, axis=-1)
    loss = tf.maximum(0.0, margin + score_j - score_i)
    return tf.reduce_mean(loss)

In [22]:
input_pu = Input(shape=(3, 224, 224, 3), name="input_pu")
input_i = Input(shape=(224, 224, 3), name="input_i")
input_j = Input(shape=(224, 224, 3), name="input_j")
pu_features = Lambda(
    lambda x: K.map_fn(lambda y: extract_features(y), x),
    output_shape=(None, embedding_dim),
)(input_pu)

i_features = extract_features(input_i)
j_features = extract_features(input_j)

dense_layer_1 = Dense(embedding_dim, activation="selu", name="dense_layer_1")
dense_layer_2 = Dense(embedding_dim, activation="selu", name="dense_layer_2")

reduced_pu = Lambda(
    lambda x: K.map_fn(lambda y: dense_layer_2(dense_layer_1(y)), x),
    output_shape=(None, embedding_dim),
)(pu_features)


reduced_i = dense_layer_2(dense_layer_1(i_features))
reduced_j = dense_layer_2(dense_layer_1(j_features))


average_pooled_pu = Lambda(lambda x: K.mean(x, axis=1), output_shape=(embedding_dim,))(
    reduced_pu
)
max_pooled_pu = Lambda(lambda x: K.max(x, axis=1), output_shape=(embedding_dim,))(
    reduced_pu
)

pooled_pu = Concatenate()([average_pooled_pu, max_pooled_pu])


pu_dense_1 = Dense(300, activation="selu", name="pu_dense_1")(pooled_pu)
pu_dense_2 = Dense(200, activation="selu", name="pu_dense_2")(pu_dense_1)

final_pu = Dense(200, activation="selu", name="pu_dense_3")(pu_dense_2)

In [23]:
score_i = Lambda(lambda x: K.sum(x[0] * x[1], axis=1, keepdims=True))(
    [final_pu, reduced_i]
)
score_j = Lambda(lambda x: K.sum(x[0] * x[1], axis=1, keepdims=True))(
    [final_pu, reduced_j]
)
output_scores = Concatenate(axis=-1)([score_i, score_j])

In [24]:
def build_curatornet_model():
    tf.keras.backend.clear_session()
    curatornet = Model(
        inputs=[
            input_pu,
            input_i,
            input_j,
        ],
        outputs=output_scores,
    )
    return curatornet

# curatorenet_model = build_curatornet_model()
# curatorenet_model.compile(optimizer="adam", loss=custom_triplet_loss)
# curatorenet_model.summary()

In [25]:
def prepare_inputs(
    processed_triples, users_df, target_image_count=3, expected_length=183
):
    inputs = {
        "input_pu": [],
        "input_i": [],
        "input_j": [],
    }

    for pu_images, i_image, j_image, i_meta in processed_triples:
        user_features = users_df[users_df["name"] == i_meta["author"]].iloc[0]
        if pu_images.shape[0] < target_image_count:
            pad_width = target_image_count - pu_images.shape[0]
            pu_images = np.pad(
                pu_images, ((0, pad_width), (0, 0), (0, 0), (0, 0)), mode="constant"
            )
        elif pu_images.shape[0] > target_image_count:
            pu_images = pu_images[:target_image_count]

        inputs["input_pu"].append(pu_images)
        inputs["input_i"].append(i_image)
        inputs["input_j"].append(j_image)

    for key in inputs:
        try:
            if not isinstance(inputs[key], np.ndarray):
                inputs[key] = np.array(inputs[key])

            current_length = len(inputs[key])
            if current_length != expected_length:
                print(f"Adjusting {key} from {current_length} to {expected_length}")
                repeat_factor = max(1, expected_length // current_length)
                if repeat_factor > 1:
                    inputs[key] = np.tile(
                        inputs[key], (repeat_factor,) + (1,) * (inputs[key].ndim - 1)
                    )
                if len(inputs[key]) > expected_length:
                    inputs[key] = inputs[key][:expected_length]
        except Exception as e:
            print(f"Warning: Could not convert {key} to numpy array: {e}")
    return inputs

In [26]:
def convert_inputs_to_tensors(inputs):
    for key, value in inputs.items():
        if isinstance(value, np.ndarray) or isinstance(value, list):
            inputs[key] = tf.convert_to_tensor(value)
    return inputs

In [27]:
triples = create_triples(user_df, artwork_df)
preprocessed_triples = preprocess_data(triples, preprocess_image)
inputs = prepare_inputs(preprocessed_triples, user_df)
convert_inputs_to_tensors(inputs)

KeyboardInterrupt: 

In [None]:
n_splits = 5
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
y = np.zeros((len(triples), 1))
fold = 1
val_losses = []
val_accuracies = []
for train_index, test_index in kf.split(inputs["pu"]):
    print(f"Fold {fold}")
    print(f"Train Index: {train_index}")
    print(f"Test Index: {test_index}")
    X_train = {key: np.array(val)[train_index] for key, val in inputs.items()}
    X_test = {key: np.array(val)[test_index] for key, val in inputs.items()}
    y_train, y_test = y[train_index], y[test_index]

    print(f"divisione completata")

    model = build_curatornet_model()

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.000001),
        loss=custom_triplet_loss,
        metrics=["accuracy"],
    )

    print(f"compilazione completata")

    model.fit(X_train, y_train, epochs=10, verbose=0, batch_size=32)

    print(f"addestramento completato")

    val_loss, val_accuracy = model.evaluate(X_test, y_test, verbose=0)

    print(
        f"Fold {fold} - Validation Loss: {val_loss} - Validation Accuracy: {val_accuracy}"
    )

    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)
    fold += 1

average_val_loss = np.mean(val_losses)
print(
    f"Mean Validation Loss after {n_splits}-Fold Cross Validation: {average_val_loss}"
)