In [None]:
import os

os.environ["GIT_CLONE_PROTECTION_ACTIVE"] = "false"

%pip install -Uq pip
%pip uninstall -q -y optimum optimum-intel
%pip install --pre -Uq openvino openvino-tokenizers[transformers] --extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly
%pip install -q --extra-index-url https://download.pytorch.org/whl/cpu\
"git+https://github.com/huggingface/optimum-intel.git"\
"git+https://github.com/openvinotoolkit/nncf.git"\
"torch>=2.1"\
"datasets" \
"accelerate"\
"gradio>=4.19"\
"onnx" "einops" "transformers_stream_generator" "tiktoken" "transformers>=4.40" "bitsandbytes"

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from pathlib import Path
import requests
import shutil

# fetch model configuration

config_shared_path = Path("../../utils/llm_config.py")
config_dst_path = Path("llm_config.py")

if not config_dst_path.exists():
    if config_shared_path.exists():
        try:
            os.symlink(config_shared_path, config_dst_path)
        except Exception:
            shutil.copy(config_shared_path, config_dst_path)
    else:
        r = requests.get(url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/llm_config.py")
        with open("llm_config.py", "w", encoding="utf-8") as f:
            f.write(r.text)
elif not os.path.islink(config_dst_path):
    print("LLM config will be updated")
    if config_shared_path.exists():
        shutil.copy(config_shared_path, config_dst_path)
    else:
        r = requests.get(url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/llm_config.py")
        with open("llm_config.py", "w", encoding="utf-8") as f:
            f.write(r.text)

LLM config will be updated


In [3]:
from huggingface_hub import notebook_login, whoami

try:
    whoami()
    print('Authorization token already provided')
except OSError:
    notebook_login()

Authorization token already provided


In [4]:
from llm_config import SUPPORTED_LLM_MODELS
import ipywidgets as widgets

In [5]:
model_languages = list(SUPPORTED_LLM_MODELS)

model_language = widgets.Dropdown(
    options=model_languages,
    value=model_languages[0],
    description="Model Language:",
    disabled=False,
)

model_language

Dropdown(description='Model Language:', options=('English', 'Chinese', 'Japanese'), value='English')

In [6]:
model_ids = list(SUPPORTED_LLM_MODELS[model_language.value])

model_id = widgets.Dropdown(
    options=model_ids,
    value=model_ids[0],
    description="Model:",
    disabled=False,
)

model_id

Dropdown(description='Model:', options=('qwen2-0.5b-instruct', 'tiny-llama-1b-chat', 'qwen2-1.5b-instruct', 'g…

In [7]:
model_configuration = SUPPORTED_LLM_MODELS[model_language.value][model_id.value]
print(f"Selected model {model_id.value}")

Selected model qwen2-0.5b-instruct


In [8]:
from IPython.display import Markdown, display

prepare_int4_model = widgets.Checkbox(
    value=True,
    description="Prepare INT4 model",
    disabled=False,
)
prepare_int8_model = widgets.Checkbox(
    value=False,
    description="Prepare INT8 model",
    disabled=False,
)
prepare_fp16_model = widgets.Checkbox(
    value=False,
    description="Prepare FP16 model",
    disabled=False,
)

display(prepare_int4_model)
display(prepare_int8_model)
display(prepare_fp16_model)

Checkbox(value=True, description='Prepare INT4 model')

Checkbox(value=False, description='Prepare INT8 model')

Checkbox(value=False, description='Prepare FP16 model')

In [9]:
enable_awq = widgets.Checkbox(
    value=False,
    description="Enable AWQ",
    disabled=not prepare_int4_model.value,
)
display(enable_awq)

Checkbox(value=False, description='Enable AWQ')

In [10]:
from pathlib import Path

pt_model_id = model_configuration["model_id"]
pt_model_name = model_id.value.split("-")[0]
fp16_model_dir = Path(model_id.value) / "FP16"
int8_model_dir = Path(model_id.value) / "INT8_compressed_weights"
int4_model_dir = Path(model_id.value) / "INT4_compressed_weights"


def convert_to_fp16():
    if (fp16_model_dir / "openvino_model.xml").exists():
        return
    remote_code = model_configuration.get("remote_code", False)
    export_command_base = "optimum-cli export openvino --model {} --task text-generation-with-past --weight-format fp16".format(pt_model_id)
    if remote_code:
        export_command_base += " --trust-remote-code"
    export_command = export_command_base + " " + str(fp16_model_dir)
    display(Markdown("**Export command:**"))
    display(Markdown(f"`{export_command}`"))
    ! $export_command


def convert_to_int8():
    if (int8_model_dir / "openvino_model.xml").exists():
        return
    int8_model_dir.mkdir(parents=True, exist_ok=True)
    remote_code = model_configuration.get("remote_code", False)
    export_command_base = "optimum-cli export openvino --model {} --task text-generation-with-past --weight-format int8".format(pt_model_id)
    if remote_code:
        export_command_base += " --trust-remote-code"
    export_command = export_command_base + " " + str(int8_model_dir)
    display(Markdown("**Export command:**"))
    display(Markdown(f"`{export_command}`"))
    ! $export_command


def convert_to_int4():
    compression_configs = {
        "zephyr-7b-beta": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "mistral-7b": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "minicpm-2b-dpo": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "gemma-2b-it": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "notus-7b-v1": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "neural-chat-7b-v3-1": {
            "sym": True,
            "group_size": 64,
            "ratio": 0.6,
        },
        "llama-2-chat-7b": {
            "sym": True,
            "group_size": 128,
            "ratio": 0.8,
        },
        "llama-3-8b-instruct": {
            "sym": True,
            "group_size": 128,
            "ratio": 0.8,
        },
        "gemma-7b-it": {
            "sym": True,
            "group_size": 128,
            "ratio": 0.8,
        },
        "chatglm2-6b": {
            "sym": True,
            "group_size": 128,
            "ratio": 0.72,
        },
        "qwen-7b-chat": {"sym": True, "group_size": 128, "ratio": 0.6},
        "red-pajama-3b-chat": {
            "sym": False,
            "group_size": 128,
            "ratio": 0.5,
        },
        "default": {
            "sym": False,
            "group_size": 128,
            "ratio": 0.8,
        },
    }

    model_compression_params = compression_configs.get(model_id.value, compression_configs["default"])
    if (int4_model_dir / "openvino_model.xml").exists():
        return
    remote_code = model_configuration.get("remote_code", False)
    export_command_base = "optimum-cli export openvino --model {} --task text-generation-with-past --weight-format int4".format(pt_model_id)
    int4_compression_args = " --group-size {} --ratio {}".format(model_compression_params["group_size"], model_compression_params["ratio"])
    if model_compression_params["sym"]:
        int4_compression_args += " --sym"
    if enable_awq.value:
        int4_compression_args += " --awq --dataset wikitext2 --num-samples 128"
    export_command_base += int4_compression_args
    if remote_code:
        export_command_base += " --trust-remote-code"
    export_command = export_command_base + " " + str(int4_model_dir)
    display(Markdown("**Export command:**"))
    display(Markdown(f"`{export_command}`"))
    ! $export_command


if prepare_fp16_model.value:
    convert_to_fp16()
if prepare_int8_model.value:
    convert_to_int8()
if prepare_int4_model.value:
    convert_to_int4()

In [11]:
fp16_weights = fp16_model_dir / "openvino_model.bin"
int8_weights = int8_model_dir / "openvino_model.bin"
int4_weights = int4_model_dir / "openvino_model.bin"

if fp16_weights.exists():
    print(f"Size of FP16 model is {fp16_weights.stat().st_size / 1024 / 1024:.2f} MB")
for precision, compressed_weights in zip([8, 4], [int8_weights, int4_weights]):
    if compressed_weights.exists():
        print(f"Size of model with INT{precision} compressed weights is {compressed_weights.stat().st_size / 1024 / 1024:.2f} MB")
    if compressed_weights.exists() and fp16_weights.exists():
        print(f"Compression rate for INT{precision} model: {fp16_weights.stat().st_size / compressed_weights.stat().st_size:.3f}")

Size of model with INT4 compressed weights is 358.86 MB


In [12]:
import openvino as ov

core = ov.Core()

support_devices = core.available_devices
if "NPU" in support_devices:
    support_devices.remove("NPU")

device = widgets.Dropdown(
    options=support_devices + ["AUTO"],
    value="CPU",
    description="Device:",
    disabled=False,
)

device

Dropdown(description='Device:', options=('CPU', 'GPU', 'AUTO'), value='CPU')

In [13]:
available_models = []
if int4_model_dir.exists():
    available_models.append("INT4")
if int8_model_dir.exists():
    available_models.append("INT8")
if fp16_model_dir.exists():
    available_models.append("FP16")

model_to_run = widgets.Dropdown(
    options=available_models,
    value=available_models[0],
    description="Model to run:",
    disabled=False,
)

model_to_run

Dropdown(description='Model to run:', options=('INT4',), value='INT4')

In [14]:
from transformers import AutoConfig, AutoTokenizer
from optimum.intel.openvino import OVModelForCausalLM

if model_to_run.value == "INT4":
    model_dir = int4_model_dir
elif model_to_run.value == "INT8":
    model_dir = int8_model_dir
else:
    model_dir = fp16_model_dir
print(f"Loading model from {model_dir}")

ov_config = {"PERFORMANCE_HINT": "LATENCY", "NUM_STREAMS": "1", "CACHE_DIR": ""}

if "GPU" in device.value and "qwen2-7b-instruct" in model_id.value:
    ov_config["GPU_ENABLE_SDPA_OPTIMIZATION"] = "NO"

# On a GPU device a model is executed in FP16 precision. For red-pajama-3b-chat model there known accuracy
# issues caused by this, which we avoid by setting precision hint to "f32".
if model_id.value == "red-pajama-3b-chat" and "GPU" in core.available_devices and device.value in ["GPU", "AUTO"]:
    ov_config["INFERENCE_PRECISION_HINT"] = "f32"

model_name = model_configuration["model_id"]
tok = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)

ov_model = OVModelForCausalLM.from_pretrained(
    model_dir,
    device=device.value,
    ov_config=ov_config,
    config=AutoConfig.from_pretrained(model_dir, trust_remote_code=True),
    trust_remote_code=True,
)

Loading model from qwen2-0.5b-instruct\INT4_compressed_weights


Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
The argument `trust_remote_code` is to be used along with export=True. It will be ignored.
Compiling the model to CPU ...


In [15]:
tokenizer_kwargs = model_configuration.get("tokenizer_kwargs", {})
test_string = "2 + 2 ="
input_tokens = tok(test_string, return_tensors="pt", **tokenizer_kwargs)
answer = ov_model.generate(**input_tokens, max_new_tokens=2)
print(tok.batch_decode(answer, skip_special_tokens=True)[0])

2 + 2 = 4


In [16]:
!pip install torch transformers datasets gradio optimum



In [17]:
import torch
from threading import Event, Thread
from uuid import uuid4
from typing import List, Tuple
import gradio as gr
from transformers import (
    AutoTokenizer,
    StoppingCriteria,
    StoppingCriteriaList,
    TextIteratorStreamer,
)

model_name = model_configuration["model_id"]
technical_support_prompt = """You are an AI technical support assistant for a computer hardware company. Your role is to help users troubleshoot issues with their computers, peripherals, and software. Provide clear, step-by-step instructions and ask clarifying questions when needed. If you can't resolve an issue, guide the user on how to contact human support. Always prioritize user safety and data security in your advice. Begin each interaction by asking for a brief description of the problem."""
history_template = model_configuration.get("history_template")
current_message_template = model_configuration.get("current_message_template")
stop_tokens = model_configuration.get("stop_tokens")
tokenizer_kwargs = model_configuration.get("tokenizer_kwargs", {})

examples = [
    ["My computer won't turn on. What should I do?"],
    ["I'm getting a blue screen error. How can I fix this?"],
    ["My printer isn't connecting to my computer. Can you help?"],
    ["How do I update my graphics card drivers?"],
    ["My laptop battery drains very quickly. What could be the issue?"],
    ["I think my computer has a virus. What steps should I take?"],
    ["How can I speed up my slow computer?"],
]

max_new_tokens = 256

class StopOnTokens(StoppingCriteria):
    def __init__(self, token_ids):
        self.token_ids = token_ids

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        for stop_id in self.token_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

stop_tokens = None

def default_partial_text_processor(partial_text: str, new_text: str):
    partial_text += new_text
    return partial_text

text_processor = default_partial_text_processor

def convert_history_to_token(history: List[Tuple[str, str]]):
    messages = [{"role": "system", "content": technical_support_prompt}]
    for idx, (user_msg, model_msg) in enumerate(history):
        if idx == len(history) - 1 and not model_msg:
            messages.append({"role": "user", "content": user_msg})
            break
        if user_msg:
            messages.append({"role": "user", "content": user_msg})
        if model_msg:
            messages.append({"role": "assistant", "content": model_msg})
    input_token = tok.apply_chat_template(messages, add_generation_prompt=True, tokenize=True, return_tensors="pt")
    return input_token

def user(message, history):
    print(f"User message received: {message}")  # Debugging line
    if history is None:
        history = []
    return "", history + [[message, ""]]

def bot(history, temperature, top_p, top_k, repetition_penalty, conversation_id):
    try:
        input_ids = convert_history_to_token(history)
        if input_ids.shape[1] > 2000:
            history = [history[-1]]
            input_ids = convert_history_to_token(history)
        streamer = TextIteratorStreamer(tok, timeout=30.0, skip_prompt=True, skip_special_tokens=True)
        generate_kwargs = dict(
            input_ids=input_ids,
            max_new_tokens=max_new_tokens,
            temperature=temperature,
            do_sample=temperature > 0.0,
            top_p=top_p,
            top_k=top_k,
            repetition_penalty=repetition_penalty,
            streamer=streamer,
        )
        if stop_tokens is not None:
            generate_kwargs["stopping_criteria"] = StoppingCriteriaList(stop_tokens)
        stream_complete = Event()
        def generate_and_signal_complete():
            ov_model.generate(**generate_kwargs)
            stream_complete.set()
        t1 = Thread(target=generate_and_signal_complete)
        t1.start()
        partial_text = ""
        for new_text in streamer:
            partial_text = text_processor(partial_text, new_text)
            history[-1][1] = partial_text
            yield history
    except Exception as e:
        print(f"Error in bot function: {e}")  # Debugging line

def request_cancel():
    ov_model.request.cancel()

def get_uuid():
    return str(uuid4())

with gr.Blocks(
    theme=gr.themes.Soft(),
    css=".disclaimer {font-variant-caps: all-small-caps;}",
) as demo:
    conversation_id = gr.State(get_uuid)
    gr.Markdown(f"""<h1><center>Technical Support Assistant</center></h1>""")
    gr.Markdown("""<p>Welcome to our Technical Support Assistant. Please describe your technical issue, and I'll do my best to help you resolve it.</p>""")
    chatbot = gr.Chatbot(height=500)
    with gr.Row():
        with gr.Column():
            msg = gr.Textbox(
                label="Describe your technical issue",
                placeholder="e.g., My computer won't start",
                show_label=False,
                container=False,
            )
        with gr.Column():
            with gr.Row():
                submit = gr.Button("Get Help")
                stop = gr.Button("Stop")
                clear = gr.Button("New Issue")
    with gr.Row():
        with gr.Accordion("Advanced Options:", open=False):
            with gr.Row():
                with gr.Column():
                    with gr.Row():
                        temperature = gr.Slider(
                            label="Temperature",
                            value=0.1,
                            minimum=0.0,
                            maximum=1.0,
                            step=0.1,
                            interactive=True,
                            info="Higher values produce more diverse outputs",
                        )
                with gr.Column():
                    with gr.Row():
                        top_p = gr.Slider(
                            label="Top-p (nucleus sampling)",
                            value=1.0,
                            minimum=0.0,
                            maximum=1,
                            step=0.01,
                            interactive=True,
                            info=(
                                "Sample from the smallest possible set of tokens whose cumulative probability "
                                "exceeds top_p. Set to 1 to disable and sample from all tokens."
                            ),
                        )
                with gr.Column():
                    with gr.Row():
                        top_k = gr.Slider(
                            label="Top-k",
                            value=50,
                            minimum=0.0,
                            maximum=200,
                            step=1,
                            interactive=True,
                            info="Sample from a shortlist of top-k tokens — 0 to disable and sample from all tokens.",
                        )
                with gr.Column():
                    with gr.Row():
                        repetition_penalty = gr.Slider(
                            label="Repetition Penalty",
                            value=1.1,
                            minimum=1.0,
                            maximum=2.0,
                            step=0.1,
                            interactive=True,
                            info="Penalize repetition — 1.0 to disable.",
                        )
    gr.Examples(examples, inputs=msg, label="Common Technical Issues (Click on any example and press 'Get Help')")
    submit_event = msg.submit(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,
        ],
        outputs=chatbot,
        queue=True,
    )
    submit_click_event = submit.click(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,
        ],
        outputs=chatbot,
        queue=True,
    )
    stop.click(
        fn=request_cancel,
        inputs=None,
        outputs=None,
        cancels=[submit_event, submit_click_event],
        queue=False,
    )
    clear.click(lambda: None, None, chatbot, queue=False)

demo.launch()


Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.




The attention mask is not set and cannot be inferred from input because pad token is same as eos token.As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


User message received: My computer won't turn on. What should I do?
User message received: I'm getting a blue screen error. How can I fix this?
