<a href="https://colab.research.google.com/github/adimis-ai/Large-Language-Model-LLM-Wrapper-from-Scratch-using-Openai-models/blob/main/Large%20Language%20Model%20(LLM)%20Wrapper%20from%20Scratch%20using%20Openai%20models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
#!pip install openai
#!pip install --upgrade tiktoken

In [28]:
import openai
import tiktoken
import time
from typing import List, Union
from pydantic import BaseModel

In [29]:
API_KEY = "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"

In [30]:
class ChatMessage(BaseModel):
    role: str
    content: str

In [31]:
# Utility functions
def count_tokens(text, model_name="gpt-3.5-turbo"):
    encoding = tiktoken.encoding_for_model(model_name)
    return len(encoding.encode(text))

def handle_error(response, retry_count, max_retry_attempts, retry_wait_time):
    if response.status_code in {429, 500, 503} and retry_count < max_retry_attempts:
        print(f"Retrying after {retry_wait_time} seconds...")
        time.sleep(retry_wait_time)
        return True
    return False

In [32]:
# Chat Model Wrapper
class ChatModelWrapper:
    def __init__(self, api_key, use_memory=True, max_completion_token=2000, model_name="gpt-3.5-turbo"):
        openai.api_key = api_key
        self.memory = []
        self.max_completion_token = max_completion_token
        self.model_name = model_name
        self.use_memory = use_memory

    def _manage_memory(self, current_prompt_content, max_tokens):
        total_required_token = self.max_completion_token - (count_tokens(current_prompt_content) + max_tokens)
        combined_memory_tokens = sum(count_tokens(msg["role"] + msg["content"]) for msg in self.memory)
        removed_messages = []

        while combined_memory_tokens > total_required_token:
            removed_message = self.memory.pop(0)
            combined_memory_tokens -= count_tokens(removed_message["content"])
            removed_messages.append(removed_message)

    def _generate_prompt(self, messages, max_tokens):
        if self.use_memory:
            current_prompt_content = " ".join(message["content"] for message in messages)
            self._manage_memory(current_prompt_content, max_tokens)
            final_messages = self.memory + messages
        else:
            final_messages = messages
        return final_messages

    def _chat_completion(self, messages: List[ChatMessage], max_tokens: int = 128, **kwargs) -> openai.ChatCompletion:
        retry_count = 0

        total_tokens = sum(count_tokens(msg["content"]) for msg in messages)
        if total_tokens + max_tokens > self.max_completion_token:
            return "Error: Total tokens exceed the limit."

        while retry_count < kwargs.get("max_retry_attempts", 3):
            try:
                response = openai.ChatCompletion.create(
                    model=kwargs.get("model", self.model_name),
                    messages=messages,
                    max_tokens=max_tokens,
                    **kwargs
                )
                return response

            except openai.error.OpenAIError as e:
                if handle_error(e, retry_count, kwargs.get("max_retry_attempts", 3), kwargs.get("retry_wait_time", 60)):
                    print("Error in _chat_completion: ", e)
                    retry_count += 1
                else:
                    return None

    def generate_response(self, messages: List[ChatMessage], max_tokens: int = 128, **kwargs) -> openai.ChatCompletion:
        if len(messages) == 0:
            return "Error: No input messages."

        prompt = self._generate_prompt(messages, max_tokens)
        response = self._chat_completion(prompt, **kwargs)

        try:
            if response and response.choices and len(response.choices) > 0:
                if self.use_memory:
                    self.memory.extend(messages)
                    response_dict = {
                        "role": "assistant",
                        "content": response.choices[0].message["content"]
                    }
                    self.memory.append(response_dict)
                return response
        except:
            return response

    def set_model(self, model_name):
        self.model_name = model_name

    def set_memory_usage(self, use_memory):
        self.use_memory = use_memory

    def prioritize_messages(self, messages: List[ChatMessage]):
        # Sort messages based on timestamp or relevance score
        # Ensure that higher-priority messages come first in the list
        sorted_messages = sorted(messages, key=lambda msg: msg.get("timestamp", 0), reverse=True)
        return sorted_messages

    def split_long_conversation(self, messages: List[ChatMessage], max_tokens_per_chunk):
        split_chunks = []
        current_chunk = []
        current_chunk_tokens = 0

        for msg in messages:
            msg_tokens = count_tokens(msg["content"])
            if current_chunk_tokens + msg_tokens <= max_tokens_per_chunk:
                current_chunk.append(msg)
                current_chunk_tokens += msg_tokens
            else:
                split_chunks.append(current_chunk)
                current_chunk = [msg]
                current_chunk_tokens = msg_tokens

        if current_chunk:
            split_chunks.append(current_chunk)

        return split_chunks

In [33]:
# Completion Model Wrapper
class CompletionModelWrapper:
    def __init__(self, api_key, use_memory=True, max_completion_token=3000, model_name="text-davinci-003"):
        openai.api_key = api_key
        self.memories = []
        self.max_completion_token = max_completion_token
        self.model_name = model_name
        self.use_memory = use_memory
        self.max_retry_attempts = 3

    def _manage_memory(self, current_prompt, max_tokens):
        number_of_token_in_current_prompt = count_tokens(current_prompt, model_name="text-davinci-003")
        total_memory_tokens = sum(count_tokens(memory["USER"], model_name="text-davinci-003") + count_tokens(memory["AI"], model_name="text-davinci-003") for memory in self.memories)

        while total_memory_tokens > self.max_completion_token - (number_of_token_in_current_prompt + max_tokens):
            removed_memory = self.memories.pop(0)
            total_memory_tokens -= count_tokens(removed_memory["USER"], model_name="text-davinci-003") + count_tokens(removed_memory["AI"], model_name="text-davinci-003")

    def _format_conversation(self, current_prompt):
        if self.use_memory:
            conversation_series = "\n".join([f"User: {memory['USER']}\nAI: {memory['AI']}" for memory in self.memories])
            conversation_series += f"\nUser: {current_prompt}\nAI:"
            return conversation_series
        else:
            return current_prompt

    def _completion(self, prompt: str, max_tokens: int = 2000, temperature=1.0, **kwargs) -> openai.Completion:
        prompt_with_memory = self._format_conversation(prompt)
        retry_count = 0
        while retry_count < self.max_retry_attempts:
            try:
                response = openai.Completion.create(
                    model=self.model_name,
                    prompt=prompt_with_memory,
                    max_tokens=max_tokens,
                    temperature=temperature,
                    **kwargs
                )
                return response
            except openai.error.OpenAIError as e:
                if handle_error(e, retry_count, self.max_retry_attempts, kwargs.get("retry_wait_time", 60)):
                    print("Error in _completion: ", e)
                    retry_count += 1
                else:
                    return None

    def generate_response(self, prompt: str, max_tokens: int = 2000, temperature=1.0, **kwargs) -> openai.Completion:
        res = self._completion(prompt, max_tokens, temperature, **kwargs)
        if res:
            memory = {
                "USER": prompt,
                "AI": res.choices[0].text.strip()
            }
            self.memories.append(memory)
            self._manage_memory(prompt, max_tokens)  # Dynamic memory management
        return res

    def set_model(self, model_name):
        self.model_name = model_name

    def set_memory_usage(self, use_memory):
        self.use_memory = use_memory

In [34]:
# LLM Wrapper
class LLMWrapper:
    def __init__(self, api_key, model_type, use_memory=True, max_chat_completion_token=3000, model_name="gpt-3.5-turbo"):
        self.api_key = api_key
        self.model_type = model_type
        self.use_memory = use_memory
        self.max_chat_completion_token = max_chat_completion_token
        self.model_name = model_name
        self.chat_wrapper = ChatModelWrapper(self.api_key, self.use_memory, self.max_chat_completion_token, self.model_name)
        self.completion_wrapper = CompletionModelWrapper(self.api_key, self.use_memory, self.max_chat_completion_token, self.model_name)

    def generate_response(self, messages: List[ChatMessage], max_tokens: int = 2000, **kwargs) -> Union[openai.ChatCompletion, openai.Completion, str]:
        if self.model_type == "Chat":
            res = self.chat_wrapper.generate_response(messages, max_tokens, **kwargs)
            return res.choices[0].message
        elif self.model_type == "Completion":
            res = self.completion_wrapper.generate_response(messages, max_tokens, **kwargs)
            return res.choices[0].text.strip()
        else:
            return "Invalid model_type specified."

### Chatbot Test using LLMWrapper

In [None]:
# Create an instance of your LLMWrapper for chat-based interactions
chatbot = LLMWrapper(API_KEY, model_type='Chat')

print("Chatbot: Hello! How can I assist you today?")

while True:
    user_input = input("You: ")
    if user_input.lower() == 'exit':
        print("Chatbot: Goodbye!")
        break

    user_message = {
        "role": "user",
        "content": user_input
    }

    response = chatbot.generate_response([user_message], max_tokens=128, temperature=0.7)
    print(f"Chatbot: {response.content}")

### Section Wise Article Generator Test using LLMWrapper

In [None]:
# Test functions
def generate_metadata_sections():
    chat_wrapper = ChatModelWrapper(API_KEY)

    metadata_tasks = [
        {"role": "user", "content": "Please provide an introduction for a travel destination."},
        {"role": "user", "content": "Describe the local cuisine and dining experiences."},
        {"role": "user", "content": "Highlight must-visit attractions on the island."},
        {"role": "user", "content": "Share insights about the island's culture and traditions."},
        {"role": "user", "content": "Provide recommendations for accommodations and lodging."},
        {"role": "user", "content": "Conclude the article with a summary of Santorini's charm."}
    ]

    metadata_sections = []
    for task in metadata_tasks:
        chat_response = chat_wrapper.generate_response([task], max_tokens=128, temperature=0.7)
        metadata_sections.append(chat_response.choices[0].message["content"])

    return metadata_sections

def write_article_sections(metadata_sections):
    completion_wrapper = CompletionModelWrapper(API_KEY)

    article_prompts = metadata_sections

    article_sections = []
    for prompt in article_prompts:
        system_prompt = "Generate an article section about the following topic:"
        full_prompt = f"{system_prompt}\n{prompt}"

        completion_response = completion_wrapper.generate_response(full_prompt, max_tokens=800, temperature=0.7)
        article_sections.append(completion_response.choices[0].text.strip())

    return article_sections

In [None]:
if __name__ == "__main__":
    metadata_sections = generate_metadata_sections()
    article_sections = write_article_sections(metadata_sections)

    for i, (metadata, section) in enumerate(zip(metadata_sections, article_sections), start=1):
        print(f"Section {i} - Metadata:")
        print(metadata)
        print("==" * 40)
        print(f"Section {i} - Section Content:")
        print(section)
        print("==" * 40)