# tiny-llama-1b-chat Chatbot using OpenVino
### By- Tejasvee Dwivedi

## Setting Up Environment and Installing Required Packages

In [6]:
import os

os.environ["GIT_CLONE_PROTECTION_ACTIVE"] = "false"

# %pip install -Uq pip
# %pip uninstall -q -y optimum optimum-intel
# %pip install --pre -Uq openvino openvino-tokenizers[transformers] --extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly
# %pip install -q --extra-index-url https://download.pytorch.org/whl/cpu\
# "git+https://github.com/huggingface/optimum-intel.git"\
# "git+https://github.com/openvinotoolkit/nncf.git"\
# "torch>=2.1"\
# "datasets" \
# "accelerate"\
# "gradio>=4.19"\
# "onnx" "einops" "transformers_stream_generator" "tiktoken" "transformers>=4.38.1" "bitsandbytes"

## Configuring and Fetching the LLM Configuration File

In [9]:
from pathlib import Path
import requests
import shutil

# Define paths for the shared config file and destination config file
config_shared_path = Path("../../utils/llm_config.py")
config_dst_path = Path("llm_config.py")

def fetch_and_write_config(url, path):
    # Fetch the config file from the given URL and write it to the specified path
    response = requests.get(url)
    with open(path, "w", encoding='utf-8') as f:
        f.write(response.text)

if not config_dst_path.exists():
    if config_shared_path.exists():
        try:
            os.symlink(config_shared_path, config_dst_path)  # Try creating a symlink
        except OSError:
            shutil.copy(config_shared_path, config_dst_path)  # Copy if symlink fails
    else:
        fetch_and_write_config("https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/llm_config.py", config_dst_path)
elif not os.path.islink(config_dst_path):
    if config_shared_path.exists():
        shutil.copy(config_shared_path, config_dst_path)  # Copy shared config if destination is not a symlink
    else:
        fetch_and_write_config("https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/llm_config.py", config_dst_path)


## Importing SUPPORTED_LLM_MODELS and Selecting Model Language

In [12]:
from llm_config import SUPPORTED_LLM_MODELS
import ipywidgets as widgets

# Get the list of supported LLM models
model_languages = list(SUPPORTED_LLM_MODELS)

# Create a dropdown widget for selecting model language
model_language = widgets.Dropdown(
    options=model_languages,
    value=model_languages[0],  # Set default value
    description="Model Language:",
    disabled=False,
)

model_language

Dropdown(description='Model Language:', options=('English', 'Chinese', 'Japanese'), value='English')

## Creating a Dropdown Widget for Selecting Model_IDs

In [15]:
# Get the list of model IDs for the selected model language
model_ids = list(SUPPORTED_LLM_MODELS[model_language.value])

# Create a dropdown widget for selecting model ID
model_id = widgets.Dropdown(
    options=model_ids,
    value=model_ids[2],  # Set default value
    description="Model:",
    disabled=False,
)

model_id

Dropdown(description='Model:', index=2, options=('qwen2-0.5b-instruct', 'tiny-llama-1b-chat', 'qwen2-1.5b-inst…

In [17]:
model_configuration = SUPPORTED_LLM_MODELS[model_language.value][model_id.value]
print(f"Selected model:- {model_id.value}")

Selected model:- tiny-llama-1b-chat


## Creating Toggle Buttons for Model Preparation Options

In [20]:
from IPython.display import Markdown, display

# Toggle buttons for preparing INT4 and INT8 models
prepare_int4_model = widgets.ToggleButton(
    value=True,  # Initial state for INT4 model preparation
    description="Prepare INT4 model",
    disabled=False,
)
prepare_int8_model = widgets.ToggleButton(
    value=False,  # Initial state for INT8 model preparation
    description="Prepare INT8 model",
    disabled=False,
)

# Display the toggle buttons
display(prepare_int4_model)
display(prepare_int8_model)

ToggleButton(value=True, description='Prepare INT4 model')

ToggleButton(value=False, description='Prepare INT8 model')

## Generating and Executing Commands for Model Conversion to INT8 and INT4

In [23]:
# Determine the model ID from the configuration
pt_model_id = model_configuration["model_id"]

# Extract the model name from the selected model ID
pt_model_name = model_id.value.split("-")[0]

# Define directories for INT8 and INT4 compressed models
int8_model_dir = Path(pt_model_name) / "INT8_compressed_weights"
int4_model_dir = Path(pt_model_name) / "INT4_compressed_weights"

# Function to generate export command for OpenVINO
def generate_export_command(model_id, task, weight_format, output_dir, remote_code=False, additional_args=""):
    base_command = f"optimum-cli export openvino --model {model_id} --task {task} --weight-format {weight_format} {additional_args}"
    if remote_code:
        base_command += " --trust-remote-code"
    return f"{base_command} {output_dir}"

# Function to convert model to INT8 format
def convert_to_int8():
    if (int8_model_dir / "openvino_model.xml").exists():
        return
    int8_model_dir.mkdir(parents=True, exist_ok=True)
    export_command = generate_export_command(pt_model_id, "text-generation-with-past", "int8", int8_model_dir, model_configuration.get("remote_code", False))
    display(Markdown(f"**Export command:**\n\n`{export_command}`"))
    os.system(export_command)

# Function to convert model to INT4 format
def convert_to_int4():
    compression_configs = {
        "default": {"sym": False, "group_size": 128, "ratio": 0.8},
    }
    params = compression_configs.get(model_id.value, compression_configs["default"])
    if (int4_model_dir / "openvino_model.xml").exists():
        return
    additional_args = f"--group-size {params['group_size']} --ratio {params['ratio']}"
    if params["sym"]:
        additional_args += " --sym"
    export_command = generate_export_command(pt_model_id, "text-generation-with-past", "int4", int4_model_dir, model_configuration.get("remote_code", False), additional_args)
    display(Markdown(f"**Export command:**\n\n`{export_command}`"))
    os.system(export_command)

# Check if INT8 model preparation is requested
if prepare_int8_model.value:
    convert_to_int8()

# Check if INT4 model preparation is requested
if prepare_int4_model.value:
    convert_to_int4()

In [25]:
int8_weights = int8_model_dir / "openvino_model.bin"
int4_weights = int4_model_dir / "openvino_model.bin"

for precision, compressed_weights in zip([8, 4], [int8_weights, int4_weights]):
    if compressed_weights.exists():
        print(f"Size of model with INT{precision} compressed weights is {compressed_weights.stat().st_size / 1024 / 1024:.2f} MB")

Size of model with INT8 compressed weights is 1050.66 MB
Size of model with INT4 compressed weights is 696.19 MB


## Listing Available OpenVINO Devices and Creating a Dropdown Widget for Selection

In [28]:
import openvino as ov
core = ov.Core()
devices = core.available_devices

# Loop through available devices and print their names
for x in devices:
    device_name = core.get_property(x, "FULL_DEVICE_NAME")
    print(f"{x}: {device_name}")
    
import ipywidgets as widgets

# Create a dropdown widget for selecting device
device = widgets.Dropdown(
    options=core.available_devices,
    value=core.available_devices[0], 
    description="Device:",
    disabled=False,
)

device

CPU: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
GPU: Intel(R) Iris(R) Xe Graphics (iGPU)


Dropdown(description='Device:', options=('CPU', 'GPU'), value='CPU')

## Choice of Precision model

In [31]:
available_models = []

# Check if INT4 model directory exists and add to available models if true
if int4_model_dir.exists():
    available_models.append("INT4")

# Check if INT8 model directory exists and add to available models if true
if int8_model_dir.exists():
    available_models.append("INT8")
import ipywidgets as widgets

# Create a dropdown widget for selecting model to run
model_to_run = widgets.Dropdown(
    options=available_models,
    value=available_models[0],
    description="Model to run:",
    disabled=False,
)

model_to_run

Dropdown(description='Model to run:', options=('INT4', 'INT8'), value='INT4')

## Loading and Configuring OpenVINO Model for Language Modeling

In [34]:
import time
from transformers import AutoConfig, AutoTokenizer
from optimum.intel.openvino import OVModelForCausalLM

# Determine model directory based on selected model type (INT4 or INT8)
model_dir = int4_model_dir if model_to_run.value == "INT4" else int8_model_dir
print(f"Loading model from {model_dir}")

# OpenVINO configuration settings
ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""}
tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)

# Load OpenVINO model for causal language modeling
ov_model = OVModelForCausalLM.from_pretrained(
    model_dir,
    device=device.value,  # Specify device for inference
    ov_config=ov_config,  # OpenVINO configuration settings
    config=AutoConfig.from_pretrained(model_dir, trust_remote_code=True),  # Model configuration
    trust_remote_code=True,  # Trust remote code when loading
)

The argument `trust_remote_code` is to be used along with export=True. It will be ignored.


Loading model from tiny\INT4_compressed_weights


Compiling the model to CPU ...


## Generating Answer Using OpenVINO Model for Language Modeling

In [45]:
question = "What is photosynthesis?"
prompt = f"Question: {question}\nAnswer:"

# Measure the time taken for generation
start_time = time.time()
input_tokens = tokenizer(prompt, return_tensors="pt")
answer = ov_model.generate(input_ids=input_tokens['input_ids'], max_new_tokens=80, do_sample=True)
end_time = time.time()

# Decode the answer
generated_answer = tokenizer.decode(answer[0], skip_special_tokens=True)

print(generated_answer)
print(f"Time taken to generate the answer: {end_time - start_time:.2f} seconds")

Question: What is photosynthesis?
Answer: photosynthesis is a reaction in which plants and chlorophyll cells use sunlight to convert atmospheric carbon dioxide and water into organic compounds (glucose) as well as oxygen gas in the process. During photosynthesis, the plants use light energy to split an electron acceptor (ATP) and release a light-absorbing molecule
Time taken to generate the answer: 2.66 seconds


In [53]:
question = " 6 + 2 = "
prompt = f"Question: {question}\nAnswer:"

# Measure the time taken for generation
start_time = time.time()
input_tokens = tokenizer(prompt, return_tensors="pt")
answer = ov_model.generate(input_ids=input_tokens['input_ids'], max_new_tokens=10, do_sample=True)
end_time = time.time()

# Decode the answer
generated_answer = tokenizer.decode(answer[0], skip_special_tokens=True)

print(generated_answer)
print(f"Time taken to generate the answer: {end_time - start_time:.2f} seconds")

Question:  6 + 2 = 
Answer: 8
Time taken to generate the answer: 0.18 seconds


## Setting Up Conversational AI with OpenVINO Model Integration

In [56]:
import torch
from threading import Event, Thread
from uuid import uuid4
from typing import List, Tuple
import gradio as gr
from transformers import (
    AutoTokenizer,
    StoppingCriteria,
    StoppingCriteriaList,
    TextIteratorStreamer,
)

# Define constants and configuration
model_name = model_configuration["model_id"]
start_message = model_configuration["start_message"]
history_template = model_configuration.get("history_template")
current_message_template = model_configuration.get("current_message_template")
stop_tokens = model_configuration.get("stop_tokens")
tokenizer_kwargs = model_configuration.get("tokenizer_kwargs", {})

max_new_tokens = 180

# Define tokenizer
tok = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

# Define stop criteria
class StopOnTokens(StoppingCriteria):
    def __init__(self, token_ids):
        self.token_ids = token_ids

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        for stop_id in self.token_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

stop_tokens = [StopOnTokens(tok.convert_tokens_to_ids(stop_tokens))] if stop_tokens else None

# Define text processing function
def default_partial_text_processor(partial_text: str, new_text: str):
    partial_text += new_text
    return partial_text

text_processor = model_configuration.get("partial_text_processor", default_partial_text_processor)

# Define history conversion function
def convert_history_to_token(history: List[Tuple[str, str]]):
    if history_template is None:
        messages = [{"role": "system", "content": start_message}]
        for idx, (user_msg, model_msg) in enumerate(history):
            if idx == len(history) - 1 and not model_msg:
                messages.append({"role": "user", "content": user_msg})
                break
            if user_msg:
                messages.append({"role": "user", "content": user_msg})
            if model_msg:
                messages.append({"role": "assistant", "content": model_msg})
        input_token = tok.apply_chat_template(messages, add_generation_prompt=True, tokenize=True, return_tensors="pt")
    else:
        text = start_message + "".join([history_template.format(num=round, user=item[0], assistant=item[1]) for round, item in enumerate(history[:-1])])
        text += "".join([current_message_template.format(num=len(history) + 1, user=history[-1][0], assistant=history[-1][1])])
        input_token = tok(text, return_tensors="pt", **tokenizer_kwargs).input_ids
    return input_token

# Define user callback function
def user(message, history):
    return "", history + [[message, ""]]

# Define bot callback function
def bot(history, temperature, top_p, top_k, repetition_penalty, conversation_id):
    input_ids = convert_history_to_token(history)
    streamer = TextIteratorStreamer(tok, timeout=30.0, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        input_ids=input_ids,
        max_new_tokens=max_new_tokens,
        temperature=temperature,
        do_sample=temperature > 0.0,
        top_p=top_p,
        top_k=top_k,
        repetition_penalty=repetition_penalty,
        streamer=streamer,
    )
    if stop_tokens:
        generate_kwargs["stopping_criteria"] = StoppingCriteriaList(stop_tokens)

    stream_complete = Event()

    def generate_and_signal_complete():
        ov_model.generate(**generate_kwargs)
        stream_complete.set()

    t1 = Thread(target=generate_and_signal_complete)
    t1.start()

    partial_text = ""
    for new_text in streamer:
        partial_text = text_processor(partial_text, new_text)
        history[-1][1] = partial_text
        yield history

def request_cancel():
    ov_model.request.cancel()

def get_uuid():
    return str(uuid4())
get_uuid()

'cf57ea9f-ceda-4284-8538-d8b2d6783fe0'

## Creating Gradio Interface for Chatbot

In [59]:
#Gradio interface
with gr.Blocks(
    theme='HaleyCH/HaleyCH_Theme',
    css=".gradio-container {background: linear-gradient(to bottom, #140225, #000000);",
) as demo:
    conversation_id = gr.State(get_uuid)
    gr.Markdown(f"""<h1><center>{model_id.value} Chatbot using OpenVino</center></h1>""")
    gr.Markdown(f"""<h2><center>Made by:- Tejasvee Dwivedi</center></h2>""")
    gr.Markdown(f"""<h5><center><a href="mailto:tejasvee.dwivedi@learner.manipal.edu">tejasvee.dwivedi@learner.manipal.edu</a></center></h5>""")
    gr.Markdown(f"""<h5><center>Manipal Institute of Technology</center></h5>""")
    chatbot = gr.Chatbot(height=450)
    with gr.Row():
        with gr.Column():
            msg = gr.Textbox(
                label="Chat Message Box",
                placeholder="Chat Message Box",
                show_label=False,
                container=False,
            )
        with gr.Column():
            with gr.Row():
                submit = gr.Button("Submit")
                stop = gr.Button("Stop")
                clear = gr.Button("Clear")
    with gr.Row():
        with gr.Accordion("Advanced Options:", open=False):
            with gr.Row():
                with gr.Column():
                    with gr.Row():
                        temperature = gr.Slider(
                            label="Temperature",
                            value=0.1,
                            minimum=0.0,
                            maximum=1.0,
                            step=0.1,
                            interactive=True,
                            info="Higher values produce more diverse outputs",
                        )
                with gr.Column():
                    with gr.Row():
                        top_p = gr.Slider(
                            label="Top-p (nucleus sampling)",
                            value=1.0,
                            minimum=0.0,
                            maximum=1,
                            step=0.01,
                            interactive=True,
                            info=(
                                "Sample from the smallest possible set of tokens whose cumulative probability "
                                "exceeds top_p. Set to 1 to disable and sample from all tokens."
                            ),
                        )
                with gr.Column():
                    with gr.Row():
                        top_k = gr.Slider(
                            label="Top-k",
                            value=50,
                            minimum=0.0,
                            maximum=200,
                            step=1,
                            interactive=True,
                            info="Sample from a shortlist of top-k tokens — 0 to disable and sample from all tokens.",
                        )
                with gr.Column():
                    with gr.Row():
                        repetition_penalty = gr.Slider(
                            label="Repetition Penalty",
                            value=1.1,
                            minimum=1.0,
                            maximum=2.0,
                            step=0.1,
                            interactive=True,
                            info="Penalize repetition — 1.0 to disable.",
                        )

    submit_event = msg.submit(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,
        ],
        outputs=chatbot,
        queue=True,
    )
    submit_click_event = submit.click(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,
        ],
        outputs=chatbot,
        queue=True,
    )
    stop.click(
        fn=request_cancel,
        inputs=None,
        outputs=None,
        cancels=[submit_event, submit_click_event],
        queue=False,
    )
    clear.click(lambda: None, None, chatbot, queue=False)

demo.launch()

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




In [57]:
demo.close()

Closing server running on port: 7862


## Fine tuning on certain dataset

In [61]:
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from datasets import Dataset
import torch

In [63]:
from datasets import Dataset
file_path = "D:\Downloads\GG.txt"

# Read the text file
with open(file_path, 'r', encoding='utf-8') as file:
    texts = file.readlines()

# Remove newline characters and strip whitespace
texts = [text.strip() for text in texts]

# Create a Dataset object
dataset = Dataset.from_dict({
    'text': texts
})

print(dataset)

Dataset({
    features: ['text'],
    num_rows: 17
})


In [65]:
model_name = model_configuration["model_id"]
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [67]:
def tokenize_function(examples):
    return tokenizer(examples['text'], padding="max_length", truncation=True)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

Map:   0%|          | 0/17 [00:00<?, ? examples/s]

In [69]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir='./results',                 # This is the directory where model checkpoints and outputs will be saved
    per_device_train_batch_size=2,
    num_train_epochs=2,
    logging_dir='./logs',
    logging_steps=50,
    save_steps=2000,
    evaluation_strategy="epoch",
)

In [71]:
model = AutoModelForCausalLM.from_pretrained(model_name)

In [73]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
)

In [75]:
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM

class YourModelClass(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = AutoModelForCausalLM.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        return loss

In [None]:
trainer.train()trainer.train()
model.save_pretrained('./fine-tuned-tinyllama-1b-chat')