In [1]:
import os, requests
from openai import OpenAI

In [20]:
from abc import ABC, abstractmethod

class Base_Planner(ABC):
    """The base class for Planner."""

    def __init__(self):
        super().__init__()
        self.dialogue_system = ''                  
        self.dialogue_user = ''
        self.dialogue_logger = ''         
        self.show_dialogue = False
        self.llm_model = None
        self.llm_url = None
    def reset(self, show=False):
        self.dialogue_user = ''
        self.dialogue_logger = ''
        self.show_dialogue = show
        
    ## initial prompt, write in 'prompt/task_info.json
    def initial_planning(self, description, example):
        if self.llm_model is None:
            assert "no select Large Language Model"
        prompts = description + example
        self.dialogue_system += description + "\n"
        self.dialogue_system += example + "\n"

        ## set system part
        server_flag = 0
        server_error_cnt = 0
        while server_error_cnt<10:
            try:
                url = self.llm_url
                headers = {'Content-Type': 'application/json'}
                
                data = {'model': self.llm_model, "messages":[{"role": "system", "content": prompts}]}
                response = requests.post(url, headers=headers, json=data)
                
                if response.status_code == 200:
                    print('Yes')
                    result = response.json()                    
                    server_flag = 1
                                
                   
                if server_flag:
                    break
                    
            except Exception as e:
                server_error_cnt += 1
                print(e)    

In [24]:
description = """
What is the capital of France. Write just one word.
"""

example = """"""
p.llm_model

'bartowski/gemma-2-9b-it-GGUF'

In [23]:
p = Base_Planner()
p.llm_url = 'http://localhost:1234/v1/completions'
p.llm_model = 'bartowski/gemma-2-9b-it-GGUF'
p.initial_planning(description, example)

KeyboardInterrupt: 

In [25]:
# Example: reuse your existing OpenAI setup
from openai import OpenAI

# Point to the local server
client = OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")

completion = client.chat.completions.create(
  model="bartowski/gemma-2-9b-it-GGUF",
  messages=[
    {"role": "system", "content": "Always answer with one word."},
    {"role": "user", "content": "What is the capital of France?"}
  ],
  temperature=0.7,
)

print(completion.choices[0].message)

ChatCompletionMessage(content='Paris', role='assistant', function_call=None, tool_calls=None)


In [30]:
completion.choices[0].message.content

'Paris'

In [31]:
class SimpleDoorKey_Planner(Base_Planner):
    def __init__(self, seed=0):
        super().__init__()
        
        self.mediator = SimpleDoorKey_Mediator()
        if seed %2 ==0:
            self.llm_model = "vicuna-7b-3"
            self.llm_url = 'http://10.106.27.11:8003/v1/chat/completions'
        else:
            self.llm_model = "vicuna-7b-0"
            self.llm_url = 'http://10.106.27.11:8000/v1/chat/completions'

    def __call__(self, input):
        return self.forward(input)
    
    def reset(self, show=False):
        self.dialogue_user = ''
        self.dialogue_logger = ''
        self.show_dialogue = show
        ## reset dialogue
        if self.show_dialogue:
            print(self.dialogue_system)

       
    def forward(self, obs):
        text = self.mediator.RL2LLM(obs)
        # print(text)
        plan = self.step_planning(text)
        
        self.dialogue_logger += text
        self.dialogue_logger += plan
        self.dialogue_user = text +"\n"
        self.dialogue_user += plan
        if self.show_dialogue:
            print(self.dialogue_user)
        skill = self.mediator.LLM2RL(plan)
        return skill

In [38]:
s = SimpleDoorKey_Planner()

NameError: name 'SimpleDoorKey_Mediator' is not defined

In [39]:
class SimpleDoorKey_Mediator(Base_Mediator):
    def __init__(self, ):
        super().__init__()

    # ## obs2text
    def RL2LLM(self, obs):
        context = ''
        if len(obs.shape) == 4:
            obs = obs[0,:,:,-4:]
        obs_object = copy.deepcopy(obs[:,:,0])
        agent_map = obs[:, :, 3]
        agent_pos = np.argwhere(agent_map != 4)[0]

        key_list = np.argwhere(obs_object==5)
        door_list = np.argwhere(obs_object==4)
        carry_object = None
        if len(key_list):
            for key in key_list:
                i, j = key
                color = obs[i,j,1]
                #object = f"{IDX_TO_COLOR[color]} key"
                object = "key"
                if (agent_pos == key).all():
                    carry_object = object
                else:
                    if context != "":
                        context += ", "
                    context += f"observed a {object}" 
                    self.obj_coordinate[object] = (i,j)

        if len(door_list):
            for door in door_list:
                i, j = door
                color = obs[i,j,1]
                #object = f"{IDX_TO_COLOR[color]} door"
                object = "door"
                if context != "":
                    context += ", "
                context += f"observed a {object}"
                self.obj_coordinate[object] = (i,j)

        if context == '':
            context += "observed {}".format('nothing')
        if carry_object is not None:
            if context != "":
                context += ", "
            context += f"carry {carry_object}"
        context = f"observation: {{{context}}} "
        return context
    
    def parser(self, text):
        # action:
        if "explore" in text:
            act = SKILL_TO_IDX["explore"]
        elif "go to" in text:
            act = SKILL_TO_IDX["go to object"]
        elif "pick up" in text:
            act = SKILL_TO_IDX["pickup"]
        elif "drop" in text:
            act = SKILL_TO_IDX["drop"]
        elif "toggle" in text:
            act = SKILL_TO_IDX["toggle"]
        else:
            print("Unknown Planning :", text)
            act = 6 # do nothing
        # object:
        try:
            if "key" in text:
                obj = OBJECT_TO_IDX["key"]
                coordinate = self.obj_coordinate["key"]
            elif "door" in text:
                obj = OBJECT_TO_IDX["door"]
                coordinate = self.obj_coordinate["door"]
            else:
                obj = OBJECT_TO_IDX["empty"]
                coordinate = None
        except:
            print("Unknown Planning :", text)
            act = 6 # do nothing
        return act, obj, coordinate
    
    
    def LLM2RL(self, plan):

        plan = re.findall(r'{(.*?)}', plan)
        lines = plan[0].split(',')
        skill_list = []
        for line in lines:

            action, object, coordinate = self.parser(line)
            goal = {}
            goal["action"] = action
            goal["object"] = object
            goal["coordinate"] = coordinate
            skill_list.append(goal)
        return skill_list

NameError: name 'Base_Mediator' is not defined

In [40]:
class Base_Mediator(ABC):
    """The base class for Base_Mediator."""

    def __init__(self):
        super().__init__()
        self.obj_coordinate = {}

    @abstractmethod
    def RL2LLM(self):
        pass
    @abstractmethod
    def LLM2RL(self):
        pass
    
    def reset(self):
        self.obj_coordinate = {}
