In [None]:

# =========
# IMPORTING
# =========

# Importing necessary libraries and modules for the code.

import gdown  # Library for downloading datasets
import os  # Library for managing file paths
import json  # Library for working with JSON files
import pickle  # Library for serialization and deserialization of Python objects
import re  # Library for regular expressions
import cv2  # Library for image processing
import concurrent.futures  # Library for parallel computing
import random  # Library for random number generation
import shutil  # Library for file and directory operations
import pandas as pd  # Library for data manipulation and analysis
import numpy as np  # Library for numerical operations
import matplotlib.pyplot as plt  # Library for plotting images and graphs
from PIL import Image  # Library for image processing
from google.colab import files  # Library for file upload and download in Google Colab
from numpy.linalg import norm  # Library for vector normalization
from tqdm import tqdm  # Library for creating progress bars
import tensorflow as tf  # Library for deep learning models and operations
from keras.preprocessing.image import ImageDataGenerator  # Keras module for image data preprocessing
from sklearn.model_selection import train_test_split  # Library for splitting data into train and test sets
from sklearn.utils import shuffle  # Library for shuffling data
from keras.callbacks import EarlyStopping  # Keras callback for early stopping during model training
from keras.models import load_model  # Keras function for loading pre-trained models
from keras.layers import GlobalMaxPooling2D  # Keras layer for global max pooling
from keras.applications import ResNet50  # Pre-trained CNN model in Keras
from keras.applications.resnet import preprocess_input  # Function for preprocessing input images
from sklearn.neighbors import NearestNeighbors  # Library for performing nearest neighbor search
import matplotlib.image as mpimg  # Library for loading and displaying images

In [None]:
# ==================================
# DOWNLOAD AND PREPARING THE DATASET
# ==================================

# Data Download

file_id = '1F0oXhVQfmv3qsISc5stW8EiVEV7N-OF_'
url = f'https://drive.google.com/uc?id={file_id}'

output = '/content/dataset.zip'  # dataset

gdown.download(url, output, quiet=False)

# Unzip
! unzip "/content/dataset.zip"

# Merge all the json to csv
def process_json_file(file_path):
    try:
        with open(file_path, 'r') as json_file:
            json_data = json.load(json_file)

            setId = json_data['setId']
            user = json_data['user']

            processed_items = []
            for item in json_data['items']:
                itemId = item['itemId']
                imgUrl = item['imgUrl']
                price = item['price']
                itemName = item['itemName']
                colors = item['colors']
                expressions = item['expressions']
                category_x_color = item['category x color']

                file_path = os.path.join(base_directory, str(user), str(setId), str(itemId) + '_m.jpg')

                processed_items.append({
                    'setId': setId,
                    'file_path': file_path,
                    'price': price,
                    'itemName': itemName,
                    'expressions': expressions,
                    'category_x_color': category_x_color})

            return processed_items
    except Exception as e:
        print(f"Error processing JSON file: {file_path}")
        print(f"Error message: {str(e)}")
        return []

def convert_json_to_csv(base_directory):
    data = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Collect file paths
        file_paths = []
        for root, dirs, files in os.walk(base_directory):
            for file in files:
                if file.endswith('.json'):  # Process only JSON files
                    file_path = os.path.join(root, file)
                    file_paths.append(file_path)

        # Process files concurrently
        results = executor.map(process_json_file, file_paths)

        # Collect processed items from results
        for processed_items in results:
            data.extend(processed_items)

    df = pd.DataFrame(data)
    return df

base_directory = '/content/IQON3000'
df = convert_json_to_csv(base_directory)


In [None]:
# ===================
# PREPARE THE DATASET
# ===================

# Create new columns "category" and "color" in the DataFrame
df[['category', 'color']] = df['category_x_color'].str.split(' × ', expand=True)

# Map the category to clothes, bottom, and accessories
top_categories = ['ニット' , 'ブラウス' , 'コート' , 'ジャケット' , 'カーディガン' , 'パーカー' , 'ダウンジャケット' , 'チュニック' , 'ワンピース']
bottom_categories = ['ロングパンツ' , 'スカート' , 'Tシャツ' , 'ショートパンツ' , 'レッグウェア' , 'ロングスカート']
accessories_categories = ['ピアス' , 'ショルダーバッグ' , '浴衣' , '水着' , 'タンクトップ' , 'パンプス' , 'トップス' , 'ネックレス' , 'サンダル' , 'アンダーウェア' , 'ブレスレット' , 'スニーカー' , 'インテリア' , 'コスメ' , 'ルームウェア' , '腕時計' , 'トートバッグ' , 'ブーツ' , 'クラッチバッグ' , 'ストール' , 'ハット' , 'ハンドバッグ' , 'サングラス' , 'リング' , 'メガネ' , 'ヘアアクセサリー' , 'リュック' , 'キャップ' , 'バッグ' , 'ベルト' , '靴' , '帽子' , 'フレグランス' , 'ニット帽' , 'ネイル' , 'ボストンバッグ' , '小物' , '財布' , '手袋' , 'ボディケア' , 'ブローチ' , '傘' , 'ファッション小物' , 'ステーショナリー' , 'アクセサリー' , 'ルームシューズ']
df['category_mapped'] = df['category'].map(lambda x: 'Top' if x in top_categories else ('Bottom' if x in bottom_categories else 'Accessories'))

# Drop the unnecesarry column
df.drop(columns=["category_x_color", "category", "color"], inplace=True)

# Drop the accessories
df = df[df["category_mapped"] != "Accessories"]

# Hapus yang punya setId tunggal
df = df[~df["setId"].isin(df["setId"].value_counts()[df["setId"].value_counts() == 1].index)]

# Save current progress
df.to_csv("dataset2.csv", index=False)

# Ambil sampel untuk melakukan training

# Hapus x% isi dataframe
df = df.sample(frac=0.1, random_state=42)

# Hapus yang punya setId tunggal
df = df[~df["setId"].isin(df["setId"].value_counts()[df["setId"].value_counts() == 1].index)]

# Save progress
df.to_csv("dataset3.csv", index=False)

# dataframe shape
df.shape

# Check whether the image is corrupted or not
def is_image_valid(file_path):
    try:
        Image.open(file_path)
        return True
    except:
        return False

# Check every row
valid_rows = []
for index, row in df.iterrows():
    file_path = row['file_path']
    if os.path.exists(file_path) and is_image_valid(file_path):
        valid_rows.append(row)

# Create valid dataframe
df = pd.DataFrame(valid_rows)

# Save progress
df.to_csv("dataset4.csv", index=False)

# shape
df.shape

In [None]:
# ===============================================
# INITIAL TOP/DOWN MODELING TO TEST THE ALGORITHM
# ===============================================

# Split dataframe menjadi data latih dan data uji
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# Definisikan parameter untuk pra-pemrosesan gambar
image_size = (150, 150)
batch_size = 32

# Pra-pemrosesan gambar dan augmentasi data latih
train_datagen = ImageDataGenerator(rescale=1./255, validation_split=0.2)  # Normalisasi piksel
train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col='file_path',
    y_col='category_mapped',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='training'
)

# Pra-pemrosesan gambar data validasi
validation_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col='file_path',
    y_col='category_mapped',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary',
    subset='validation'
)

# Membangun model CNN
model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(150, 150, 3)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

# Mengompilasi model
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Callback EarlyStopping
early_stopping = EarlyStopping(monitor='val_accuracy', patience=3, mode='max', min_delta=0.01)

# Melatih model dengan data latih dan evaluasi pada data validasi
num_epochs = 100
history = model.fit(
    train_generator,
    epochs=num_epochs,
    validation_data=validation_generator,
    callbacks=[early_stopping]
)

# Evaluasi model pada data uji
test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col='file_path',
    y_col='category_mapped',
    target_size=image_size,
    batch_size=batch_size,
    class_mode='binary'
)

test_loss, test_accuracy = model.evaluate(test_generator)
print('Akurasi pada data uji:', test_accuracy)

# Visualisasi loss dan akurasi
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

# Simpan model ke file
model.save('model_top_down.h5')

# Download the model
files.download("model_top_down.h5")

In [None]:
# =============
# MODEL TESTING
# =============

# Test the model

model = load_model('model_top_down.h5')

# Upload image
uploaded_image = files.upload()

# Load and preprocess the uploaded image
image_path = list(uploaded_image.keys())[0]
image = Image.open(image_path)
image = image.resize((150, 150))
image = np.array(image) / 255.0
image = np.expand_dims(image, axis=0)

# Predict the category
prediction = model.predict(image)
predicted_class = 'top' if prediction[0][0] > 0.5 else 'bottom'

# Display the uploaded image
plt.imshow(image[0])
plt.title(f"Predicted Category: {predicted_class}")
plt.axis('off')
plt.show()

In [None]:
# ===============================
# RECOMMENDATION DATA PREPARATION
# ===============================

# Hapus baris dataframe yang tidak memiliki category_mapped top dan bottom

df_filtered = df.groupby('setId').filter(lambda x: x['category_mapped'].nunique() == 2)

df = df_filtered
del df_filtered

In [None]:
# ===========================
# FEATURE EXTRACTION AND SAVE
# ===========================

# Buat list directory top dan bottom

top_directory_df = df[df['category_mapped'] == 'Top']
bottom_directory_df = df[df['category_mapped'] == 'Bottom']
top_directory = df[df['category_mapped'] == 'Top']['file_path'].tolist()
bottom_directory = df[df['category_mapped'] == 'Bottom']['file_path'].tolist()

# Save to pickle

pickle.dump(top_directory, open("top_directory.pkl", "wb"))
pickle.dump(bottom_directory, open("bottom_directory.pkl", "wb"))
pickle.dump(top_directory_df, open("top_directory_df.pkl", "wb"))
pickle.dump(bottom_directory_df, open("bottom_directory_df.pkl", "wb"))

# Feature extraction
resnet_model = ResNet50(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
resnet_model.trainable = False

model = tf.keras.Sequential([
    resnet_model,
    GlobalMaxPooling2D()
])

model.save("feature_extract_model.h5")

def extract_feature(img_path, model):
  img = cv2.imread(img_path)
  img = cv2.resize(img, (150,150))
  img = np.array(img)
  expand_img = np.expand_dims(img, axis=0)
  pre_img = preprocess_input(expand_img)
  result = model.predict(pre_img).flatten()
  normalized = result/norm(result)
  return normalized

# Feature extraction untuk fitur "top"

feature_extract_model = load_model("feature_extract_model.h5")
top_image_features = [extract_feature(file, feature_extract_model) for file in tqdm(top_directory["file_path"])]

# Feature extraction untuk fitur "bottom"

feature_extract_model = load_model("feature_extract_model.h5")
bottom_image_features = [extract_feature(file, feature_extract_model) for file in tqdm(bottom_directory["file_path"])]

# Export the feature to pickle file

pickle.dump(top_image_features, open("top_feature_extraction.pkl", "wb"))
pickle.dump(bottom_image_features, open("bottom_feature_extraction.pkl", "wb"))


files.download("top_feature_extraction.pkl")
files.download("bottom_feature_extraction.pkl")

In [None]:
# ===================================
# RECOMMENDATION MODELING AND TESTING
# ===================================

# Recommend model
def recommend(features,feature_list):
    neighbors = NearestNeighbors(n_neighbors=6, algorithm='brute', metric='euclidean')
    neighbors.fit(feature_list)

    distances, indices = neighbors.kneighbors([features])

    return indices

# Load all the model and extracted features

model_top_down = load_model("model_top_down.h5")
model_extraction = load_model("feature_extract_model.h5")

top_feature_list = np.array(pickle.load(open("top_feature_extraction.pkl", "rb")))
bottom_feature_list = np.array(pickle.load(open("bottom_feature_extraction.pkl", "rb")))

top_filenames = pickle.load(open('top_directory.pkl', "rb"))
bottom_filenames = pickle.load(open("bottom_directory.pkl", "rb"))

top_filenames_df = pickle.load(open('top_directory_df.pkl', 'rb'))
bottom_filenames_df = pickle.load(open('bottom_directory_df.pkl', 'rb'))

# Upload image

# Create the folder if it doesn't exist
folder_path = '/content/upload'
os.makedirs(folder_path, exist_ok=True)

# Upload image
uploaded_image = files.upload()

# Save the uploaded image to the folder
image_path = list(uploaded_image.keys())[0]
new_image_path = os.path.join(folder_path, image_path)
shutil.move(image_path, new_image_path)

# Load and preprocess the uploaded image
image_path = new_image_path
image = Image.open(image_path)
image = image.resize((150, 150))
image = np.array(image) / 255.0
image = np.expand_dims(image, axis=0)

# Predict the category
prediction = model_top_down.predict(image)
predicted_class = 'top' if prediction[0][0] > 0.5 else 'bottom'

# Display the uploaded image
plt.imshow(image[0])
plt.title(f"Predicted Category: {predicted_class}")
plt.axis('off')
plt.show()

# Do the recommendation if predicted is top

if predicted_class == 'top':
  features = extract_feature(image_path, model_extraction)
  indices = recommend(features, top_feature_list)

  # Recommendation file_path
  target_file = top_filenames[indices[0][0]]

  # Ambil setId dari rekomendasi
  filtered_df = df.loc[df['file_path'] == target_file]

  set_id = filtered_df["setId"]

  for set in set_id:
    set_id = set
    break

  # Teruskan setId ke dataframe bottom
  recommended_path = bottom_filenames_df[bottom_filenames_df['setId'] == set_id]["file_path"]

  for path in recommended_path:
    recommended_path = path
    break

  # Recommend the bottom
  features = extract_feature(recommended_path, model_extraction)
  indices = recommend(features, bottom_feature_list)

  # Print the recommendation
  target_files = [
      bottom_filenames[indices[0][0]],
      bottom_filenames[indices[0][1]],
      bottom_filenames[indices[0][2]],
      bottom_filenames[indices[0][3]],
      bottom_filenames[indices[0][4]]
  ]

  price_output = []

  for file in target_files:
      filtered_df = bottom_filenames_df.loc[bottom_filenames_df['file_path'] == file]
      prices = filtered_df["price"].values
      if len(prices) > 0:
          price_output.append(prices[0])
      else:
          price_output.append('Unknown')

  # Iterate over the target files and display the images with labels
  for i, target_file in enumerate(target_files):
      image_path = target_file
      img = mpimg.imread(image_path)
      plt.imshow(img)
      plt.axis('off')
      plt.title(f"Price: {price_output[i]}")
      plt.show()


# If the input is bottom

else:
  features = extract_feature(image_path, model_extraction)
  indices = recommend(features, bottom_feature_list)

  # Recommendation file_path
  target_file = bottom_filenames[indices[0][0]]

  # Ambil setId dari rekomendasi
  filtered_df = df.loc[df['file_path'] == target_file]

  set_id = filtered_df["setId"]

  for set in set_id:
    set_id = set
    break

  # Teruskan setId ke dataframe top
  recommended_path = top_filenames_df[top_filenames_df['setId'] == set_id]["file_path"]

  for path in recommended_path:
    recommended_path = path
    break

  # Recommend the top
  features = extract_feature(recommended_path, model_extraction)
  indices = recommend(features, top_feature_list)

  # Print the recommendation
  target_files = [
      top_filenames[indices[0][0]],
      top_filenames[indices[0][1]],
      top_filenames[indices[0][2]],
      top_filenames[indices[0][3]],
      top_filenames[indices[0][4]]
  ]

  price_output = []

  for file in target_files:
      filtered_df = top_filenames_df.loc[top_filenames_df['file_path'] == file]
      prices = filtered_df["price"].values
      if len(prices) > 0:
          price_output.append(prices[0])
      else:
          price_output.append('Unknown')

  # Iterate over the target files and display the images with labels
  for i, target_file in enumerate(target_files):
      image_path = target_file
      img = mpimg.imread(image_path)
      plt.imshow(img)
      plt.axis('off')
      plt.title(f"Price: {price_output[i]}")
      plt.show()