<a href="https://www.kaggle.com/code/kuchiriel/llm-study-fast-inference-dynamic-hardware-and-llms?scriptVersionId=159668091" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
! pip install einops
! pip install torch
! pip install torchvision
! pip install google-api-python-client
#! pip install torch-xla
! pip install attention-sinks
! pip install git+https://github.com/huggingface/transformers
! pip install bitsandbytes
! pip install accelerate
! pip install tensorflow
! pip install numpy
! pip install SciPy
! pip install collections nltk re
# ! pip install ctransformers
# ! pip install ctransformers[cuda]
# ! pip install --upgrade scipy
# ! pip install datasets
# ! pip install gradio

In [None]:
#! pip install einops
#! pip install torch
#! pip install torchvision
#! pip install google-api-python-client
#! pip install attention-sinks
#! pip install git+https://github.com/huggingface/transformers
#! pip install bitsandbytes
#! pip install accelerate
#! pip install tensorflow
#! pip install numpy
#! pip install SciPy
#! pip install collections nltk re
##! pip install torch-xla
## ! pip install ctransformers
## ! pip install ctransformers[cuda]
## ! pip install --upgrade scipy
## ! pip install datasets
## ! pip install gradio

# Import Garbage Collector module
import gc

# Import os module
import os

import re

# Import the AutoTokenizer class from the attention_sinks module
# TL;DR: attention_sinks adapts pre-trained LLMs to use a modified
# form of sliding window attention that remains able to produce fluent text indefinitely.
# https://github.com/tomaarsen/attention_sinks
from attention_sinks import AutoModelForCausalLM, AutoTokenizer

# Import the GenerationConfig and TextStreamer class from the transformers module
from transformers import GenerationConfig, TextStreamer

# Import the Accelerator class from the accelerate module
from accelerate import Accelerator

# Import the torch library
import torch

# Import the nn module from the torch library
#from torch import nn

# Import tensorflow module
import tensorflow as tf

import nltk

from nltk.corpus import stopwords

# import torch_xla.core.xla_model as xm

In [None]:
nltk.download("stopwords")

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:50"

accelerator = Accelerator()
CONTENT = "You are an assistant that follow user instructions precisely"
TOKENIZER_MODEL_MAX_LENGHT = 256
SAVE_DIR = "~/.cache/huggingface/transformers"


def test_is_tpu_available():
    """
    Checks if a TPU device is available.

    Returns:
        bool: True if a TPU device is available, False otherwise.
    """
    devices = tf.config.list_logical_devices()
    for device in devices:
        if device.device_type == "TPU":
            return True
    return False  # Add a return statement here to handle the case when no TPU device is found


if torch.cuda.is_available():
    DEVICE = "cuda"
    print("GPU Available:", torch.cuda.get_device_name(torch.cuda.current_device()))
elif test_is_tpu_available():
    try:
        # Attempt to initialize the TPU
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        print(f"Running on TPU: {tpu.cluster_spec().as_dict()['worker']}")
        strategy = tf.distribute.experimental.TPUStrategy(tpu)
        DEVICE = "tpu"
    except tf.errors.AlreadyExistsError:
        print("TPU already initialized")
    except tf.errors.FailedPreconditionError as e:
        print(f"Failed to initialize TPU: {e}")
        DEVICE = "cpu"
else:
    DEVICE = "cpu"
print("Device:", DEVICE)

if DEVICE in ["cuda", "tpu"]:
    MODEL = "HuggingFaceH4/zephyr-7b-beta"
else:
    MODEL = "microsoft/phi-2"
if DEVICE == "cuda" and torch.cuda.is_bf16_supported():
    DTYPE = torch.bfloat16
elif DEVICE == "cuda" and not torch.cuda.is_bf16_supported():
    DTYPE = torch.float16
else:
    DTYPE = torch.float32
print(DTYPE)


def clear_all():
    """
    Clears all the global variables and optionally clears the local variables as well.

    Parameters:
        None

    Returns:
        None
    """
    flush()
    # Optionally, clear the locals too
    locals_to_remove = [var for var in locals() if var[0] != "_"]
    for var in locals_to_remove:
        del locals()[var]
    gc.collect()

    # Get a list of all global variables
    globals_to_remove = [var for var in globals() if var[0] != "_"]
    for var in globals_to_remove:
        del globals()[var]


def flush():
    """
    Flushes the memory by freeing up the allocated memory. If the device is set to "cuda" or "tpu",
    it also clears the GPU memory and resets the peak memory stats.

    Parameters:
        None

    Returns:
        None
    """
    accelerator.free_memory()
    if DEVICE in "cuda" or DEVICE in "tpu":
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()


def print_memory_usage():
    """
    Print the memory usage of the current CUDA device.

    Parameters:
        None

    Returns:
        None
    """
    if DEVICE == "cuda":
        allocated = torch.cuda.memory_allocated() / 1e9
        max_allocated = torch.cuda.max_memory_allocated() / 1e9
        print(f"Memory Allocated: {allocated} GB, Max Allocated: {max_allocated} GB")
        flush()


def check_if_quantized(model):
    """
    Check if any parameter in the given model is quantized.

    Parameters:
        model (torch.nn.Module): The model to check.

    Returns:
        bool: True if any parameter in the model is quantized, False otherwise.
    """
    for name, param in model.named_parameters():
        if "quantized" in str(param.dtype):
            print(f"Layer {name} is quantized with data type: {param.dtype}")
            return True
    return False


def setup_model_and_tokenizer():
    """

    Returns:
        model (AutoModelForCausalLM): The pretrained language model for text generation.
        tokenizer (AutoTokenizer): The tokenizer for the language model.
        generation_config (GenerationConfig): The configuration for text generation.
    """
    torch.set_grad_enabled(False)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL,
        torch_dtype=DTYPE,
        attention_sink_size=4,
        attention_sink_window_size=252,
        trust_remote_code=True if DEVICE in "cpu" else False,
    )

    tokenizer = AutoTokenizer.from_pretrained(MODEL, use_fast=True)

    model.config.update(
        {
            "load_in_4bit": check_if_quantized(model),
            "pad_token_id": tokenizer.pad_token_id,
            "eos_token_id": tokenizer.eos_token_id,
            "bos_token_id": tokenizer.bos_token_id,
            "unk_token_id": tokenizer.unk_token_id,
            "sep_token_id": tokenizer.sep_token_id,
            "cls_token_id": tokenizer.cls_token_id,
            "mask_token_id": tokenizer.mask_token_id,
        }
    )

    # if torch.cuda.device_count() > 1:
    #     print(
    #         f"{torch.cuda.device_count()} GPUs detected, initializing Data Parallel..."
    #     )
    #     model = nn.DataParallel(model)
    #     model.to(DTYPE)

    #     underlying_model = (
    #         model.module if isinstance(model, torch.nn.DataParallel) else model
    #     )

    #     if underlying_model.config.vocab_size != len(tokenizer):
    #         underlying_model.resize_token_embeddings(len(tokenizer))

    #     model = underlying_model

    # elif torch.cuda.device_count() < 1:
    #     model = torch.quantization.quantize_dynamic(
    #         model, {torch.nn.Linear}, dtype=DTYPE
    #     )

    generation_config = GenerationConfig(
        min_length=25,
        max_length=250,
        bnb_4bit_compute_dtype=DTYPE,
        penalty_alpha=0.6,
        repetition_penalty=1.1,
        top_k=20,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
        bos_token_id=tokenizer.bos_token_id,
        unk_token_id=tokenizer.unk_token_id,
        sep_token_id=tokenizer.sep_token_id,
        cls_token_id=tokenizer.cls_token_id,
    )

    if DEVICE == "cuda" and not check_if_quantized(model):
        model = model.half()

    model = accelerator.prepare(model)

    model.eval()

    print_memory_usage()

    return model, tokenizer, generation_config


model, tokenizer, generation_config = setup_model_and_tokenizer()


def determine_max_tokens(prompt, base_multiplier=5):
    """
    Dynamically determines the max_new_tokens for text generation based on the prompt.

    :param prompt: The user's input prompt.
    :param base_multiplier: Base multiplier to adjust the response length.
    :return: An integer representing the max_new_tokens value.
    """

    prompt_length = len(tokenizer.encode(prompt))

    # Adjust min and max tokens based on prompt length
    if prompt_length < 50:  # Short prompt
        min_tokens, max_tokens = 50, 500
    elif prompt_length < 100:  # Medium prompt
        min_tokens, max_tokens = 100, 750
    else:  # Long prompt
        min_tokens, max_tokens = 150, 1000

    # Further adjustments based on prompt type or complexity
    if is_complex(prompt):
        max_tokens += 200  # Allow longer responses for complex prompts

    if prompt.strip().endswith("?"):
        multiplier = base_multiplier * 1.5
    else:
        multiplier = base_multiplier

    # Calculate the maximum number of new tokens to generate
    calculated_max_tokens = int(prompt_length * multiplier)

    # Ensure the calculated max tokens is within the defined range
    max_tokens = max(min_tokens, min(calculated_max_tokens, max_tokens))

    return max_tokens, min_tokens


def is_complex(prompt):
    """
    Determine if a prompt is complex based on various linguistic features.

    :param prompt: A string representing the prompt to be evaluated.
    :return: A boolean value indicating whether the prompt is complex.
    """
    # Define thresholds for complexity
    word_count_threshold = 12
    unique_word_threshold = 10
    long_word_threshold = 7
    complex_sentence_threshold = 2

    # Tokenize the prompt into words
    words = prompt.split()

    # Check for word count
    if len(words) > word_count_threshold:
        return True

    # Check for unique word count
    unique_words = set(words)
    if len(unique_words) > unique_word_threshold:
        return True

    # Check for long words (indicative of advanced vocabulary)
    long_words = [word for word in words if len(word) >= long_word_threshold]
    if len(long_words) > long_word_threshold:
        return True

    # Check for complex sentence structure
    sentences = re.split(r"[.!?]+", prompt)
    complex_sentences = [
        sentence
        for sentence in sentences
        if len(sentence.split()) > word_count_threshold
    ]
    if len(complex_sentences) > complex_sentence_threshold:
        return True

    # Check for low-frequency or specialized words (excluding common stopwords)
    stop_words = set(stopwords.words("english"))
    non_stop_words = [word for word in unique_words if word.lower() not in stop_words]
    if len(non_stop_words) > unique_word_threshold:
        return True

    return False


def stream_text(user_prompt, model, tokenizer, generation_config):
    # If the model is wrapped by DataParallel, access the original model
    if isinstance(model, torch.nn.DataParallel):
        model = model.module
    """
    Generates a stream of text based on the user prompt.

    Parameters:
        user_prompt (str): The user prompt to generate text from.
        model: The language model used for text generation.
        tokenizer: The tokenizer used to encode the user prompt.
        generation_config: The configuration for text generation.

    Returns:
        str: The generated text based on the user prompt.
    """
    messages = [
        {
            "role": "system",
            "content": CONTENT,
        },
        {"role": "user", "content": user_prompt},
    ]
    user_prompt = tokenizer.apply_chat_template(
        messages, tokenize=False, return_tensors="pt"
    )
    input_ids = tokenizer.encode(user_prompt, return_tensors="pt").to(model.device)
    max_tokens, min_tokens = determine_max_tokens(user_prompt)

    streamer = TextStreamer(tokenizer)
    generated_tokens = model.generate(
        input_ids,
        min_length=min_tokens,
        max_length=max_tokens,
        generation_config=generation_config,
        streamer=streamer,
    )

    output_text = tokenizer.decode(generated_tokens[0], skip_special_tokens=True)
    return output_text


def call_llm():
    """
    Executes the call to the Language Learning Model (LLM).

    This function prompts the user to enter a prompt, which will be used as input
    to the LLM for text generation. The user can exit the function by typing
    'exit'.

    Parameters:
        None

    Returns:
        str: The user prompt entered by the user.

    Raises:
        Exception: If an error occurs during text generation.

    Examples:
        >>> call_llm()
        Enter your prompt (or type 'exit' to quit): What is the meaning of life?
        'What is the meaning of life?'
    """
    try:
        user_prompt = input("Enter your prompt (or type 'exit' to quit): ")
        # user_prompt = "Question: " + user_prompt + "\n\nAnswer:"
        # We use the tokenizer's chat template to format each message -
        # see https://huggingface.co/docs/transformers/main/en/chat_templatin
        if user_prompt.lower() == "exit":
            return "exit"
        if DEVICE == "cuda":
            with torch.inference_mode():
                stream_text(user_prompt, model, tokenizer, generation_config)
        else:
            stream_text(user_prompt, model, tokenizer, generation_config)
    except Exception as ex:
        print(f"An error occurred during text generation: {ex}")
    finally:
        if DEVICE == "cuda":
            print_memory_usage()
        else:
            accelerator.free_memory()
    return user_prompt


def loop_llm():
    """
    A function that continuously loops until a specific condition is met or the program is interrupted.
    This function calls the `call_llm()` function in a loop and checks the return value. If the return
    value is "exit", it performs some cleanup tasks including printing memory usage, deleting the `model`
    and `tokenizer` objects, and running garbage collection. Finally, it breaks out of the loop and
    terminates the function. If the program is interrupted by a `KeyboardInterrupt`, it prints a message
    and exits gracefully.

    Parameters:
        None

    Return:
        None
    """
    try:
        while True:
            if call_llm() == "exit":
                break
    except KeyboardInterrupt:
        print("\nExiting the program.")
        print_memory_usage()
        clear_all()


loop_llm()
