In [1]:
# imports 
import os
import time
import requests
import json 
from typing import List, Dict, Union
from pathlib import Path, PosixPath
from collections import defaultdict
import random

# import openai 
import litellm




In [2]:
# Alex's API key 


os.environ["OPENAI_API_KEY"] = OAI_API

# Wrapper 

In [3]:
# wrapper for API

# model names 

# MODEL_NAMES = defaultdict(lambda: None)
# MODEL_NAMES.update({
#     "gpt-4": None, 
#     "turbo": None, 
#     "davinci": None,
#     "chatgpt-3.5": None 
# })

# class 

class Model: 
    """
    wrapper class for LLM APIs (for now, just gpt 4 and 3.5). 
    we will design this API so that it is as easy as possible to conduct experiments. 
    
    DETAILS: 
    1.) iid sampling: we will prompt the model with the same prompt multiple times and store the outputs in a jsonl file 
    2.) iterative self-improvement: we will prompt the model with the same prompt, but will also ask the model to improve upon its previous output.

    3.) (TODO later) run evaluations: there is a script policy_vis.py for evaluating model performance. We will build this into the API so that it is very easy to evaluate. 
    """

    def __init__(self,
                model: str, 
                temp: int = 0.7, 
                model_name: str = None , 
                prompt: Union[str,Path] = None, 
                self_improv: Union[str,Path] = None, 
                save_path: Union[str,Path] = None):

        # model configuration
        self.oai_api: str = OAI_API
        self.model_name: str = model_name if model_name else model
        self.model: str = model # MODEL_NAMES[self.model_name]
        self.temp: int = temp
        self.timestamp: str = None # lastrun 

        # prompting 
        self.prompt = prompt if type(prompt) == str else open(prompt).read()
        self.sample_size: int = 5 # TODO: set this to 100 for full experiment
        self.token_limit: int = 100 # TODO: set this to 1000 for full experiment

        self.self_improv_prompt = self_improv if type(self_improv) == str else open(self_improv).read()
        self.improv_count: int = 2 # TODO: allow larger values for full experiment

        # saving results 
        self.save_path: Path = Path(save_path) if type(save_path) == str else save_path

    def __repr__(self) -> str:
        """
        Pretty print the following information:
        - model name
        - temp
        - token limit
        - sample size
        - prompt (first 20 chars followed by ...)
        - self improv prompt (first 20 chars followed by ...)
        """
        prompt_preview = (self.prompt[:20] + '...') if self.prompt else 'None'
        improv_preview = (self.self_improv_prompt[:20] + '...') if self.self_improv_prompt else 'None'
        return (f"Model(name={self.model_name}, temp={self.temp}, token_limit={self.token_limit}, "
                f"sample_size={self.sample_size}, prompt={prompt_preview}, "
                f"self_improv_prompt={improv_preview})")

    # utility functions
    def _run_completion(self, prompt, temp, token_limit):
        response = litellm.completion(
            model=self.model,
            messages=[{"content": prompt, "role": "user"}],
            max_tokens=token_limit,
            temperature=temp
        )
        # Check if response is dictionary type or an object with attributes
        if isinstance(response, dict):
            # Dictionary access
            message_content = response['choices'][0]['message']['content']
        else:
            # Object attribute access
            message_content = response.choices[0].message.content
        # Return the message content directly, or adapt this part as needed
        return message_content


    def _save_result_to_file(self, filename: str, result):
        """
        Save a single result to a file.
        """
        with open(filename, 'a') as file:  # Open in append mode
            file.write(str({"result": json.dumps(result)}) + "\n")  # Write result as a JSON line
    




    def completion(self, prompt: Union[str, Path] = None, sample_size: int = None, temp: int = None, token_limit: int = None): 
        """
        function for getting model output given a prompt.
        """
        
        if prompt is not None and isinstance(prompt, str):
            final_prompt = prompt
        elif prompt is not None and isinstance(prompt, Path):
            final_prompt = prompt.read_text()
        elif self.prompt: final_prompt = self.prompt 
        else: 
            raise ValueError("Prompt can't be Non.")

        temperature = temp if temp is not None else self.temp
        max_tokens = token_limit if token_limit is not None else self.token_limit
        self.timestamp = time.strftime("%Y%m%d-%H%M%S")

        if sample_size is None or sample_size <= 1:
            response = self._run_completion(final_prompt, temperature, max_tokens)
            if self.save_path:
                filename = f"{self.save_path}/results_{self.model_name}_single_{self.timestamp}_1_{temp}_{token_limit}.jsonl"
                self._save_result_to_file(filename, response)
            return [response]
        else:
            experiment_type = "iid"
            filename = f"{self.save_path}/results_{self.model_name}_{experiment_type}_{self.timestamp}_{sample_size}_{temp}_{token_limit}.jsonl"
            for _ in range(sample_size):
                response = self._run_completion(final_prompt, temperature, max_tokens)
                self._save_result_to_file(filename, response)
            return None  # Or return an indication that results have been saved.

    

    def self_improv(self, prompt: str = None, self_improv_prompt: str = None, improv_count: int = None, sample_count: int = 1, temp: int = None, token_limit: int = None, final_result_only: bool = True):
        """
        Function for running iterative self-improvement with sampling.

        Runs self-improvement by first prompting the model and then asking it to improve upon its previous output. Samples the improvement step multiple times and optionally saves the results to a jsonl file. Use temp and token_limit to control the sampling. Run it improv_count times with sample_count samples at each step.

        If final_result_only is False, saves all responses to a JSON file. Otherwise, only saves the final improved output.
        """
        final_prompt = prompt if prompt else self.self_improv_prompt
        improv_iterations = improv_count if improv_count is not None else self.improv_count
        temperature = temp if temp is not None else self.temp
        max_tokens = token_limit if token_limit is not None else self.token_limit
        
        all_outputs = []  # Collect all outputs here
            
        # Perform initial completion to start the self-improvement process
        improved_output = self._run_completion(final_prompt, temperature, max_tokens)
        
        # Iteratively ask the model to improve upon the previous output, sampling each step
        for _ in range(improv_iterations - 1):
            new_prompts = [f"Improve upon the following: {improved_output}" for _ in range(sample_count)]
            improved_outputs = [self._run_completion(new_prompt, temperature, max_tokens) for new_prompt in new_prompts]
            all_outputs.extend(improved_outputs)  # Save outputs from each iteration
        
        # Optionally, save outputs to a file
        if self.save_path:
            file_name_template = "results_{model_name}_self_improv_{timestamp}_{sample_size}_{temp}_{token_limit}.jsonl"
            experiment_type = "self_improv"
            results_to_save = [{"result": improved_output}] if final_result_only else [{"prompt": final_prompt, "result": output} for output in all_outputs]
            
            self._save_results_to_file(file_name_template, results_to_save, experiment_type, sample_count, temp, token_limit)
        
        return improved_output


In [4]:
def generate_prompt(template: Union[str, PosixPath], task_descr: Union[Dict[str, str], PosixPath], prompt_path: PosixPath = Path("prompt.txt")) -> str:
    """
    Fill in template (given as f-string) with the fields in the task description. 
    Print out result as well as save to path.
    """
    if isinstance(template, PosixPath):
        template = template.read_text()
    if isinstance(task_descr, PosixPath):
        task_descr = task_descr.read_text()  # Assuming JSON or similar, needs parsing

    filled_template = template.format(**task_descr)
    print(filled_template)
    with open(prompt_path, 'w') as file:
        file.write(filled_template)

    return filled_template

# Evaluation Function 

In [5]:
# implement evaluation function 

# Experiments

In [6]:

# Template for the prompt
template = """\
You are responsible for designing a value function to solve the following task: 
{task_description}\n\n\
You will write a python `Value`, which should be initializable without any parameters from the user, object which has one method:
- `def value(observation)` which takes in an observation and returns the value of the observation. The \
output should be normalized between -1 and 1.
Note: You should not assume any exploration outside of what is learned during the agent's single rollout in \
the environment. This means you should not rely on Q-learning, requiring extra exploration.\n\n\
The observation space is defined formally as: 
{observation_description}\n\n\
You are allowed to use any python library you want but should not assume access \
to any other external resources (such as models with downloadable weights) unless otherwise specified. \
In particular you can assume access to the following APIs: \
{api_description}\n\n\
You should only write the Value class and nothing else. \
You are encouraged to be as creative as possible, do not simply copy one of the exemplars if given. \
All code should be written in a single, large code block.
"""

In [7]:
envs = {
    "chess": dict( 
        task_description="""
        You are a chess world champion. Win the chess game. You are playing white. You have no time constraints.  
""", 
        observation_description="""
        observation: chess.Boards() object from the python-chess library. It has the following attributes which may be useful:
        
        move_stack: List[Move]
        The move stack. Use Board.push(), Board.pop(), Board.peek() and Board.clear_stack() for manipulation.

        propertylegal_moves: LegalMoveGenerator
        A dynamic list of legal moves.
        Note: To use this as a list you will need to do list(board.legal_moves)

        import chess

        board = chess.Board()
        board.legal_moves.count()
        20
        bool(board.legal_moves)
        True
        move = chess.Move.from_uci("g1f3")
        move in board.legal_moves
        True
        Wraps generate_legal_moves() and is_legal().


        checkers()→ SquareSet[source]
        Gets the pieces currently giving check.

        Returns a set of squares.


        is_check()→ bool[source]
        Tests if the current side to move is in check.

        gives_check(move: Move)→ bool[source]
        Probes if the given move would put the opponent in check. The move must be at least pseudo-legal.


        is_checkmate()→ bool[source]
        Checks if the current position is a checkmate.

        is_stalemate()→ bool[source]
        Checks if the current position is a stalemate.

        is_insufficient_material()→ bool[source]
        Checks if neither side has sufficient winning material (has_insufficient_material()).

        has_insufficient_material(color: chess.Color)→ bool[source]
        Checks if color has insufficient winning material.

        This is guaranteed to return False if color can still win the game.

        The converse does not necessarily hold: The implementation only looks at the material, including the colors of bishops, but not considering piece positions. So fortress positions or positions with forced lines may return False, even though there is no possible winning line.


        find_move(from_square: chess.Square, to_square: chess.Square, promotion: chess.PieceType | None = None)→ Move[source]
        Finds a matching legal move for an origin square, a target square, and an optional promotion piece type.

        For pawn moves to the backrank, the promotion piece type defaults to chess.QUEEN, unless otherwise specified.

        Castling moves are normalized to king moves by two steps, except in Chess960.

        Raises
        :
        IllegalMoveError if no matching legal move is found.
""", 
        api_description="""
        None
""", 
        action_description="""
        None
""",
        reward_description="""
        None
""",
        action_exemplar="""
        None
"""
    ), 

    "Blackjack-v1": dict(
        task_description="""Win the blackjack hand. Each round will be separate, independent of the ones before.
""",
        observation_description="""observation: Tuple[int, int, bool] where:\n\
observation[0] = The sum of your cards\n\
observation[1] = The dealer's showing card sum.\n\
observation[2] = True if you have an Ace, False otherwise\n\
""",
     action_description="""action: str \n\
chess notation for player1's move. \n\
""",
        reward_description="""A reward of between -1 and 1 is given for each move. Takes into account: number of pieces captured, piece mobility, pawn structure
         , king safety, control of center of board, how well balanced 
           the pieces are around the board.  \n\
""",
        action_exemplar="""\
- a_1 = "e2e4"  # Player 1 moves pawn from e2 to e4
- a_2 = "e7e5"  # Player 2 moves pawn from e7 to e5
- a_3 = "g1f3"  # Player 1 moves knight from g1 to f3
- a_4 = "b8c6"  # Player 2 moves knight from b8 to c6
""",
    ),

    "MiniGrid-BlockedUnlockPickup-v0": dict(
        task_description="""You are an agent in 2-D gridworld. The agent has to pick up a box which is placed in another room, behind a locked door. \
The door is also blocked by a ball which the agent has to move before it can unlock the door. \
Hence, the agent has to learn to move the ball, pick up the key, open the door and pick up the object in the other room.
""",
        observation_description="""You can only see a (7, 7) square of tiles in the direction you are facing. \
Formally `observation: Dict('direction': Discrete(4), 'image':  array: (7, 7, 3)))` \
where:
- observation['direction'] with 0: right, 1: down, 2: left, 3: up\n\
- observation['image'] array with shape (7, 7, 3) with each tile in the (7, 7) grid encoded as the triple (object: int, color: int, state: int) where
    - object with 0: unseen, 1: empty, 2: wall, 3: floor, 4: door, 5: key, 6: ball, 7: box, 8: goal, 9: lava
    - color with 0: red, 1: green, 2: blue, 3: purple, 4: yellow, 5: grey
    - state with 0: door open, 1: door closed, 2: door locked
Note, the agent is always located at observation['image'][3][6] with \
observation['image'][2] to the left and observation['image'][4] to the right and observation['image'][3][5] forward
""",
        action_description="""action: int such that\n\
- 0: turn left\n\
- 1: turn right\n\
- 2: move forward\n\
- 3: pickup item\n
""",
        reward_description="""A reward of ‘1 - 0.9 * (step_count / max_steps)’ is given for success, and ‘0’ for failure.
""",
        action_exemplar="""\
- a_1 = 1  # I don't see anything so turn right
- a_2 = 2  # I see the key to my forward left and the door to my right so I walk toward both
- a_3 = 2  # Walk forward again toward key
- a_4 = 0  # Turn toward key
- a_5 = 3  # pickup key
- a_6 = 1  # Turn right to pickup ball to my right
- a_7 = 2  # Walk toward ball
- a_8 = 3  # pickup ball in front of me
- a_9 = 2  # Walk forward towards door
- a_10 = 2  # Walk through door with key
- a_11 = 2  # Walk forward
- a_12 = 1  # I see the chest to my right so I turn right
- a_13 = 2  # Walk toward chest
- a_14 = 3  # Pickup chest
""",
    ),
}

In [8]:
# Assuming `envs` dictionary is defined elsewhere in the script, or loaded from an external source

# Example usage for a specific task, e.g., "chess"
task_descr = envs["chess"]

# Generate prompt for "chess" task
prompt_path = Path("chess_prompt.txt")  # Define a specific file path if needed
generate_prompt(template, task_descr, prompt_path)

You are responsible for designing a value function to solve the following task: 

        You are a chess world champion. Win the chess game. You are playing white. You have no time constraints.  


You will write a python `Value`, which should be initializable without any parameters from the user, object which has one method:
- `def value(observation)` which takes in an observation and returns the value of the observation. The output should be normalized between -1 and 1.
Note: You should not assume any exploration outside of what is learned during the agent's single rollout in the environment. This means you should not rely on Q-learning, requiring extra exploration.

The observation space is defined formally as: 

        observation: chess.Boards() object from the python-chess library. It has the following attributes which may be useful:
        
        move_stack: List[Move]
        The move stack. Use Board.push(), Board.pop(), Board.peek() and Board.clear_stack() for manipulati

'You are responsible for designing a value function to solve the following task: \n\n        You are a chess world champion. Win the chess game. You are playing white. You have no time constraints.  \n\n\nYou will write a python `Value`, which should be initializable without any parameters from the user, object which has one method:\n- `def value(observation)` which takes in an observation and returns the value of the observation. The output should be normalized between -1 and 1.\nNote: You should not assume any exploration outside of what is learned during the agent\'s single rollout in the environment. This means you should not rely on Q-learning, requiring extra exploration.\n\nThe observation space is defined formally as: \n\n        observation: chess.Boards() object from the python-chess library. It has the following attributes which may be useful:\n        \n        move_stack: List[Move]\n        The move stack. Use Board.push(), Board.pop(), Board.peek() and Board.clear_stack(

In [9]:
gpt4 = Model(model="gpt-3.5-turbo", temp=0.7, prompt=prompt_path, self_improv="self_improv.txt", save_path="results")

In [10]:
gpt4.model="gpt-3.5-turbo"
gpt4.save_path = Path("results")

In [15]:
gpt4.completion(prompt = prompt_path, sample_size=100, temp=1, token_limit=3000)

In [None]:
# gpt4.self_improv(prompt = prompt_path, self_improv_prompt=Path("self-improv.txt"), improv_count=2, temp=0.7, token_limit=10)