In [1]:
import os
import openai
import json
from tqdm.notebook import tqdm
import numpy as np
import re
from PIL import Image
from dotenv import load_dotenv
load_dotenv()

# initialize openai
openai.api_key = os.environ["OPENAI_API_KEY"]

In [3]:
sample_image = Image.open('../data/room-dataset/living/living_200.jpg')
# sample_image ## 거실 이미지

- search의 종류
    - step 1 (유사한 분위기를 연출한 거실 이미지 탐색)
        - image to image
        - text to image
            - caption, gpt-4v description, etc
    - step 2 (이미지 내에 있는 물건들을 활용)
        - 각 가구들끼리의 이미지 유사도 측정 (img-emb-sim 측정)
    - (추가) filtering (meta data)

- 이미지에서 정보를 최대한 많이 추출하여 데이터 포인트로 생성
    - 이미지의 전반적인 분위기 (image embeddings)
    - 이미지에 대한 설명 (image description using GPT-4V)
    - 이미지 내에 있는 가구들 (object detection)

### Preprocessing

- 이미지 설명 생성 (description generation)
- object detection (words)
- 각 이미지의 좌표 위치 생성

### 1. GPT-4V를 활용하여 다양한 정보 추출

- Rate limit을 고려하여 GPT-4V api call

참고 : https://platform.openai.com/docs/guides/vision

Rate limit 확인 : https://platform.openai.com/account/limits

In [None]:
import base64
import requests

def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

#### GPT-4v를 활용하여 이미지 설명 생성

In [None]:
text_prompt = """Please analyze the living room image provided.  
Include 'Color Scheme', 'Lighting', 'Spatial Layout', and 'Architectural Features' with descriptions based on the room's characteristics.
The output should be formatted in a JSON-like dictionary structure. Each image should be done separately.

Example output :

```json
  {
    "Color Scheme": <Description about color scheme>,
    "Lighting": <Description about lighting>,
    "Spatial Layout": <Description about spatial layouts >,
    "Architectural Features": <Descrption about architectural features>
  }
```
"""

headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {openai.api_key}"
}

img = encode_image('../data/room-dataset/living/living_18.jpg')
img2 = encode_image('../data/room-dataset/living/living_5.jpg')

payload = {
  "model": "gpt-4-vision-preview",
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": text_prompt
        },
        {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{img}"
          }
        },
        {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{img2}"
          }
        }
      ]
    },
  ],
  "max_tokens": 1000
}


In [None]:
# # Tier2 기준

# TPM = 20000
# RPM = 100
# RPD = 1000

# Tier1 기준

TPM = 10000
RPM = 100
RPD = 1000

low_res = True # 저화질 : 512x512 사이즈 이미지를 Input으로

if low_res:
    token_per_img = 65
    text_token = 115
    print("1분에 최대 {}번의 api call 가능.".format( min(TPM//(token_per_img*2+text_token), RPM)))
else:
    print("기본 65 토큰 + 512px 사이즈로 crop 된 이미지 개수 x 129 토큰")

In [None]:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
print(response.json())

In [None]:
output = response.json()['choices'][0]['message']['content']

In [None]:
print(output)

- gpt-4v는 json 형태로 결과를 내어줄 수 없다
    - 따라서 전반적인 image descprition을 받은 후 검증이 필요함

#### GPT-4v를 활용하여 이미지 설명 생성 (이미지 100개에 적용)

In [None]:
def describe_image(input_prompt, image_paths, openai_key):
  headers = {
      "Content-Type": "application/json",
      "Authorization": f"Bearer {openai_key}"
      }
  imgs = [encode_image(i) for i in image_paths]

  payload = {
          "model": "gpt-4-vision-preview",
          "messages": [{"role": "user",
                      "content": []
                      },
                      ],
          "max_tokens": 1000
          }
  
  img_contents = [{"type": "text", "text": input_prompt}]
  for img in imgs:
    input_template = {
      "type": "image_url",
      "image_url": {
        "url": f"data:image/jpeg;base64,{img}"
      }
    }
    img_contents.append(input_template)

  payload['messages'][0]['content'] = img_contents

  response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
  output = response.json()['choices'][0]['message']['content']
  return output

In [None]:
# 1번부터 100번 이미지까지, 2개씩
img_paths = list(os.walk('../data/room-dataset/living'))[0][2]
img_paths = [i for i in img_paths if i!=".DS_Store"]
img_paths = [i for i in img_paths if int(i.split('_')[1].split('.')[0]) in list(range(1, 101))]

def extract_number(filename):
    match = re.search(r'\d+', filename)
    return int(match.group()) if match else 0

img_paths = sorted(img_paths, key=extract_number)
img_paths = [os.path.join('../data/room-dataset/living', i) for i in img_paths]

batches = [img_paths[i : i+2] for i in range(0, len(img_paths), 2)]
outputs = dict()

- GPT-4V는 JSON 형태의 아웃풋을 강제할 수 없음
- 단순 이미지 description을 활용하여 유사도 측정을 해도 무방함

In [None]:
## 과금 주의
for batch in tqdm(batches):
    r = describe_image(text_prompt, batch, openai.api_key)
    batch1 = batch[0].split('/')[-1]
    batch2 = batch[1].split('/')[-1]
    outputs[batch1 + "#" + batch2] = r

In [None]:
# with open("../data/room-dataset/room_descriptions.json", 'w') as file:
#     json.dump(outputs, file)

In [None]:
with open("../data/room-dataset/room_descriptions.json", 'r') as file:
    outputs = json.load(file)

In [None]:
print(outputs['living_1.jpg#living_2.jpg'])

In [None]:
print(outputs['living_11.jpg#living_12.jpg'])

In [None]:
def parse_response(text):
    matches = re.findall(r'(\{[\s\S]*?\})', text)
    matches = [json.loads(m) for m in matches]
    return matches

In [None]:
print(outputs['living_11.jpg#living_12.jpg'])

In [None]:
parse_response(outputs['living_11.jpg#living_12.jpg'])

In [None]:
failed = list()

for k,v in outputs.items():
    try:
        parsed = parse_response(v)
        if len(parsed)<2:
            failed.append(k)
        else:
            outputs[k] = parsed    
    except:
        failed.append(k)

In [None]:
len(failed)

In [None]:
failed

dict 안에 dict가 있는 등 일관된 결과를 얻기 힘들다

In [None]:
print(outputs['living_91.jpg#living_92.jpg'])

#### chat_completion api를 활용하여 비정형 text를 json format으로 변경

In [None]:
output_formatting_prompt = """Using the provided text, find the smallest format of json there is and store them in a list as separate elements.
The ouput list should have two json objects found from the provided text.

Desired output :
{'list': [{'Image 1': {'Color Scheme': <Color Scheme>,
    'Lighting': <Lighting>,
    'Spatial Layout': <Spatial Layout>,
    'Architectural Features': <Architectural Features>}},
  {'Image 2': {'Color Scheme': <Color Scheme>,
    'Lighting': <Lighting>,
    'Spatial Layout': <Spatial Layout>,
    'Architectural Features': <Architectural Features>}}]}

Provided text : """

def normal_chat_completion(input_prompt, model='gpt-4-turbo-preview'):
    client = openai.OpenAI()

    response = client.chat.completions.create(
        model=model,
        response_format={ "type": "json_object" },
        messages=[
            {"role": "system", "content": 'You are a smart and intelligent program that understands information and provides output in JSON format'},
            {"role": "user", "content":input_prompt}
        ]
        )
    return response

In [None]:
info = normal_chat_completion(output_formatting_prompt + outputs['living_15.jpg#living_16.jpg'])

In [None]:
print(info.choices[0].message.content)

In [None]:
failed

In [None]:
parse_step1 = dict()

for f in tqdm(failed):
    parsed = normal_chat_completion(output_formatting_prompt+outputs[f])
    parse_step1[f] = parsed

In [None]:
def reformat_json(input_dict):
    output_list = []
    for item in input_dict['list']:
        for key in item:
            output_list.append(item[key])
    return output_list

In [None]:
for k, v in parse_step1.items():
    v = json.loads(v.choices[0].message.content)
    outputs[k] = reformat_json(v)

In [None]:
outputs['living_81.jpg#living_82.jpg']

In [None]:
indiv_outputs = dict()

for k,v in outputs.items():
    k1, k2 = k.split("#")
    v1, v2 = v

    indiv_outputs[k1] = v1
    indiv_outputs[k2] = v2

In [None]:
indiv_outputs['living_11.jpg']

In [None]:
indiv_outputs['living_1.jpg']

In [None]:
fixed = dict()

for k,v in indiv_outputs.items():
    tmp_dict = dict()
    for kk, vv in v.items():
        if isinstance(vv, list):
            tmp_dict[kk] = ' '.join(vv)
        else:
            tmp_dict[kk] = vv
    fixed[k] = tmp_dict

In [None]:
fixed['living_1.jpg']

In [None]:
# with open("../data/room-dataset/room_descriptions_parsed.json", 'w') as file:
#     json.dump(fixed, file)

In [None]:
with open("../data/room-dataset/room_descriptions_parsed.json", 'r') as file:
    final_outputs = json.load(file)

In [None]:
final_outputs['living_1.jpg']

### 2-1. Yolo를 활용하여 가구 detect

- YOLO class?

In [None]:
from utils import detect_objects

In [None]:
import yolov5

# 출처 : https://pypi.org/project/yolov5/

# load pretrained model
model = yolov5.load('yolov5s.pt')

# set model parameters
model.conf = 0.3  # NMS confidence threshold
model.iou = 0.45  # NMS IoU threshold
model.agnostic = False  # NMS class-agnostic
model.multi_label = False  # NMS multiple labels per box
model.max_det = 1000  # maximum number of detections per image

In [None]:
detections = detect_objects('../data/room-dataset/living/living_18.jpg', model)

In [None]:
detections[0].show()

In [None]:
detections

In [None]:
def filter_furniture(detections):
    furniture_class = [56, 57, 59, 60] # detections[0].names
    furniture_names = ['chair', 'couch', 'bed', 'dining table']
    furniture_detected = {}

    filter = [True if (i in furniture_names) and (s>0.5) else False for i, s in zip(detections[1]['labels'], detections[1]['scores'])]
    furniture_detected['boxes'] = detections[1]['boxes'][filter]
    furniture_detected['scores'] = detections[1]['scores'][filter]
    furniture_detected['categories'] = detections[1]['categories'][filter]
    furniture_detected['labels'] = [item for item, bool in zip(detections[1]['labels'], filter) if bool==True]
    
    return furniture_detected

In [None]:
a = filter_furniture(detections)

In [None]:
a

#### 2-2. 가구들을 crop + 좌표 추출

In [None]:
detections = dict()

for img in tqdm(img_paths):
    detect = detect_objects(img, model)
    detections_parsed = filter_furniture(detect)
    detections[img] = detections_parsed

In [None]:
detections['../data/room-dataset/living/living_10.jpg']

In [None]:
def crop_bbox(pil_image, bbox):
    x_min, y_min, x_max, y_max = bbox
    crop_box = (x_min, y_min, x_max, y_max)

    cropped_image = pil_image.crop(crop_box)

    return cropped_image

def normalize_image(pil_image, target_size=(224, 224)):
    # resizing
    resized_image = pil_image.resize(target_size, Image.LANCZOS)

    # normalization
    np_image = np.array(resized_image).astype('float32')
    np_image /= 255.0  # pixel values to [0, 1]
    normalized_image = Image.fromarray((np_image * 255).astype('uint8'))
    return normalized_image

In [None]:
def crop_and_extract_coords(detection_results, base_path='../data/room-dataset/living_cropped/'):
    for image_path, details in detection_results.items():
        pil_image = Image.open(image_path)
        if pil_image.mode == 'RGBA':
            # Convert the image to RGB
            # process 표준화 및 정확도 향상을 위해 변경
            pil_image = pil_image.convert('RGB')

        for i, bbox in enumerate(details['boxes']):
            # Crop and normalize the image
            cropped_image = crop_bbox(pil_image, bbox)
            normalized_image = normalize_image(cropped_image, target_size=(112, 112))

            # Save the normalized image
            save_path = base_path + image_path.split('/')[-1].split('.')[0] + "_" + str(i) + ".jpg"
            normalized_image.save(save_path)

    return detection_results

In [None]:
detections_parsed = crop_and_extract_coords(detections)

In [None]:
detections_parsed['../data/room-dataset/living/living_1.jpg']

In [None]:
def convert_np_to_lists(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {k: convert_np_to_lists(v) for k, v in obj.items()}
    return obj

In [None]:
detections_parsed = convert_np_to_lists(detections_parsed)

In [None]:
# with open("../data/room-dataset/room_detections_parsed.json", 'w') as file:
#     json.dump(detections_parsed, file)