In [7]:
import requests
from langchain.vectorstores import Chroma
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.base import Embeddings
from typing import List
import os

In [8]:
# --- 1. Read the .txt file ---
with open("transportation_test_text.txt", "r", encoding="utf-8") as file:
    raw_text = file.read()

In [9]:
type(raw_text)

str

In [10]:
# --- 2. Chunking ---
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100
)
chunks = text_splitter.create_documents([raw_text])

In [11]:
len(chunks)

376

In [12]:

# --- 3. Custom Embedding class using mxbai-embed-large ---
class MXBAIEmbeddings(Embeddings):
    def __init__(self, endpoint="http://localhost:11434/api/embed", model="mxbai-embed-large:latest"):
        self.endpoint = endpoint
        self.model = model

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        embeddings = []
        for text in texts:
            response = requests.post(
                self.endpoint,
                json={"model": self.model, "input": text}
            )
            response.raise_for_status()
            data = response.json()

            if "embeddings" not in data:
                raise ValueError(f"Missing 'embeddings' in response: {data}")

            # Append the first embedding vector from the list
            embeddings.append(data["embeddings"][0])
        return embeddings

    def embed_query(self, text: str) -> List[float]:
        return self.embed_documents([text])[0]



In [13]:
# --- 4. Initialize embedding model ---
embedding_model = MXBAIEmbeddings()

In [14]:
# --- 5. Store into Chroma DB ---
persist_directory = "chroma_store"

# Extract plain text content from each Document
documents_as_text = [doc.page_content for doc in chunks]
metadatas = [doc.metadata for doc in chunks]

vectordb = Chroma.from_texts(
    texts=documents_as_text,
    embedding=embedding_model,
    metadatas=metadatas,
    persist_directory=persist_directory
)
vectordb.persist()

KeyboardInterrupt: 

In [1]:
# --- Function to call DeepSeek model ---
def query_deepseek(prompt: str, endpoint="http://localhost:11434/api/generate", model="deepseek-r1:1.5b"):
    response = requests.post(
        endpoint,
        json={
            "model": model,
            "prompt": prompt,
            "stream": False
        }
    )
    response.raise_for_status()
    data = response.json()
    # Adjust depending on your API's response format
    return data.get("text") or data.get("result") or data.get("generation") or data


In [5]:
# --- User query interaction ---
def answer_query(user_query: str, k=3):
    # 1. Retrieve top k relevant chunks from vector DB
    results = vectordb.similarity_search(user_query, k=k)
    retrieved_texts = [doc.page_content for doc in results]

    print(f"\n🔍 Retrieved {len(retrieved_texts)} chunk(s) from the vector DB.")
    for i, text in enumerate(retrieved_texts, 1):
        print(f"\n📄 Chunk {i}:\n{text[:300]}{'...' if len(text) > 300 else ''}")

    # 2. Combine retrieved texts as context for DeepSeek
    context = "\n\n".join(retrieved_texts)

    # 3. Build prompt (you can customize this prompt as needed)
    prompt = (
        f"Use the following context to answer the question:\n\n{context}\n\n"
        f"Question: {user_query}\nAnswer:"
    )

    # 4. Get response from DeepSeek model
    answer = query_deepseek(prompt)

    return answer