In [1]:
"""
@Description :   The 4th step of main.py, where LLM & VLM are implemented
@Author      :   Yan Ding 
@Time        :   2023/09/03 21:54:15
"""

'''
pip install openai==0.27.0
pip install inflect
pip install modelscope -U
pip install transformers accelerate tiktoken -U
pip install einops transformers_stream_generator -U
pip install "pillow==9.*" -U
pip install torchvision
pip install matplotlib -U
'''

import torch
import os
import math
import numpy as np
import inflect
import sys
import openai
from modelscope import snapshot_download, AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation import GenerationConfig

2023-09-11 01:02:45,769 - modelscope - INFO - PyTorch version 2.0.1 Found.
2023-09-11 01:02:45,773 - modelscope - INFO - Loading ast index from /root/.cache/modelscope/ast_indexer
2023-09-11 01:02:45,832 - modelscope - INFO - Loading done! Current index file version is 1.9.0, with md5 324dfd599f06dff38089c5f9fda58af5 and a total number of 921 components indexed


In [26]:
# ----------------------------------------------------------------
# Step 4.0: Configuration and assumptions
# ----------------------------------------------------------------
CONFIG = {
    "api_key": "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",
    "gpt_model": "text-davinci-003",
    "model_id": "qwen/Qwen-VL-Chat",
    "revision": "v1.0.3"
}
openai.api_key = CONFIG["api_key"]


# We have an assumption that we know object categories, and exact object name
# 数据初始化
fruit_names = ["banana", "apple", "pear"]
food_names = ["potato_chip", "pastry", "pie"]
beverage_names = ['bottle']
utensil_names = ['knife', 'fork', 'spoon']
container_names = ['bowl', 'mug', 'cup', 'plate']
table_name = ["counter", "table"]
cabinet_name = ["cabinet"]
appliance_names = ['fridge', 'microwave']
object_names = fruit_names + food_names + beverage_names + utensil_names + container_names + table_name + cabinet_name + appliance_names

# 将对象按类型分组
object_groups = {
    'fruit_names': fruit_names,
    'food_names': food_names,
    'beverage_names': beverage_names,
    'utensil_names': utensil_names,
    'container_names': container_names,
    'table_name': table_name,
    'cabinet_name': cabinet_name,
    'appliance_names': appliance_names
}

# 可能的位置列表
locations = container_names + table_name + cabinet_name + appliance_names

predicates = {
    'fruit_names': ['on', 'in'],
    'food_names': ['on', 'in'],
    'beverage_names': ['on', 'in'],
    'utensil_names': ['on', 'in'],
    'container_names': ['empty'],
    'table_name': [],
    'cabinet_name': ['open'],
    'appliance_names': ['open']
}

all_predicate_based_prompts = []
# 遍历所有对象和位置生成提示
for obj_group, obj_list in object_groups.items():
    for obj in obj_list:
        for loc in locations:
            # 过滤掉 'in counter' 和 'in table'
            if 'on' in predicates[obj_group] and loc in table_name:
                all_predicate_based_prompts.append([f"is {obj} on {loc}?", f"{obj}", f"{loc}"])
            if 'in' in predicates[obj_group] and loc not in table_name:
                all_predicate_based_prompts.append([f"is {obj} in {loc}?", f"{obj}", f"{loc}"])
        if 'empty' in predicates[obj_group]:
            all_predicate_based_prompts.append([f"is {obj} empty?", f"{obj}"])
        if 'open' in predicates[obj_group]:
            all_predicate_based_prompts.append([f"is {obj} open?", f"{obj}"])

# # 输出结果
# for prompt in all_predicate_based_prompts:
#     print(prompt)

# 创建同义词或映射字典
synonyms_dict = {
    "refrigerator freezer": "fridge",
    "refrigerator": "fridge",
    "freezer": "fridge",
    "microwave oven": "microwave"
}

In [20]:
# ----------------------------------------------------------------
# Step 4.1: Load VLM models
# ----------------------------------------------------------------
model_dir = snapshot_download(CONFIG["model_id"], revision=CONFIG["revision"])
torch.manual_seed(1234)
tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", trust_remote_code=True, fp16=True).eval()
print(f"model weights is in {model_dir}")
# 'Users can modify configuration in generation_config.json'
model.generation_config = GenerationConfig.from_pretrained(model_dir, trust_remote_code=True)

2023-09-11 01:27:57,862 - modelscope - INFO - Use user-specified model revision: v1.0.3


Loading checkpoint shards:   0%|          | 0/10 [00:00<?, ?it/s]

model weights is in /root/.cache/modelscope/hub/qwen/Qwen-VL-Chat


In [21]:
# ----------------------------------------------------------------
# Step 4.2: Utils functions
# ----------------------------------------------------------------
def extract_bounding_box(): #TODO
    for object_index in range(len(objects_name)):
        object = objects_in_description[object_index]
        response, history = model.chat(tokenizer, f'please output the bounding box of {object}', history=history)
        print(response)
        image = tokenizer.draw_bbox_on_latest_picture(response, history)
        image.save(f'/home/image_output/view_{camera_index}_object_{object_index}.jpg')

def find_image_path(image_name):
    """
    Helper function to find the image path.
    """
    for ext in ['.jpg', '.png']:
        image_path = f'/home/input/{image_name}{ext}'
        if os.path.exists(image_path):
            return image_path
    raise FileNotFoundError(f"No image found for {image_name} in supported formats (.jpg, .png).")


def vlm(image_name, prompt):
    """
    This function queries a Vision-Language Model (VLM) for a given image name and prompt.
    This function deos not need need history.
    
    Args:
        image_name: Name of the image file without extension. The function will attempt to find both .jpg and .png extensions.
        prompt: The text prompt/question that you want to ask the VLM about the image.
    """
    
    image_path = find_image_path(image_name)
    query = tokenizer.from_list_format([{'image': image_path}, {'text': prompt}])
    response, history = model.chat(tokenizer, query=query, history=None)
    
    return response, history

def vlm_history(prompt, image_history):
    """
    Queries a Vision-Language Model (VLM) using a given image and prompt, while considering previous chat history.
    
    Args:
        image_name (str): The base name of the image without its file extension.
                        This function attempts to locate the image using both .jpg and .png extensions.
        image_history (list): Previous interactions with the VLM. Expected to be tokenized.
        prompt (str): The text prompt/question for the VLM regarding the image.
    """
    response, history = model.chat(tokenizer, query=prompt, history=image_history)
    
    return response

def convert_prompt_to_format(initial_state):
    """
    Convert the initial state prompts into a structured format based on their content.
    For example, a prompt like 'is spoon on counter?' --> ['on', 'spoon', 'counter']
    
    Args:
        initial_state (dict): A dictionary where the keys are camera identifiers (or similar) and the values 
                            are lists of prompts related to that camera.
    """
    def parse_prompt(prompt):
        words = prompt.split()
        
        # Check for 'on' or 'in' predicate
        if ' on ' in prompt:
            return ['on', words[1], words[3].replace('?', '')]
        elif ' in ' in prompt:
            return ['in', words[1], words[3].replace('?', '')]
        elif ' open' in prompt:
            return ['open', words[1].replace('?', '')]
        elif ' empty' in prompt:
            return ['empty', words[1].replace('?', '')]
        else:
            # If the prompt doesn't match any of the known formats, return the original prompt as a single string
            return [prompt]

    formatted_state = {}
    for camera, prompts in initial_state.items():
        formatted_prompts = [parse_prompt(prompt) for prompt in prompts]
        formatted_state[camera] = formatted_prompts

    return formatted_state

def llm(prompt):
  sampling_params = {"n": 1,  # sampling number
                      "max_tokens": 32,
                      "temperature": 0,
                      "top_p": 1,
                      "logprobs": 1,
                      "presence_penalty": 0,
                      "frequency_penalty": 0,
                      "stop": ['\\n', '.']}
  raw_response = openai.Completion.create(engine=CONFIG["gpt_model"], prompt=prompt, **sampling_params)
  responses = [raw_response['choices'][i]['text'] for i in range(sampling_params['n'])]
  mean_probs = [math.exp(np.mean(raw_response['choices'][i]['logprobs']['token_logprobs'])) for i in range(sampling_params['n'])]
  responses = [sample.strip().lower() for sample in responses]
  return responses, mean_probs

def query_llm_for_objects(response):
    task = "The task is to extract object names in the sentence."
    example1 = "Input: There is an apple and a bowl. Output: apple, bowl"
    example2 = "Input: There is an oven on the counter. Output: oven, counter"
    question = f"Input: {response}"
    prompt = f'{task} \n {example1} \n {example2} \n {question}.'
    response, _ = llm(prompt)

    items = response[0].split(':')[-1]
    items_list = [item.strip() for item in items.split(',') if item.strip()]
    normalized_items = [synonyms_dict.get(item, item) for item in items_list]

    p = inflect.engine()
    return [p.singular_noun(item) or item for item in normalized_items if any(item in obj_name for obj_name in object_names)]

def select_prompts(objects):
    return [
        sentence[0] 
        for sentence in all_predicate_based_prompts 
        if (len(sentence) == 3 and sentence[1] in objects and sentence[2] in objects) or
           (len(sentence) != 3 and sentence[1] in objects)
    ]

In [22]:
# ----------------------------------------------------------------
# Step 4.3: Do image captioning
# ----------------------------------------------------------------
vlm_responses = {}  # store responses from vlm
histories = {}  # store histories from vlm
num_view = 6
for index in range(num_view):
    index += 1
    image_name = f'view_{index}'
    prompt = 'please descripe the input image.'
    response, history = vlm(image_name, prompt)
    
    # store results
    vlm_responses[image_name] = response
    histories[image_name] = history

print('VLM has output response about image captioning!')
# print(f'vlm_responses:{vlm_responses}')
# print(f'histories:{histories}')

VLM has output response about image captioning!


In [23]:
# ----------------------------------------------------------------
# Step 4.4: Extract objects from VLM's response
# ----------------------------------------------------------------
extracted_objects = {}
for image_name, response in vlm_responses.items():
  extracted_objects[image_name] = query_llm_for_objects(response)
  print('Objects in image {}: {}'.format(image_name, extracted_objects[image_name]))

Objects in image view_1: ['counter', 'apple']
Objects in image view_2: ['microwave']
Objects in image view_3: ['microwave']
Objects in image view_4: ['cabinet']
Objects in image view_5: ['fridge', 'microwave']
Objects in image view_6: ['fridge']


In [24]:
# ----------------------------------------------------------------
# Step 4.5: Select predicate-based prompts
# ----------------------------------------------------------------
selected_predicate_based_prompts = {}

for image_name, objects in extracted_objects.items():
    matched_prompts = []
    for sentence in all_predicate_based_prompts:
        object1 = sentence[1]
        object2 = sentence[2] if len(sentence) == 3 else None

        # 若object1在extracted_objects中，并且object2在extracted_objects中或者为None，那么将此句子加入matched_prompts
        if object1 in objects and (not object2 or object2 in objects):
            matched_prompts.append(sentence[0])

    selected_predicate_based_prompts[image_name] = matched_prompts

for image_name, predicate_based_prompts in selected_predicate_based_prompts.items():
    print(f'For {image_name}, predicate_based_prompts: {predicate_based_prompts}')

# print(f'selected_predicate_based_prompts={selected_predicate_based_prompts}')

For view_1, predicate_based_prompts: ['in the image, is apple on counter?']
For view_2, predicate_based_prompts: ['in the image, is microwave open?']
For view_3, predicate_based_prompts: ['in the image, is microwave open?']
For view_4, predicate_based_prompts: ['in the image, is cabinet open?']
For view_5, predicate_based_prompts: ['in the image, is fridge open?', 'in the image, is microwave open?']
For view_6, predicate_based_prompts: ['in the image, is fridge open?']


In [25]:
# ----------------------------------------------------------------
# Step 4.6: Convert prompts
# ----------------------------------------------------------------
initial_state = {}
for index_view in range(num_view):
    index_view = f'view_{index_view + 1}'
    for index_prompt in range(len(selected_predicate_based_prompts[index_view])):
        prompt = selected_predicate_based_prompts[index_view][index_prompt]
        image_history = histories[index_view]
        response = vlm_history(prompt, image_history)
        print(f'response:{response}')
        
        if 'yes' in response.lower():
            if image_name not in initial_state:
                initial_state[index_view] = []
            initial_state[index_view].append(prompt)

formatted_state = convert_prompt_to_format(initial_state)
print(f'initial_state={formatted_state}')

response:Yes, there is an apple on the counter in the image.
response:No, the microwave is closed in the image.
response:Yes, the microwave is open in the image.
response:Yes, the cabinet is open in the image.
response:Yes, the refrigerator is open in the image.
response:No, the microwave is closed in the image.
response:Yes, the fridge is open in the image.
initial_state={'view_1': [['on', 'the', 'is']], 'view_3': [['open', 'the']], 'view_4': [['open', 'the']], 'view_5': [['open', 'the']], 'view_6': [['open', 'the']]}
