# Multimodal MR dialogue dataset construction
### 1. Setup openai agent and generate conversations

In [1]:
import os
import time
from typing import Tuple, Dict, List

from langchain.agents import initialize_agent
from langchain.tools import StructuredTool
from langchain.agents import AgentType
from langchain.chat_models import ChatOpenAI
from langchain.agents import AgentExecutor
from langchain.memory import ConversationBufferMemory
from langchain.prompts.chat import MessagesPlaceholder
from langchain.schema.messages import SystemMessage
from callbacks import AgentCallbackHandler

In [2]:
# Load .env variables
from dotenv import load_dotenv
load_dotenv()

True

#### 1.1 Design a prompt for guideline-grounded conversation generation task

In [3]:
short_task_reminder = 'The task it to generate multiple turns of conversations and called tools between trainer (assistant) and trainee (user) grounded on the task-specific guidelines and tools in LEGO XR application. '
task_instruction = f"""
{short_task_reminder}
The trainer aims to teach the trainee how to accomplish the assembly task based on the task-specific guidelines, supported by an XR application.
Specifically, the trainee is wearing AR glasses to see both VR environment and real world.
The trainee knows nothing about the guidelines before trainer's guidance.
For each step,
the trainee must ask at least one deep-dive question, or request a troublesome issue if he or she cannot follow the guide, or call tools from XR application and learn how to use those tools;
the trainer must answer the question, assist the trainee, show them the responses of the execution of the tools.
At the end of a conversation,
first, trainer must ask if the trainee has accomplished the task and the trainee must tell if the trainee can accomplish the task;
second, trainer must ask how is user experience, and the trainee provide feedback on the user experience.
You must add a section title to separate which key point in the guideline in the generated conversation and generate until the final step of the guidelines.
"""

tool_descriptions = {
    "StartAssemble": "Useful Unity tool to initiate the assembly process.",
    "NextStep": "Useful Unity tool to move to the next assembly step.",
    "FrontStep": "Useful Unity tool to go back to the previous assembly step.",
    "Explode": "Useful Unity tool to trigger an explosion for detailed viewing.",
    "Recover": "Useful Unity tool to restore the initial state of AR objects after explosion.",
    "FinishedVideo": "Useful Unity tool to end the assembly process and show a video of the assembled LEGO bricks.",
    "ReShow": "Useful Unity tool to repeat the current assembly step.",
    "Enlarge": "Useful Unity tool to enlarge or zoom out the current object.",
    "Shrink": "Useful Unity tool to shrink or zoom in the current object.",
    "GoToStep": "Useful Unity tool to go to the given an assembly step number.",
    "Rotate": "Useful Unity tool to rotate the current object to a direction.",
    "ShowPieces": "Useful Unity tool to show all candidate LEGO pieces to be assembled.",
    "HighlightCorrectComponents": "Useful Unity tool to highlight correct attachment points and components.",
    "GetCurrentStep": "Useful Unity tool to get the number of the current step.",
    "GetRemainingStep": "Useful Unity tool to get the number of the remaining steps.",
    "CheckStepStatusVR": "Useful Unity tool to check if the current step in Unity is accomplished correctly or not. If the current assembly sequence recorded in Unity is the same as the manual assembly sequence, then it is correct, otherwise, it is incorrect.",
    "APICallObjectRecognitionAR": "Useful AR tool to call the VLM agent to identify LEGO pieces based on the provided video streaming data from AR glasses and highlights the recognized pieces in the AR environment.",
    "APICallCheckStepStatusAR": "Useful AR tool to call the VLM agent to determine if the current assembly step is completed correctly or not, using the provided video streaming data from AR glasses as input."
}

sys_prompt = f"""
### Instruction:
{task_instruction}

### Tools:
{tool_descriptions}
"""

#### 1.2 Setup dialogue agent with the prompt

In [7]:
class Config:
    """
    Contains the configuration of the LLM.
    """
    model = 'gpt-3.5-turbo-16k-0613'
    try:
        OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
    except:
        print(f'OPENAI_API_KEY={OPENAI_API_KEY}')
    temperature = 0.0
    verbose = True
    
    
class LegoAPIWrapper:
    def __init__(self, tools):
        self.tools = tools

        # Dynamically create methods based on the function names
        for function_name in tools:
            setattr(self, function_name, self._create_class_method(function_name))

    def __getattr__(self, function_name):
        if function_name in self.tools:
            return self._create_class_method(function_name)
        else:
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{function_name}'")

    def _create_class_method(self, function_name):
        def method():
            print(f"Unity: Method '{function_name}' has been called.")
            return f"Response of '{function_name}'"

        return method
        
def setup_memory() -> Tuple[Dict, ConversationBufferMemory]:
    """
    Sets up memory for the open ai functions agent.
    :return a tuple with the agent keyword pairs and the conversation memory.
    """
    system_message = SystemMessage(content=f"{sys_prompt}")
    agent_kwargs = {
        "extra_prompt_messages": [MessagesPlaceholder(variable_name="memory")],
        "system_message": system_message,
    }
    memory = ConversationBufferMemory(memory_key="memory", return_messages=True)

    return agent_kwargs, memory

# In the setup_tools function, access descriptions from LegoAPIWrapper
def setup_tools() -> List[StructuredTool]:

    lego_toolkits = LegoAPIWrapper(tool_descriptions)     # async toolkits

    # Create StructuredTool objects with descriptions from LegoAPIWrapper
    structured_tools = []
    # structured_tools = [metadata_retriever]

    for name, description in tool_descriptions.items():
        func = getattr(lego_toolkits, name)
        structured_tools.append(StructuredTool.from_function(func=func, name=name, description=description))

    return structured_tools

def setup_agent() -> AgentExecutor:
    """
    Sets up the tools for a function based chain.
    """
    cfg = Config()

    llm = ChatOpenAI(
        temperature=cfg.temperature,
        model=cfg.model,
        verbose=cfg.verbose
    )

    agent_kwargs, memory = setup_memory()

    tools = setup_tools()

    return initialize_agent(
        tools, 
        llm,
        agent=AgentType.OPENAI_FUNCTIONS, 
        verbose=False, 
        agent_kwargs=agent_kwargs,
        memory=memory
    )

agent_executor: AgentExecutor = setup_agent()

#### 1.3 Generate a conversation grounded by simulated tool responses and add to the memory of agent.
What could be a good example of a conversation with calling the tools? We generate it rather than just imagine an example in a prompt.

In [5]:
import random

class UnityAssemblySimulator:
    def __init__(self, tool_descriptions, manual_total_step=10):
        self.tool_descriptions = tool_descriptions
        self.current_step = 1
        self.remaining_steps = manual_total_step - self.current_step # Assuming there are 10 assembly steps in total

    def StartAssemble(self):
        self.current_step = 1
        return "Started assembly. Begin with step 1."

    def NextStep(self):
        if self.current_step < 10:
            self.current_step += 1
            self.remaining_steps -= 1
            return f"Moved to the next assembly step: {self.current_step}. {self.remaining_steps} steps remaining."
        else:
            return "Assembly is already at the final step."

    def FrontStep(self):
        if self.current_step > 1:
            self.current_step -= 1
            self.remaining_steps += 1
            return f"Moved back to the previous assembly step: {self.current_step}. {self.remaining_steps} steps remaining."
        else:
            return "Already at the first assembly step."

    def Explode(self):
        return "Triggered explosion for detailed viewing. Use 'Recover' to restore the initial state."

    def Recover(self):
        return "Restored the initial state of AR objects after explosion."

    def FinishedVideo(self):
        return "Ended the assembly process. Showing a video of the assembled LEGO bricks."

    def ReShow(self):
        return f"Repeating the current assembly step: {self.current_step}."

    def Enlarge(self):
        return "Enlarged or zoomed out the current object."

    def Shrink(self):
        return "Shrunk or zoomed in the current object."

    def GoToStep(self, step_number):
        if 1 <= step_number <= 10:
            self.current_step = step_number
            self.remaining_steps = 10 - step_number
            return f"Jumped to assembly step: {self.current_step}. {self.remaining_steps} steps remaining."
        else:
            return "Invalid step number. Provide a step number between 1 and 10."

    def Rotate(self, direction):
        return f"Rotated the current object <{direction}>."

    def ShowPieces(self):
        return "Showing all candidate LEGO pieces to be assembled."

    def HighlightCorrectComponents(self):
        return "Highlighted correct attachment points and components."

    def GetCurrentStep(self):
        return f"The current assembly step is: {self.current_step}."

    def GetRemainingStep(self):
        return f"The number of remaining steps is: {self.remaining_steps}."

    def CheckStepStatusVR(self):
        # Assuming Unity and manual assembly sequences are the same for simplicity
        return "The current assembly step in Unity is accomplished correctly."

    def APICallObjectRecognitionAR(self):
        return "Called the VLM agent to identify LEGO pieces based on video streaming data from AR glasses."

    def APICallCheckStepStatusAR(self):
        return "Called the VLM agent to determine if the current assembly step is completed correctly based on video streaming data from AR glasses."

    def simulate_response(self, tool_name, *args):
        if tool_name in self.tool_descriptions:
            # method_name = tool_name.lower().replace(" ", "_")
            method_name = tool_name
            if hasattr(self, method_name):
                method = getattr(self, method_name)
                return f"Called the Unity tool <{method_name}> and got a response: {method(*args)}"
            else:
                return f"Method not implemented for tool '{tool_name}'."
        else:
            return f"Tool '{tool_name}' not found in the tool descriptions."

# Example usage:
# print(tool_descriptions)
simulator = UnityAssemblySimulator(tool_descriptions)

# Simulate responses for random tools
directions = ['Up', 'Down', 'Left', 'Right', 'None'] # https://docs.unity3d.com/2018.3/Documentation/ScriptReference/EventSystems.MoveDirection.html
called_tool_responses = ''
for _ in range(6):
    random_tool = random.choice(list(tool_descriptions.keys()))
    try:
        called_tool_responses += '\n'+simulator.simulate_response(random_tool)
    except:
        try:
            # GoToStep(int)
            called_tool_responses += '\n'+simulator.simulate_response(random_tool, random.randint(1, 10))
        except:
            # Rotate(int)
            called_tool_responses += '\n'+simulator.simulate_response(random_tool, random.choice(directions))

tool_grounded_conversation_prompt = f"""
Generate a trainee-trainer conversation, imaging the trainee's utterance has the intent of using the tools with the following responses:
{called_tool_responses}
"""
print(tool_grounded_conversation_prompt)
# generated_tool_grounded_conversation = agent_executor.run(tool_grounded_conversation_prompt, callbacks=[AgentCallbackHandler()])
# print('='*50, '>\n', generated_tool_grounded_conversation)


Generate a trainee-trainer conversation, imaging the trainee's utterance has the intent of using the tools with the following responses:

Called the Unity tool <Recover> and got a response: Restored the initial state of AR objects after explosion.
Called the Unity tool <CheckStepStatusVR> and got a response: The current assembly step in Unity is accomplished correctly.
Called the Unity tool <GetRemainingStep> and got a response: The number of remaining steps is: 9.
Called the Unity tool <GoToStep> and got a response: Jumped to assembly step: 1. 9 steps remaining.
Called the Unity tool <Recover> and got a response: Restored the initial state of AR objects after explosion.
Called the Unity tool <GetCurrentStep> and got a response: The current assembly step is: 1.


#### 1.4 Generate a conversation grounded on both simulated tool responses and guidelines.

In [None]:
manual_dir = '/media/Blue2TB3/jpei/vox_arta_dataset/manuals/lego'
import json
import math
from tqdm.notebook import tqdm
from pathlib import Path
import re

count_dialogue = 0
chunk_size = 10
max_words = int(16385*0.9) 

def generate_conversation_per_file(fname, mdir=manual_dir):
    with open(f'{mdir}/{fname}/{fname}.json', 'r', encoding='utf-8') as fr:
        json_instructions = json.load(fr)['instructions']
        summary = json_instructions[0]['text']
        instructions = [d['text'] for d in json_instructions[1:]]
        
        ## Chunk the original instructions
        n = math.ceil(len(instructions)/chunk_size)
        for i_chunk in range(n):
            output_file = f'{mdir}/{fname}/{fname}_{i_chunk}.txt'
            start_index = i_chunk*chunk_size
            end_index = min(i_chunk*chunk_size+chunk_size, len(instructions)-1) 
            if os.path.exists(output_file) and os.path.getsize(output_file)>1000:
                print('='*10, f'> Pass as already exist: {output_file}', f'; Chunk {i_chunk}/{n}', f'; Instruction indexes {start_index+1}: {end_index}', '*'*50)
            else: 
                chuck_instruction_str = '\n'.join(instructions[start_index:end_index]) 
                chuck_instruction_str = ' '.join(chuck_instruction_str.split()[:max_words])
                print('*'*50, fname, f'; Chunk {i_chunk}/{n}', f'; Instruction indexes {start_index+1}: {end_index}', '*'*50)
                if os.path.exists(output_file) and os.path.getsize(output_file)<=1000:
                    print('Re-generate longer conversations...')                
                # print(chuck_instruction_str)
                # print(chuck_instruction_str)
                ### Simulate responses for random tools
                directions = ['Up', 'Down', 'Left', 'Right', 'None'] # https://docs.unity3d.com/2018.3/Documentation/ScriptReference/EventSystems.MoveDirection.html
                called_tool_responses = []
                for _ in range(5):
                    random_tool = random.choice(list(tool_descriptions.keys()))
                    try:
                        tr = simulator.simulate_response(random_tool)
                    except:
                        try:
                            # GoToStep(int)
                            tr = simulator.simulate_response(random_tool, random.randint(1, 10))
                        except:
                            # Rotate(int)
                            tr = simulator.simulate_response(random_tool, random.choice(directions))
                    called_tool_responses.append(tr)
                called_tool_responses = '\n'.join(list(set(called_tool_responses)))
                
                # Prepare query prompt only grounded on manual => Topic match, but ONLY StartAssemble, NextStep can be called.
                query_prompt_1 = f"""
                    {short_task_reminder}
                    ### Guidelines:
                    {summary}
                    {chuck_instruction_str}
                    ### Conversation:
                """.strip() 
                
                ## Prepare query prompt add ToolAgent to memory => => Topic match, multiple tools are called.
                query_prompt_2 = f"""
                    {short_task_reminder}
                    Imagine some trainee's utterances have the intent of using the tools with the following responses:
                    {called_tool_responses}
                    ### Guidelines:
                    {summary}
                    {chuck_instruction_str}
                    ### Conversation:
                """.strip()
 
                query_prompt = random.choice([query_prompt_1, query_prompt_2])
                
                # print(called_tool_responses)
                
                query_prompt = re.sub(r'\s+', ' ', query_prompt)
                query_prompt = re.sub(r'\n+', '\n', query_prompt)
                # print('-'*50, '\n', query_prompt)
                print('$'*50, len(query_prompt.split()))
                ## Carefully call ChatGPT API as it costs credits!
                response = agent_executor.run(query_prompt, callbacks=[AgentCallbackHandler()])
                print(query_prompt)
                print('='*50, '>\n', response)
                with open(output_file, 'w') as fw:
                    fw.write(query_prompt + '\n'+ response)
                time.sleep(10) # Limit is 60000 tokens per minute
    
for folder_name in tqdm(os.listdir(manual_dir)):
    if Path(os.path.join(manual_dir,folder_name)).is_dir():
        generate_conversation_per_file(folder_name)
    else:
        print(f'Pass as not a folder {folder_name}')


  0%|          | 0/66 [00:00<?, ?it/s]

Pass as not a folder .DS_Store
************************************************** lego-60272-elite-police-boat-transport-readscr ; Chunk 6/27 ; Instruction indexes 61: 70 **************************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ 418
The task it to generate multiple turns of conversations and called tools between trainer (assistant) and trainee (user) grounded on the task-specific guidelines and tools in LEGO XR application. ### Guidelines: LEGO Audio & Braille Building Instructions for the LEGO City set "Elite Police Boat Transport". Vroom! Clara the Criminal is making a twilight escape in a superfast speedboat loaded with stolen treasure. Mobilize LEGO City TV hero Duke DeTain and race to the scene in the armored off-roader. Then unload the cool police dinghy from the trailer, grab the night vision goggles and set off in hot pursuit before she escapes into the darkness. Another action-packed LEGO City Police adventure! On the box cover, you 

In [None]:
# reset agent if out of context memory 
agent_executor: AgentExecutor = setup_agent()

# 2. Dataset generation

In [None]:
import os
import json
import re
from tqdm import tqdm

def preprocess_conversations(input_folder, output_path):
    data = []

    # Recursively traverse the directory
    for root, _, files in os.walk(input_folder):
        for filename in files:
            if filename.endswith(".txt"):
                input_path = os.path.join(root, filename)

                # Read the content from the file
                with open(input_path, 'r', encoding='utf-8') as file:
                    content = file.read()

                # Perform preprocessing (customize as needed)
                preprocessed_content = preprocess_text(content)

                # Extract trainee-trainer conversation pairs
                conversation_pairs = extract_conversation_pairs(preprocessed_content)

                # Append to the dataset
                data.extend(conversation_pairs)

    # Save the dataset to a JSONL file
    with open(output_path, 'w', encoding='utf-8') as jsonl_file:
        for example in data:
            jsonl_file.write(json.dumps(example, ensure_ascii=False) + '\n')

    # Compute and print dataset statistics
    compute_dataset_statistics(data)

def preprocess_text(text):
    # Add your specific text preprocessing steps here
    # For example, removing special characters, extracting conversations, etc.
    # Modify this function based on the structure of your dataset

    # Example: Remove non-alphanumeric characters
    cleaned_text = re.sub(r'[^a-zA-Z0-9\n\s]', '', text)

    return cleaned_text

def extract_conversation_pairs(text):
    # Add your logic to extract trainee-trainer conversation pairs
    # Modify this function based on the structure of your dataset
    pairs = []

    # Example: Split the text into lines and create pairs of consecutive lines
    lines = text.split('\n')
    for i in range(0, len(lines)-1, 2):
        trainee_input = lines[i].strip()
        trainer_output = lines[i+1].strip()

        # Create a pair with "input" and "output" keys
        pair = {"input": trainee_input, "output": trainer_output}
        pairs.append(pair)

    return pairs

def compute_dataset_statistics(dataset):
    num_examples = len(dataset)
    avg_input_length = sum(len(example["input"]) for example in dataset) / num_examples
    avg_output_length = sum(len(example["output"]) for example in dataset) / num_examples

    print("Dataset Statistics:")
    print(f"Number of Examples: {num_examples}")
    print(f"Average Input Length: {avg_input_length:.2f} characters")
    print(f"Average Output Length: {avg_output_length:.2f} characters")

if __name__ == "__main__":
    input_folder_path = '/media/Blue2TB3/jpei/vox_arta_dataset/manuals/lego'  # Replace with the actual path to your input folder
    output_jsonl_path = '/media/Blue2TB3/jpei/vox_arta_dataset/manuals/lego/to/output_dataset.jsonl'  # Replace with the desired path for the output JSONL file

    preprocess_conversations(input_folder_path, output_jsonl_path)
