<a href="https://www.kaggle.com/code/nadaarfaoui/amazon-electronics-visual-search-with-cnn?scriptVersionId=289233174" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

**Problem:** Directly classifying thousands of product SKUs is difficult — many items look similar and per-SKU accuracy suffers.

**Solution (two stages):**

1. **Brand classification (coarse):** Train a CNN to predict the product brand (fewer classes, easier to learn).
2. **Similarity search (fine):** Within the predicted brand, compare embeddings (cosine similarity) to find the most visually similar product.

**Why it works:** Fewer classes for the CNN (brand-level) and an embedding-based nearest-neighbor step for fine-grained retrieval — this improves accuracy and scalability.

**When to use:** When the dataset has many visually-similar product variants and a brand-level filter meaningfully reduces search space.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv("/kaggle/input/merged-amazon-electronics-dataset/merged_electronics_dataset.csv", on_bad_lines='skip')

print(df.head())
print(df.tail())
print(df.shape)


In [None]:
print(df.columns)


In [None]:
# Create 'brand' column by taking the first word of 'name'
df['brand'] = df['name'].apply(lambda x: str(x).split()[0])

# Check
print(df[['name', 'brand']].head())


In [None]:
print(df.columns)


In [None]:
df_clean = df.dropna(subset=['image', 'brand','name']).reset_index(drop=True)
print(df_clean.shape)

In [None]:
df_clean = df_clean[['brand', 'image','name']]


In [None]:
#download images
import os
import requests
from tqdm import tqdm

# Folder to save images
image_dir = '/kaggle/working/images'
os.makedirs(image_dir, exist_ok=True)

# Download images
df_clean['image_path'] = None  # new column for local image path

for idx, row in tqdm(df_clean.iterrows(), total=df_clean.shape[0]):
    url = row['image']
    if pd.isna(url):
        continue
    try:
        response = requests.get(url, timeout=5)
        ext = url.split('.')[-1].split('?')[0]  # get jpg/png extension
        file_path = os.path.join(image_dir, f"{idx}.{ext}")
        with open(file_path, 'wb') as f:
            f.write(response.content)
        df_clean.at[idx, 'image_path'] = file_path
    except:
        pass

# Keep only rows where download succeeded
df_clean = df_clean.dropna(subset=['image_path']).reset_index(drop=True)
print(df_clean.shape)


In [None]:
import os
import numpy as np
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm

# Correct imports for loading and converting images
from tensorflow.keras.utils import load_img, img_to_array


In [None]:
# Encode brands
le_brand = LabelEncoder()
df_clean['brand_encoded'] = le_brand.fit_transform(df_clean['brand'])
num_brands = df_clean['brand_encoded'].nunique()
print("Number of brands:", num_brands)


In [None]:
# Convert brand_encoded to strings
df_clean['brand_encoded_str'] = df_clean['brand_encoded'].astype(str)


In [None]:
from PIL import Image

valid_paths = []
for path in df_clean['image_path']:
    try:
        img = Image.open(path)
        img.verify()  # checks if image can be opened
        valid_paths.append(path)
    except:
        pass

# Keep only rows with valid images
df_clean = df_clean[df_clean['image_path'].isin(valid_paths)].reset_index(drop=True)
print("Number of valid images:", len(df_clean))


In [None]:
# Re-encode brands after filtering valid images
from sklearn.preprocessing import LabelEncoder
le_brand = LabelEncoder()
df_clean['brand_encoded'] = le_brand.fit_transform(df_clean['brand'])
df_clean['brand_encoded_str'] = df_clean['brand_encoded'].astype(str)
num_brands = df_clean['brand_encoded'].nunique()
print('Number of brands after re-encoding:', num_brands)

# Prepare image data generators
# Data augmentation for brand classification
datagen = ImageDataGenerator(
    rescale=1./255,
    validation_split=0.2,  # 80% train, 20% validation
    horizontal_flip=True,
    rotation_range=20,
    zoom_range=0.2
)

train_gen = datagen.flow_from_dataframe(
    dataframe=df_clean,
    x_col='image_path',
    y_col='brand_encoded_str',  # use string labels
    target_size=(224,224),
    batch_size=32,
    class_mode='categorical',   # categorical works with string labels
    subset='training'
)

val_gen = datagen.flow_from_dataframe(
    dataframe=df_clean,
    x_col='image_path',
    y_col='brand_encoded_str',
    target_size=(224,224),
    batch_size=32,
    class_mode='categorical',
    subset='validation'
)

#CNN will learn to classify brands, not individual products yet.

In [None]:
#build the cnn transfer learning
# Pretrained CNN as feature extractor
base_model = EfficientNetB0(include_top=False, input_shape=(224,224,3), weights='imagenet')
base_model.trainable = False  # freeze pretrained layers

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.5)(x)
output = Dense(num_brands, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=output)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()


In [None]:
# After filtering invalid images
num_brands = df_clean['brand_encoded_str'].nunique()
print("Number of brands after filtering:", num_brands)

# Rebuild the model output layer
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

base_model = EfficientNetB0(include_top=False, input_shape=(224,224,3), weights='imagenet')
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.5)(x)
output = Dense(num_brands, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=output)

# Compile
model.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10,
    steps_per_epoch=train_gen.samples // train_gen.batch_size,
    validation_steps=val_gen.samples // val_gen.batch_size
)

#After training, the CNN can predict the brand from a new product image

In [None]:
# Save the entire CNN model (architecture + weights)
model.save('/kaggle/working/brand_cnn_model.h5')


In [None]:
import pickle

# Save LabelEncoder / artifacts to Kaggle working directory
with open('/kaggle/working/brand_encoder.pkl', 'wb') as f:
    pickle.dump(le_brand, f)


**CNN for Brand Classification**

- **Backbone:** EfficientNetB0 (pretrained on ImageNet).
- **Head:** GlobalAveragePooling2D → Dropout → Dense(softmax).
- **Task:** predict the product *brand* (not individual product SKUs).

**Notes:** Transfer learning provides strong visual features from the pretrained backbone; the classification head (pooling, dropout, dense) is trained on your brand labels.

**Summary:** the model uses a pretrained convolutional backbone with a custom classification head for brand-level prediction.

In [None]:
import os
from PIL import Image

valid_files = []
for filename in os.listdir('/kaggle/working/images'):
    path = os.path.join('/kaggle/working/images', filename)
    try:
        img = Image.open(path)
        img.verify()  # Check if image is readable
        valid_files.append(path)
    except:
        print(f"Removing corrupted image: {path}")
        os.remove(path)  # Delete corrupted file immediately


In [None]:
df_clean = df_clean[df_clean['image_path'].isin(valid_files)].reset_index(drop=True)
print("Remaining valid images:", df_clean.shape[0])


In [None]:
# Keep required columns including image_path and brand for downstream steps
df_clean = df_clean[['name', 'image', 'image_path', 'brand']].copy()
print(df_clean.head())


In [None]:
import os

# Keep only rows with actual image files
df_clean = df_clean[df_clean['image_path'].apply(os.path.isfile)].reset_index(drop=True)

print("Remaining images:", df_clean.shape[0])


In [None]:
#Step 5: Extract image embeddings for similarity search
#We will create feature vectors for all images to compare images within the same brand.

from tensorflow.keras.preprocessing import image
import numpy as np
from tqdm import tqdm
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D

embedding_model = Model(inputs=base_model.input, outputs=GlobalAveragePooling2D()(base_model.output))
embeddings = {}

for idx, row in tqdm(df_clean.iterrows(), total=df_clean.shape[0]):
    img_path = row['image_path']
    img = image.load_img(img_path, target_size=(224,224))
    img_array = image.img_to_array(img)/255.0
    img_array = np.expand_dims(img_array, axis=0)
    emb = embedding_model.predict(img_array, verbose=0)
    # Key embeddings by image_path to remain stable across filtering
    embeddings[row['image_path']] = emb.flatten()


In [None]:
#Predict brand + find most similar product

def predict_product(img_path, top_k=1):
    # 1️⃣ Predict brand
    img = image.load_img(img_path, target_size=(224,224))
    x = image.img_to_array(img)/255.0
    x = np.expand_dims(x, axis=0)

    brand_pred = model.predict(x)#brand_pred: array of size (1, num_brands) → probability for each brand.
    brand_idx = np.argmax(brand_pred)#brand_idx: integer → index of predicted brand.
    brand_name = le_brand.inverse_transform([brand_idx])[0] #brand_name: string → the predicted brand.

    # 2️⃣ Find embeddings of products with the same brand (use image_path as key)
    brand_products = df_clean[df_clean['brand'] == brand_name][['name','image_path']].reset_index(drop=True)
    brand_embeddings = np.array([embeddings[path] for path in brand_products['image_path']])

    # 3️⃣ Compute similarity with input image embedding
    img_emb = embedding_model.predict(x).flatten().reshape(1, -1)
    similarities = cosine_similarity(img_emb, brand_embeddings).flatten()

    # 4️⃣ Pick most similar product
    best_idx = np.argmax(similarities)
    predicted_product = brand_products.loc[best_idx, 'name']

    return brand_name, predicted_product


In [None]:
# Ensure brand column exists on the original dataframe (don't overwrite df_clean here)
df['brand'] = df['name'].apply(lambda x: str(x).split()[0])


In [None]:
brand, product = predict_product('/kaggle/working/images/0.jpg')
print("Predicted brand:", brand)
print("Predicted product:", product)


In [None]:
model.save('/kaggle/working/brand_classifier.h5')


In [None]:
import pickle

with open('/kaggle/working/le_brand.pkl', 'wb') as f:
    pickle.dump(le_brand, f)


**Pipeline Overview**

1. Input image
2. CNN predicts the brand
3. Filter dataset to products of the predicted brand
4. Compute cosine similarity between image embeddings
5. Return the most visually similar product

This hybrid pipeline reduces the search space (brand → similarity) for more accurate fine-grained retrieval.

**Hybrid Approach — Brand Classification + Embedding Similarity**

**Goal:** Given a product image, identify the exact product efficiently and robustly.

**Problem (direct per-SKU classification):**
- Datasets contain thousands of product SKUs; many look visually similar.
- Training a CNN to classify every single SKU is slow, memory-intensive, and often low accuracy.

**Solution (two-stage hybrid):**
1. **Brand classification (coarse):** a CNN predicts the product brand (far fewer classes).
2. **Similarity search (fine):** within the predicted brand, compare image embeddings using cosine similarity to find the most visually similar product.

**Why this works:**
- Reduces the search space (brand → product), making nearest-neighbor retrieval tractable.
- Leverages pretrained backbones to extract strong visual features; embeddings capture fine-grained differences.

**When to use / benefits:**
- Useful when many SKUs per brand exist and brand acts as a meaningful filter.
- Easier to scale and update: add new products by computing embeddings rather than retraining the full classifier.

**Example:** CNN predicts 
 → limit candidates to Samsung products → return the product with highest cosine similarity in embedding space.

**Results & Next Steps**

- Current results were not satisfactory with the initial direct approach.
- Switched to the hybrid pipeline: brand classification + embedding similarity retrieval.
- Next: evaluate retrieval accuracy, tune embedding model, and reduce noisy images.

**Key Benefits of the Hybrid Pipeline**

- Extracts deep visual features via a pretrained backbone.
- Embeds all product images into a common vector space for comparison.
- Uses cosine similarity for efficient nearest-neighbor retrieval.
- Predicts brand (CNN) and then the specific product (embedding search).

In [None]:
import os, glob, pickle
from tqdm import tqdm
from PIL import Image
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D
from sklearn.metrics.pairwise import cosine_similarity

# ---------------- Configuration ----------------
CSV_PATH = "/kaggle/input/merged-amazon-electronics-dataset/merged_electronics_dataset.csv"
IMAGE_DIR = "/kaggle/working/images"
INPUT_SIZE = 300
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

# ---------------- Load CSV ----------------
df = pd.read_csv(CSV_PATH, on_bad_lines='skip')
if 'name' not in df.columns or 'image' not in df.columns:
    raise ValueError("CSV must contain 'name' and 'image' columns.")

# ---------------- Map CSV rows to existing images ----------------
existing_images = sorted(glob.glob(os.path.join(IMAGE_DIR, "*.jpg")))  # adjust extension if needed
df = df.iloc[:len(existing_images)].copy()
df['image_path'] = existing_images

# ---------------- Extract brand ----------------
df['brand'] = df['name'].astype(str).apply(lambda x: str(x).split()[0].strip())
df = df.dropna(subset=['name','brand','image_path']).reset_index(drop=True)
print("Images and rows:", len(df))

# ---------------- Build embedding model ----------------
base_model = EfficientNetB3(include_top=False, input_shape=(INPUT_SIZE, INPUT_SIZE, 3), weights='imagenet')
embedding_output = GlobalAveragePooling2D()(base_model.output)
embedding_model = Model(inputs=base_model.input, outputs=embedding_output)
print("Embedding model created.")

# ---------------- Create embeddings ----------------
print("Creating embeddings for all product images...")
embeddings = {}
product_rows = []

for _, row in tqdm(df.iterrows(), total=len(df)):
    try:
        img = Image.open(row['image_path']).convert('RGB').resize((INPUT_SIZE, INPUT_SIZE))
        arr = np.expand_dims(preprocess_input(np.array(img, dtype=np.float32)), 0)
        emb = embedding_model.predict(arr, verbose=0)
        embeddings[row['name']] = emb.flatten()
        product_rows.append({'name': row['name'], 'brand': row['brand'], 'image_path': row['image_path']})
    except:
        continue

with open("/kaggle/working/embeddings.pkl", "wb") as f:
    pickle.dump(embeddings, f)

products_df = pd.DataFrame(product_rows)
products_df.to_csv("/kaggle/working/products_index.csv", index=False)
print("Saved embeddings and product index.")

# ---------------- Prediction function (top-1) ----------------
def predict_product(img_path):
    img = Image.open(img_path).convert('RGB').resize((INPUT_SIZE, INPUT_SIZE))
    arr = np.expand_dims(preprocess_input(np.array(img, dtype=np.float32)), 0)
    emb = embedding_model.predict(arr, verbose=0).reshape(1, -1)

    # Compare with all products
    candidate_names = list(embeddings.keys())
    candidate_embs = np.array([embeddings[n] for n in candidate_names])
    sims = cosine_similarity(emb, candidate_embs).flatten()

    top_idx = sims.argmax()  # top-1
    top_name = candidate_names[top_idx]
    top_brand = products_df[products_df['name'] == top_name]['brand'].values[0]
    top_score = float(sims[top_idx])

    return top_brand, top_name, top_score

# ---------------- Quick test ----------------
sample_image = sorted(glob.glob(os.path.join(IMAGE_DIR, "*.jpg")))[0]
brand, name, score = predict_product(sample_image)

print("Sample prediction:")
print("Brand:", brand)
print("Product Name:", name)
print("Similarity:", score)
