<a href="https://colab.research.google.com/github/hungryjins/Fashion_rec/blob/main/05_Hybrid_search_module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import pandas as pd
from tqdm.notebook import tqdm
import base64
import requests
from PIL import Image
import openai
import os
import numpy as np
pd.set_option('display.max_columns', None)

## Table of Contents

#### 1. Extract features using GPT4v to match our attributes -> Convert to text
#### 2. Text search: top-100 (broad search)
#### 3. Secondary search using image search

## Enrich image with descriptions

In [None]:
# initialize openai
os.environ['OPENAI_API_KEY']= "openai_api_key"
openai.api_key = os.environ["OPENAI_API_KEY"]

```python
"'main_category', 'silhouette', 'silhouette_fit', 'waistline',
       'length', 'collar_type', 'neckline_type', 'sleeve_type',
       'pocket_type', 'opening_type', 'non-textile material type',
       'leather', 'textile finishing, manufacturing techniques',
       'textile pattern', 'animal', 'other'"
```

In [None]:
from search_utils import clothes_detector
from transformers import YolosFeatureExtractor, YolosForObjectDetection

In [None]:
MODEL_NAME = "valentinafeve/yolos-fashionpedia"

feature_extractor = YolosFeatureExtractor.from_pretrained('hustvl/yolos-small')
model = YolosForObjectDetection.from_pretrained(MODEL_NAME)

image = Image.open("test_images/test_image5.jpg").convert("RGB")

In [None]:
cropped_images = clothes_detector(image, feature_extractor, model)

In [None]:
image

In [None]:
cropped_images

In [None]:
cropped_images

In [None]:
cropped_images['shoe']

In [None]:
# initialize openai
os.environ['OPENAI_API_KEY']= "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"
openai.api_key = os.environ["OPENAI_API_KEY"]

In [None]:
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

In [None]:
import io
import base64

In [None]:
# Read the image using GPT and create a description

def describe_clothes(image, label, openai_key):
  buffer = io.BytesIO()
  # Save the image to the buffer in JPEG format
  image.save(buffer, format="JPEG")
  buffer.seek(0)
  image_data = buffer.read()

  base64_image = base64.b64encode(image_data).decode('utf-8')
  image_desc_prompt = """Focus on {} inside the image.
        Identify the attributes of the item.
        The attributes you should answer are :
        - clothes_type
        - color
        - silhouette
        - silhouette_fit
        - waistline
        - sleeve_type
        - collar_type
        - length
        - gender
        - patterns
        - textile_pattern

        Ignore the attributes you cannot answer.
        Keep the answer simple and clear, having max three words per attribute.
  """.format(label)

  headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {openai_key}"
  }

  payload = {
    "model": "gpt-4-vision-preview",
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": image_desc_prompt
          },
          {
            "type": "image_url",
            "image_url": {
              "url": f"data:image/jpeg;base64,{base64_image}"
            }
          }
        ]
      }
    ],
    "max_tokens": 300
  }

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

  return response.json()['choices'][0]['message']['content']

In [None]:
cropped_images.items()

In [None]:
descriptions = dict()

for i, img in cropped_images.items():
    print(i)
    desc = describe_clothes(img, i, openai.api_key)
    descriptions[i] = desc

In [None]:
descriptions

In [None]:
from search_utils import fashion_query_transformer, text_search
from image_utils import draw_images

Convert text in the following way:

In [None]:
text_query = fashion_query_transformer(str(descriptions))
text_query

In [None]:
from splade.splade.models.transformer_rep import Splade
from transformers import AutoTokenizer

splade_model_id = 'naver/splade-cocondenser-ensembledistil'

splade_model = Splade(splade_model_id, agg='max')
splade_model.to('cpu')
splade_model.eval()

splade_tokenizer = AutoTokenizer.from_pretrained(splade_model_id)

In [None]:
from pinecone import Pinecone

pc = Pinecone(api_key="74e30e50-02fa-4e55-9bff-affa6a3817a0")
# index number check
# index_list = pc.list_indexes().indexes

# index description
index = pc.Index("fastcampus")
# index.describe_index_stats()

# CLIP
from image_utils import fetch_clip, extract_img_features, draw_images

model, processor, tokenizer = fetch_clip(model_name="patrickjohncyh/fashion-clip")

In [None]:
results = text_search(index, text_query, model, tokenizer, splade_model, splade_tokenizer, top_k=100, hybrid=False)
results.keys()

In [None]:
len(results['tights, stockings'].matches)

In [None]:
paths = dict()
for k,v in results.items():
    paths[k] = [i['metadata']['img_path'] for i in v['matches']]

# show images
for k,v in paths.items():
    print(k)
    draw_images([Image.open(i) for i in v[:10]]) # display only 10

### Perform another search within the matching images (image search)

In [None]:
local_db = pd.read_csv("local_db.csv")
local_db['values'] = local_db['values'].apply(json.loads)
local_db.shape

In [None]:
local_db.head(3)

Save search result values by each category

In [None]:
results.keys()

In [None]:
ids = list()

for category, value in results.items():
    id = [i['id'] for i in value['matches']]
    ids.append({category:id})

In [None]:
ids[0].keys(), list(ids[0].values())[0][:3], "..."

In [None]:
ids[1]['shoe']

In [None]:
ids[0]['tights, stockings']

In [None]:
# Convert image to embedding
# cropped_images is also needed

final_results = list()

for search_result in ids:
    category = list(search_result.keys())[0]
    search_ids = list(search_result.values())[0]
    # Get relevant items
    filtered_local_db = local_db.loc[local_db['vdb_id'].isin(search_ids)]

    img_emb = extract_img_features(cropped_images[category], processor, model)

    # def search_local_db()
    def calculate_dot_products(embedding, df, column_name):
        dot_products = df[column_name].apply(lambda x: np.dot(embedding, x))
        return dot_products

    # Calculate dot products
    dot_products = calculate_dot_products(img_emb.cpu().numpy()[0], filtered_local_db, 'values')

    # Find the indices of the top 5 most similar embeddings
    top_indices = dot_products.nlargest(10).index

    # Retrieve the top 5 most similar embeddings
    top_similar_ids = filtered_local_db.loc[top_indices, 'vdb_id'].tolist()

    final_results.append({category:top_similar_ids})

In [None]:
cropped_images

In [None]:
filtered_local_db

In [None]:
final_results

In [None]:
for search_result in final_results:
    category = list(search_result.keys())[0]
    paths = list(search_result.values())[0]

    full_paths = [os.path.join("imaterialist-fashion-2020-fgvc7", "cropped_images", i+".jpg") for i in paths]
    print(category)
    draw_images([Image.open(i) for i in full_paths])

## Converting to Function

In [None]:
from search_utils import get_single_text_embedding

def calculate_dot_products(embedding, df, column_name):
    dot_products = df[column_name].apply(lambda x: np.dot(embedding, x))
    return dot_products


def get_top_indices(db, input_data, category, clip_processor, clip_model, clip_tokenizer, top_k, type='image'):
    if type=='image':
        # input_data should be a single cropped image
        emb = extract_img_features(input_data, clip_processor, clip_model)
        # Calculate dot products
        dot_products = calculate_dot_products(emb.cpu().numpy()[0], db, 'values')
    elif type=='text':
        # input_data should be a single string of text
        emb = get_single_text_embedding(input_data, clip_model, clip_tokenizer)
        # Calculate dot products
        dot_products = calculate_dot_products(np.array(emb)[0], db, 'values')

    # Find the indices of the top 5 most similar embeddings
    top_indices = dot_products.nlargest(top_k).index

    # Retrieve the top 5 most similar embeddings
    top_similar_ids = db.loc[top_indices, 'vdb_id'].tolist()

    return {category:top_similar_ids}


def additional_search(local_db, cropped_images, search_results, clip_processor, clip_model, clip_tokenizer, top_k=10):

    ids = list()
    for category, value in search_results.items():
        id = [i['id'] for i in value['matches']]
        ids.extend(id)

    final_results = list()

    # From the overall items, retrieve only the ones that were retrieved in the first search
    db = local_db.loc[local_db['vdb_id'].isin(ids)]

    for label, v in search_results.items(): # From text
        tmp = db.loc[db['name']==label]

        # If the label exists in both text and image
        if label in cropped_images.keys():
            r = get_top_indices(tmp, cropped_images[label], label, clip_processor, clip_model, clip_tokenizer, top_k, type='image')
            final_results.append(r)
        # If the label exists only in text, just get top_k
        else:
            final_results.append({ label : [i['id'] for i in v['matches']][:top_k]} )

    refined_result = dict()

    for search_result in final_results:
        category = list(search_result.keys())[0]
        paths = list(search_result.values())[0]

        full_paths = [os.path.join("imaterialist-fashion-2020-fgvc7", "cropped_images", i+".jpg") for i in paths]
        refined_result[category] = full_paths


    return refined_result

## Test

In [None]:
# initialize openai
os.environ['OPENAI_API_KEY']= "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"
openai.api_key = os.environ["OPENAI_API_KEY"]

In [None]:
MODEL_NAME = "valentinafeve/yolos-fashionpedia"

feature_extractor = YolosFeatureExtractor.from_pretrained('hustvl/yolos-small')
model = YolosForObjectDetection.from_pretrained(MODEL_NAME)

In [None]:
clip_model, clip_processor, clip_tokenizer = fetch_clip(model_name="patrickjohncyh/fashion-clip")

In [None]:
from search_utils import clothes_detector

In [None]:
image = Image.open("test_images/test.jpg")
# image = fix_channels(ToTensor()(image))
image

In [None]:
cropped_images = clothes_detector(image, feature_extractor, model, 0.5)

In [None]:
cropped_images

In [None]:
descriptions = dict()

for i, img in cropped_images.items():
    print(i)
    desc = describe_clothes(img, i, openai.api_key)
    descriptions[i] = desc

In [None]:
for i, v in descriptions.items():
    print(i)
    print(v)
    print()

In [None]:
text_query = fashion_query_transformer(str(descriptions))
text_query

In [None]:
results = text_search(index, text_query, clip_model, clip_tokenizer, splade_model, splade_tokenizer, top_k=100, hybrid=False)
results.keys()

In [None]:
paths = dict()
for k,v in results.items():
    paths[k] = [i['metadata']['img_path'] for i in v['matches']]

# show images
for k,v in paths.items():
    print(k)
    draw_images([Image.open(i) for i in v[:10]]) # display only 10

In [None]:
final_results = additional_search(local_db, cropped_images, results, clip_processor, clip_model, clip_tokenizer)

In [None]:
for k,v in final_results.items():
    print(k)
    draw_images([Image.open(i) for i in v])