# Use your voice

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import os, json
from llama_parse import LlamaParse
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import (
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage
)
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
    InputRequiredEvent,
    HumanResponseEvent
)
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.readers.whisper import WhisperReader
import gradio as gr
import asyncio
from queue import Queue

import nest_asyncio
nest_asyncio.apply()

from helper import get_openai_api_key, get_llama_cloud_api_key

llama_cloud_api_key = get_llama_cloud_api_key()
openai_api_key = get_openai_api_key()

In [None]:
class ParseFormEvent(Event):
    application_form: str

class QueryEvent(Event):
    query: str

class ResponseEvent(Event):
    response: str

class FeedbackEvent(Event):
    feedback: str

class GenerateQuestionsEvent(Event):
    pass

class RAGWorkflow(Workflow):
    storage_dir = "./storage"
    llm: OpenAI
    query_engine: VectorStoreIndex

    @step
    async def set_up(self, ctx: Context, ev: StartEvent) -> ParseFormEvent:

        if not ev.resume_file:
            raise ValueError("No resume file provided")

        if not ev.application_form:
            raise ValueError("No application form provided")

        
        self.llm = OpenAI(model="gpt-4o-mini")

        
        if os.path.exists(self.storage_dir):
            
            storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir)
            index = load_index_from_storage(storage_context)
        else:
            
            documents = LlamaParse(
                api_key=llama_cloud_api_key,
                base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
                result_type="markdown",
                content_guideline_instruction="This is a resume, gather related facts together and format it as bullet points with headers"
            ).load_data(ev.resume_file)
            
            index = VectorStoreIndex.from_documents(
                documents,
                embed_model=OpenAIEmbedding(model_name="text-embedding-3-small")
            )
            index.storage_context.persist(persist_dir=self.storage_dir)

        
        self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)

        
        return ParseFormEvent(application_form=ev.application_form)

    
    @step
    async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionsEvent:
        parser = LlamaParse(
            api_key=llama_cloud_api_key,
            base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
            result_type="markdown",
            content_guideline_instruction="This is a job application form. Create a list of all the fields that need to be filled in.",
            formatting_instruction="Return a bulleted list of the fields ONLY."
        )

        
        result = parser.load_data(ev.application_form)[0]
        raw_json = self.llm.complete(
            f"This is a parsed form. Convert it into a JSON object containing only the list of fields to be filled in, in the form {{ fields: [...] }}. <form>{result.text}</form>. Return JSON ONLY, no markdown.")
        fields = json.loads(raw_json.text)["fields"]

        await ctx.set("fields_to_fill", fields)

        return GenerateQuestionsEvent()

    
    @step
    async def generate_questions(self, ctx: Context, ev: GenerateQuestionsEvent | FeedbackEvent) -> QueryEvent:

        
        fields = await ctx.get("fields_to_fill")

        
        for field in fields:
            question = f"How would you answer this question about the candidate? <field>{field}</field>"

            if hasattr(ev,"feedback"):
                question += f"""
                    \nWe previously got feedback about how we answered the questions.
                    It might not be relevant to this particular field, but here it is:
                    <feedback>{ev.feedback}</feedback>
                """

            ctx.send_event(QueryEvent(
                field=field,
                query=question
            ))

        
        await ctx.set("total_fields", len(fields))
        return

    @step
    async def ask_question(self, ctx: Context, ev: QueryEvent) -> ResponseEvent:
        print(f"Asking question: {ev.query}")

        response = self.query_engine.query(f"This is a question about the specific resume we have in our database: {ev.query}")

        print(f"Answer was: {str(response)}")

        return ResponseEvent(field=ev.field, response=response.response)

    
    @step
    async def fill_in_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent:
        
        total_fields = await ctx.get("total_fields")

        responses = ctx.collect_events(ev, [ResponseEvent] * total_fields)
        if responses is None:
            return None 

        
        responseList = "\n".join("Field: " + r.field + "\n" + "Response: " + r.response for r in responses)

        result = self.llm.complete(f"""
            You are given a list of fields in an application form and responses to
            questions about those fields from a resume. Combine the two into a list of
            fields and succinct, factual answers to fill in those fields.

            <responses>
            {responseList}
            </responses>
        """)

        
        await ctx.set("filled_form", str(result))

        
        return InputRequiredEvent(
            prefix="How does this look? Give me any feedback you have on any of the answers.",
            result=result
        )

    
    @step
    async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent:

        result = self.llm.complete(f"""
            You have received some human feedback on the form-filling task you've done.
            Does everything look good, or is there more work to be done?
            <feedback>
            {ev.response}
            </feedback>
            If everything is fine, respond with just the word 'OKAY'.
            If there's any other feedback, respond with just the word 'FEEDBACK'.
        """)

        verdict = result.text.strip()

        print(f"LLM says the verdict was {verdict}")
        if (verdict == "OKAY"):
            return StopEvent(result=await ctx.get("filled_form"))
        else:
            return FeedbackEvent(feedback=ev.response)


In [None]:
WORKFLOW_FILE = "workflows/lesson_6.html"
draw_all_possible_flows(RAGWorkflow, filename=WORKFLOW_FILE)

In [None]:
from IPython.display import display, HTML, DisplayHandle
from helper import extract_html_content

html_content = extract_html_content(WORKFLOW_FILE)
display(HTML(html_content), metadata=dict(isolated=True))

## Getting voice feedback

In [None]:
def transcribe_speech(filepath):
    if filepath is None:
        gr.Warning("No audio found, please retry.")
    audio_file= open(filepath, "rb")
    reader = WhisperReader(
        model="whisper-1",
        api_key=openai_api_key,
    )
    documents = reader.load_data(filepath)
    return documents[0].text

In [None]:
def store_transcription(output):
    global transcription_value
    transcription_value = output
    return output

In [None]:
mic_transcribe = gr.Interface(
    fn=lambda x: store_transcription(transcribe_speech(x)),
    inputs=gr.Audio(sources="microphone",
                    type="filepath"),
    outputs=gr.Textbox(label="Transcription"))

In [None]:
test_interface = gr.Blocks()
with test_interface:
    gr.TabbedInterface(
        [mic_transcribe],
        ["Transcribe Microphone"]
    )

test_interface.launch(
    share=False, 
    server_port=8000, 
    prevent_thread_lock=True
)

In [None]:
print(transcription_value)

In [None]:
test_interface.close()

In [None]:
class TranscriptionHandler:

    def __init__(self):
        self.transcription_queue = Queue()
        self.interface = None

    def store_transcription(self, output):
        self.transcription_queue.put(output)
        return output

    def create_interface(self):
        mic_transcribe = gr.Interface(
            fn=lambda x: self.store_transcription(transcribe_speech(x)),
            inputs=gr.Audio(sources="microphone", type="filepath"),
            outputs=gr.Textbox(label="Transcription")
        )
        self.interface = gr.Blocks()
        with self.interface:
            gr.TabbedInterface(
                [mic_transcribe],
                ["Transcribe Microphone"]
            )
        return self.interface

    
    async def get_transcription(self):
        self.interface = self.create_interface()
        self.interface.launch(
            share=False,
            server_port=8000, 
            prevent_thread_lock=True
        )

        
        while True:
            if not self.transcription_queue.empty():
                result = self.transcription_queue.get()
                if self.interface is not None:
                    self.interface.close()
                return result
            await asyncio.sleep(1.5)


In [None]:
w = RAGWorkflow(timeout=600, verbose=False)

handler = w.run(
    resume_file="./data/fake_resume.pdf",
    application_form="./data/fake_application_form.pdf"
)

async for event in handler.stream_events():
  if isinstance(event, InputRequiredEvent):
     
      transcription_handler = TranscriptionHandler()
      response = await transcription_handler.get_transcription()

      handler.ctx.send_event(
          HumanResponseEvent(
              response=response
          )
      )

response = await handler
print("Agent complete! Here's your final result:")
print(str(response))