In [1]:
#@title Download dataset

!git clone https://github.com/Yessense/llm_planning_airi
!cd llm_planning_airi

Cloning into 'llm_planning_airi'...
remote: Enumerating objects: 43, done.[K
remote: Counting objects: 100% (43/43), done.[K
remote: Compressing objects: 100% (36/36), done.[K
remote: Total 43 (delta 10), reused 31 (delta 6), pack-reused 0[K
Unpacking objects: 100% (43/43), 13.62 MiB | 3.63 MiB/s, done.


In [2]:
#@title Instal dependancies

!pip install accelerate
!pip install bitsandbytes
!pip install -q datasets loralib sentencepiece
!pip install -q git+https://github.com/zphang/transformers@c3dc391
!pip install -q git+https://github.com/huggingface/peft.git

Collecting accelerate
  Downloading accelerate-0.21.0-py3-none-any.whl (244 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/244.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.2/244.2 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: accelerate
Successfully installed accelerate-0.21.0
Collecting bitsandbytes
  Downloading bitsandbytes-0.40.2-py3-none-any.whl (92.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.5/92.5 MB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bitsandbytes
Successfully installed bitsandbytes-0.40.2
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m486.2/486.2 kB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m75.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1

## Датасет

In [3]:
PATH_TO_DATA = '/content/llm_planning_airi/data'

In [4]:
from dataclasses import dataclass, field
from typing import List, Optional, Set
from pprint import pformat

@dataclass(frozen=True)
class SceneObj:
    name: str = ''

    def __str__(self) -> str:
        return self.name

    def __repr__(self) -> str:
        return str(self)


@dataclass
class Action:
    name: str = ''

    def __str__(self) -> str:
        return self.name

    # def __repr__(self) -> str:
    #     return str(self)

    @property
    def is_filled(self):
        return self.name != ''


@dataclass
class Arg:
    name: str = ''

    def __str__(self) -> str:
        return self.name

    def __repr__(self) -> str:
        return str(self)




@dataclass
class Subtask:
    action: Action = field(default_factory=Action)
    n_arguments: int = 0
    arguments: List[Arg] = field(default_factory=lambda: [])
    text: str = ''

    @property
    def has_text(self):
        return self.text != ''



@dataclass
class AlfredPlan:
    goal: str = ''
    task_type: str = ''
    subtasks: List[Subtask] = field(default_factory=lambda: [])
    scene_objects: List[SceneObj] = field(default_factory=lambda: [])
    text: str = ''

    @property
    def scene_objects_set(self) -> Set[str]:
        return set(str(obj) for obj in self.scene_objects)

    def __str__(self):
        out = f"AlfredPlan(\ngoal: {self.goal}\nsubtasks: {pformat(self.subtasks)}\nobjects: {self.scene_objects}\ntask_type: {self.task_type}\ntext:{repr(self.text)}\n)"
        return out

    def __repr__(self):
        return str(self)


In [5]:
from typing import List, Optional, Set
import torch
import json

from torch.utils.data import Dataset

class AlfredDataset(Dataset):
    def __init__(self,
                 split: str,
                 path_to_data_dir: str,
                 available_task_types: Optional[Set[str]] = None,
                 ) -> None:
        super().__init__()

        self._split = split

        if available_task_types is not None:
            self._available_task_types = available_task_types
        else:
            self._available_task_types = set()

        self._available_objects = set()
        self._plans: List[AlfredPlan] = []

        print(f"Loading split: {self._split}...  ", end='')

        with open(f'{path_to_data_dir}/{self.split}_highlevel.json', 'r') as f:
            data = json.load(f)

            for entry in data:
                # task type
                # 'pick_heat_then_place_in_recep'
                task_type = entry['task_type']
                if available_task_types is not None:
                    if task_type not in available_task_types:
                        continue

                # even than check if task type is possible
                self._available_task_types.add(task_type)

                # May not have subtasks
                list_of_actions = entry['list_of_actions']
                if list_of_actions is None:
                    continue

                # Parse list of actions
                # ['potato', 'SliceObject']
                subtasks = []
                for obj, action in list_of_actions:
                    subtask = Subtask(
                        action=Action(action),
                        arguments=[Arg(obj)],
                        n_arguments=1)  # only one argument
                    subtasks.append(subtask)

                # scene objects
                # ['StoveBurner', 'ButterKnife', 'SoapBottle', 'CounterTop', 'Bowl',
                # 'Microwave', 'Window', 'Fork', 'Egg', 'StoveKnob',
                # 'GarbageCan', 'Plate', 'Mug', 'Sink', ...]
                scene_objects = [SceneObj(name=obj) for obj in entry['objects']]
                self._available_objects.update(scene_objects)

                # Combine all information into a Plan
                plan = AlfredPlan(
                    goal=entry['goal'],
                    task_type=entry['task_type'],
                    subtasks=subtasks,
                    scene_objects=scene_objects)

                self._plans.append(plan)
            print("Done.")

    def __len__(self):
        return len(self.plans)

    def __getitem__(self, index) -> AlfredPlan:
        return self.plans[index]

    @property
    def plans(self) -> List[AlfredPlan]:
        return self._plans

    @property
    def available_task_types(self) -> Set[str]:
        return self._available_task_types

    @property
    def available_objects(self) -> Set[SceneObj]:
        return self._available_objects

    @property
    def split(self):
        return self._split

In [6]:
split = 'valid_seen' #@param ["valid_seen", "valid_unseen"]
dataset = AlfredDataset(split=split, path_to_data_dir=PATH_TO_DATA)
len(dataset)

Loading split: valid_seen...  Done.


814

In [7]:
idx = 782 #@param {type:"slider", min:0, max:813, step:1}
dataset[idx]

AlfredPlan(
goal: put two soap bars in bin
subtasks: [Subtask(action=Action(name='PickupObject'),
         n_arguments=1,
         arguments=[soapbar],
         text=''),
 Subtask(action=Action(name='PutObject'),
         n_arguments=1,
         arguments=[garbagecan],
         text=''),
 Subtask(action=Action(name='PickupObject'),
         n_arguments=1,
         arguments=[soapbar],
         text=''),
 Subtask(action=Action(name='PutObject'),
         n_arguments=1,
         arguments=[garbagecan],
         text='')]
objects: [SoapBar, SoapBottle, CounterTop, Window, Candle, Towel, GarbageCan, Mirror, Cloth, Plunger, ScrubBrush, SprayBottle, ShowerDoor, ShowerGlass, Sink, HandTowel, Cabinet, ToiletPaper, Faucet, Toilet, LightSwitch, TowelHolder, HandTowelHolder, ToiletPaperHanger, SinkBasin]
task_type: pick_two_obj_and_place
text:''
)

## Модель

In [8]:
from abc import ABC, abstractmethod
from typing import Any


class BaseLLMPlanningModel(ABC):
    """ Base class for LLM models"""
    _name: str

    @property
    def name(self):
        return self._name

    def __init__(self,
                 name: str,
                 **kwargs) -> None:
        self._name = name

        print(f"Loading {name} tokenizer...")
        self.tokenizer = self._load_tokenizer()

        print(f"Loading {name} model...")
        self.model = self._load_model()
        print(f'Model: {name}\n')

        self._prepare_for_generation()
        self._prepare_for_scoring()

    def _prepare_for_generation(self) -> None:
        """Define pipeline, etc."""
        return None

    def _prepare_for_scoring(self) -> None:
        """Define scoring for saycan mode"""
        pass

    @abstractmethod
    def generate_text(self, prompt: str, **kwargs) -> str:
        """ Generate text"""
        pass

    @abstractmethod
    def score_text(self, **kwargs) -> Any:
        """ Score text for saycan approach """
        pass

    @abstractmethod
    def _load_model(self) -> Any:
        """Load model"""
        pass

    @abstractmethod
    def _load_tokenizer(self) -> Any:
        """Load tokenizer"""
        pass


In [9]:
import torch
from transformers import pipeline
from typing import Any
from accelerate import infer_auto_device_map
import pprint
from peft import PeftModel
from transformers import LLaMATokenizer, LLaMAForCausalLM, GenerationConfig


def generate_prompt(instruction):
    return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{instruction}

### Response:"""


class AlpacaLora7B(BaseLLMPlanningModel):
    LLAMA7B_NAME = "decapoda-research/llama-7b-hf"
    MODEL_NAME = "tloen/alpaca-lora-7b"

    def __init__(self,
                 name: str = "alpaca_lora",
                 max_new_tokens = 45,
                 **kwargs) -> None:
        self.max_new_tokens = max_new_tokens
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        super().__init__(name=name, **kwargs)

    def score_text(self, **kwargs) -> Any:
        pass

    def _load_model(self) -> Any:
        model = LLaMAForCausalLM.from_pretrained(
            self.LLAMA7B_NAME,
            load_in_8bit=True,
            device_map="auto",
        )
        model = PeftModel.from_pretrained(model, self.MODEL_NAME)
        print(
            f"Model device map: \n{pprint.pformat(model.hf_device_map)}")
        return model

    def _load_tokenizer(self) -> Any:
        tokenizer = LLaMATokenizer.from_pretrained(self.LLAMA7B_NAME)
        return tokenizer

    def _prepare_for_generation(self) -> None:
        self.generation_pipeline = pipeline("text-generation",
                                            model=self.model,
                                            tokenizer=self.tokenizer)

    def generate_text(self, prompt):
        prompt = generate_prompt(prompt)

        output = self.generation_pipeline(prompt,
                                          do_sample=False,
                                          return_full_text=False,
                                          max_new_tokens=self.max_new_tokens)

        return output[0]['generated_text']

In [10]:
model = AlpacaLora7B()

Loading alpaca_lora tokenizer...


Downloading tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/141 [00:00<?, ?B/s]



Loading alpaca_lora model...


Downloading (…)lve/main/config.json:   0%|          | 0.00/427 [00:00<?, ?B/s]

Downloading (…)model.bin.index.json:   0%|          | 0.00/25.5k [00:00<?, ?B/s]

Downloading (…)l-00001-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00002-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00003-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00004-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00005-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00006-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00007-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00008-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00009-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00010-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00011-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00012-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00013-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00014-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00015-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00016-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00017-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00018-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00019-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00020-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00021-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00022-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00023-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00024-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00025-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00026-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00027-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00028-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00029-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00030-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00031-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00032-of-00033.bin:   0%|          | 0.00/405M [00:00<?, ?B/s]

Downloading (…)l-00033-of-00033.bin:   0%|          | 0.00/524M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/33 [00:00<?, ?it/s]

Downloading (…)neration_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Downloading (…)/adapter_config.json:   0%|          | 0.00/399 [00:00<?, ?B/s]

Downloading adapter_model.bin:   0%|          | 0.00/67.2M [00:00<?, ?B/s]

Model device map: 
{'': 0}
Model: alpaca_lora



The model 'PeftModelForCausalLM' is not supported for text-generation. Supported models are ['BartForCausalLM', 'BertLMHeadModel', 'BertGenerationDecoder', 'BigBirdForCausalLM', 'BigBirdPegasusForCausalLM', 'BioGptForCausalLM', 'BlenderbotForCausalLM', 'BlenderbotSmallForCausalLM', 'BloomForCausalLM', 'CamembertForCausalLM', 'CodeGenForCausalLM', 'CTRLLMHeadModel', 'Data2VecTextForCausalLM', 'ElectraForCausalLM', 'ErnieForCausalLM', 'GitForCausalLM', 'GPT2LMHeadModel', 'GPT2LMHeadModel', 'GPTNeoForCausalLM', 'GPTNeoXForCausalLM', 'GPTNeoXJapaneseForCausalLM', 'GPTJForCausalLM', 'LLaMAForCausalLM', 'MarianForCausalLM', 'MBartForCausalLM', 'MegatronBertForCausalLM', 'MvpForCausalLM', 'OpenAIGPTLMHeadModel', 'OPTForCausalLM', 'PegasusForCausalLM', 'PLBartForCausalLM', 'ProphetNetForCausalLM', 'QDQBertLMHeadModel', 'ReformerModelWithLMHead', 'RemBertForCausalLM', 'RobertaForCausalLM', 'RobertaPreLayerNormForCausalLM', 'RoCBertForCausalLM', 'RoFormerForCausalLM', 'Speech2Text2ForCausalLM', 

In [11]:
prompt = 'Test\n' #@param {type:"string"}
model.generate_text(prompt)



'\nTest passed.'

## Prompt generation

In [12]:
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Dict, Set



class BasePromptBuilder(ABC):
    _name: str
    TERMINATING_STRING: str = ""
    _initial_prompt: str = ""
    inital_prompt_length: int = 0
    stop_words: List[str] = []
    available_tasks_types: Set[str] = set()

    def __init__(self,
                 name: str = '',
                 **kwargs):
        self._name = name
        print(f"Initializing {name} prompt builder")

    def _add_to_initial_prompt(self, text: str) -> None:
        self._initial_prompt += text

    @property
    def name(self):
        return self._name

    def fill_action(self, subtask: Subtask) -> None:
        pass

    def fill_actions(self, plan: AlfredPlan) -> None:
        pass

    @abstractmethod
    def get_action(self, subtask_text: str) -> str:
        return ' '

    @abstractmethod
    def subtask_to_prompt(self, subtask: Subtask) -> str:
        pass

    @abstractmethod
    def example_to_prompt(self, plan: AlfredPlan) -> str:
        """Translates an example into a text prompt."""
        pass

    @abstractmethod
    def build_prefix(self) -> str:
        """Build introduction (task definition)."""
        pass

    @abstractmethod
    def plan_to_prompt(self, plan: AlfredPlan) -> str:
        """Combine actions in one prompt"""

    @abstractmethod
    def build_task_examples_prompt(self, **kwargs) -> str:
        """Add several examples to the initial prompt."""
        pass

    @abstractmethod
    def get_query(self, goal: str) -> str:
        """Translate evaluated task."""
        pass

    @abstractmethod
    def cut_answer_from_generated_text(self, generated_text: str) -> Optional[str]:
        """Cut answer from generated text"""
        pass

    def generated_answer_to_plan(self, answer: str) -> Any:
        """Translate generated answer to plan."""
        pass

    def extract_subtask_texts(self, generated_text: str) -> List[str]:
        """"""
        pass

    def get_one_prompt(self, plan: AlfredPlan) -> str:
        pass

    def cut_step_from_generated_text(self, generated_text: str) -> Optional[str]:
        pass


In [13]:
#@title Prepositions { display-mode: "form" }

PREPOSITIONS = {'ArmChair' : 'on',
 'Bed' : 'on',
 'Bowl' : 'in',
 'Box' : 'in',
 'BathtubBasin' : 'in',
 'Cabinet'  : 'in',
 'CoffeeMachine' : 'in',
 'CoffeeTable' : 'on',
 'CounterTop' : 'on',
 'Desk' : 'on',
 'DiningTable' : 'on',
 'Drawer' : 'in',
 'Dresser' : 'in',
 'Fridge' : 'in',
 'GarbageCan' : 'in',
 'HandTowelHolder' : 'on',
 'LaundryHamper' : 'in',
 'Microwave' : 'in',
 'Mug' : 'in',
 'Cup' : 'in',
 'Ottoman' : 'on',
 'Pan' : 'on',
 'Plate' : 'on',
 'Pot' : 'in',
 'Safe' : 'in',
 'Shelf' : 'on',
 'SideTable' : 'on',
 'SinkBasin' : 'in',
 'Sofa' : 'on',
 'StoveBurner' : 'on',
 'TVStand' : 'on',
 'Toaster' : 'in',
 'Toilet' : 'on',
 'ToiletPaperHanger' : 'on',
 'TowelHolder' : 'on',
 'Cart': 'in'}


In [14]:
#@title Prompt Builder { display-mode: "form" }
import pprint
import re
from typing import List, Optional

from typing import List, Optional

import numpy as np



class SaycanPromptBuilder(BasePromptBuilder):
    TERMINATING_STRING = "done"

    def get_action(self, subtask_text: str) -> str:
        action = subtask_text.split()[0]
        return action

    def __init__(self,
                 prompt_allowed_tasks: List[str],
                 name: str = 'saycan',
                 n_examples_per_task_type: int = 3,
                 **kwargs):
        super().__init__(name, **kwargs)

        self._initial_prompt = ""
        self._initial_prompt += self.build_prefix()
        self.n_examples_per_task_type = n_examples_per_task_type

        self.prompt_allowed_tasks = dataset.available_task_types
        self.available_tasks_types = dataset.available_task_types

        print(f'Allowed tasks:\n'
                         f'{pprint.pformat(self.available_tasks_types)}')

        self._preps = {k.lower(): PREPOSITIONS[k] for k in PREPOSITIONS}
        self._initial_prompt += self.build_task_examples_prompt(
            dataset=dataset)
        print(f'Prompt:\n' + self._initial_prompt)

        self.inital_prompt_length = len(self._initial_prompt)

        self.stop_pattern = re.compile(f'\\d+\\. {self.TERMINATING_STRING}.')
        self.stop_step_pattern = re.compile(r'(\s*\d+\.\s*)*(.*?)(?=\.|,)')
        self.extract_actions_pattern = re.compile(
            r'( \d. ([A-Za-z\s]+)(\.|,))')

        self.stop_words = ["done."]

        self.metrics = ["accuracy", "lcs", "aem"]

    def fill_action(self, subtask: Subtask) -> None:
        if not subtask.has_text:
            raise ValueError(f"Subtask {subtask} has no text.")
        subtask.action.name = self.get_action(subtask_text=subtask.text)

    def fill_actions(self, plan: AlfredPlan) -> None:
        for subtask in plan.subtasks:
            self.fill_action(subtask=subtask)

    def subtask_to_prompt(self, subtask: Subtask):
        act = str(subtask.action)
        obj = str(subtask.arguments[0])

        instr: str

        if 'Sliced' in obj:
            obj = f'sliced {obj[:-6]}'
        if act == 'PickupObject':
            instr = f'Pick up the {obj.lower()}'
        elif act == 'PutObject':
            instr = f'Put it {self._preps[obj]} the {obj.lower()}'
        elif act == 'ToggleObject':
            instr = f'Toggle on the {obj.lower()}'
        elif act == 'SliceObject':
            instr = f'Slice the {obj.lower()}'
        elif act == 'CleanObject':
            instr = f'Clean the {obj.lower()}'
        elif act == 'HeatObject':
            instr = f'Heat the {obj.lower()}'
        elif act == 'CoolObject':
            instr = f'Cool the {obj.lower()}'
        else:
            raise ValueError(f'Invalid action: {act}')
        return instr.lower()

    def get_query(self, goal: str) -> str:
        question = "How would you" #@param {type: "string"}
        query = f"Human: {question} {goal}?\n"
        query += "Robot:"
        return query

    def plan_to_prompt(self, plan: AlfredPlan) -> str:
        prompt = ""
        # Add actions
        for i, subtask in enumerate(plan.subtasks, start=1):
            subtask.text = self.subtask_to_prompt(subtask)

            prompt += f" {i}. {subtask.text}"
            if i == len(plan.subtasks):
                prompt += f", {i+1}. {self.TERMINATING_STRING}.\n"
            else:
                prompt += ","
        plan.text = prompt.strip()
        return prompt

    def example_to_prompt(self, plan: AlfredPlan) -> str:
        # Add query
        prompt = self.get_query(plan.goal)
        prompt += self.plan_to_prompt(plan)

        return prompt

    def build_prefix(self) -> str:
        prompt = "Robot: Hi there, I’m a robot operating in a house.\nRobot: You can ask me to do various tasks and I’ll tell you the sequence of actions I would do to accomplish your task.\n" #@param {type: "raw"}
        return prompt

    def build_task_examples_prompt(self,
                                   dataset: AlfredDataset) -> str:
        prompt: str = ""

        # indices with the same type of task
        task_types = np.array([plan.task_type
                               for plan in dataset], dtype=object)
        types_idx = [np.argwhere(task_types == task_type)
                     for task_type in dataset.available_task_types]
        for type_idx in types_idx:
            selected_indices = np.random.choice(type_idx.flatten(),
                                                size=self.n_examples_per_task_type,
                                                replace=False)
            for ind in selected_indices:
                prompt += self.example_to_prompt(dataset[ind])

        return prompt

    def get_one_prompt(self, plan: AlfredPlan) -> str:
        prompt = self._initial_prompt + self.get_query(plan.goal)
        return prompt

    def cut_step_from_generated_text(self, generated_text: str) -> Optional[str]:
        stop_match = self.stop_step_pattern.search(generated_text)

        if stop_match is not None:
            if stop_match.groups()[-1].isnumeric():
                return None
            else:
                return stop_match.groups()[-1]

    def cut_answer_from_generated_text(self, generated_text: str) -> Optional[str]:
        """Cut answer on first terminating string appearing in generated text"""

        stop_match = self.stop_pattern.search(generated_text)

        if stop_match is not None:
            return generated_text[:stop_match.end()].strip(' \n\t')

        return generated_text.strip(' \n\t')

    def extract_subtask_texts(self, generated_text: str) -> List[str]:
        # string = " 1. pick up the bread, 2. cool the bread, 3. put it on the counter, 4. done."
        # [(' 1. pick up the bread,', 'pick up the bread', ','), (' 2. cool the bread,', 'cool the
        #   bread', ','), (' 3. put it on the counter,', 'put it on the counter', ','), (' 4. done.', 'done', '.')]

        match = re.findall(self.extract_actions_pattern, ' ' + generated_text)
        subtask_texts = [m[1] for m in match]
        return subtask_texts[:-1]  # without the terminating string

In [15]:
prompt_builder = SaycanPromptBuilder(dataset.available_task_types)

Initializing saycan prompt builder
Allowed tasks:
{'look_at_obj_in_light',
 'pick_and_place_simple',
 'pick_and_place_with_movable_recep',
 'pick_clean_then_place_in_recep',
 'pick_cool_then_place_in_recep',
 'pick_heat_then_place_in_recep',
 'pick_two_obj_and_place'}
Prompt:
Robot: Hi there, I’m a robot operating in a house.
Robot: You can ask me to do various tasks and I’ll tell you the sequence of actions I would do to accomplish your task.
Human: How would you cool down the cup inside the microwave.?
Robot: 1. pick up the cup, 2. cool the cup, 3. put it in the microwave, 4. done.
Human: How would you put a chilled egg in the microwave.?
Robot: 1. pick up the egg, 2. cool the egg, 3. put it in the microwave, 4. done.
Human: How would you put a chilled head of lettuce on the counter.?
Robot: 1. pick up the lettuce, 2. cool the lettuce, 3. put it on the countertop, 4. done.
Human: How would you to wash the red cloth.?
Robot: 1. pick up the cloth, 2. clean the cloth, 3. put it in the c

## Plan generation method

In [16]:
from abc import ABC, abstractmethod
from typing import List


class BasePlanGenerationMethod(ABC):
    """ Base class for plan generation methods. """
    model: BaseLLMPlanningModel
    prompt_builder: BasePromptBuilder

    def __init__(self,
                 model: BaseLLMPlanningModel,
                 prompt_builder: BasePromptBuilder,
                 **kwargs):
        self.model = model
        self.prompt_builder = prompt_builder
        self.setup()

    @abstractmethod
    def setup(self) -> None:
        """ Prepare model for current method.
        Add step classifier, etc.
        """
        pass

    @abstractmethod
    def predict_plan(self, target_plan, idx: int) -> AlfredPlan:
        """ Predict complete plan"""
        pass


In [17]:
from typing import Dict, List


class FullPlanGenerationMethod(BasePlanGenerationMethod):
    def __init__(self,
                 model: BaseLLMPlanningModel,
                 prompt_builder: BasePromptBuilder,
                 **kwargs):
        super().__init__(model, prompt_builder, **kwargs)

    def setup(self) -> None:
        pass

    def predict_plan(self, target_plan: AlfredPlan, idx: int=0) -> AlfredPlan:
        prompt = self.prompt_builder.get_one_prompt(target_plan)

        pred_plan = self.model.generate_text(prompt)
        pred_plan = self.prompt_builder.cut_answer_from_generated_text(
            pred_plan)

        return AlfredPlan(text=pred_plan)


In [18]:
method = FullPlanGenerationMethod(model=model,
                                  prompt_builder=prompt_builder)

In [19]:
gt_plan = dataset[1]
print(gt_plan)
print(f"GT Plan: {prompt_builder.plan_to_prompt(gt_plan)}")
predicted_plan = method.predict_plan(gt_plan)
print(f"Predicted plan: {predicted_plan.text}")

AlfredPlan(
goal: put a cooked piece of potato in the sink.
subtasks: [Subtask(action=Action(name='SliceObject'),
         n_arguments=1,
         arguments=[potato],
         text=''),
 Subtask(action=Action(name='PickupObject'),
         n_arguments=1,
         arguments=[potatoSliced],
         text=''),
 Subtask(action=Action(name='HeatObject'),
         n_arguments=1,
         arguments=[potatoSliced],
         text=''),
 Subtask(action=Action(name='PutObject'),
         n_arguments=1,
         arguments=[sinkbasin],
         text='')]
objects: [StoveBurner, ButterKnife, SoapBottle, CounterTop, Bowl, Microwave, Window, Fork, Egg, StoveKnob, GarbageCan, Plate, Mug, Sink, CoffeeMachine, Lettuce, Pan, Cabinet, Potato, Knife, Spatula, SaltShaker, Curtains, Cup, Faucet, PaperTowelRoll, Toaster, Pot, Pencil, Spoon, DishSponge, LightSwitch, Apple, Fridge, Bread, PepperShaker, Drawer, SinkBasin, WineBottle, Tomato]
task_type: pick_heat_then_place_in_recep
text:''
)
GT Plan:  1. slice the 

## Metrics

In [20]:
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Union


class BaseMetric(ABC):
    @abstractmethod
    def __init__(self, **kwargs):
        pass

    @abstractmethod
    def __call__(self, pred: Any, target: Any) -> float:
        pass


class BasePlanMetric(ABC):
    _name: str

    @abstractmethod
    def __init__(self,
                 prompt_builder: BasePromptBuilder,
                 **kwargs):
        self._prompt_builder = prompt_builder

    @abstractmethod
    def __call__(self, pred: AlfredPlan, target: AlfredPlan) -> float:
        return 0.

    @property
    def name(self):
        return self._name


class BaseMetrics():
    def __init__(self,
                 prompt_builder: BasePromptBuilder,
                 metric_classes: List,
                 **kwargs):
        self._metrics: List[BasePlanMetric] = [metric_cls(prompt_builder)
                                               for metric_cls in metric_classes]
        self.prompt_builder = prompt_builder

    @abstractmethod
    def update(self,
               predicted_plan: AlfredPlan,
               target_plan: AlfredPlan,
               plan_idx: int) -> None:
        pass

    @abstractmethod
    def calculate_metrics(self):
        pass


In [21]:
from typing import Dict, List, Any


class ListAccuracy(BaseMetric):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __call__(self, pred: List, target: List):
        answer: bool = False
        if len(pred) == len(target):
            if all(p == t for p, t in zip(pred, target)):
                answer = True
        return int(answer)


class LCS(BaseMetric):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __call__(self, pred: List, target: List) -> float:
        """Time Complexity: O(n*m)"""
        m = len(pred)
        n = len(target)

        # declaring the array for storing the dp values
        L: List[List[Any]] = [[None] * (n + 1) for _ in range(m + 1)]

        for i in range(m + 1):
            for j in range(n + 1):
                if i == 0 or j == 0:
                    L[i][j] = 0
                elif pred[i - 1] == target[j - 1]:
                    L[i][j] = L[i - 1][j - 1] + 1
                else:
                    L[i][j] = max(L[i - 1][j], L[i][j - 1])

        return L[m][n] / max(m, n)


class ActionsExactMatch(BasePlanMetric):
    _name: str = 'AEM'

    def __init__(self, prompt_builder: BasePromptBuilder, **kwargs):
        super().__init__(prompt_builder=prompt_builder, **kwargs)
        self._list_acc = ListAccuracy()
        self._name = 'AEM'

    def __call__(self, pred: AlfredPlan, target: AlfredPlan):
        """Check if actions in ground_true_plan and pred_plan are exactly the same."""
        for plan in [pred, target]:
            self._prompt_builder.fill_actions(plan)

        pred_actions = [subtask.action.name for subtask in pred.subtasks]
        target_actions = [subtask.action.name for subtask in target.subtasks]

        return self._list_acc(pred_actions, target_actions)


class ActionsLCS(BasePlanMetric):
    _name: str = 'A-LCS'

    def __init__(self, prompt_builder: BasePromptBuilder, **kwargs):
        super().__init__(prompt_builder=prompt_builder, **kwargs)
        self._lcs = LCS()

    def __call__(self, pred: AlfredPlan, target: AlfredPlan):
        """Check if actions in ground_true_plan and pred_plan are exactly the same."""
        for plan in [pred, target]:
            self._prompt_builder.fill_actions(plan)

        pred_actions = [subtask.action.name for subtask in pred.subtasks]
        target_actions = [subtask.action.name for subtask in target.subtasks]

        return self._lcs(pred_actions, target_actions)


class PlanExactMatch(BasePlanMetric):
    _name: str = 'PEM'

    def __init__(self, prompt_builder: BasePromptBuilder, **kwargs):
        super().__init__(prompt_builder=prompt_builder, **kwargs)
        self._list_acc = ListAccuracy()

    def __call__(self, pred: AlfredPlan, target: AlfredPlan):
        for plan in [pred, target]:
            for subtask in plan.subtasks:
                if not subtask.has_text:
                    raise ValueError(f"Subtask {subtask} has no text.")

        pred_plan = [subtask.text for subtask in pred.subtasks]
        target_plan = [subtask.text for subtask in target.subtasks]

        return self._list_acc(pred_plan, target_plan)


class PlanLCS(BasePlanMetric):
    _name: str = 'P-LCS'

    def __init__(self, prompt_builder: BasePromptBuilder, **kwargs):
        super().__init__(prompt_builder=prompt_builder, **kwargs)
        self._lcs = LCS()

    def __call__(self, pred: AlfredPlan, target: AlfredPlan):
        for plan in [pred, target]:
            for subtask in plan.subtasks:
                if not subtask.has_text:
                    raise ValueError(f"Subtask {subtask} has no text.")

        pred_plan = [subtask.text for subtask in pred.subtasks]
        target_plan = [subtask.text for subtask in target.subtasks]

        return self._lcs(pred_plan, target_plan)


class AlfredMetrics(BaseMetrics):
    def __init__(self,
                 prompt_builder: BasePromptBuilder,
                 allowed_tasks_types: List[str],
                 **kwargs):
        metric_classes = [ActionsExactMatch,
                          ActionsLCS, PlanExactMatch, PlanLCS]
        super().__init__(metric_classes=metric_classes,
                         prompt_builder=prompt_builder)
        self._metric_dict: Dict[str, Dict] = {task_type: {
            metric.name: {'n_updates': 0,
                          'value': 0.}
            for metric in self._metrics}
            for task_type in allowed_tasks_types}

    def update(self,
               predicted_plan: AlfredPlan,
               target_plan: AlfredPlan,
               plan_idx: int) -> None:

        description: List[str] = []
        subtask_texts: List[str] = self.prompt_builder.extract_subtask_texts(predicted_plan.text)
        predicted_plan.subtasks = [Subtask(text=text) for text in subtask_texts]
        print(f"Predicted plan: {predicted_plan.text}")

        self.prompt_builder.plan_to_prompt(target_plan)
        print(f"GT plan:        {target_plan.text}")

        for metric in self._metrics:
            value = metric(predicted_plan, target_plan)
            self._metric_dict[target_plan.task_type][metric.name]['n_updates'] += 1
            self._metric_dict[target_plan.task_type][metric.name]['value'] += value
            description.append(f"{metric.name}: {value:0.3f}")

        # self.logger.log_plans(predicted_plan, target_plan, plan_idx)
        metrics_string = ", ".join(desc for desc in description)
        print(f"Calculated metrics: [{metrics_string}]")

    def calculate_metrics(self) -> Dict[str, Dict[str, float]]:
        total_metrics = {}

        column_names = list([metric.name for metric in self._metrics])
        data = []

        for task_type, metric_dict in self._metric_dict.items():
            total_metrics[task_type] = {}
            for metric_name in metric_dict:
                if metric_dict[metric_name]['n_updates'] == 0:
                    total_metrics[task_type][metric_name] = 0.
                else:
                    total_metrics[task_type][metric_name] = (
                        metric_dict[metric_name]['value'] / metric_dict[metric_name]['n_updates'])

            metric_values_list = [total_metrics[task_type][metric_name] for metric_name in column_names]
            data.append([task_type] + metric_values_list)

        column_names = ["Task type"] + column_names


        print(column_names)
        return total_metrics


In [22]:
dataset.available_task_types

{'look_at_obj_in_light',
 'pick_and_place_simple',
 'pick_and_place_with_movable_recep',
 'pick_clean_then_place_in_recep',
 'pick_cool_then_place_in_recep',
 'pick_heat_then_place_in_recep',
 'pick_two_obj_and_place'}

In [23]:
# Run once for item in dataset
gt_plan = dataset[1]
metrics = AlfredMetrics(prompt_builder,
                        allowed_tasks_types = dataset.available_task_types)

metrics.update(predicted_plan, gt_plan, idx)

metrics.calculate_metrics()

Predicted plan: 1. pick up the cooked potato, 2. put it in the sink, 3. done.
GT plan:        1. slice the potato, 2. pick up the sliced potato, 3. heat the sliced potato, 4. put it in the sinkbasin, 5. done.
Calculated metrics: [AEM: 0.000, A-LCS: 0.500, PEM: 0.000, P-LCS: 0.000]
['Task type', 'AEM', 'A-LCS', 'PEM', 'P-LCS']


{'pick_cool_then_place_in_recep': {'AEM': 0.0,
  'A-LCS': 0.0,
  'PEM': 0.0,
  'P-LCS': 0.0},
 'pick_clean_then_place_in_recep': {'AEM': 0.0,
  'A-LCS': 0.0,
  'PEM': 0.0,
  'P-LCS': 0.0},
 'pick_heat_then_place_in_recep': {'AEM': 0.0,
  'A-LCS': 0.5,
  'PEM': 0.0,
  'P-LCS': 0.0},
 'pick_two_obj_and_place': {'AEM': 0.0,
  'A-LCS': 0.0,
  'PEM': 0.0,
  'P-LCS': 0.0},
 'pick_and_place_simple': {'AEM': 0.0, 'A-LCS': 0.0, 'PEM': 0.0, 'P-LCS': 0.0},
 'pick_and_place_with_movable_recep': {'AEM': 0.0,
  'A-LCS': 0.0,
  'PEM': 0.0,
  'P-LCS': 0.0},
 'look_at_obj_in_light': {'AEM': 0.0, 'A-LCS': 0.0, 'PEM': 0.0, 'P-LCS': 0.0}}

## Evaluation

In [24]:
from torch.utils.data import Subset
from pprint import pprint

split = 'valid_seen' #@param ["valid_seen", "valid_unseen"]
dataset = AlfredDataset(split=split, path_to_data_dir=PATH_TO_DATA)
start_idx = 0 #@param {type: "integer"}
n = 10 #@param {type: "integer"}
indices = list(range(start_idx, start_idx + n * 3, 3))
subset = Subset(dataset, indices)

metrics = AlfredMetrics(prompt_builder,
                        allowed_tasks_types = dataset.available_task_types)
for idx, target_plan in enumerate(subset):
    predicted_plan = method.predict_plan(target_plan, idx)
    print(f"{idx}. Goal: {target_plan.goal}")
    metrics.update(predicted_plan, target_plan, idx)
    print()

Loading split: valid_seen...  Done.
0. Goal: place a cooked potato slice in the sink
Predicted plan: 1. pick up the potato, 2. heat the potato, 3. put it in the sink, 4. done.
GT plan:        1. slice the potato, 2. pick up the sliced potato, 3. heat the sliced potato, 4. put it in the sinkbasin, 5. done.
Calculated metrics: [AEM: 0.000, A-LCS: 0.750, PEM: 0.000, P-LCS: 0.000]

1. Goal: place a vase on a coffee table
Predicted plan: 1. pick up the vase, 2. put it on the coffee table, 3. done.
GT plan:        1. pick up the vase, 2. put it on the coffeetable, 3. done.
Calculated metrics: [AEM: 1.000, A-LCS: 1.000, PEM: 0.000, P-LCS: 0.500]

2. Goal: place a knife on the microwave oven table
Predicted plan: 1. pick up the knife, 2. put it on the microwave oven table, 3. done.
GT plan:        1. pick up the knife, 2. put it on the sidetable, 3. done.
Calculated metrics: [AEM: 1.000, A-LCS: 1.000, PEM: 0.000, P-LCS: 0.500]

3. Goal: place a rinsed knife inside a drawer.
Predicted plan: 1. 



8. Goal: put clean spoon on the table
Predicted plan: 1. pick up the spoon, 2. put it on the table, 3. done.
GT plan:        1. pick up the ladle, 2. clean the ladle, 3. put it on the diningtable, 4. done.
Calculated metrics: [AEM: 0.000, A-LCS: 0.667, PEM: 0.000, P-LCS: 0.000]

9. Goal: slice an apple or vegetable followed by rinsing it
Predicted plan: 1. Pick up the apple or vegetable, 2. Slice the apple or vegetable, 3. Rinse the apple or vegetable, 4. Done.
GT plan:        1. slice the apple, 2. pick up the sliced apple, 3. clean the sliced apple, 4. put it on the diningtable, 5. done.
Calculated metrics: [AEM: 0.000, A-LCS: 0.000, PEM: 0.000, P-LCS: 0.000]



In [25]:
total_metrics = metrics.calculate_metrics()
average_metrics = {'AEM': [], 'PEM': [], 'A-LCS': [], 'P-LCS': []}

for task_name, metric_values in total_metrics.items():
    for metric_name, metric_value in metric_values.items():
        average_metrics[metric_name].append(metric_value)

average_metrics = {k: f'{sum(v)/7:0.3f}' for k, v in average_metrics.items()}

pprint(total_metrics)
pprint(average_metrics)

['Task type', 'AEM', 'A-LCS', 'PEM', 'P-LCS']
{'look_at_obj_in_light': {'A-LCS': 0.0, 'AEM': 0.0, 'P-LCS': 0.0, 'PEM': 0.0},
 'pick_and_place_simple': {'A-LCS': 1.0, 'AEM': 1.0, 'P-LCS': 0.5, 'PEM': 0.0},
 'pick_and_place_with_movable_recep': {'A-LCS': 0.5,
                                       'AEM': 0.0,
                                       'P-LCS': 0.0,
                                       'PEM': 0.0},
 'pick_clean_then_place_in_recep': {'A-LCS': 0.4444444444444444,
                                    'AEM': 0.0,
                                    'P-LCS': 0.1111111111111111,
                                    'PEM': 0.0},
 'pick_cool_then_place_in_recep': {'A-LCS': 0.75,
                                   'AEM': 0.0,
                                   'P-LCS': 0.25,
                                   'PEM': 0.0},
 'pick_heat_then_place_in_recep': {'A-LCS': 0.625,
                                   'AEM': 0.0,
                                   'P-LCS': 0.0,
                 