In [None]:
import google.generativeai as genai
import instructor
import openai
from burr.core import Application, ApplicationBuilder, State, action, expr, when
from burr.tracking import LocalTrackingClient
from pydantic import BaseModel, Field

from dreamai.ai import ModelName, assistant_message, system_message, user_message

ask_oai = instructor.from_openai(openai.OpenAI())
ask_gemini = instructor.from_gemini(
    client=genai.GenerativeModel(model_name=ModelName.GEMINI_FLASH),
    mode=instructor.Mode.GEMINI_JSON,
)

In [None]:
NUM_CLARIFICATION_QUESTIONS = 3

In [None]:
class ClarificationQuestions(BaseModel):
    questions: list[str] = Field(
        description="Clarification Questions", default_factory=list
    )

In [None]:
@action(reads=[], writes=["incoming_email", "response_instructions"])
def process_input(
    state: State, email_to_respond: str, response_instructions: str
) -> State:
    return state.update(
        incoming_email=email_to_respond, response_instructions=response_instructions
    )


@action(
    reads=["response_instructions", "incoming_email"],
    writes=["clarification_questions"],
)
def ask_for_clarifications(state: State) -> State:
    incoming_email = state["incoming_email"]
    response_instructions = state["response_instructions"]
    system = "You are a chatbot that has the task of generating responses to an email on behalf of a user."
    task = f"""\
The email you are to respond to is: {incoming_email}.
Your instructions are: {response_instructions}.
Your first task is to ask any clarifying questions for the user who is asking you to respond to this email.
These clarifying questions are for the user, *not* for the original sender of the email.
Please generate a list of at MOST {NUM_CLARIFICATION_QUESTIONS} questions (and you really can do less, or even none are OK!)
But only if you feel that you could leverage that clarification (my time is valuable).
If you do not need clarification, return an empty list.
"""
    try:
        questions = ask_gemini.create(
            response_model=ClarificationQuestions,
            messages=[system_message(system), user_message(task.strip())],  # type: ignore
        )
        if not isinstance(questions, ClarificationQuestions):
            questions = ClarificationQuestions()
    except Exception as e:
        print(e)
        questions = ClarificationQuestions()
    return state.update(
        clarification_questions=questions.questions[:NUM_CLARIFICATION_QUESTIONS]
    )


@action(reads=[], writes=["clarification_answers"])
def answer_clarifications(state: State, clarification_answers: list[str]) -> State:
    """Clarifies the response instructions if needed."""
    return state.update(clarification_answers=clarification_answers)


@action(
    reads=[
        "incoming_email",
        "response_instructions",
        "clarification_answers",
        "clarification_questions",
        "draft_history",
        "feedback",
    ],
    writes=["current_draft", "draft_history"],
)
def formulate_draft(state: State) -> tuple[dict, State]:
    """Formulates the draft response based on the incoming email, response instructions, and any clarifications."""
    incoming_email = state["incoming_email"]
    response_instructions = state["response_instructions"]
    clarification_answers_formatted_q_a = "\n".join(
        [
            f"Q: {q}\nA: {a}"
            for q, a in zip(
                state["clarification_questions"], state.get("clarification_answers", [])
            )
        ]
    )
    system = "You are a chatbot that has the task of generating responses to an email."
    task = f"""\
The email you are to respond to is: {incoming_email}
Your instructions are: {response_instructions}
You have already asked the following questions and received the following answers:
{clarification_answers_formatted_q_a}
"""
    if state["draft_history"]:
        task += "\nYour previous draft was: " + state["draft_history"][-1]
        task += "\nFeedback received: " + state["feedback"]
        task += "\nPlease incorporate this feedback into your response."
    task += "\nPlease generate a draft response using all this information!"
    messages = [system_message(system), user_message(task.strip())]
    try:
        draft = ask_gemini.create(
            response_model=str,
            messages=messages,  # type: ignore
        )
    except Exception as e:
        print(e)
        draft = ""
    return {"prompt": task, "current_draft": draft}, state.update(
        current_draft=draft
    ).append(draft_history=draft)


@action(reads=[], writes=["feedback", "feedback_history"])
def process_feedback(state: State, feedback: str) -> tuple[dict, State]:
    """Processes feedback from user and updates state with the feedback."""
    result = {"feedback": feedback}
    return result, state.update(feedback=feedback).append(feedback_history=feedback)


@action(reads=["current_draft", "feedback"], writes=["final_draft"])
def final_result(state: State) -> tuple[dict, State]:
    """Returns the final result of the process."""
    result = {"final_draft": state["current_draft"]}
    return result, state.update(final_draft=result["final_draft"])


def application(
    app_id: str = "dai_email",
    username: str = "hamza_email",
    project: str = "email_assistant",
) -> Application:
    tracker = LocalTrackingClient(project=project)
    builder = (
        ApplicationBuilder()
        .with_actions(
            process_input,
            ask_for_clarifications,
            answer_clarifications,
            formulate_draft,
            process_feedback,
            final_result,
        )
        .with_transitions(
            ("process_input", "ask_for_clarifications"),
            (
                "ask_for_clarifications",
                "answer_clarifications",
                expr("len(clarification_questions) > 0"),  # type: ignore
            ),
            ("ask_for_clarifications", "formulate_draft"),
            ("answer_clarifications", "formulate_draft"),
            ("formulate_draft", "process_feedback"),
            ("process_feedback", "formulate_draft", expr("len(feedback) > 0")),  # type: ignore
            ("process_feedback", "final_result"),
        )
        .with_tracker("local", project=project)
        .with_identifiers(app_id=app_id, partition_key=username)
        .initialize_from(
            tracker,
            resume_at_next_action=True,
            default_state={"draft_history": [], "feedback_history": []},
            default_entrypoint="process_input",
        )
    )
    return builder.build()

In [None]:
EMAIL = """\
Hi Hamza,

I hope you are well.

I have made a cool tool called DreamAI for AI practitioners. I am reaching out to you to see if you would be interested in trying it out.

Please inform me of your availability for a quick chat. I look forward to your kind response.

Regards,
Rafay"""

INSTRUCTIONS = """\
I get a bunch of these emails and usually ignore them.
I don't want to be rude, but I also don't want to waste my time. But also, I don't want to miss out on something cool.
Can you help me craft a response that is polite and respectful, but also sets the right expectations?
And you never know, maybe I will actually try out the tool if it is cool enough. I need some more details to make that decision.
"""

In [None]:
app = application(app_id=None)
app.visualize(
    output_file_path="statemachine",
    include_conditions=True,
    include_state=True,
    format="png",
)

In [None]:
def request_answers(questions: list[str]) -> list[str]:
    """Requests answers from the user for the questions the LLM has"""
    answers = []
    print("The email assistant wants more information:\n")
    for question in questions:
        answers.append(input(question))
    return answers


def request_feedback(draft: str) -> str:
    """Requests feedback from the user for a draft"""
    print(
        f"here's a draft!: \n {draft} \n \n What feedback do you have?",
        "If you have no feedback then we'll finish it up.",
    )
    return input("Write feedback or leave blank to continue (if you're happy)")


inputs = {"email_to_respond": EMAIL, "response_instructions": INSTRUCTIONS}
while True:
    action, result, state = app.run(
        halt_before=["answer_clarifications", "process_feedback"],
        halt_after=["final_result"],
        inputs=inputs,
    )
    if action.name == "answer_clarifications":
        questions = state["clarification_questions"]
        answers = request_answers(questions)
        inputs = {"clarification_answers": answers}
    if action.name == "process_feedback":
        feedback = request_feedback(state["current_draft"])
        inputs = {"feedback": feedback}
    if action.name == "final_result":
        print("final result is:", state["current_draft"])
        break

In [None]:
print(result["final_draft"])