In [2]:
import os
import re
from typing import List, Dict
import numpy as np
from llama_parse import LlamaParse
from dateutil import parser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain.schema.document import Document
from langchain_core.messages import HumanMessage
from sentence_transformers import SentenceTransformer
from pathlib import Path
from langchain_openai.chat_models import ChatOpenAI
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import login
from dotenv import load_dotenv
from langchain_community.llms import Ollama
from datetime import datetime, timezone
from typing import List, Dict
from llama_parse import LlamaParse
from llama_index.core import Document
from llama_index.core.embeddings import BaseEmbedding
from pydantic import PrivateAttr
from langchain.chains import LLMChain
from pathlib import Path
from langchain.memory import ConversationKGMemory
import pyttsx3
import sounddevice as sd
import numpy as np
import openai
import tempfile
import scipy.io.wavfile as wav
import streamlit as st
load_dotenv()


True

In [None]:
from huggingface_hub import login
from sentence_transformers import SentenceTransformer

my_token = os.getenv("HF_TOKEN")

login(token=my_token)

model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

sentences = [
    "That is a happy person",
    "That is a happy dog",
    "That is a very happy person",
    "Today is a sunny day"
]
embeddings = model.encode(sentences)
similarities = model.similarity(embeddings, embeddings)
print(similarities.shape)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


torch.Size([4, 4])


In [4]:
embeddings.shape

(4, 384)

In [5]:
class LocalEmbedding(BaseEmbedding):
    _model: SentenceTransformer = PrivateAttr()
    _dim: int = PrivateAttr()

    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2", **kwargs):
        super().__init__(**kwargs)  # important for BaseEmbedding init
        self._model = SentenceTransformer(model_name)
        self._dim = self._model.get_sentence_embedding_dimension()

    @property
    def dim(self):
        return self._dim

    # --- sync methods ---
    def _get_text_embedding(self, text: str):
        return self._model.encode(text, convert_to_numpy=True).tolist()

    def _get_query_embedding(self, query: str):
        return self._model.encode(query, convert_to_numpy=True).tolist()

    def _get_text_embeddings(self, texts: List[str]):
        return [self._get_text_embedding(t) for t in texts]

    def _get_query_embeddings(self, queries: List[str]):
        return [self._get_query_embedding(q) for q in queries]

    # --- async fallbacks ---
    async def _aget_text_embedding(self, text: str):
        return self._get_text_embedding(text)

    async def _aget_query_embedding(self, query: str):
        return self._get_query_embedding(query)

    async def _aget_text_embeddings(self, texts: List[str]):
        return self._get_text_embeddings(texts)

    async def _aget_query_embeddings(self, queries: List[str]):
        return self._get_query_embeddings(queries)


def extract_page_texts_from_documents(documents: List[Document]) -> List[str]:
    """Extracts page-wise text content from parsed documents."""
    return [doc.text_resource.text.strip() for doc in documents if doc.text_resource.text]


def split_document_page_level(pages: List[str], filename: str) -> List[Dict]:
    """Creates page-level chunks only (no text/table separation)."""
    final_chunks = []

    for page_num, page in enumerate(pages, start=1):
        page_text = page.strip()
        if not page_text:
            continue

        final_chunks.append({
            "type": "page",
            "content": page_text,
            "file_name": filename,
            "metadata": {
                "page_num": page_num,
                "uploaded_at": datetime.now().isoformat(),
            }
        })

    return final_chunks


def doc_parser(file_path: str, filename: str) -> List[Dict]:
    """Parses a PDF and returns page-level chunks."""
    if file_path.endswith('.pdf'):
        document = LlamaParse(
            result_type='markdown',
            api_key=os.getenv("LLAMA_PARSE_API_KEY"),
            parse_mode="parse_page_with_lvm"
        ).load_data(file_path)

        page_texts = extract_page_texts_from_documents(document)
        chunks = split_document_page_level(page_texts, filename)
        return chunks
    else:
        print(f"Unsupported file format: {file_path}")
        return []


import faiss
import pickle

def store_in_faiss(chunks, faiss_index_path="faiss_storage"):
    os.makedirs(faiss_index_path, exist_ok=True)
    embed_model = LocalEmbedding()
    embeddings = [embed_model.get_text_embedding(c["content"]) for c in chunks]

    dim = embed_model.dim
    index = faiss.IndexFlatL2(dim)
    index.add(np.array(embeddings).astype("float32"))

    # Save FAISS index
    faiss.write_index(index, f"{faiss_index_path}/faiss.index")

    # Save metadata separately
    with open(f"{faiss_index_path}/metadata.pkl", "wb") as f:
        pickle.dump(chunks, f)

    print(f" Stored {len(chunks)} chunks in {faiss_index_path}")

In [6]:
file_path = r"E:\Backup d\Pranto Coding Files\RAG using DeepSeek Innovative Skills\bots\document_store\pdfs\8 The Tragedy of Julius Caesar author William Shakespeare.pdf"

In [7]:
filename = Path(file_path).stem.split("\\")[0]
filename

'8 The Tragedy of Julius Caesar author William Shakespeare'

In [None]:
# Example usage
if __name__ == "__main__":
    chunks = doc_parser(file_path, filename)

In [9]:
chunks

[]

In [None]:
store_in_faiss(chunks, faiss_index_path="faiss_storage")

In [14]:
def clean_context_block(text):
    if not text:
        return None
    lowered = text.lower()
    if "insufficient information" in lowered or "i can't find" in lowered or "no data" in lowered or "there is no" in lowered:
        return ""
    return text.strip()

In [15]:
def load_faiss_index(faiss_index_path="faiss_storage"):
    index = faiss.read_index(f"{faiss_index_path}/faiss.index")
    with open(f"{faiss_index_path}/metadata.pkl", "rb") as f:
        chunks = pickle.load(f)
    return index, chunks


def get_context_from_faiss(query, top_k=3, faiss_index_path="faiss_storage"):
    embed_model = LocalEmbedding()
    query_vec = np.array([embed_model.get_text_embedding(query)]).astype("float32")

    index, chunks = load_faiss_index(faiss_index_path)
    _ , I = index.search(query_vec, top_k)

    results = []
    for idx in I[0]:
        if idx < len(chunks):
            c = chunks[idx]
            file_info = c["metadata"].get("file_name", "unknown")
            page_info = c["metadata"].get("page_num", "N/A")
            results.append(f"{c['content']}\n(file: {file_info}, page: {page_info})")

    return "\n\n".join(results)

In [16]:
HISTORY_FILE = "E:\Backup d\Pranto Coding Files\RAG using DeepSeek Innovative Skills\FOI\conversation_history.txt"
def save_to_history(user_query, answer, history_file=HISTORY_FILE):
    os.makedirs(os.path.dirname(history_file), exist_ok=True)

    with open(history_file, "a", encoding="utf-8") as f:
        f.write(f"User: {user_query}\n")
        f.write(f"Assistant: {answer}\n\n")

  HISTORY_FILE = "E:\Backup d\Pranto Coding Files\RAG using DeepSeek Innovative Skills\FOI\conversation_history.txt"


In [None]:
llm = Ollama(model="llama3:8b")
kg_memory = ConversationKGMemory(llm=llm, memory_key="chat_memory", input_key="input")

In [18]:
kg_memory

ConversationKGMemory(chat_memory=InMemoryChatMessageHistory(messages=[]), input_key='input', kg=<langchain_community.graphs.networkx_graph.NetworkxEntityGraph object at 0x0000018B3C67AFF0>, llm=ChatOpenAI(client=<openai.resources.chat.completions.completions.Completions object at 0x0000018B3DF4FDA0>, async_client=<openai.resources.chat.completions.completions.AsyncCompletions object at 0x0000018B3E2752E0>, root_client=<openai.OpenAI object at 0x0000018B3C55D400>, root_async_client=<openai.AsyncOpenAI object at 0x0000018B3C67AC30>, temperature=1.0, model_kwargs={}, openai_api_key=SecretStr('**********')), memory_key='chat_memory')

In [19]:
from langchain.prompts import PromptTemplate

In [20]:
def generate_answer_with_memory(user_query, doc):
    print("Generating answer...")

    global kg_memory
    
    conversation_prompt = PromptTemplate(
        input_variables=["chat_memory", "input", "doc"],
        template="""You are a smart assistant. Answer the user's query based on the provided context and conversation facts.
        The context will be include text, tables. Please answer the question describing breifly and concisely.
        Remember you answer must be contain context and relations that user can understand without any ambiguity or query for extra information.
        In response must give the info about the page number which can be found in the metadata if only the result comes from those 
        specific files.

Conversation Facts (from memory):
{chat_memory}

Document Context:
{doc}

Rules:
- If the document doesn't have the answer, reply: Sorry, I can only answer questions related to Julius Caesar.
- If the document has the answer, include page_num along with the answer.
- Answer the question breilfy.

User Query: {input}

Answer:"""
    )

    conversation_chain = LLMChain(
        llm=llm,
        prompt=conversation_prompt,
        memory=kg_memory,
    )

    print("KG Triples:", kg_memory.kg.get_triples())
    print("Chat Memory Messages:", kg_memory.chat_memory.messages)
    print("Memory Variables:", kg_memory.memory_variables)

    result = conversation_chain.predict(
        input=user_query,
        doc=doc,
    )

    save_to_history(user_query, result)

    return result

In [21]:
kg_memory

ConversationKGMemory(chat_memory=InMemoryChatMessageHistory(messages=[]), input_key='input', kg=<langchain_community.graphs.networkx_graph.NetworkxEntityGraph object at 0x0000018B3C67AFF0>, llm=ChatOpenAI(client=<openai.resources.chat.completions.completions.Completions object at 0x0000018B3DF4FDA0>, async_client=<openai.resources.chat.completions.completions.AsyncCompletions object at 0x0000018B3E2752E0>, root_client=<openai.OpenAI object at 0x0000018B3C55D400>, root_async_client=<openai.AsyncOpenAI object at 0x0000018B3C67AC30>, temperature=1.0, model_kwargs={}, openai_api_key=SecretStr('**********')), memory_key='chat_memory')

In [22]:
def record_and_transcribe(duration=10, samplerate=16000):
    print(" Recording... Speak now!")
    audio = sd.rec(int(duration * samplerate), samplerate=samplerate, channels=1, dtype=np.int16)
    sd.wait()
    print(" Recording finished")

    # Save temporary WAV
    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmpfile:
        wav.write(tmpfile.name, samplerate, audio)
        audio_path = tmpfile.name

    # Transcribe with Whisper (using OpenAI API)
    with open(audio_path, "rb") as f:
        transcript = openai.audio.transcriptions.create(
            model="whisper-1",
            file=f
        )
    os.remove(audio_path) 
    return transcript.text


def speak(text):
    engine = pyttsx3.init()
    engine.say(text)
    engine.runAndWait()

In [23]:
text = record_and_transcribe()
text

 Recording... Speak now!
 Recording finished


'今日もご視聴ありがとうございました'

In [33]:
speak(text)

In [48]:
user = input("")
if user == "y":
    text = record_and_transcribe()
else:
    text = input("")

In [49]:
text

'who is thomas?'

In [50]:
if __name__ == "__main__":
    context = get_context_from_faiss(text, top_k=3, faiss_index_path="faiss_storage")

    answer = generate_answer_with_memory(text,context)
    
    speak(answer)
    print(answer)

Generating answer...
KG Triples: [('Caesar', 'a group of conspirators led by Brutus', 'was killed by'), ('Caesar', 'noble Roman', 'was a'), ('Caesar', 'group of conspirators', 'was killed by'), ('group of conspirators', 'Brutus', 'led by'), ('Act 2', "Caesar's ambition", 'Brutus ponders')]
Chat Memory Messages: [HumanMessage(content='who killed caesar?', additional_kwargs={}, response_metadata={}), AIMessage(content='Caesar was killed by a group of conspirators led by Brutus. (page: unknown)', additional_kwargs={}, response_metadata={}), HumanMessage(content='کیا ہوتا ہے ایکٹ ٹو؟', additional_kwargs={}, response_metadata={}), AIMessage(content='Sorry, I can only answer questions related to Julius Caesar.', additional_kwargs={}, response_metadata={}), HumanMessage(content='What happened in Act 2?', additional_kwargs={}, response_metadata={}), AIMessage(content='In Act 2 of "The Tragedy of Julius Caesar," Brutus contemplates Caesar\'s death, considering it necessary for the greater good.