# Imports

In [None]:
import numpy as np
import pandas as pd
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# Reading Data

In [None]:
df = pd.read_parquet("hf://datasets/openai/openai_humaneval/openai_humaneval/test-00000-of-00001.parquet")
df = df.iloc[:,:3]

# PART 1 : RAG SYSTEM

# Setting up VectorDB using Chroma

In [None]:
# Setting up embeddingsee
embeddings = HuggingFaceEmbeddings(model_name = "all-mpnet-base-v2")

# Setting up data for compatibility with Chroma.from_texts
prompts = [prompt for prompt in df["prompt"]]
metadata = [{"solution": sol} for sol in df["canonical_solution"]]

#Creating Vectorstore
vectorstore = Chroma.from_texts(
    texts = prompts,
    embedding = embeddings,
    metadatas = metadata,
    ids = df["task_id"],
    persist_directory="./chroma_store",
)

# Retrieval Engine

In [None]:
from typing import List, Tuple
from langchain_core.documents import Document

def retrieveDocsFromQuery(query : str, k : int = 4, printResults : bool = False) -> Tuple[List, List]:
  """Takes in a query and returns K most relevant tasks."""

  retriever = vectorstore.as_retriever(
    search_type = "similarity",
    search_kwargs = {"k" : k}
  )
  tasks  = []
  solutions = []
  for i, doc in enumerate(retriever.invoke(query)):
    tasks.append(doc.page_content)
    solutions.append(doc.metadata["solution"])

    if printResults:
      print(f"\n==========\nTASK {i+1} IN DATABASE\n==========\n{tasks[i]}", '\n')
      print(f"\n==========\nSOLUTION {i+1} IN DATABASE\n==========\n{solutions[i]}")

  return tasks, solutions


# PART 2 : LANGGRAPH AGENT

# Model Initialization

In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoProcessor

processor = AutoProcessor.from_pretrained("microsoft/Phi-3.5-mini-instruct")
model     = AutoModelForCausalLM.from_pretrained("microsoft/Phi-3.5-mini-instruct",
                                            torch_dtype=torch.float16,
                                            device_map="auto")

# LangGraph Python Assistant

In [None]:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Union, List, Dict
from langchain_core.messages import HumanMessage, AIMessage, FunctionMessage, SystemMessage
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.messages.utils import convert_to_openai_messages
import json

In [None]:
def generateCodeFromMsg(msg, max_new_tokens = 500):
  msg_converted = convert_to_openai_messages(msg)

  inputs = processor.apply_chat_template(
        msg_converted,
        add_generation_prompt=True,   # append the “now you speak” token(s)
        tokenize=True,
        return_dict=True,
        return_tensors="pt",
    ).to(model.device)

  outputs = model.generate(**inputs, max_new_tokens=max_new_tokens)

  start = inputs["input_ids"].shape[-1]
  response  = processor.decode(outputs[0][start:], skip_special_tokens=True)

  return response

In [None]:
class AgentState(TypedDict):
  messages : List[Union[HumanMessage, AIMessage, SystemMessage]]
  chat_state : str

def chat_node(state : AgentState) -> AgentState:

  response = generateCodeFromMsg(state["messages"])

  state["messages"].append(AIMessage(content = response))
  print(f"\nAI: '{response}'")

  return state

def explain_node(state : AgentState) -> AgentState:

  state["messages"] += [SystemMessage("""You are a helpful assistant that explains code in simple terms.
                                        However, if no code is provided, you may respond normally""")]

  response = generateCodeFromMsg(state["messages"])

  state["messages"].append(AIMessage(content = response))
  print(f"\nAI: '{response}'")

  return state

def generate_node(state : AgentState) -> AgentState:

  state["messages"] += [SystemMessage("You are a coding assistant. Generate code that fulfills the user's request.")]

  response = generateCodeFromMsg(state["messages"])

  state["messages"].append(AIMessage(content = response))
  print(f"\nAI: {response}")
  return state


def router(state : AgentState):
  if state["chat_state"] == "chat_normally":
    return "chat_edge"
  elif state["chat_state"] == "explain_code":
    return "explain_edge"
  elif state["chat_state"] == "generate_code":
    return "generate_edge"


graph = StateGraph(AgentState)
graph.add_node("chat_node", chat_node)
graph.add_node("explain_node", explain_node)
graph.add_node("generate_node", generate_node)
graph.add_node("router", lambda state : state)

graph.add_conditional_edges(
    source = "router",
    path = router,
    path_map = {
        "chat_edge" : "chat_node",
        "explain_edge" : "explain_node",
        "generate_edge" : "generate_node"
    }
)


graph.add_edge(START, "router")
graph.add_edge("chat_node", END)
graph.add_edge("explain_node", END)
graph.add_edge("generate_node", END)

agent = graph.compile()


In [None]:
conversation_history = []
operation = "chat"
print("WELCOME TO PHI 3 CODING ASSISTANT: Type 'new' at any time to start a new Chat. Type 'exit' to exit program.")

while True:
  user_input = input("Enter: ")
  if user_input == "exit": break

  print(f"\n\nUser: {user_input}")
  if(user_input.lower() == "new"):
    conversation_history = []
    docs = ''
    continue

  # SMART ROUTING IMPLEMENATION
  intent_classifier_prompt = f"""
    You are an intent‑classifier. Read the user’s message and respond *only* with JSON in this exact schema:
    {{
      "task": <one of: "explain_code", "generate_code", "chat_normally">,
      "user_input": <the original user text, verbatim>
    }}

    Examples:
    User: "Can you walk me through what this function does line by line?"
    ➞ {{"task":"explain_code","user_input":"Can you walk me through what this function does line by line?"}}

    User: "Write me a Python script that parses a CSV and prints the average of column A."
    ➞ {{"task":"generate_code","user_input":"Write me a Python script that parses a CSV and prints the average of column A."}}

    User: "Hey, how’s your day going?"
    ➞ {{"task":"chat_normally","user_input":"Hey, how’s your day going?"}}

    Now classify:
    User: "{user_input}"
    """
  input_to_intent_classifier = processor.apply_chat_template(
      convert_to_openai_messages([SystemMessage(intent_classifier_prompt)]),
      add_generation_prompt=True,   # append the “now you speak” token(s)
      tokenize=True,
      return_dict=True,
      return_tensors="pt",
  ).to(model.device)

  output = model.generate(**input_to_intent_classifier, max_new_tokens = 200)
  start = input_to_intent_classifier["input_ids"].shape[-1]
  response  = processor.decode(output[0][start:], skip_special_tokens=True)
  print(f"\nRESPONSE OF INTENT CLASIFIER IS {response}\n")

  parsed_response = json.loads(response)

  operation = parsed_response["task"]

  if operation == "explain_code" or operation == "generate_code":

    tasks,solutions = retrieveDocsFromQuery(user_input, k = 3, printResults = False)

    context = "\n\n".join(
        f"Task:\n{task}\n\nSolution:\n{solution}"
        for task,solution in zip(tasks,solutions)
        )
    system_msg = (
        "Use the following examples for context (do not just repeat them word‑for‑word):\n\n"
        f"{context}"
        )
    conversation_history.append(SystemMessage(system_msg))
    conversation_history.append(HumanMessage(user_input))
  elif operation == "chat_normally":
    conversation_history.append(SystemMessage("You are a helpful assistant"))
    conversation_history.append(HumanMessage(user_input))
  else:
    print("\nINCORRECT OPERATION\n")
    continue

  print(f"Conversation History: {conversation_history}")
  result = agent.invoke({"messages" : conversation_history,
                         "chat_state" : operation})

  conversation_history = result["messages"]
  operation = result["chat_state"]


WELCOME TO PHI 3 CODING ASSISTANT: Type 'new' at any time to start a new Chat. Type 'exit' to exit program.
Enter: exit


# 2. DEPLOYMENT ON GRADIO

In [None]:
!pip install --quiet gradio

import gradio as gr
import json

In [None]:
def gradio_step(user_input, chat_history, conv_history, operation):
    """
    - user_input: str, the new user message
    - chat_history: list of (user, bot) tuples for display
    - conv_history: List[HumanMessage|SystemMessage|AIMessage], your raw transcript
    - operation: str, one of "chat_normally", "explain_code", "generate_code"
    """
    # 1) Handle “new” to reset
    if user_input.lower().strip() == "new":
        return [], [], [], ""

    # 2) Intent classification
    intent_prompt = f"""
    You are an intent‑classifier. Read the user’s message and respond *only* with JSON in this exact schema:
    {{
      "task": <one of: "explain_code", "generate_code", "chat_normally">,
      "user_input": <the original user text, verbatim>
    }}

    Examples:
    User: "Can you walk me through what this function does line by line?"
    ➞ {{"task":"explain_code","user_input":"Can you walk me through what this function does line by line?"}}

    User: "Write me a Python script that parses a CSV and prints the average of column A."
    ➞ {{"task":"generate_code","user_input":"Write me a Python script that parses a CSV and prints the average of column A."}}

    User: "Hey, how’s your day going?"
    ➞ {{"task":"chat_normally","user_input":"Hey, how’s your day going?"}}

    Now classify:
    User: "{user_input}"
    """
    cls_inputs = processor.apply_chat_template(
        convert_to_openai_messages([SystemMessage(intent_prompt)]),
        add_generation_prompt=True,
        tokenize=True,
        return_dict=True,
        return_tensors="pt",
    ).to(model.device)
    cls_out = model.generate(**cls_inputs, max_new_tokens=200)
    start = cls_inputs["input_ids"].shape[-1]
    cls_resp = processor.decode(cls_out[0][start:], skip_special_tokens=True)

    #Parsing JSON safely
    try:
        parsed = json.loads(cls_resp)
        operation = parsed.get("task", "chat_normally")
    except json.JSONDecodeError:
        print(f"[Warning] Failed to parse intent JSON: {repr(cls_resp)}")
        operation = "chat_normally"

    # 3) Build up conv_history for RAG
    if operation in ("explain_code","generate_code"):
        tasks, sols = retrieveDocsFromQuery(user_input, k=3, printResults=False)

        ctx = "\n\n".join(f"Task:\n{t}\n\nSolution:\n{s}" for t,s in zip(tasks,sols))
        conv_history += [
            SystemMessage(f"Use the following examples for context...\n\n{ctx}"),
            HumanMessage(user_input)
        ]
    else:
        conv_history += [
            SystemMessage("You are a helpful assistant"),
            HumanMessage(user_input)
        ]

    # 4) Invoke your LangGraph agent
    result = agent.invoke({
        "messages": conv_history,
        "chat_state": operation
    })
    conv_history = result["messages"]
    operation    = result["chat_state"]

    # 5) Extract and append bot reply to chat_history
    bot_msg     = conv_history[-1].content
    chat_history = chat_history or []
    chat_history.append((user_input, bot_msg))

    return chat_history, conv_history, operation, ""


In [None]:
with gr.Blocks() as demo:
    gr.Markdown("## PHI3 Coding Assistant")
    chatbot       = gr.Chatbot()
    user_in       = gr.Textbox(placeholder="Type here and press Enter…")
    conv_history  = gr.State([])               # your raw message list
    operation     = gr.State("chat_normally")  # initial routing state

    user_in.submit(
        fn=gradio_step,
        inputs=[user_in, chatbot, conv_history, operation],
        outputs=[chatbot, conv_history, operation, user_in],
    )

# demo.launch(share=True, debug=True)


  chatbot       = gr.Chatbot()


# RAG Evaluation on MBPP

In [None]:
mbpp_df = pd.read_json("/content/drive/MyDrive/Colab Notebooks/Cellula-Task5/sanitized-mbpp.json")
mbpp_df = mbpp_df.sample(10, random_state=42).loc[:, ["prompt","task_id", "code", "test_list"]]
mbpp_df.set_index("task_id", inplace=True, drop=True)

In [None]:
pd.set_option('display.max_colwidth', None)

In [None]:
mbpp_df.head(2)

Unnamed: 0_level_0,prompt,code,test_list
task_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
802,Write a python function to count the number of rotations required to generate a sorted array. https://www.geeksforgeeks.org/count-of-rotations-required-to-generate-a-sorted-array/,"def count_rotation(arr): \n for i in range (1,len(arr)): \n if (arr[i] < arr[i - 1]): \n return i \n return 0","[assert count_rotation([3,2,1]) == 1, assert count_rotation([4,5,1,2,3]) == 2, assert count_rotation([7,8,9,1,2,3]) == 3, assert count_rotation([1,2,3]) == 0, assert count_rotation([1,3,2]) == 2]"
127,Write a function to multiply two integers.,"def multiply_int(x, y):\n if y < 0:\n return -multiply_int(x, -y)\n elif y == 0:\n return 0\n elif y == 1:\n return x\n else:\n return x + multiply_int(x, y - 1)","[assert multiply_int(10,20)==200, assert multiply_int(5,10)==50, assert multiply_int(4,8)==32]"


In [None]:
prompts_mbpp = [prompt for prompt in mbpp_df["prompt"]]
metadatas_mbpp = [{"solution" : code} for code in mbpp_df["code"]]


vectorstore2 = Chroma.from_texts(
    texts = prompts_mbpp,
    embedding = embeddings,
    metadatas = metadatas_mbpp,
    persist_directory = "./chroma_store2"
)

retriever2 = vectorstore2.as_retriever()

In [None]:
mbpp_query = prompts_mbpp[0]

print(retriever2.invoke(mbpp_query)[0].metadata["solution"])


def count_rotation(arr):   
    for i in range (1,len(arr)): 
        if (arr[i] < arr[i - 1]): 
            return i  
    return 0


In [None]:
def evaluateRAG(query : str):

  docs = retriever2.invoke(query)
  tasks = []
  solutions = []

  for doc in docs:
    tasks.append(doc.page_content)
    solutions.append(doc.metadata["solution"])

  context = "\n\n".join(
        f"Task:\n{task}\n\nSolution:\n{solution}"
        for task,solution in zip(tasks,solutions)
        )

  context_msg = (
        "Use the following examples for context:\n\n"
        f"{context}"
        )

  instruction_msg = (
      f"""You are an expert python programmer. Here is your task: \n{query}\n Respond *only* with the python function.No more. Do NOT add any additional text after the function's end:
      """
  )

  msg = []
  msg.append(SystemMessage(context_msg))
  msg.append(HumanMessage(instruction_msg))

  input = processor.apply_chat_template(
      convert_to_openai_messages(msg),
      add_generation_prompt=True,
      tokenize=True,
      return_dict=True,
      return_tensors="pt",
  ).to(model.device)

  output = model.generate(**input, max_new_tokens=500)
  start = input["input_ids"].shape[-1]
  response = processor.decode(output[0][start:], skip_special_tokens=True)

  return response



In [None]:
# Just testing the code output
print(f"\nThe Task is \n{mbpp_df['prompt'].iloc[0]}\n. The given solution in docs is \n{mbpp_df['code'].iloc[0]}\n.")


print(f"The output generated is: \n{evaluateRAG(mbpp_df['prompt'].iloc[0])}")


The Task is 
Write a python function to count the number of rotations required to generate a sorted array. https://www.geeksforgeeks.org/count-of-rotations-required-to-generate-a-sorted-array/
. The given solution in docs is 
def count_rotation(arr):   
    for i in range (1,len(arr)): 
        if (arr[i] < arr[i - 1]): 
            return i  
    return 0
.
You are an expert python programmer. Here is your task: 
Write a python function to count the number of rotations required to generate a sorted array. https://www.geeksforgeeks.org/count-of-rotations-required-to-generate-a-sorted-array/
 Respond *only* with the python function.No more. Do NOT add any additional text after the function's end:
      
The output generated is: 
```python
def count_rotation(arr):   
    for i in range (1,len(arr)): 
        if (arr[i] < arr[i - 1]): 
            return i  
    return 0
```


## Generate model outputs

In [None]:
model_outputs = []
for prompt in mbpp_df["prompt"]:
  output = evaluateRAG(prompt)
  model_outputs.append(output)

model_outputs = pd.Series(model_outputs)

### Pass/Fail Tests

In [None]:
# Hardcoded script to test pass/fail cases
print(model_outputs.iloc[9])
for test in mbpp_df["test_list"].iloc[9]:
  print(test)

```python
def large_product(nums1, nums2, N):
    result = sorted([x*y for x in nums1 for y in nums2], reverse=True)[:N]
    return result
```
assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],3)==[60, 54, 50]
assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],4)==[60, 54, 50, 48]
assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],5)==[60, 54, 50, 48, 45]


In [None]:
exec("""
def large_product(nums1, nums2, N):
    result = sorted([x*y for x in nums1 for y in nums2], reverse=True)[:N]
    return result

assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],3)==[60, 54, 50]
assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],4)==[60, 54, 50, 48]
assert large_product([1, 2, 3, 4, 5, 6],[3, 6, 8, 9, 10, 6],5)==[60, 54, 50, 48, 45]

print("No Assertion Error. All tests Passed")
""")

No Assertion Error. All tests Passed
