In [7]:
import openai 
import requests 
import numpy as np 
from retrying import retry
from typing import List, Optional, Union

install and import basic package for GPT

In [8]:
!python3 -m pip install pandas bs4 wikipedia sqlalchemy torch matplotlib
import pandas as pd 
import requests
from bs4 import BeautifulSoup
import wikipedia
import os,sys,pathlib
import cv2
import datetime
import sqlalchemy
import torch
import matplotlib.pyplot as plt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


## OpenAI API Setting

In [9]:
import openai
import logging
from termcolor import colored
# load from key file
with open('./rilab_key.txt') as f:
    OPENAI_API_KEY = key = f.read()
openai.api_key = OPENAI_API_KEY

## GPTChat Class 


In [10]:
class GPTChatClass:
    def __init__(self, model="gpt-4", role_msg = 'You are a helpful assistant.'):
        self.model = model
        self.messages = [{"role": "system", "content": f'{role_msg}'}]  # initial system message
        self.response = None
        self.logger = logging.getLogger(__name__)

    def add_message(self, role, content):
        if not role or not content:
            self.logger.error("Role or content is missing.")
            return
        self.messages.append({"role": role, "content": content})

    @retry(stop_max_attempt_number=7, wait_exponential_multiplier=1000, wait_exponential_max=10000)
    def __call__(self, user_input : str = 'hi'):
        if not user_input:
            self.logger.error("User input is missing.")
            return
        self.add_message("user", user_input)
        try:
            self.response = openai.ChatCompletion.create(
                model=self.model,
                messages=self.messages
            )
        except Exception as e:
            self.logger.error(f"Failed to query the model: {e}")
            return None
        self.add_message("assistant", self.get_response_content())  # save the assistant's response
        return self.get_response_content()

    def get_response_content(self):
        if self.response:
            return self.response["choices"][0]["message"]["content"]
        else:
            self.logger.warning("Response is empty.")
            return None

    def get_response_status(self):
        if self.response:
            return self.response["choices"][0]["finish_reason"]
        else:
            self.logger.warning("Response is empty.")
            return None
    
    def print_history(self):
        for message in self.messages:
            role = message["role"]
            content = message["content"]

            if role == "user":
                formatted_message = colored("User : ", attrs=["bold"]) + content
            elif role == "assistant":
                formatted_message = colored("Assistant : ", attrs=["bold"]) + colored(content, "green", "on_dark_grey")
            else:
                #formatted_message = content  # System message, no formatting
                continue

            print(formatted_message)
            print("="*50)

In [6]:
gptchat = GPTChatClass(role_msg = 'You are a helpful assistant.')
gptchat(user_input = 'hi')
# Assitant : Hello! How can I assist you today?
gptchat(user_input = 'Who is the current korean president?')
# Assitant : The current president of South Korea, as of 2022, is Moon Jae-in. He has been in office since May 10, 2017.

'The current president of South Korea, as of 2022, is Moon Jae-in.'

### Print Chat History

In [5]:
gptchat.print_history()

You are a helpful assistant.
[1mUser : [0mhi
[1mAssistant : [0m[100m[32mHello! How can I assist you today?
[0m
[1mUser : [0mWho is the current korean president?
[1mAssistant : [0m[100m[32mThe current president of South Korea, as of 2022, is Moon Jae-in. He assumed office on May 10, 2017.[0m


## Make GPT use Code and Execute it

In [6]:
import sys 
from io import StringIO
import re

# import for python execution
import numpy as np
import pandas as pd 
import requests
from bs4 import BeautifulSoup
import wikipedia

class GPTAgent(GPTChatClass):
    SYSTEM_PROMPT = """You are a helpful assistant. You can get code execution result generate by code block
```python
(code contents)
```.
Then, you will get output as results\nCurrently, only python execution is available.

For example, 

### User : Can you me the 52nd fibonacci number?

### Assistant : 
To calculate 52nd fibonacci number, I think I need to make code to calculate it.
First, I need to make efficient Python function using memoization to compute Fibonacci numbers. which avoids redundant computation which can occur in a simple recursive implementation:
```python
def fibonacci(n, memo={}):
if n in memo:
    return memo[n]
elif n <= 2:
    result = 1
else:
    result = fibonacci(n-1, memo) + fibonacci(n-2, memo)
memo[n] = result
return result

print(fibonacci(52))```

### User : Executing above code. You got following results:
32951280099

### Assistant : 52nd fibonacci number is 32951280099
"""
    MAX_REFINE_STEP = 3
    IMPORTE_PACKAGES = """
import numpy as np
import pandas as pd 
import requests
from bs4 import BeautifulSoup
import wikipedia
"""

    def __init__(self, model="gpt-4"):
        self.model = model
        self.messages = [{"role": "system", "content": f'{self.SYSTEM_PROMPT}\nInitially imported packages:\n```python{self.IMPORTE_PACKAGES}```'}]  # initial system message
        self.response = None
        self.logger = logging.getLogger(__name__)

    def code_exec(self, code: str = ''):
        # using code as tool (code execution)
        stdout_backup = sys.stdout
        sys.stdout = StringIO()

        try:
            exec(code)
            output = sys.stdout.getvalue().strip()
            return output
        except Exception as e:
            # Capture and print any exceptions or errors
            import traceback
            error_log = traceback.format_exc()
            return error_log
        finally:
            # Restore the original stdout
            sys.stdout = stdout_backup
    
    def extract_code_blocks(self, text : str):
        pattern = r'```python(.*?)```' # now just for python
        code_blocks = re.findall(pattern, text, re.DOTALL)
        return [block.strip() for block in code_blocks]

    def code_step(self, 
                  user_prompt:str = 'Executing above code. You got following results:\n{output}\nIf result not shown, you should add print statement to see the result.'):
        # check response has code
        
        last_assistant_response = self.get_response_content()
        code_blocks = self.extract_code_blocks(text = last_assistant_response)

        if len(code_blocks)<1:
            # No code block presented
            return False 
        
        # if code blocks 
        # join all code blocks
        full_code_str = '\n'.join(code_blocks)
        # exectute
        output = self.code_exec(full_code_str)
        
        # attach output
        #self.add_message("user", f'Executing above code:\n{full_code_str}\nGot results:\n{output}')
        self.add_message("user", user_prompt.format(output=output))
        return True
    
    def code_step_local_loop(self,
                             user_prompt:str = 'Executing above code. You got following results:\n{output}\nIf result not shown, you should add print statement to see the result.'):
        code_flag = self.code_step()
        
        # refine code 
        for i in range(self.MAX_REFINE_STEP):
            if not code_flag:
                break
            else:
                self.generate()
                code_flag = self.code_step(user_prompt=user_prompt)

    @retry(stop_max_attempt_number=7, wait_exponential_multiplier=1000, wait_exponential_max=10000)
    def generate(self, max_tokens : int = 1024,
                       stop: Optional[Union[str, list]] = None,):
        try:
            self.response = openai.ChatCompletion.create(
                model=self.model,
                messages=self.messages,
                max_tokens=max_tokens,
                stop = stop,
            )
        except Exception as e:
            self.logger.error(f"Failed to query the model: {e}")
            return None
        self.add_message("assistant", self.get_response_content())  # save the assistant's response

    def __call__(self, user_input : str = 'hi',
                       max_tokens : int = 1024,
                       stop: Optional[Union[str, list]] = None,):
        if not user_input:
            self.logger.error("User input is missing.")
            return
        self.add_message("user", user_input)
        self.generate(max_tokens = max_tokens, stop=stop)

        ######################
        ### take code step ###
        ######################
        self.code_step_local_loop()
        return self.get_response_content()

In [9]:
gpt_agent = GPTAgent()

gpt_agent('Who is current korean president?') # original gpt4 answer : 문재인
gpt_agent.print_history()

You are a helpful assistant. You can get code execution result generate by code block
```python
(code contents)
```.
Then, you will get output as results
Currently, only python execution is available.

For example, 

### User : Can you me the 52nd fibonacci number?

### Assistant : 
To calculate 52nd fibonacci number, I think I need to make code to calculate it.
First, I need to make efficient Python function using memoization to compute Fibonacci numbers. which avoids redundant computation which can occur in a simple recursive implementation:
```python
def fibonacci(n, memo={}):
if n in memo:
    return memo[n]
elif n <= 2:
    result = 1
else:
    result = fibonacci(n-1, memo) + fibonacci(n-2, memo)
memo[n] = result
return result

print(fibonacci(52))```

### User : Executing above code. You got following results:
32951280099

### Assistant : 52nd fibonacci number is 32951280099

Initially imported packages:
```python
import numpy as np
import pandas as pd 
import requests
from bs4 i

## Save code for future use

prompt dummy

In [7]:
SYSTEM_PROMPT = """You are a helpful assistant. You can get code execution result generate by code block
```python
(code contents)
```.
Then, you will get output as results
Currently, only python execution is available.
! Make sure print it you want to check to see the code result.

"""

IMPORTE_PACKAGES = """
import os,sys,pathlib
import datetime
import numpy as np 
import pandas as pd 
import requests
from bs4 import BeautifulSoup
import wikipedia
import cv2
import sqlalchemy
import matplotlib.pyplot as plt
import torch
"""

SAVE_PROMPT = """
If you think, code is re-usable by fequent times and the last executed code is successfully work as exepcted then generalized code for future use.

Here is last generated code :
```python
{code}
```
"""

CODE_OBS_PROMPT = """
Executing above code. You got following results:
[! You may not see the result if you not print it.]
{output}
Modify the code for better performance if needed."""

`ToolSaveAgent` 
1. (first step) : getting user msg and generate,refine code with `code_step_local_loop` 
1. (second step) :  `save_code` to generalize the code for future use. 

In [8]:
# import for python execution
import importlib.util

class ToolSaveAgent(GPTAgent):

    """
        This agent will create tool(code) and save this code then reuse it.
    """

    def __init__(self, model="gpt-4"):
        self.model = model
        self.messages = [{"role": "system", "content": f'{SYSTEM_PROMPT}\nInitially imported packages:\n```python{IMPORTE_PACKAGES}```'}]  # initial system message
        self.response = None
        self.logger = logging.getLogger(__name__)

        # code save related
        self.code_save_path = './gpt_gen_codes'

        # post define max refine step
        self.MAX_REFINE_STEP = 7
        self.last_executed_code = ''

    def code_exec(self, code: str = ''):
        # using code as tool (code execution)
        stdout_backup = sys.stdout
        sys.stdout = StringIO()

        include_pkg_code = f'#import pkg\n{IMPORTE_PACKAGES}\n\n{code}'
        
        try:
            exec(include_pkg_code)
            output = sys.stdout.getvalue().strip()
            self.last_executed_code = code
            return output
        except Exception as e:
            # Capture and print any exceptions or errors
            import traceback
            error_log = traceback.format_exc()
            return error_log
        finally:
            # Restore the original stdout
            sys.stdout = stdout_backup
    
    def code_step(self, 
                  user_prompt:str = 'Executing above code. You got following results:\n{output}\nIf result not shown, you should add print statement to see the result.'):
        # check response has code
        
        last_assistant_response = self.get_response_content()
        code_blocks = self.extract_code_blocks(text = last_assistant_response)

        if len(code_blocks)<1:
            # No code block presented
            return False 
        
        # if code blocks 
        # join all code blocks
        full_code_str = '\n'.join(code_blocks)
        # exectute
        output = self.code_exec(full_code_str)
        
        # attach output
        #self.add_message("user", f'Executing above code:\n{full_code_str}\nGot results:\n{output}')
        self.add_message("user", user_prompt.format(output=output))

        self.logger.warn(f'\ncode : {full_code_str}\noutput: {output}')
        return True

    def save_code(self, max_tokens : int = 1024,
                       stop: Optional[Union[str, list]] = None,):
                
        # save the code 
        self.add_message("user", SAVE_PROMPT.format(code = self.last_executed_code))
        self.generate(max_tokens = max_tokens,
                      stop       = stop)
        self.add_message("user", f'Make sure this is also works for other example, you may give test script to test this.')
        self.generate(max_tokens = max_tokens,
                      stop       = stop)

        self.code_step_local_loop(user_prompt=CODE_OBS_PROMPT)
        
        #print(self.messages[-1])
        #print("---"*30)
        

    def __call__(self, user_input : str = 'hi',
                       max_tokens : int = 1024,
                       stop: Optional[Union[str, list]] = None,):
        
        ######################
        ###   user input   ###
        ######################
        if not user_input:
            self.logger.error("User input is missing.")
            return
        self.add_message("user", user_input)
        self.generate(max_tokens = max_tokens,
                      stop       = stop)

        ######################
        ### take code step ###
        ######################
        self.code_step_local_loop()

        ######################
        ### save the code  ###
        ######################
        self.save_code()

        return self.get_response_content()

In [9]:
tool_save_agent = ToolSaveAgent()

tool_save_agent('Who is the current korean president?') # original gpt4 answer : 문재인
tool_save_agent.print_history()

  self.logger.warn(f'\ncode : {full_code_str}\noutput: {output}')

code : def current_korean_president():
    current_president_page = wikipedia.page("President of South Korea")
    soup = BeautifulSoup(current_president_page.html(), "html.parser")
    table = soup.find_all('table', Class_='infobox vcard')[0]
    for i in table.find_all('tr'):
        if "Incumbent" in i.text:
            president_name = i.text.split("\n")[2]
            return president_name

current_korean_president()
output: Traceback (most recent call last):
  File "/var/folders/fd/4tclvst511lf5fztg9vflt5r0000gn/T/ipykernel_15937/3690587708.py", line 31, in code_exec
    exec(include_pkg_code)
  File "<string>", line 25, in <module>
  File "<string>", line 19, in current_korean_president
IndexError: list index out of range


code : president_info = wikipedia.page("List of presidents of South Korea")
soup = BeautifulSoup(president_info.html(), "html.parser")
table = soup.find_all('table', class_='wikitable')[0]

# 

[1mUser : [0mWho is the current korean president?
[1mAssistant : [0m[100m[32mCurrently, my execution only supports simple programming code, my search parameters do not contain updated real-time data or events.

You can look up this type of information by using real-time search engines like Google. 

Let me try execute wikipedia module and scrap data from it. Let's find this out together.
```python
def current_korean_president():
    current_president_page = wikipedia.page("President of South Korea")
    soup = BeautifulSoup(current_president_page.html(), "html.parser")
    table = soup.find_all('table', Class_='infobox vcard')[0]
    for i in table.find_all('tr'):
        if "Incumbent" in i.text:
            president_name = i.text.split("\n")[2]
            return president_name

current_korean_president()
```
This function scraps the data from the Wikipedia page and finds out the current President of South Korea.[0m
[1mUser : [0mExecuting above code. You got following resul