In [2]:
# dependencies
import kagglehub
from kagglehub import KaggleDatasetAdapter
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
import keras_tuner
import os
import requests
from tqdm import tqdm
import matplotlib.pyplot as plt

In [2]:
# Set the path to the file you'd like to load
file_path = "products.csv"

# Load the latest version
df = kagglehub.dataset_load(
    KaggleDatasetAdapter.PANDAS,
    "poorveshchaudhari/amazon-fashion-products",
    file_path
)

In [3]:
df = df.dropna(subset=["image_url", "rating"])
df = df[["image_url", "rating"]]
train_df, test_df = train_test_split(df, test_size = 0.2, random_state = 42)
train_df, val_df = train_test_split(train_df, test_size = 0.2, random_state = 42)

In [None]:
IMG_DIR = "images"
os.makedirs(IMG_DIR, exist_ok=True)

def download_images(df):
    for i, url in tqdm(enumerate(df["image_url"]), total=len(df)):
        filename = os.path.join(IMG_DIR, f"{i}.jpg")
        if not os.path.exists(filename):  # skip if already downloaded
            try:
                r = requests.get(url, timeout=10)
                if r.status_code == 200:
                    with open(filename, "wb") as f:
                        f.write(r.content)
            except Exception as e:
                print(f"Error downloading {url}: {e}")

download_images(df)

In [None]:
# prepare the data

IMG_SIZE = (128, 128)
BATCH_SIZE = 32

def fetch_images(path, label):
    path = path.numpy().decode('utf-8')

    try:
        img = tf.keras.utils.load_img(path, target_size = IMG_SIZE)
        img = tf.keras.utils.img_to_array(img) / 255.0 # since every pixel has values [0, 255], dividing by 255 normalizes them to [0, 1]
    except Exception:
        img = np.zeros((*IMG_SIZE, 3), dtype = np.float32)

    return img.astype(np.float32), np.float32(label)

def tf_fetch_image(url, label):
    img, lbl = tf.py_function(fetch_images, [url, label], [tf.float32, tf.float32])
    img.set_shape((*IMG_SIZE, 3))
    lbl.set_shape(())

    return img, lbl

def make_datasets(sub_dataframe):
    paths = [os.path.join(IMG_DIR, f"{i}.jpg") for i in sub_dataframe.index]

    ds = tf.data.Dataset.from_tensor_slices((paths, sub_dataframe["rating"].values))
    ds = ds.map(tf_fetch_image, num_parallel_calls = tf.data.AUTOTUNE)
    ds = ds.shuffle(1000).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    return ds

train_ds, val_ds, test_ds = make_datasets(train_df), make_datasets(val_df), make_datasets(test_df)

In [None]:
# here we can visualize the data (optional)

"""
the first for loop displays:

(32, 224, 224, 3) [3.3, ...., 1.5]

Here we have:
- a batch of 32 images
- 224x224 each (and their 3 respetive channels)
- [3.3, ..., 1.5] the array of ratings for each image
"""

for img, lbl in train_ds.take(1):
    print(img.shape, lbl.numpy())

"""
second for loop shows the values for each pixel

For each image we have:
- first axis: rows (height = 224)
- second axis: columns (width = 224)
- third axis: channels (RGB = 3)
"""
for imgs, labels in train_ds.take(1):
    print("Image shape:", imgs[1].shape)
    print("Pixel values (first image):")
    print(imgs[1].numpy())


In [None]:
def r2_score(y_true, y_pred):
    ss_res = tf.reduce_sum(tf.square(y_true - y_pred))
    ss_tot = tf.reduce_sum(tf.square(y_true - tf.reduce_mean(y_true)))
    return 1 - ss_res / (ss_tot + tf.keras.backend.epsilon())

def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))

In [None]:
def build_cnn_model(hp):

    model = tf.keras.Sequential()

    # input layer for images
    model.add(tf.keras.layers.Input(shape=(128, 128, 3)))

    # hyperparameter: number of conv layers, 1–3
    for i in range(hp.Int("num_conv_layers", 1, 3)):

        model.add(tf.keras.layers.Conv2D(
            filters = hp.Int(f"filters_{i}", min_value = 32, max_value = 128, step = 32),
            kernel_size = hp.Choice("kernel_size", values = [3, 5]),
            activation = "relu",
            padding = "same"
        ))
        model.add(tf.keras.layers.MaxPooling2D(pool_size=2))

    # flatten to connect with dense layers
    model.add(tf.keras.layers.Flatten())

    # hyperparameter: number of dense layers, 0–2
    for j in range(hp.Int("num_dense_layers", 0, 2)):
        model.add(tf.keras.layers.Dense(
            units = hp.Int(f"units_dense_{j}", min_value = 64, max_value = 256, step = 64),
            activation = hp.Choice("activation", ["relu", "tanh"])
        ))

    # output layer (regression on rating)
    model.add(tf.keras.layers.Dense(1, activation="linear"))

    # hyperparameter: learning rate
    hp_lr = hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4])

    model.compile(
        optimizer = tf.keras.optimizers.Adam(learning_rate=hp_lr),
        loss = "mse",
        metrics = ["mae", rmse, r2_score]
    )

    return model


In [None]:
build_cnn_model(keras_tuner.HyperParameters())

In [None]:
tuner = keras_tuner.RandomSearch(
    hypermodel = build_cnn_model,
    objective = "val_mae",
    max_trials = 10,
    directory = "my_dir",
    project_name = "cnn_tuning",
    overwrite = True
)

tuner.search_space_summary()

In [None]:
tuner.search(
    train_ds,
    validation_data = val_ds,
    epochs = 5,
    verbose=0
)

In [None]:
tuner.results_summary()

In [None]:
# extract the best model
best_model = tuner.get_best_models(num_models = 1)[0]
best_model.summary()

In [None]:
loss, mae, rmse_val = best_model.evaluate(test_ds)
print(f"Test Loss: {loss:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test RMSE: {rmse_val:.4f}")