<a href="https://colab.research.google.com/github/KK-Singh333/IITI_BOT/blob/main/Pipeline_IITI_BOT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Radhey Radhey
!pip install pathway pathway[xpack-llm] litellm tavily-python

In [None]:
import pathway as pw
from pathway.xpacks.llm import llms, embedders, rerankers
import os
from pathway.udfs import ExponentialBackoffRetryStrategy
import numpy as np
import ast
from pathway.stdlib.ml.index import KNNIndex

from langchain_community.tools.tavily_search import TavilySearchResults
# from pathway.xpacks.llm import rerankers
# from pathway.xpacks.llm import embedders

# from getpass import getpass
os.environ['GROQ_API_KEY'] = os.getenv("GROQ_API_KEY") or "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"
os.environ["TAVILY_API_KEY"] = "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"

# Setting Model
model="groq/meta-llama/llama-4-scout-17b-16e-instruct"

system_prompt_retriever = """
You are an AI language model assistant.
Your task is to generate five different versions of the given user question to retrieve relevant documents from a vector database.
By generating multiple perspectives on the user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search.
Provide these alternative questions separated by newlines.
Output only the generated queries not including any other text.
"""

# Doc Store
path = 'result.csv'

class InputCSVDataSchema(pw.Schema):
    row_id: str
    chunk: str
    embedding: str
    # url: str

@pw.udf
def split_lines(text: str) -> list[str]:
    return text.splitlines()

@pw.reducers.stateful_many
def unique_docs(state: list | None, rows) -> list:
  # if state is None:
  state = [[],[]]
    # print("State is list", type(state))
  for row, cnt in rows:
    doc_ids, docs = row
    for i, doc_id in enumerate(doc_ids):
      if doc_id not in state[0]:
        state[0].append(doc_id)
        state[1].append(docs[i])
        print(type(state))
  return state

class Retriever():

  def __init__(self, model:str = model, system_prompt:str = system_prompt_retriever, path_csv:str = path):
    # Setting llm
    self.llm = llms.LiteLLMChat(model=model, retry_strategy=ExponentialBackoffRetryStrategy(max_retries=2))
    self.system_prompt = system_prompt
    self.embedder = embedders.SentenceTransformerEmbedder(model="all-MiniLM-L6-v2")
    self.reranker = rerankers.LLMReranker(llm=llms.LiteLLMChat(model=model, retry_strategy=ExponentialBackoffRetryStrategy(max_retries=2), response_format={'type': 'json_object'}))
    self.web_search_tool = TavilySearchResults(k=2)

    self.csv_data = pw.io.csv.read(
    path_csv,
    schema=InputCSVDataSchema,
    mode="static"
    )
    def parse_nested_embedding(embedding_str):
      try:
          # Parse the string to get nested list
          parsed = ast.literal_eval(embedding_str)
          # Extract the first (inner) list
          # embedding_vector = parsed[0]
          embedding_vector = parsed
          # Convert to numpy array
          return np.array(embedding_vector, dtype=np.float32)
      except Exception as e:
          print(f"Error parsing embedding: {e}")
          return None

    self.vector_store = self.csv_data.select(
    doc_id=pw.this.row_id,
    chunks=pw.this.chunk,
    # url=pw.this.url,
    embedding=pw.apply(parse_nested_embedding, self.csv_data.embedding),
    # Include other columns you need
    ).filter(pw.this.embedding.is_not_none())

  def web_content(self, question:str):
    results = self.web_search_tool.invoke(question)
    content_list = [result['content'] for result in results]
    # full_content = "\n".join(content_list)
    # print(type(results)) # It is a list[dict[url, content]]
    return content_list

  @pw.table_transformer
  def __call__(self, queries:pw.Table):

    @pw.udf
    def query_parser(args) -> list[dict]:
      return [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": f"{args}"}]

    query_table = queries.select(user_id = pw.this.user_id, questions = query_parser(pw.this.queries))
    responses = query_table.select(user_id = pw.this.user_id, result = self.llm(pw.this.questions, temperature=0.0))

    split_table = responses.select(
    user_id=pw.this.user_id,
    questions = split_lines(pw.this.result)  # This gives a List[str]
    )
    response = split_table.flatten(pw.this.questions)

    response += response.select(embedding=self.embedder(pw.this.questions))
    # return response

    doc_index = KNNIndex(
    self.vector_store.embedding,
    self.vector_store,
    n_dimensions= self.embedder.get_embedding_dimension(),  # dimension for all-MiniLM-L6-v2
    distance_type = "cosine"
    # n_and_d=2
    )

    results = doc_index.get_nearest_items(
    response.embedding,
    k=3  # top 5 most similar documents
    ).select(user_id = response.user_id, doc_id = pw.this.doc_id, chunks = pw.this.chunks)

    results = results.groupby(pw.this.user_id).reduce(pw.this.user_id, chunks = unique_docs(pw.this.doc_id, pw.this.chunks)[1])
    results = results.join(queries, pw.this.user_id == queries.user_id).select(user_id = results.user_id, queries = queries.queries, chunks = results.chunks)
    results_flatten = results.flatten(pw.this.chunks)
    results_flatten += results_flatten.select(rank=self.reranker(pw.this.chunks, pw.this.queries))
    results = results_flatten.filter(pw.this.rank > 2)
    results = results.groupby(pw.this.user_id, pw.this.queries).reduce(pw.this.user_id, pw.this.queries, chunks = pw.reducers.tuple(pw.this.chunks), rank = pw.reducers.tuple(pw.this.rank))

    @pw.udf
    def check_web_search(query:str, docs:tuple, rank:tuple) -> tuple:
      if len(rank) < 3:
        print("Web Searching...")
        docs = docs + tuple(self.web_content(query))
      return docs

    results = results.select(user_id = pw.this.user_id, queries = pw.this.queries, chunks = check_web_search(pw.this.queries, pw.this.chunks, pw.this.rank))
    # return results
    @pw.udf
    def multiple_queries_parser(query:str, docs:tuple[str]) -> list[dict]:
      system_prompt = """
      You are a helpful and intelligent assistant. Use the provided context to help you better understand and answer the user's question.
      Prefer answers that are informed by the context.
      If there are relevant details in the documents, only include knowledge of documents in your answer.
      Always aim for clarity, helpfulness, and accuracy in your answer.
      Output just the answer to user query.
      """
      return [{"role": "system", "content": system_prompt},{'role':"assistant", "content": "\n".join(docs)} ,{"role": "user", "content": query}]
    questions = results.select(
        user_id=pw.this.user_id,
        queries = pw.this.queries,
        prompt=multiple_queries_parser(pw.this.queries, pw.this.chunks))
    answers = questions.select(user_id = pw.this.user_id,
                               queries = pw.this.queries,
                               answer = self.llm(pw.this.prompt, temperature=0.0))

    return answers

In [None]:
# Lets try once
query = pw.debug.table_from_rows(
    schema = pw.schema_from_types(user_id = str, queries=str),
    rows = [
        (
            "1",
            "who is the director of iit indore?"
        ),
        (
            "2",
            "who are you and who made you?"
        ),
        (
            "3",
            "what are the major events of iit indore?"
        ),
        (
            "4",
            "what is the nirf ranking of iit indore?"
        )
    ]
)

In [None]:
custom_retriever = Retriever()

pw.debug.compute_and_print(custom_retriever(queries=query))
# custom_retriever(queries=query).typehints()





<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
Web Searching...

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.


[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.


[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m

            | user_id | queries                                  | answer
^51STC36... | 1       | who is the director of iit indore

In [None]:
import pathway as pw
from pathway.xpacks.llm import llms
from pathway.udfs import ExponentialBackoffRetryStrategy
import os
os.environ['GROQ_API_KEY'] = os.getenv("GROQ_API_KEY") or "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"
llm = llms.LiteLLMChat(model="groq/meta-llama/llama-4-scout-17b-16e-instruct", retry_strategy=ExponentialBackoffRetryStrategy(max_retries=2), temperature=0.0, response_format={"type": "json_object"})

In [None]:
from pathway.xpacks.llm import rerankers
import pandas as pd



# docs = [
#     {"text": "John drinks coffee"},
#     {"text": "Someone drinks tea"},
#     {"text": "Nobody drinks coca-cola"},
# ]
docs = [
    "John drinks coffee",
    "Someone drinks tea",
    "Nobody drinks coca-cola",
]

query = "What does John drink?"

df = pd.DataFrame({"docs": docs, "prompt": query})

input = pw.debug.table_from_pandas(df)



In [None]:
print(df)
pw.debug.compute_and_print(input)
input.typehints()

In [None]:
# chat = llms.OpenAIChat(model="gpt-4o-mini", api_key=API_KEY, response_format="{'type': 'json_object'}")
reranker = rerankers.LLMReranker(llm=llm)

res = input.select(rank=reranker(pw.this.docs, pw.this.prompt))
pw.debug.compute_and_print(res)

In [None]:
import pathway as pw
import pandas as pd
query_id = [
    "1","1","1","2","2",
]

doc_id = [
    ("1", "2", "3"),
    ("2", "4", "5"),
    ("6", "3", "2"),
    ("1", "2", "3"),
    ("2", "4", "5"),
]
chunks = [
    ("This is chunk 1", "This is chunk 2", "This is chunk 3"),
    ("This is chunk 2", "This is chunk 4", "This is chunk 5"),
    ("This is chunk 6", "This is chunk 3", "This is chunk 2"),
    ("This is chunk 1", "This is chunk 2", "This is chunk 3"),
    ("This is chunk 2", "This is chunk 4", "This is chunk 5"),
]
df = pd.DataFrame({"query_id": query_id, "doc_id": doc_id, "chunks": chunks})

print(df)
input = pw.debug.table_from_pandas(df)
pw.debug.compute_and_print(input)
input.typehints()

  query_id     doc_id                                             chunks
0        1  (1, 2, 3)  (This is chunk 1, This is chunk 2, This is chu...
1        1  (2, 4, 5)  (This is chunk 2, This is chunk 4, This is chu...
2        1  (6, 3, 2)  (This is chunk 6, This is chunk 3, This is chu...
3        2  (1, 2, 3)  (This is chunk 1, This is chunk 2, This is chu...
4        2  (2, 4, 5)  (This is chunk 2, This is chunk 4, This is chu...
            | query_id | doc_id          | chunks
^X1MXHYY... | 1        | ('1', '2', '3') | ('This is chunk 1', 'This is chunk 2', 'This is chunk 3')
^YYY4HAB... | 1        | ('2', '4', '5') | ('This is chunk 2', 'This is chunk 4', 'This is chunk 5')
^Z3QWT29... | 1        | ('6', '3', '2') | ('This is chunk 6', 'This is chunk 3', 'This is chunk 2')
^3CZ78B4... | 2        | ('1', '2', '3') | ('This is chunk 1', 'This is chunk 2', 'This is chunk 3')
^3HN31E1... | 2        | ('2', '4', '5') | ('This is chunk 2', 'This is chunk 4', 'This is chunk 5')


mappingproxy({'query_id': str,
              'doc_id': tuple[str, str, str],
              'chunks': tuple[str, str, str]})

In [None]:
@pw.reducers.stateful_many
def unique_docs(state: list | None, rows) -> list:
  # if state is None:
  state = [[],[]]
    # print("State is list", type(state))
  for row, cnt in rows:
    doc_ids, docs = row
    for i, doc_id in enumerate(doc_ids):
      if doc_id not in state[0]:
        state[0].append(doc_id)
        state[1].append(docs[i])
        print(type(state))

  return state

In [None]:
# new = input.groupby(pw.this.query_id).reduce(pw.this.query_id, chunks = unique_docs(pw.this.doc_id, pw.this.chunks)[1])
new = input.groupby(pw.this.query_id).reduce(pw.this.query_id, doc_id = pw.reducers.tuple(pw.this.doc_id), chunks = pw.reducers.tuple(pw.this.chunks))
# new
pw.debug.compute_and_print(new)

            | query_id | doc_id                                              | chunks
^4VGSC8C... | 1        | (('2', '4', '5'), ('6', '3', '2'), ('1', '2', '3')) | (('This is chunk 2', 'This is chunk 4', 'This is chunk 5'), ('This is chunk 6', 'This is chunk 3', 'This is chunk 2'), ('This is chunk 1', 'This is chunk 2', 'This is chunk 3'))
^7MPQCWB... | 2        | (('1', '2', '3'), ('2', '4', '5'))                  | (('This is chunk 1', 'This is chunk 2', 'This is chunk 3'), ('This is chunk 2', 'This is chunk 4', 'This is chunk 5'))


In [None]:
pw.io.csv.write(
    table=custom_retriever(queries=query),
    filename="answers.csv"
)

pw.run()