# Local Models playbook

In [1]:
!pip install -qU "torch" "python-dotenv" "chromadb" "sentence-transformers" "transformers" "psycopg2-binary" "rich"


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## General

In [33]:
import abc
import logging
import os
from datetime import datetime, timedelta
from typing import List, Dict, Any
from pprint import pprint
import time

import chromadb
import psutil
import torch
from chromadb import Documents, EmbeddingFunction, Embeddings
from dotenv import load_dotenv, find_dotenv
from jinja2 import Environment, meta
from psycopg2 import connect
from rich import print as rprint
from rich.console import Console
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers.utils import get_json_schema

In [16]:
# load environment variables
load_dotenv(find_dotenv())
console = Console()

# Local models
local_models = {
    "llama-mini": "meta-llama/Llama-3.2-1B-Instruct",
    "llama": "meta-llama/Llama-3.2-3B-Instruct",
    "qwen-mini": "Qwen/Qwen2.5-3B-Instruct",
    "qwen": "Qwen/Qwen2.5-7B-Instruct",
    "gemma-mini": "google/gemma-2-2b-it",
    "gemma": "google/gemma-2-9b-it",
    "phi-mini": "microsoft/Phi-4-mini-instruct",
    "phi": "microsoft/Phi-4-multimodal-instruct",
}


In [28]:
# Let's loop through the models and see their chat templates
for model_name, model_id in local_models.items():
    if 'mini' in model_name:
        continue
    console.print(f"Model: {model_name}", style="bold green")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    print(tokenizer.get_chat_template())
    console.print("*"*50, style="bold black")
    #console.print(tokenizer.get_chat_template(), style="Red on White")

{{- bos_token }}
{%- if custom_tools is defined %}
    {%- set tools = custom_tools %}
{%- endif %}
{%- if not tools_in_user_message is defined %}
    {%- set tools_in_user_message = true %}
{%- endif %}
{%- if not date_string is defined %}
    {%- if strftime_now is defined %}
        {%- set date_string = strftime_now("%d %b %Y") %}
    {%- else %}
        {%- set date_string = "26 Jul 2024" %}
    {%- endif %}
{%- endif %}
{%- if not tools is defined %}
    {%- set tools = none %}
{%- endif %}

{#- This block extracts the system message, so we can slot it into the right place. #}
{%- if messages[0]['role'] == 'system' %}
    {%- set system_message = messages[0]['content']|trim %}
    {%- set messages = messages[1:] %}
{%- else %}
    {%- set system_message = "" %}
{%- endif %}

{#- System message #}
{{- "<|start_header_id|>system<|end_header_id|>\n\n" }}
{%- if tools is not none %}
    {{- "Environment: ipython\n" }}
{%- endif %}
{{- "Cutting Knowledge Date: December 2023\n" }}
{{- 

{%- if tools %}
    {{- '<|im_start|>system\n' }}
    {%- if messages[0]['role'] == 'system' %}
        {{- messages[0]['content'] }}
    {%- else %}
        {{- 'You are Qwen, created by Alibaba Cloud. You are a helpful assistant.' }}
    {%- endif %}
    {{- "\n\n# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>" }}
    {%- for tool in tools %}
        {{- "\n" }}
        {{- tool | tojson }}
    {%- endfor %}
    {{- "\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call><|im_end|>\n" }}
{%- else %}
    {%- if messages[0]['role'] == 'system' %}
        {{- '<|im_start|>system\n' + messages[0]['content'] + '<|im_end|>\n' }}
    {%- else %}
        {{- '<|im_start|>system\nYou are Qwen, created by Alibaba C

{{ bos_token }}{% if messages[0]['role'] == 'system' %}{{ raise_exception('System role not supported') }}{% endif %}{% for message in messages %}{% if (message['role'] == 'user') != (loop.index0 % 2 == 0) %}{{ raise_exception('Conversation roles must alternate user/assistant/user/assistant/...') }}{% endif %}{% if (message['role'] == 'assistant') %}{% set role = 'model' %}{% else %}{% set role = message['role'] %}{% endif %}{{ '<start_of_turn>' + role + '
' + message['content'] | trim + '<end_of_turn>
' }}{% endfor %}{% if add_generation_prompt %}{{'<start_of_turn>model
'}}{% endif %}


{% for message in messages %}{% if message['role'] == 'system' and 'tools' in message and message['tools'] is not none %}{{ '<|' + message['role'] + '|>' + message['content'] + '<|tool|>' + message['tools'] + '<|/tool|>' + '<|end|>' }}{% else %}{{ '<|' + message['role'] + '|>' + message['content'] + '<|end|>' }}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ '<|assistant|>' }}{% else %}{{ eos_token }}{% endif %}


In [29]:
# Creating dummy functions for testing

def current_time():
    """Get the current local time as a string."""
    return str(datetime.now())

def multiply(a: float, b: float):
    """
    A function that multiplies two numbers
    
    Args:
        a: The first number to multiply
        b: The second number to multiply
    """
    return a * b


tools = [current_time, multiply]

messages = [
    {"role": "system", "content": "Your name is Iida, You are a helpful assistant."},
    {"role": "user", "content": "Tell me something about large language models."},
    {"role": "assistant", "content": "Large language models are powerful models that can generate human-like text."},
    {"role": "user", "content": "Can you show me an example of a large language model?"},
]

fallback_messages = [
   {"role": "user", "content": "Hello, how are you?"},
   {"role": "assistant", "content": "I'm doing great. How can I help you today?"},
   {"role": "user", "content": "I'd like to show off how chat templating works!"},
]

In [None]:
# Loop through the local models and see the templates
for model_name, model_id in local_models.items():
    if "mini" in model_name:
        continue
    # Print memory usage
    print(f"Memory usage: {psutil.virtual_memory().percent}%")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    console.print(f"Model: {model_name}", style="red on white")
    try:
        text = tokenizer.apply_chat_template(
            fallback_messages, tools=tools, tokenize=False, add_generation_prompt=True
        )
    except Exception as e:
        print(f"Error: {e}")
        text = tokenizer.apply_chat_template(
            fallback_messages, tools=tools, tokenize=False, add_generation_prompt=True
        )
    # Delete the tokenizer to free up memory
    del tokenizer
    console.print(text, style="italic magenta on yellow")
    print("\n\n")

Memory usage: 82.8%





Memory usage: 82.6%





Memory usage: 82.5%





Memory usage: 82.6%







### Base classes

In [None]:
# Creating a base class for the models, since we will be experimenting with different models which have different requirements
class BaseLLM(abc.ABC):
    """
    Abstract base class for LLM models, defining common functionality.
    """

    def __init__(
        self, model: str, max_history: int = 5, system_prompt: str = "", **kwargs
    ):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.system_prompt = system_prompt
        self.max_history = max_history
        self.history: List[tuple] = []
        self.chat_history: List[Dict[str, str]] = []

        # Load model and tokenizer
        self.tokenizer = None
        self.model = None
        self.rag_prompt:str = None
        self.load_model_and_tokenizer(model, **kwargs)
        self.load_rag_prompt()

    def load_model_and_tokenizer(self, model: str, **kwargs) -> None:
        """
        Loads the tokenizer and model.
        """
        self.logger.info("Initializing tokenizer and model...")
        self.tokenizer = AutoTokenizer.from_pretrained(
            model, torch_dtype=torch.bfloat16, **kwargs
        )
        self.model = AutoModelForCausalLM.from_pretrained(
            model, torch_dtype=torch.bfloat16, **kwargs
        )
        self.model.to(self.device)

        self.logger.info("Loaded model: %s", model)
        self.logger.info("Model type: %s", type(self.model).__name__)
        self.logger.info("Number of parameters: %s", self.model.num_parameters())
        self.logger.info("Device: %s", self.device.type)

    def get_token_count(self, text: str) -> int:
        """
        Gets the token count of the given text.
        """
        return len(self.tokenizer(text)["input_ids"])

    def trim_conversation(self, conversation_history, token_limit) -> List:
        """
        Trims the conversation history to fit within the given token limit.
        """
        total_tokens = 0
        tokenized_history = []

        if not conversation_history:
            return []

        for user, assistant in conversation_history:
            user_tokens = self.get_token_count(user)
            assistant_tokens = self.get_token_count(assistant)
            total_tokens += user_tokens + assistant_tokens
            tokenized_history.append((user, assistant, user_tokens + assistant_tokens))

        while total_tokens > token_limit and tokenized_history:
            removed_entry = tokenized_history.pop(0)
            total_tokens -= removed_entry[2]

        return [(entry[0], entry[1]) for entry in tokenized_history]

    def clear_history(self) -> None:
        """Clears the stored conversation history."""
        self.history = []

    def add_to_history(self, user_input, model_response) -> None:
        """Adds an interaction to history and maintains max history size."""
        self.history.append((user_input, model_response))
        if len(self.history) > self.max_history:
            self.history.pop(0)

    def generate_text(
        self,
        prompt: str,
        max_new_tokens: int = 120,
        skip_special_tokens: bool = False,
        **kwargs,
    ) -> str:
        """
        Generates text based on the given prompt.

        Parameters:
        ----------
        prompt : str
            The prompt text to generate text from.
        max_new_tokens : int, optional
            The maximum length of the generated text (default is 120).
        skip_special_tokens : bool, optional
            Flag to indicate if special tokens should be skipped (default is False).

        Returns:
        -------
        str
            The generated text.
        """

        self.logger.info("Generating response for prompt: %s", prompt)
        try:
            with torch.inference_mode():
                inputs = self.tokenizer(prompt, return_tensors="pt")
                inputs = {k: v.to(self.device) for k, v in inputs.items()}
                self.logger.debug("Tokenized inputs: %s", inputs)

                _start_time = time.time()
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_new_tokens,
                    pad_token_id=self.tokenizer.eos_token_id,
                    **kwargs,
                )
                _end_time = time.time()
                self.logger.debug("Time taken: %.2f seconds", _end_time - _start_time)

        except Exception as e:
            self.logger.error("Error generating response: %s", e)
            return "Error generating response"

        decoded_output = self.tokenizer.decode(
            outputs[0], skip_special_tokens=skip_special_tokens
        )
        self.logger.debug("Generated response: %s", decoded_output)

        # Store conversation in history
        self.add_to_history(prompt, decoded_output)

        return decoded_output

    @abc.abstractmethod
    def chat(
        self, prompt: str, clear_session: bool = False, **kwargs
    ) -> Dict[str, Any]:
        """
        Abstract method for chatting with the model.
        """
        pass

    @abc.abstractmethod
    def format_prompt(
        self,
        prompt: str,
        system_prompt: str = None,
        prompt_template: str = None,
        tools_schema: str = None,
        documents: List[Dict] = None,
    ) -> str:
        """
        Abstract method for formatting the chat prompt.
        """
        pass

    def __call__(self, prompt: str, **kwargs) -> str:
        """
        Enables direct inference by calling the model instance.
        """
        return self.generate_response(prompt, **kwargs)

    def __repr__(self):
        """
        Official string representation for debugging.
        """
        return f"{self.__class__.__name__}(model={self.model.name_or_path!r}, device={self.device})"

    def __str__(self):
        """
        User-friendly string representation.
        """
        return f"{self.__class__.__name__} running on {self.device.type}, max history: {self.max_history}"

    def __len__(self):
        """
        Returns the number of stored conversation history entries.
        """
        return len(self.history)

    def __getitem__(self, index):
        """
        Retrieves conversation history entries like an array.
        """
        return self.history[index]
    
    def load_rag_prompt(self):
        """
        Loads the RAG prompt from the model.
        """
        # Check for env variable
        if "RAG_PROMPT" in os.environ:
            self.rag_prompt = os.environ["RAG_PROMPT"]
            self.logger.info("Loaded RAG prompt from environment variable.")
        else:
            self.rag_prompt = self.rag_prompt = """You are an advanced AI assistant with expertise in retrieving and synthesizing information from provided references. Your role is to analyze the given documents and accurately answer the question based on their content.

## Context:
You will be provided with multiple documents, each containing relevant information. Each document is referenced with a unique identifier. Your response should be derived strictly from the given documents while maintaining clarity and conciseness. If the documents do not contain sufficient information, indicate that explicitly.

## Documents:
{documents}  # Placeholder for dynamically inserting documents along with their references.

## Instructions:
1. **Extract information** only from the provided documents.
2. **Cite references** where applicable by mentioning the document identifier.
3. **Maintain coherence** while summarizing details from multiple sources.
4. **Avoid speculation** or adding external knowledge.
5. **If unclear**, state that the answer is not available in the provided documents.

## User's Question:
{question}  # Placeholder for dynamically inserting the user's query.

## Expected Output:
- A **concise and accurate** response based on the referenced documents.
- **Citations** to the corresponding documents where relevant.
- A disclaimer if the answer cannot be found within the given context.
"""
            self.logger.info("Loaded default RAG prompt.")

## Llama

In [None]:
class LocalLLM(BaseLLM):
    """
    A class to represent a DOTLLM model for text generation.

    Attributes:
    ----------
    model : str
        The model name or path.
    max_history : int, optional
        The maximum number of history entries to keep (default is 5).
    local_files_only : bool, optional
        Flag to indicate if the model is local or remote (default is False).
    tokenizer : AutoTokenizer
        The tokenizer for the model.
    model : AutoModelForCausalLM
        The model for causal language modeling.
    history : list
        The history of text inputs.
    """

    def __init__(
        self,
        model: str,
        max_history: int = 100,
        prompt_template: str = None,
        system_prompt: str = None,
        **kwargs
    ):
        """
        Constructs all the necessary attributes for the DOTLLM object.

        Parameters:
        ----------
        model : str
            The model name or path.
        max_history : int, optional
            The maximum number of history entries to keep (default is 100).
        prompt_template : str, optional
            The template for the prompt (default is None).
            Expects the prompt template to be a string with atleast the following placeholders:
                {system_prompt}, {user_prompt}.
            example:
                "<|begin_of_text|>
                <|start_header_id|>system<|end_header_id|>{system_prompt}<|eot_id|>
                <|start_header_id|>user<|end_header_id|>{user_prompt}<|eot_id|>
                <|start_header_id|>assistant<|end_header_id|>"
        system_prompt : str, optional
            The system prompt text (default is "You are a helpful AI assistant").
            Note: This is only used if prompt_template is provided.
        kwargs : dict,
            Additional keyword arguments for the model and tokenizer.
        """
        super().__init__(model, max_history, system_prompt, **kwargs)
        self.prompt_template = prompt_template
        self.logger.debug("Default role of the AI assistant: %s", system_prompt)
        self.tool_calling_prompt = """You are an expert in composing functions. You are given a question and a set of possible functions. 
Based on the question, you will need to make one or more function/tool calls to achieve the purpose. 
If none of the function can be used, point it out. If the given question lacks the parameters required by the function,
also point it out. You should only return the function call in tools call sections.

If you decide to invoke any of the function(s), you MUST put it in the format of [func_name1(params_name1=params_value1, params_name2=params_value2...), func_name2(params)]\n
You SHOULD NOT include any other text in the response.

Here is a list of functions in JSON format that you can invoke.\n\n{functions}\n"""

    def format_prompt(
        self,
        prompt: str,
        system_prompt: str = None,
        prompt_template: str = None,
        tools_schema: str = None,
        documents: List[Dict] = None,
    ) -> str:
        """
        Formats the prompt using the prompt template.
        """

        system_prompt = system_prompt or self.system_prompt

        if tools_schema:
            self.logger.debug("Formatting prompt with tool schema")
            formatted_prompt = self.tool_calling_prompt.format(functions=tools_schema)
            system_prompt = formatted_prompt

        if prompt_template:
            self.logger.debug("Formatting prompt with template")
            formatted_prompt = prompt_template.format(
                system_prompt=system_prompt, user_prompt=prompt
            )
            input_prompt = formatted_prompt
        else:
            input_prompt = prompt

        return input_prompt

    def generate_response(
        self,
        prompt: str,
        system_prompt: str = None,
        max_new_tokens: int = 120,
        skip_special_tokens: bool = False,
        formatted_prompt: bool = False,
        **kwargs
    ) -> str:
        """
        Generates text based on the given prompt.

        Parameters:
        ----------
        prompt : str
            The prompt text to generate text from.
        max_new_tokens : int, optional
            The maximum length of the generated text (default is 120).
        skip_special_tokens : bool, optional
            Flag to indicate if special tokens should be skipped (default is False).
        system_prompt : str, optional
            The system prompt text (default is None).
            If it's given, it will be used instead of the default system prompt.
        formatted_prompt : bool, optional
            Flag to indicate if the prompt is already formatted (default is False).
            If True, the prompt will be used as is without formatting.

        Returns:
        -------
        str
            The generated text.
        """
        system_prompt = system_prompt or self.system_prompt

        if self.prompt_template and not formatted_prompt:
            self.logger.debug("Formatting prompt with template")
            formatted_prompt = self.prompt_template.format(
                system_prompt=system_prompt, user_prompt=prompt
            )
            input_prompt = formatted_prompt
        else:
            input_prompt = prompt

        model_response = self.generate_text(
            input_prompt,
            max_new_tokens=max_new_tokens,
            skip_special_tokens=skip_special_tokens,
            **kwargs,
        )
        return model_response