In [1]:
! huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) Y
Token is valid (permission: fineGrained).
The token `caaapooo` has been saved to /root/.cache/huggingface/stored_tokens
[1m[31mCannot authenticate through git-credential as no helper is defined on your machine.
You might have to re-aut

In [1]:
! pip install transformers sentence-transformers faiss-cpu accelerate bitsandbytes langdetect


Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.45.3-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (fr

In [7]:
import json
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch
import time
from langdetect import detect
import re

In [3]:
try:
    with open("/content/DATA_RQ.json", "r", encoding="utf-8") as f:
        data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
    print(f"Error loading JSON data: {e}")
    exit(1)


qa_pairs = []
instructions = []
for category, content in data.items():
    try:
        if isinstance(content, list):
            for entry in content:
                if isinstance(entry, dict) and "Q" in entry and "A" in entry:
                    answer_text = entry["A"]
                    if isinstance(answer_text, list):
                        answer_text = " ".join(answer_text)
                    qa_pairs.append((entry["Q"], answer_text))
                elif isinstance(entry, dict) and "question" in entry and "answer" in entry:
                    answer_text = entry["answer"]
                    if isinstance(answer_text, list):
                        answer_text = " ".join(answer_text)
                    qa_pairs.append((entry["question"], answer_text))
                elif isinstance(entry, list):
                    for sub_entry in entry:
                        if isinstance(sub_entry, dict) and "question" in sub_entry and "answer" in sub_entry:
                            answer_text = sub_entry["answer"]
                            if isinstance(answer_text, list):
                                answer_text = " ".join(answer_text)
                            qa_pairs.append((sub_entry["question"], answer_text))
                else:
                    instructions.append(str(entry))
        elif isinstance(content, dict):
            for key, value in content.items():
                if isinstance(value, dict) and "question" in value and "answer" in value:
                    answer_text = value["answer"]
                    if isinstance(answer_text, list):
                        answer_text = " ".join(answer_text)
                    qa_pairs.append((value["question"], answer_text))
                elif isinstance(value, list):
                    instructions.extend(value)
                else:
                    instructions.append(str(value))
    except Exception as e:
        print(f"Error processing data in category {category}: {e}")

print(f"Loaded {len(qa_pairs)} Q&A pairs and {len(instructions)} instructions.")

Loaded 3263 Q&A pairs and 502 instructions.


In [75]:
try:
    embedder = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
except Exception as e:
    print(f"Error loading embedding model: {e}")
    exit(1)
try:
    question_embeddings = embedder.encode([q[0] for q in qa_pairs])
    instruction_embeddings = embedder.encode(instructions)
except Exception as e:
    print(f"Error generating embeddings: {e}")
    exit(1)

try:
    dimension = question_embeddings.shape[1]
    faiss_index = faiss.IndexFlatL2(dimension)
    faiss_index.add(np.array(question_embeddings))
    faiss_index.add(np.array(instruction_embeddings))
    all_data = qa_pairs + [(inst, inst) for inst in instructions]
    print(f"FAISS index built with {len(all_data)} entries.")
except Exception as e:
    print(f"Error building FAISS index: {e}")
    exit(1)


FAISS index built with 3765 entries.


In [None]:
try:
    tokenizer = AutoTokenizer.from_pretrained("CohereForAI/aya-expanse-8b", timeout=600)
    model = AutoModelForCausalLM.from_pretrained(
        "CohereForAI/aya-expanse-8b", load_in_4bit=True, device_map="auto"
    )
    text_gen_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer)
    print("Model and pipeline loaded successfully!")
except Exception as e:
    print(f"Error loading model: {e}")
    exit(1)

In [None]:
user_conversations = {}

def get_dynamic_word_range(user_input, retrieved_answers):
    """Determine the word count range dynamically for response generation."""
    try:
        language = detect(user_input)
    except:
        language = "en"

    num_retrieved = sum(len(ans.split()) for ans in retrieved_answers) if retrieved_answers else 0
    num_input_words = len(user_input.split())
    avg_len = num_retrieved + num_input_words

    language_factors = {
        "en": 1.0, "ar": 0.8, "fr": 1.1, "zh": 0.5, "es": 1.05, "de": 1.0,
        "ru": 1.1, "it": 1.0, "pt": 1.05, "ja": 0.6, "ko": 0.7, "tr": 0.9
    }
    factor = language_factors.get(language, 1.0)

    base_max_len = int((avg_len + 30) * factor)
    max_chars = min(600, base_max_len + 200)
    min_chars = max(200, base_max_len - 100)

    return max(150, min_chars), max(500, max_chars)

In [None]:
def format_response(text):
    """Format response to improve readability."""
    text = re.sub(r"<\|im_end\|>|<\|endoftext\|>", "", text, flags=re.IGNORECASE).strip()
    text = re.sub(r"\(\d+\s*(characters|chars)\)", "", text).strip()

    formatted_text = []
    paragraphs = text.split("\n")

    for paragraph in paragraphs:
        words = paragraph.split()
        line = ""
        for word in words:
            if len(line) + len(word) > 90:
                formatted_text.append(line.strip())
                line = ""
            line += f"{word} "
        if line:
            formatted_text.append(line.strip())

    return "\n".join(formatted_text)

In [None]:
def add_bullets(text):
    """Convert lists into bullet points where applicable."""
    lines = text.split("\n")
    formatted_lines = []

    for line in lines:
        if re.match(r"^\d+\.", line) or re.match(r"^-", line):
            formatted_lines.append(f"- {line.lstrip('-').lstrip()}")
        else:
            formatted_lines.append(line)

    return "\n".join(formatted_lines)

In [None]:
def clear_conclusion(text):
    """Remove unnecessary conclusion phrases."""
    unwanted_phrases = ["Conclusion:", "Summary:", "Final Thoughts:"]
    lines = text.split("\n")

    if lines:
        last_line = lines[-1]
        for phrase in unwanted_phrases:
            if last_line.startswith(phrase):
                last_line = last_line[len(phrase):].strip()
                break
        lines[-1] = last_line

    return "\n".join(lines)

In [None]:
def retrieve_answer(user_question, top_k=3, threshold=0.7):
    """Retrieve the most relevant answers from FAISS index."""
    try:
        user_embedding = embedder.encode([user_question])
        distances, indices = faiss_index.search(np.array(user_embedding), top_k)

        results = []
        for dist, idx in zip(distances[0], indices[0]):
            if dist < threshold and 0 <= idx < len(all_data):
                results.append(all_data[idx][1])

        return results if results else []
    except Exception as e:
        print(f"Error retrieving answer: {e}")
        return []

In [87]:
def generate_response(user_id, user_input):
    """Generate chatbot responses using chat history, retrieved knowledge, and AI reasoning."""
    global user_conversations

    if user_id not in user_conversations:
        user_conversations[user_id] = []

    user_conversations[user_id].append(f"User: {user_input}")

    retrieved_answers = retrieve_answer(user_input)


    if retrieved_answers and len(retrieved_answers[0].split()) < 50:
        response_text = retrieved_answers[0]
        user_conversations[user_id].append(f"Chatbot: {response_text}")
        return response_text


    context = " ".join(retrieved_answers[:2]) if retrieved_answers else None

    min_chars, max_chars = get_dynamic_word_range(user_input, retrieved_answers)


    conversation_history = "\n".join(user_conversations[user_id][-5:])

    prompt = f"""
    You are an AI assistant specialized in answering questions.
    - Use provided context when available.
    - Ensure responses are **detailed but concise**.
    - Use **bullet points** when listing details.
    - Do not repeat information unnecessarily.
    - Maintain a **conversational tone** for follow-up questions.

    {f'Context: {context}' if context else 'No specific context available.'}

    Conversation history:
    {conversation_history}

    User question: {user_input}

    Answer:
    """

    try:
        response = text_gen_pipeline(
            prompt,
            max_length=max_chars,
            min_length=min_chars,
            do_sample=True,
            top_p=0.85,
            temperature=0.7,
            eos_token_id=tokenizer.eos_token_id
        )

        generated_text = response[0]['generated_text']
        response_text = generated_text.replace(prompt, "").strip()

        response_text = format_response(response_text)
        response_text = add_bullets(response_text)
        response_text = clear_conclusion(response_text)

        if len(response_text) < min_chars:
            response_text += "\n- Let me know if you need more details."

    except Exception as e:
        print(f"Error generating response: {e}")
        response_text = "Sorry, I can't generate a response at the moment. Please try again later."

    if response_text and response_text[-1] not in ".!?":
        response_text += "."

    user_conversations[user_id].append(f"Chatbot: {response_text}")

    return response_text


In [None]:
user_id = "user_123"
while True:
    try:
        user_input = input("You: ").strip()
        if not user_input:
            print("Chatbot: Please enter a valid message.")
            continue
        if user_input.lower() in ["exit", "quit"]:
            print("Chatbot: Goodbye!")
            break

        start_time = time.time()
        response = generate_response(user_id, user_input)
        elapsed_time = time.time() - start_time

        if elapsed_time > 60:
            print("Chatbot: Sorry for the delay. Here’s your response:")

        print(f"Chatbot: {response}")
    except KeyboardInterrupt:
        print("\nChatbot: Session ended. Goodbye!")
        break
    except Exception as e:
        print(f"Unexpected error: {e}")