<a href="https://colab.research.google.com/github/IvanMadman/Gemma7b-llama3.8b-gemma2.9b.colab/blob/main/LLM_Gemma7b_Gemma2_9b_Llama3_8b_IT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**INFORMATION!**

This is a very simple notebook created to test in a colab hosted environment 3 of the most famous LLMs in their small Instruct version, it's a very basic implementation but it's CUDA enabled and should be fast enough for our needs.
It's using llama cpp python and a gradio UI.

Be aware that i'm using the standard high level API for testing purposes (using "prompt" input instead of "messages[]") but the chat_completions method it's the suggested one.

**HOW IT WORKS**

**Before running anything, make sure your runtime it's GPU enabled or CUDA installation will fail!!**

Just run the first cell and let it install everything, when it's done you can run the second cell and after 2-3 minutes you should have your gladio link to access the UI. You can choose which one of the models you wanna use and click on "load model", it will automatically download it (you can look at the running cell to see the estimated time left). When you will want to change the model it will check for storage space and delete the other models if it's needed, unload the current model to free up memory, download and load the new one.

In [None]:
!pip install llama-cpp-python \
  --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu122
!pip install torch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 --index-url https://download.pytorch.org/whl/cu121
!pip install gradio huggingface_hub

In [None]:
import gradio as gr
from llama_cpp import Llama
import os
from huggingface_hub import hf_hub_download
from tqdm.auto import tqdm
import torch
import shutil
import gc

# Updated dictionary of model information
model_info = {
    "Gemma 7B": {
        "repo_id": "mlabonne/gemma-7b-it-GGUF",
        "filename": "gemma-7b-it.Q5_K_M.gguf",
        "settings": {
            "n_ctx": 8192,
            "n_batch": 1024,
            "rope_scaling_type": 1,
            "rope_freq_base": 10000,
            "rope_freq_scale": 1.0,
        }
    },
    "Llama 3 8B Instruct": {
        "repo_id": "bartowski/Llama-3-Instruct-8B-SPPO-Iter3-GGUF",
        "filename": "Llama-3-Instruct-8B-SPPO-Iter3-Q6_K.gguf",
        "settings": {
            "n_ctx": 4096,
            "n_batch": 512,
            "rope_scaling_type": 0,
            "rope_freq_base": 10000,
            "rope_freq_scale": 1.0,
        }
    },
    "Gemma 2 9B": {
        "repo_id": "bartowski/gemma-2-9b-it-GGUF",
        "filename": "gemma-2-9b-it-Q5_K_M.gguf",
        "settings": {
            "n_ctx": 8192,
            "n_batch": 512,
            "rope_scaling_type": 1,
            "rope_freq_base": 10000,
            "rope_freq_scale": 1.0,
        }
    }
}

# Function to download models (unchanged)
def download_model(repo_id, filename):
    try:
        model_path = hf_hub_download(repo_id=repo_id, filename=filename, resume_download=True)
        print(f"Download completed: {model_path}")
        return model_path
    except Exception as e:
        print(f"Error downloading model: {e}")
        return None

# Global variable to store the current model
current_model = None
system_prompt = None
# Function to check available disk space (unchanged)
def get_free_space(path):
    stat = shutil.disk_usage(path)
    return stat.free // (2**30)  # Convert bytes to GB

# Function to clear the downloaded model folder
def clear_model_folder():
    cache_dir = os.path.expanduser("~/.cache/huggingface/hub")
    if os.path.exists(cache_dir):
        shutil.rmtree(cache_dir)
        print("Model folder cleared.")
    else:
        print("Model folder not found.")

# Function to unload the current model and clear GPU memory
def unload_model():
    global current_model
    if current_model is not None:
        del current_model
        current_model = None
    torch.cuda.empty_cache()
    gc.collect()
    print("Model unloaded and GPU memory cleared.")

# Function to load a model
def load_model(model_name):
    global current_model

    # Unload the current model if one is loaded
    unload_model()

    model_info_dict = model_info.get(model_name)
    if not model_info_dict:
        return f"Model {model_name} not found in the model_info dictionary."

    repo_id = model_info_dict['repo_id']
    filename = model_info_dict['filename']
    settings = model_info_dict['settings']

    # Check available space
    free_space = get_free_space(".")
    if free_space < 10:  # Assuming we need at least 10GB free space
        user_response = input(f"Low disk space ({free_space}GB available). Clear model folder? (y/n): ")
        if user_response.lower() == 'y':
            clear_model_folder()
        else:
            return "Operation cancelled due to low disk space."

    print(f"Downloading {model_name}...")
    model_path = download_model(repo_id, filename)

    if model_path:
        print(f"Loading {model_name}...")
        try:
            current_model = Llama(
                model_path=model_path,
                n_gpu_layers=-1,  # Use all available GPU layers
                n_ctx=settings['n_ctx'],
                max_tokens=settings['n_ctx'],
                offload_kqv=True,
                f16_kv=True,
                use_mlock=False,
                use_mmap=True,
                embedding_mode="llama",
                n_threads=os.cpu_count(),
                n_batch=settings['n_batch'],
                use_parallel_residual=True,
                verbose=True,
                tensor_split=None,
                rope_scaling_type=settings['rope_scaling_type'],
                rope_freq_base=settings['rope_freq_base'],
                rope_freq_scale=settings['rope_freq_scale']
            )
            print(f"{model_name} loaded successfully!")
            return f"{model_name} loaded successfully with model-specific optimizations!"
        except Exception as e:
            print(f"Error loading model: {e}")
            return f"Failed to load {model_name}. Error: {str(e)}"
    else:
        return f"Failed to download {model_name}."



# Updated function to generate a response
def generate_response(message, history, max_tokens, model_name):
    print(history)
    current_model.reset()
    global system_prompt
    # Start with an empty prompt
    prompt=""
    if current_model is None:
        return "", history + [("You", message), ("Assistant", "Please select and load a model first.")]

    # Prepare the prompt based on the selected model
    if model_name in ["Gemma 7B", "Gemma 2 9B"]:
        for role, content in history:
            prompt += f"<start_of_turn>{role.lower()}\n{content}\n<end_of_turn>\n"
        prompt += "<start_of_turn>user\n" + message + "\n<end_of_turn>\n<start_of_turn>model\n"
        stop = ["<end_of_turn>", "<start_of_turn>"]
        print(prompt)

    else:  # Llama 3 8B Instruct
        if not history:
            system_prompt = "You are a helpful AI assistant. Answer the user's questions to the best of your ability."
            prompt += f"<|start_header_id|>system<|end_header_id|>\n\n{system_prompt}<|eot_id|>\n\n"
        for role, content in history:
            prompt += f"<|start_header_id|>{role}<|end_header_id|>\n\n{content}<|eot_id|>\n\n"
        prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message}<|eot_id|>\n\n"
        prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n"
        print(prompt)
        stop = ["<|eot_id|>", "<|start_header_id|>"]


    # Generate response
    try:
        response = current_model(
            prompt,
            max_tokens=max_tokens,
            stop=stop,
            echo=False,
            temperature=0.7,
            top_p=0.9,
            repeat_penalty=1.1,
            top_k=40
        )
        torch.cuda.empty_cache()
        print(response)
        ai_message = response['choices'][0]['text'].strip()
        if model_name in ["Gemma 7B", "Gemma 2 9B"]:
            return "", history + [("user", message), ("model", ai_message)]
        else:
            return "", history + [("user", message), ("assistant", ai_message)]
    except Exception as e:
        error_message = f"An error occurred while generating the response: {str(e)}"
        return error_message, history + [("user", message), ("assistant", error_message)]

# Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# LLM Chat Interface (GPU Accelerated)")

    with gr.Row():
        model_dropdown = gr.Dropdown(choices=list(model_info.keys()), label="Select Model")
        load_button = gr.Button("Load Model")

    with gr.Row():
        max_tokens_slider = gr.Slider(10, 8192, step=10, value=512, label="Max Tokens")

    chatbot = gr.Chatbot(render_markdown=True)
    msg = gr.Textbox(label="Your message")
    clear = gr.Button("Clear Chat")

    load_status = gr.Textbox(label="Model Status")

    def load_model_wrapper(model_name):
        return load_model(model_name)

    load_button.click(load_model_wrapper, inputs=[model_dropdown], outputs=[load_status])
    msg.submit(generate_response, inputs=[msg, chatbot, max_tokens_slider, model_dropdown], outputs=[msg, chatbot])
    clear.click(lambda: None, None, chatbot, queue=False)

# Check for CUDA availability
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

# Launch the interface
demo.queue().launch(share=True, debug=True, inline=False)