In [None]:
import json
import pandas as pd
import warnings
import openai
from PIL import Image
import os
from tqdm import tqdm
import numpy as np
from image_utils import fetch_clip, draw_images

# Ignore warnings
warnings.filterwarnings('ignore')
# pandas dataframe display
pd.set_option('display.max_columns', None)

# 텍스트 전처리

In [None]:
attributes = pd.read_csv("attribute_specific.csv")
new_df = pd.read_csv("clothes_final2.csv")

In [None]:
## pineconeDB에 upsert!!
from pinecone import Pinecone

pc = Pinecone(api_key="74e30e50-02fa-4e55-9bff-affa6a3817a0")
# index 개수 확인
# index_list = pc.list_indexes().indexes

# index description
index = pc.Index("fastcampus")
index.describe_index_stats()

In [None]:
# initialize openai
os.environ['OPENAI_API_KEY']= "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"
openai.api_key = os.environ["OPENAI_API_KEY"]

In [None]:
model, processor, tokenizer = fetch_clip(model_name="patrickjohncyh/fashion-clip")

In [None]:
from search_utils import get_single_text_embedding

In [None]:
input_text = "black dog"

d = get_single_text_embedding(input_text, model, tokenizer)

result = index.query(
    vector=d[0],
    top_k=5,
    # filter={"category": {"$eq": "dress"}},
    include_metadata=True
)

paths = [i['metadata']['img_path'] for i in result.matches]

draw_images([Image.open(i) for i in paths])

## 첫 gateway
- 패션과 관련된 토픽인지 여부를 판단
- semantic router도 사용 가능하지만, router를 지정해줘야 하기 때문에 한계가 있음 (27 classes)
- 사용자의 인풋을 처음으로 받는 구간
    - -> openai chat completion을 사용하여 사용자들의 text input이 실제 우리들이 받아서 처리할 내용인지 여부를 판단

In [None]:
from pydantic import BaseModel
from typing import List, Literal

from llama_index.program.openai import OpenAIPydanticProgram

In [None]:
# initialize openai
os.environ['OPENAI_API_KEY']= "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"
openai.api_key = os.environ["OPENAI_API_KEY"]

In [None]:
from llama_index.llms.openai import OpenAI

llm = OpenAI(model="gpt-4-turbo-preview")

In [None]:
first_gateway_prompt = """Input text : {text_input}
Using the input text, do the following
- clothes_topic : Determine whether the text describes a clothes. The output should be a python boolean.
- fashion_item : Determine whether it mentions a specific fashion items such as boots or shirt, umbrella etc. The output should be a python boolean.
"""

class first_gateway(BaseModel):
    "Data model to determine whether the text describes a fashion type or clothes type."
    clothes_topic: bool
    fashion_item: bool

program = OpenAIPydanticProgram.from_defaults(
    output_cls=first_gateway, prompt_template_str=first_gateway_prompt, llm=llm,verbose=True
)

output = program(
    text_input="street fashion"
)

print(output)

In [None]:
for t in ['bohemian style boots', 'old school', 'a cup of tea', 'umbrella', 'a black hat', 'suit and tie', 'wedding apparel',
          '파란색 패션 양말', '자동차']:
    print(t)
    print(program(
                text_input=t
            ))
    print("-"*20)

## 두 번째 gateway
- 사용자들의 인풋을 우리가 원하는 인풋의 형태로 변형해주는 과정
- 카테고리 강제를 통해 search space 제한

In [None]:
input_text = "black jacket"

d = get_single_text_embedding(input_text, model, tokenizer)

result = index.query(
    vector=d[0],
    top_k=10,
    filter={"category": {"$eq": "jacket"}},
    include_metadata=True
)

paths = [i['metadata']['img_path'] for i in result.matches]

draw_images([Image.open(i) for i in paths])

In [None]:
input_text = "black jacket"

d = get_single_text_embedding(input_text, model, tokenizer)

result = index.query(
    vector=d[0],
    top_k=10,
    filter={"category": {"$eq": "jacket"}},
    include_metadata=True
)

paths = [i['metadata']['img_path'] for i in result.matches]

draw_images([Image.open(i) for i in paths])

In [None]:
second_gateway_prompt = """Input text : {text_input}.
Using the input text, do the following.

First, divide the items listed in the sentence, ensuring that descriptive words for each item are kept together during the separation.
Second, for each item listed, do the following :
    - Categorize the clothes type mentioned from the input.
        - From the options below, choose the clothes type mentioned. : 
            'pants', 'shirt, blouse', 'jacket', 'top, t-shirt, sweatshirt',
            'dress', 'shoe', 'glasses', 'skirt', 'bag, wallet', 'belt',
            'headband, head covering, hair accessory', 'sock', 'hat', 'watch',
            'glove', 'tights, stockings', 'sweater', 'tie', 'shorts', 'scarf',
            'coat', 'vest', 'umbrella', 'cardigan', 'cape', 'jumpsuit',
            'leg warmer'
        - a suit is part of jacket
        - If none of the above is mentioned, say "None"
    - Refine the text into a comma-separated string of attributes
        -  as an example, the text 'casual, urban-inspired jacket with bold graphics and loose-fitting designs'
        would be converted to 'casual, urban-inspired, jacket, bold graphics, loose-fit'.
        - another example, the text 'color Pink, - silhouette Straight, - silhouette_fit Loose'
        would be converted to 'color pink, silhouette Straight, silhouette_fit Loose'.
        - do not hesitate to repeat the modifiers for each item.
The output should be in English.
"""

class second_gateway_list(BaseModel):
    "Data model to categorize the clothing type, and refine text into a specific format."
    clothes_type: Literal['pants', 'shirt, blouse', 'jacket', 'top, t-shirt, sweatshirt',
                            'dress', 'shoe', 'glasses', 'skirt', 'bag, wallet', 'belt',
                            'headband, head covering, hair accessory', 'sock', 'hat', 'watch',
                            'glove', 'tights, stockings', 'sweater', 'tie', 'shorts', 'scarf',
                            'coat', 'vest', 'umbrella', 'cardigan', 'cape', 'jumpsuit',
                            'leg warmer', "None"]
    refined_text: str

class second_gateway(BaseModel):
    "Data model to list items."
    items: List[second_gateway_list]

program = OpenAIPydanticProgram.from_defaults(
    output_cls=second_gateway, prompt_template_str=second_gateway_prompt, llm=llm, verbose=False
)

output = program(
    text_input="street fashion boots"
)
print(output)

In [None]:
for t in ['bohemian style pants', 'street fashion', 'a black hat', 'suit and tie', 'wedding apparel',
          '파란색 패션 양말', 'old school', 'umbrella']:
    print(t)
    print(program(
                text_input=t
            ))
    print("-"*20)

In [None]:
third_gateway_prompt = """Input text : {text_input}.
Using the input text, do the following.
    - Refine the text into a comma-separated string of attributes
        -  as an example, the text 'casual, urban-inspired jacket with bold graphics and loose-fitting designs'
        would be converted to 'casual, urban-inspired, jacket, bold graphics, loose-fit'
        - do not hesitate to repeat the modifiers for each item.
"""

class third_gateway_list(BaseModel):
    "Data model to reformat an input text."
    refined_text: str

class third_gateway(BaseModel):
    "Data model to list items."
    items: List[third_gateway_list]

program = OpenAIPydanticProgram.from_defaults(
    output_cls=third_gateway, prompt_template_str=third_gateway_prompt, llm=llm, verbose=False
)

output = program(
    text_input="bohemian style clothes"
)
print(output)

In [None]:
for t in ['bohemian style pants', 'street fashion', 'a black hat', 'suit and tie', 'wedding apparel',
          '파란색 패션 양말', 'old school', 'umbrella', "I want a black jacket with gold zippers"]:
    print(t)
    print(program(
                text_input=t
            ))
    print("-"*20)

## 사용자 input 처리 경로

In [None]:
def pass_first_gateway(input_text, llm, verbose=False):
    first_gateway_prompt = """Input text : {text_input}
    Using the input text, do the following
    - clothes_topic : Determine whether the subject it is related to fashion or clothes. The output should be a python boolean.
    - Determine whether it mentions a specific fashion items such as boots or shirt, umbrella etc. The output should be a python boolean.
    """
    
    class first_gateway(BaseModel):
        "Data model to determine whether the text is related to clothes."
        clothes_topic: bool
        fashion_item: bool

    program = OpenAIPydanticProgram.from_defaults(
        output_cls=first_gateway, prompt_template_str=first_gateway_prompt, llm=llm,verbose=verbose
    )

    output = program(
        text_input=input_text
    )

    return output.dict()

In [None]:
def pass_second_gateway(text_input, llm, verbose=False):
    second_gateway_prompt = """Input text : {text_input}.
    Using the input text, do the following.

    First, divide the items listed in the sentence, ensuring that descriptive words for each item are kept together during the separation.
    Second, for each item listed, do the following :
        - Categorize the clothes type mentioned from the input.
            - From the options below, choose the clothes type mentioned. : 
                'pants', 'shirt, blouse', 'jacket', 'top, t-shirt, sweatshirt',
                'dress', 'shoe', 'glasses', 'skirt', 'bag, wallet', 'belt',
                'headband, head covering, hair accessory', 'sock', 'hat', 'watch',
                'glove', 'tights, stockings', 'sweater', 'tie', 'shorts', 'scarf',
                'coat', 'vest', 'umbrella', 'cardigan', 'cape', 'jumpsuit',
                'leg warmer'
            - a suit is part of jacket
            - If none of the above is mentioned, say "None"
        - Refine the text into a comma-separated string of attributes
            -  as an example, the text 'casual, urban-inspired jacket with bold graphics and loose-fitting designs'
            would be converted to 'casual, urban-inspired, jacket, bold graphics, loose-fit'.
            - another example, the text 'color Pink, - silhouette Straight, - silhouette_fit Loose'
            would be converted to 'color pink, silhouette Straight, silhouette_fit Loose'.
            - do not hesitate to repeat the modifiers for each item.
    The output should be in English.
    """

    class second_gateway_list(BaseModel):
        "Data model to categorize the clothing type, and refine text into a specific format."
        clothes_type: Literal['pants', 'shirt, blouse', 'jacket', 'top, t-shirt, sweatshirt',
                            'dress', 'shoe', 'glasses', 'skirt', 'bag, wallet', 'belt',
                            'headband, head covering, hair accessory', 'sock', 'hat', 'watch',
                            'glove', 'tights, stockings', 'sweater', 'tie', 'shorts', 'scarf',
                            'coat', 'vest', 'umbrella', 'cardigan', 'cape', 'jumpsuit',
                            'leg warmer']
        refined_text: str

    class second_gateway(BaseModel):
        "Data model to list items."
        items: List[second_gateway_list]

    program = OpenAIPydanticProgram.from_defaults(
        output_cls=second_gateway, prompt_template_str=second_gateway_prompt, llm=llm, verbose=verbose
    )

    output = program(
        text_input=text_input
    )

    return output.dict()

In [None]:
def pass_third_gateway(text_input, llm, verbose=False):
    
    third_gateway_prompt = """Input text : {text_input}.
    Using the input text, do the following.
        - Refine the text into a comma-separated string of attributes
            -  as an example, the text 'casual, urban-inspired jacket with bold graphics and loose-fitting designs'
            would be converted to 'casual, urban-inspired, jacket, bold graphics, loose-fit'
            - do not hesitate to repeat the modifiers for each item.
    """

    class third_gateway_list(BaseModel):
        "Data model to reformat an input text."
        refined_text: str

    class third_gateway(BaseModel):
        "Data model to list items."
        items: List[third_gateway_list]

    program = OpenAIPydanticProgram.from_defaults(
        output_cls=third_gateway, prompt_template_str=third_gateway_prompt, llm=llm, verbose=verbose
    )

    output = program(
        text_input=text_input
    )
    return output.dict()

## Search module과 연결

In [None]:
- vans shoes

=> {"shoes":'vans shoes'}

```python
def text_search(index, items_dict, model, tokenizer, splade_model, splade_tokenizer, top_k=10, hybrid=False):
    search_results = dict()
    for item in items_dict['items']:
        text_emb = get_single_text_embedding(item['refined_text'], model, tokenizer)
        if hybrid:
            sparse_vector = gen_sparse_vector(item['refined_text'], splade_model, splade_tokenizer)
        else:
            sparse_vector=None
        
        if 'clothes_type' in list(item.keys()):
            search_result = index.query(
                            vector=text_emb[0],
                            sparse_vector=sparse_vector,
                            top_k=top_k,
                            filter={"category": {"$eq": item['clothes_type']}},
                            include_metadata=True
                        )
            search_results[item['clothes_type']] = search_result
        else:
            search_result = index.query(
                            vector=text_emb[0],
                            sparse_vector=sparse_vector,
                            top_k=top_k,
                            include_metadata=True
                        )
            search_results['all'] = search_result
    return search_results
```

In [None]:
from search_utils import text_search

In [None]:
## pineconeDB에 upsert!!
from pinecone import Pinecone

pc = Pinecone(api_key="74e30e50-02fa-4e55-9bff-affa6a3817a0")
# index 개수 확인
# index_list = pc.list_indexes().indexes

# index description
index = pc.Index("fastcampus")
index.describe_index_stats()

In [None]:
from splade.splade.models.transformer_rep import Splade
from transformers import AutoTokenizer

splade_model_id = 'naver/splade-cocondenser-ensembledistil'

splade_model = Splade(splade_model_id, agg='max')
splade_model.to('cpu')
splade_model.eval()

splade_tokenizer = AutoTokenizer.from_pretrained(splade_model_id)

In [None]:
from image_utils import fetch_clip
# fetch CLIP model
model, processor, tokenizer = fetch_clip(model_name="patrickjohncyh/fashion-clip")

In [None]:
from llama_index.llms.openai import OpenAI

# initialize openai
os.environ['OPENAI_API_KEY']= "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"
openai.api_key = os.environ["OPENAI_API_KEY"]

llm = OpenAI(model="gpt-4-turbo-preview")

In [None]:
example_text = "Street fashioned boots and jacket, with colorful socks"

In [None]:
first_gateway_output = pass_first_gateway(example_text, llm)
first_gateway_output

In [None]:
second_gateway_output = pass_second_gateway(example_text, llm)
second_gateway_output

In [None]:
pass_third_gateway(example_text, llm)

In [None]:
from search_utils import get_single_text_embedding, gen_sparse_vector
from image_utils import draw_images

### Define user journey

In [None]:
example_text = "Street fashioned boots and jacket, with colorful socks"

first_gateway_output = pass_first_gateway(example_text, llm)
print("first_gateway_output : ", first_gateway_output)
if (first_gateway_output['clothes_topic']):
    print("Passed the first gateway. Moving on to the second gateway...")
    if (not first_gateway_output['fashion_item']):
        
        print("However, specific item is not found. Searching the whole database.")
        gateway_output = pass_third_gateway(example_text, llm)
        filter=False
    else:
        gateway_output = pass_second_gateway(example_text, llm)
        filter=True
    search_results = text_search(index, gateway_output, model, tokenizer, splade_model, splade_tokenizer, top_k=10)

In [None]:
paths = dict()
for k,v in search_results.items():
    paths[k] = [i['metadata']['img_path'] for i in v['matches']]

for k,v in paths.items():
    print(k)
    draw_images([Image.open(i) for i in v])

In [None]:
example_text = "street fashion"

first_gateway_output = pass_first_gateway(example_text, llm)

if (first_gateway_output['clothes_topic']):
    print("Passed the first gateway. Moving on to the second gateway...")
    if (not first_gateway_output['fashion_item']):
        
        print("However, specific item is not found. Searching the whole database.")
        gateway_output = pass_third_gateway(example_text, llm)
        filter=False
    else:
        gateway_output = pass_second_gateway(example_text, llm)
        filter=True
    search_results = text_search(index, gateway_output, model, tokenizer, splade_model, splade_tokenizer, top_k=10)

In [None]:
paths = dict()
for k,v in search_results.items():
    paths[k] = [i['metadata']['img_path'] for i in v['matches']]

for k,v in paths.items():
    print(k)
    draw_images([Image.open(i) for i in v])

In [None]:
def fashion_query_transformer(text_input):

    llm = OpenAI(model="gpt-4-turbo-preview")

    #### text가 패션과 관련된 항목인지 여부를 판단
    first_gateway_output = pass_first_gateway(text_input, llm)
    print(first_gateway_output)

    if (first_gateway_output['clothes_topic']):
        # print("Passed the first gateway. Moving on to the second gateway...")
        if (not first_gateway_output['fashion_item']):
            # print("However, specific item is not found. Searching the whole database.")
            gateway_output = pass_third_gateway(text_input, llm)
        else:
            done=False
            while not done:
                try:
                    gateway_output = pass_second_gateway(text_input, llm)
                    done=True
                except:
                    continue
    else:
        return None
    return gateway_output

In [None]:
def text_search(index, items_dict, model, tokenizer, splade_model, splade_tokenizer, top_k=10, hybrid=False):
    search_results = dict()
    for item in items_dict['items']:
        text_emb = get_single_text_embedding(item['refined_text'], model, tokenizer)
        if hybrid:
            sparse_vector = gen_sparse_vector(item['refined_text'], splade_model, splade_tokenizer)
        else:
            sparse_vector=None
        
        if 'clothes_type' in list(item.keys()):
            search_result = index.query(
                            vector=text_emb[0],
                            sparse_vector=sparse_vector,
                            top_k=top_k,
                            filter={"category": {"$eq": item['clothes_type']}},
                            include_metadata=True
                        )
            search_results[item['clothes_type']] = search_result
        else:
            search_result = index.query(
                            vector=text_emb[0],
                            sparse_vector=sparse_vector,
                            top_k=top_k,
                            include_metadata=True
                        )
            search_results['all'] = search_result
    return search_results

## Test
- 패션과 무관한 텍스트 -> None
- 패션 스타일 텍스트 -> hybrid search
- 패션 아이템 텍스트 -> apply filter & hybrid search

In [None]:
input = "a fluffy cat"
sparse_query = gen_sparse_vector(input, splade_model, splade_tokenizer)

# 인풋을 체크 및 변환
text_query = fashion_query_transformer(input)

if text_query:
    # search
    result = text_search(index, text_query, model, tokenizer, splade_model, splade_tokenizer, top_k=10)

    # 이미지들의 path 가져오기
    paths = dict()
    for k,v in result.items():
        paths[k] = [i['metadata']['img_path'] for i in v['matches']]

    # 이미지들 show
    for k,v in paths.items():
        print(k)
        draw_images([Image.open(i) for i in v])
else:
    print("패션과 무관한 텍스트 입니다")

In [None]:
input = "vans shoes with formal suit and a red tie for a wedding"
text_query = fashion_query_transformer(input)

if text_query:
    print(text_query)
    # search
    sparse_query = gen_sparse_vector(input, splade_model, splade_tokenizer)
    result = text_search(index, text_query, model, tokenizer, splade_model, splade_tokenizer, top_k=10)

    # 이미지들의 path 가져오기
    paths = dict()
    for k,v in result.items():
        paths[k] = [i['metadata']['img_path'] for i in v['matches']]

    # 이미지들 show
    for k,v in paths.items():
        print(k)
        draw_images([Image.open(i) for i in v])
else:
    print(text_query)

In [None]:
input = "creative fashion"

# 인풋을 체크 및 변환
text_query = fashion_query_transformer(input)

if text_query:
    # search
    result = text_search(index, text_query, model, tokenizer, splade_model, splade_tokenizer, top_k=10)

    # 이미지들의 path 가져오기
    paths = dict()
    for k,v in result.items():
        paths[k] = [i['metadata']['img_path'] for i in v['matches']]

    # 이미지들 show
    for k,v in paths.items():
        print(k)
        draw_images([Image.open(i) for i in v])
else:
    print(text_query)