In [None]:
from mytools import login_huggingface, best_dtype, best_device
from dotenv import load_dotenv
from datetime import datetime
from typing import List
import streamlit as sl
import uuid
import json
import os

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_huggingface import HuggingFacePipeline, HuggingFaceEndpoint, ChatHuggingFace
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage, ToolMessage

In [None]:
load_dotenv()

In [None]:
#model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

model_id= "ContactDoctor/Bio-Medical-Llama-3-8B"

In [None]:
login_huggingface() 

In [None]:
# tokenizer = AutoTokenizer.from_pretrained(model_id)

# model = AutoModelForCausalLM.from_pretrained(
#     model_id,
#     dtype = best_dtype(),
#     device_map={"":best_device()}, 
#     low_cpu_mem_usage=True     
# )
# print("Load tokenizer and base model done!")

# original_pipeline = pipeline(
#     "text-generation", 
#     model=model, 
#     tokenizer=tokenizer,
#     return_full_text=False,   
# )

In [None]:
hug_pipeline = HuggingFacePipeline(
    model_id=model_id,    
    pipeline_kwargs={
        "task": "text-generation",
        "max_new_tokens": 1024,
        "top_k": 50,
        "temperature": 0.2
    }
)

# Wrapper normal piple with huggingfacepipeline
#hug_pipeline = HuggingFacePipeline(pipeline=original_pipeline)

master_agent = ChatHuggingFace(llm=hug_pipeline) # It is the brain of the whole system

In [None]:
def create_asana_task(task_name: str, due_on: str = "today") -> str:
    """
    Create an assana task by given the name of the task and when it is due
    Example Call:

    create_asana_task("test task", "2024-06-24")
    Args:
        task_name (str): The name of the task in Asana
        due_on (str): The date the task is due in the format: yyyy-MM-dd. If not given, the current day is used
    Returns:
        str: The API response of adding the task to Asana or an error message if the API call threw an error

    """
    print(f"Task: {task_name} has been created!")
    return str(uuid.uuid4())


def add(a: int, b: int) -> int:
    """
    Add two numbers
    Example Call:
    add(3, 5)
    Args:
        a (int): the first number
        b (int): the second number
    Returns:
    int: The result of a add b

    """
    return a + b

In [None]:
available_tool = {
    "create_asana_task": create_asana_task,
    "add two numbers": add
}

In [None]:
tool_descriptions = [f"{name}:\n {func.__doc__}\n\n" for name, func in available_tool.items()]

In [None]:
class ToolCall(BaseModel):
    name: str = Field(description="Name of the function to run")
    args: str = Field(description="Arguments for the function call (empty if no arguments are needed for the tool call)")

class ToolCallOrResponse(BaseModel):
    tool_calls: List[ToolCall] = Field(description="List of tool calls, empty array if you don't need to invoke a tool")
    content: str = Field(description="Response to the user if a tool doesn't need to be invoked")

In [None]:
tool_text = f"""
You always response with a JSON object that has two required keys:
    tool_calls: List[ToolCall] = Field(description="List of tool calls, empty array if you don't need to invoke a tool")
    content: str = Field(description="Response to the user if a tool doesn't need to be invoked")

Here is the type for ToolCall (object with two keys):
    name: str = Field(description="Name of the function to run")
    args: str = Field(description="Arguments for the function call (empty if no arguments are needed for the tool call)")

Don't start your answer with "Here is the JSON response", just give the JSON object.

The tools you have access to are:
{"".join(tool_descriptions)}

Any message that starts with "Thought:" is you thinking to yourself. This isn't told to the user so you still need to communicate what you did with those tools.
Don't repeat an action. If a thought tells you that you already took an action for a user, don't do it again.
"""

In [None]:
def prompt_ai(message, nested_calls=0, invoked_tools=[]):
    if nested_calls > 3:
        raise Exception("Failsafe - AI is failing too much!")
    
    parser = JsonOutputParser(pydantic_object=ToolCallOrResponse)

    asana_chatbot_chain = master_agent | parser

    try: 
        ai_response = asana_chatbot_chain.invoke(message)
    except:
        return prompt_ai(message, nested_calls + 1)
    
    if len(ai_response["tool_calls"]) > 0:
        for tool_call in ai_response["tool_calls"]:
            if str(tool_call) not in invoked_tools:
                tool_name = tool_call["name"].lower()
                selected_tool = available_tool[tool_name]
                tool_output = selected_tool(**tool_call["args"])

                message.append(AIMessage(content=f"""Thought: I called {tool_name} with args {tool_call["args"]} and got back: {tool_output}."""))
                invoked_tools.append(str(tool_call))
            else:
                return ai_response
            
        return prompt_ai(message, nested_calls + 1, invoked_tools)
    
    return ai_response


In [None]:
sl.title("Asana Chatbot")

if "messages" not in sl.session_state:
    sl.session_state.messages = [
        SystemMessage(content=f"Help me create a task in Asana.\n {tool_text}")
    ]

for message in sl.session_state.messages:
    message_json = json.load(message.json())
    message_type = message_json["type"]
    message_content = message_json["content"]
    if message_type in ["human","ai","system"] and not message_content.startswith("Thought:"):
        with sl.chat_message(message_type):
            sl.markdown(message_content)

if prompt := sl.chat_input("what would you like to do today?"):
    sl.chat_message("user").markdown(prompt)

    sl.session_state.messages.append(HumanMessage(content=prompt))

    with sl.chat_message("assistant"):
        ai_response = prompt_ai(sl.session_state.messages)
        sl.markdown(ai_response["content"])

    sl.session_state.messages.append(AIMessage(content=ai_response["content"]))
