# Report Generation Workflow

Adapted directly from [this tutorial](https://github.com/run-llama/llamacloud-demo/blob/main/examples/report_generation/rfp_response/generate_rfp.ipynb) from LlamaIndex.

In [4]:
import nest_asyncio

nest_asyncio.apply()

In [5]:
from llama_index.core.vector_stores import (
    MetadataFilter,
    MetadataFilters,
    FilterOperator,
)
from llama_index.core.tools import FunctionTool
from llama_index.core.schema import NodeWithScore
from pathlib import Path
from typing import Optional, List

In [6]:
data_out_dir = "data_out_dopeness"

In [7]:
DOC_RETRIEVE_PREFIX = """\
Synthesizes an answer to your question by feeding in in the entire relevant document as context. Best used for higher-level summarization options.
Do NOT use if answer can be found in a specific chunk of a given document. Use the chunk_query_engine instead for that purpose.

Document: {file_name}
"""

In [8]:
CHUNK_RETRIEVE_PREFIX = """\
Synthesizes an answer to your question by feeding in relevant chunks of a document as context. Best used for questions that are more pointed in nature.
Do NOT use if the question asks seems to require a general summary of any given document. Use the doc_query_engine instead for that purpose.

Document: {file_name}
"""

In [9]:
from llama_index.core import SimpleDirectoryReader, Document, VectorStoreIndex, StorageContext, Settings
from llama_index.core.indices.vector_store.base import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.node_parser import SimpleNodeParser
from qdrant_client import QdrantClient
from tqdm.asyncio import tqdm_asyncio
import requests
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
from typing import List
import time
from urllib.parse import urlparse
import asyncio
import aiohttp
from aiohttp import ClientTimeout
from typing import List, Dict
import time

def fetch_sitemap_urls(sitemap_url: str) -> List[str]:
    """
    Fetch all URLs from a sitemap XML file.
    
    Args:
        sitemap_url (str): URL of the sitemap
        
    Returns:
        List[str]: List of URLs found in the sitemap
    """
    try:
        response = requests.get(sitemap_url)
        response.raise_for_status()
        
        # Parse the XML content
        root = ET.fromstring(response.content)
        
        # Extract URLs (handles both regular sitemaps and sitemap indexes)
        urls = []
        
        # Look for both standard sitemap URLs and sitemap index URLs
        namespaces = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
        
        # Get URLs from standard sitemap
        for url in root.findall('.//ns:loc', namespaces):
            urls.append(url.text)
            
        return urls
        
    except Exception as e:
        print(f"Error fetching sitemap: {e}")
        return []

In [10]:
async def scrape_page_async(url: str, session: aiohttp.ClientSession) -> Dict[str, str]:
    """Async version of page scraping."""
    try:
        async with session.get(url) as response:
            content = await response.text()
            soup = BeautifulSoup(content, 'html.parser')
            for script in soup(['script', 'style']):
                script.decompose()
            text = soup.get_text()
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            return {"url": url, "content": ' '.join(chunk for chunk in chunks if chunk)}
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        return {"url": url, "content": ""}

In [11]:
async def process_urls(urls: List[str], domain: str, rate_limit: float) -> List[Document]:
    """Process URLs concurrently with rate limiting and progress bar."""
    timeout = ClientTimeout(total=30)
    documents = []
    semaphore = asyncio.Semaphore(50)
    
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async def process_with_rate_limit(url):
            async with semaphore:
                await asyncio.sleep(rate_limit)
                result = await scrape_page_async(url, session)
                if result["content"]:
                    return Document(
                        text=result["content"],
                        metadata={"source": url, "domain": domain}
                    )
                return None
        
        tasks = [process_with_rate_limit(url) for url in urls]
        results = await tqdm_asyncio.gather(*tasks, desc="Scraping pages")
        documents = [doc for doc in results if doc is not None]
    
    return documents

In [12]:
def create_index_from_sitemap(sitemap_url: str, collection_name: str = "current_docs", rate_limit: float = 0.10) -> VectorStoreIndex:
    """Create a LlamaIndex index from a sitemap using QDrant backend."""
    # Initialize QDrant client
    client = QdrantClient(host="localhost", port=6333)
    vector_store = QdrantVectorStore(client=client, collection_name=collection_name)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    
    urls = fetch_sitemap_urls(sitemap_url)
    if not urls:
        raise ValueError("No URLs found in sitemap")
    
    documents = []
    domain = urlparse(sitemap_url).netloc
    
    documents = asyncio.run(process_urls(urls, domain, rate_limit))
    
    parser = SimpleNodeParser.from_defaults()
    nodes = parser.get_nodes_from_documents(documents)
    
    # Create index with QDrant vector store
    return VectorStoreIndex(nodes, storage_context=storage_context)

In [None]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

embed_model = HuggingFaceEmbedding(model_name="Snowflake/snowflake-arctic-embed-s")
Settings.embed_model = embed_model

sitemap_url = "https://docs.llamaindex.ai/en/stable/sitemap.xml"
index = create_index_from_sitemap(sitemap_url)

In [None]:
def generate_tool(
    file: str, 
    file_description: Optional[str] = None,
    retrieve_document: bool = False
):
    """Return a function that retrieves information from the sitemap index."""

    def chunk_retriever_fn(query: str) -> str:
        retriever = index.as_retriever(similarity_top_k=5)
        nodes = retriever.retrieve(query)

        full_text = "\n\n========================\n\n".join(
            [n.get_content(metadata_mode="all") for n in nodes]
        )

        return full_text

    # define name as a function of the file
    fn_name = Path(file).stem + "_retrieve"

    tool_description_tmpl = DOC_RETRIEVE_PREFIX if retrieve_document else CHUNK_RETRIEVE_PREFIX
    tool_description = tool_description_tmpl.format(file_name=file)
    if file_description is not None:
        tool_description += f"\n\nFile Description: {file_description}"

    tool = FunctionTool.from_defaults(
        fn=chunk_retriever_fn, name=fn_name, description=tool_description
    )

    return tool

In [67]:
tools = []
file_name = "currentdocs"
summary = "A collection of all the current documentation available on the LlamaIndex website."
tools.append(generate_tool(file_name, file_description=summary))
# document-level tool
tools.append(
    generate_tool(
        file_name, 
        file_description=summary,
        retrieve_document=True
    )
)

In [68]:
tools[0].metadata

ToolMetadata(description='Synthesizes an answer to your question by feeding in relevant chunks of a document as context. Best used for questions that are more pointed in nature.\nDo NOT use if the question asks seems to require a general summary of any given document. Use the doc_query_engine instead for that purpose.\n\nDocument: currentdocs\n\n\nFile Description: A collection of all the current documentation available on the LlamaIndex website.', name='currentdocs_retrieve', fn_schema=<class 'llama_index.core.tools.utils.currentdocs_retrieve'>, return_direct=False)

In [69]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Context,
    Workflow,
    step,
)
from llama_index.core.llms import LLM
from typing import Optional
from pydantic import BaseModel
from llama_index.core.schema import Document
from llama_index.core.agent import FunctionCallingAgentWorker
from llama_index.core.prompts import PromptTemplate
from llama_index.core.llms import ChatMessage, MessageRole
import logging
import json
import os

In [4]:
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)

In [6]:
AGENT_SYSTEM_PROMPT = """\
You are a research agent tasked with filling out a specific form key/question with the appropriate value, given a bank of context.
You are given a specific form key/question. Think step-by-step and use the existing set of tools to help answer the question.

You MUST always use at least one tool to answer each question. Only after you've determined that existing tools do not \
answer the question should you try to reason from first principles and prior knowledge to answer the question.

You MUST try to answer the question instead of only saying 'I dont know'.

"""

In [None]:
EXTRACT_KEYS_PROMPT = """\
You are provided Dopeness Report.

We wish to generate a response to the Dopeness Report in a way that adheres to the instructions within the Dopeness Report, \
including the specific sections that an Dopeness Report response should contain, and the content that would need to go \
into each section.

Your task is to extract out a list of "questions", where each question corresponds to a specific section that is required in the Dopeness Report response.
Put another way, after we extract out the questions we will go through each question and answer each one \
with our downstream research assistant, and the combined
question:answer pairs will constitute the full Dopeness Report response.

You must TRY to extract out questions that can be answered by the provided knowledge base. We provide the list of file metadata below. 

Additional requirements:
- Try to make the questions SPECIFIC given your knowledge of the Dopeness Report and the knowledge base. Instead of asking a question like \
"How do we ensure security" ask a question that actually addresses a security requirement in the Dopeness Report and can be addressed by the knowledge base.
- Make sure the questions are comprehensive and addresses all the Dopeness Report requirements.
- Make sure each question is descriptive - this gives our downstream assistant context to fill out the value for that question 
- Extract out all the questions as a list of strings.

Knowledge Base Files:
{file_metadata}

Dopeness Report Full Template:
{dopeness_text}

"""

In [104]:
GENERATE_OUTPUT_PROMPT = """\
You are an expert analyst.
Your task is to generate a Dopeness Report response according to the given Dopeness Report and question/answer pairs.

You are given the following Dopeness Report and qa pairs:

<dopeness_document>
{output_template}
</dopeness_document>

<question_answer_pairs>
{answers}
</question_answer_pairs>

You cannot provide a 'Not specified' response. You must judge based on the *vibes* of the documentation - just make it up if you have to!

Not every question has an appropriate answer. This is because the agent tasked with answering the question did not have the right context to answer it.
If this is the case, you MUST come up with an answer that is reasonable. You CANNOT say that you are unsure in any area of the Dopeness Report response. 

Please generate the output according to the template and the answers, in markdown format.
Directly output the generated markdown content, do not add any additional text, such as "```markdown" or "Here is the output:".
Follow the original format of the template as closely as possible, and fill in the answers into the appropriate sections.
"""

In [105]:
from pathlib import Path
from typing import Optional, List

class OutputQuestions(BaseModel):
    """List of keys that make up the sections of the Dopeness Report response."""

    questions: List[str]


class OutputTemplateEvent(Event):
    docs: List[Document]


class QuestionsExtractedEvent(Event):
    questions: List[str]


class HandleQuestionEvent(Event):
    question: str


class QuestionAnsweredEvent(Event):
    question: str
    answer: str


class CollectedAnswersEvent(Event):
    combined_answers: str


class LogEvent(Event):
    msg: str
    delta: bool = False
    # clear_previous: bool = False

In [None]:
from llama_index.llms.ollama import Ollama
from llama_index.readers.file import FlatReader
from llama_index.llms.openai import OpenAI

class DopenessReport(Workflow):
    """Dopeness workflow."""

    def __init__(
        self,
        tools,
        parser: SimpleNodeParser,
        llm: LLM | None = None,
        similarity_top_k: int = 20,
        output_dir: str = data_out_dir,
        agent_system_prompt: str = AGENT_SYSTEM_PROMPT,
        generate_output_prompt: str = GENERATE_OUTPUT_PROMPT,
        extract_keys_prompt: str = EXTRACT_KEYS_PROMPT,
        **kwargs,
    ) -> None:
        """Init params."""
        super().__init__(**kwargs)
        self.tools = tools

        self.parser = parser

        self.llm = llm
        self.similarity_top_k = similarity_top_k

        self.output_dir = output_dir

        self.agent_system_prompt = agent_system_prompt
        self.extract_keys_prompt = extract_keys_prompt

        # if not exists, create
        out_path = Path(self.output_dir) / "demo_workflow_output"
        if not out_path.exists():
            out_path.mkdir(parents=True, exist_ok=True)
            os.chmod(str(out_path), 0o0777)

        self.generate_output_prompt = PromptTemplate(generate_output_prompt)

    @step
    async def parse_output_template(
        self, ctx: Context, ev: StartEvent
    ) -> OutputTemplateEvent:
        # load output template file
        out_template_path = Path(
            f"{self.output_dir}/workflow_output/output_template.jsonl"
        )
        if out_template_path.exists():
            with open(out_template_path, "r") as f:
                docs = [Document.model_validate_json(line) for line in f]
        else:
            reader = FlatReader()
            docs = reader.load_data(Path(ev.dopeness_report_path))
            # save output template to file
            with open(out_template_path, "w") as f:
                for doc in docs:
                    f.write(doc.model_dump_json())
                    f.write("\n")

        await ctx.set("output_template", docs)
        return OutputTemplateEvent(docs=docs)

    @step
    async def extract_questions(
        self, ctx: Context, ev: OutputTemplateEvent
    ) -> HandleQuestionEvent:
        docs = ev.docs

        # save all_questions to file
        out_keys_path = Path(f"{self.output_dir}/workflow_output/all_keys.txt")
        if out_keys_path.exists():
            with open(out_keys_path, "r") as f:
                output_qs = [q.strip() for q in f.readlines()]
        else:
            # try stuffing all text into the prompt
            all_text = "\n\n".join([d.get_content(metadata_mode="all") for d in docs])
            prompt = PromptTemplate(template=self.extract_keys_prompt)

            file_metadata = "currentdocs"
            try:
                if self._verbose:
                    ctx.write_event_to_stream(LogEvent(msg=">> Extracting questions from LLM"))
                
                output_qs = self.llm.structured_predict(
                    OutputQuestions, 
                    prompt, 
                    file_metadata=file_metadata,
                    dopeness_text=all_text,
                ).questions

                if self._verbose:
                    qs_text = "\n".join([f"* {q}" for q in output_qs])
                    ctx.write_event_to_stream(LogEvent(msg=f">> Questions:\n{qs_text}"))
            
            except Exception as e:
                _logger.error(f"Error extracting questions from page: {all_text}")
                _logger.error(e)

            with open(out_keys_path, "w") as f:
                f.write("\n".join(output_qs))

        await ctx.set("num_to_collect", len(output_qs))

        for question in output_qs:
            ctx.send_event(HandleQuestionEvent(question=question))

        return None

    @step
    async def handle_question(
        self, ctx: Context, ev: HandleQuestionEvent
    ) -> QuestionAnsweredEvent:
        question = ev.question

        # initialize a Function Calling "research" agent where given a task, it can pull responses from relevant tools and synthesize over it
        research_agent = FunctionCallingAgentWorker.from_tools(
            tools, llm=OpenAI(model="gpt-4o"), verbose=False, system_prompt=self.agent_system_prompt
        ).as_agent()

        # ensure the agent's memory is cleared
        response = await research_agent.aquery(question)

        if self._verbose:
            # instead of printing the message directly, write the event to stream!
            msg = f">> Asked question: {question}\n>> Got response: {str(response)}"
            ctx.write_event_to_stream(LogEvent(msg=msg))

        return QuestionAnsweredEvent(question=question, answer=str(response))

    @step
    async def combine_answers(
        self, ctx: Context, ev: QuestionAnsweredEvent
    ) -> CollectedAnswersEvent:
        num_to_collect = await ctx.get("num_to_collect")
        results = ctx.collect_events(ev, [QuestionAnsweredEvent] * num_to_collect)
        if results is None:
            return None

        combined_answers = "\n".join([result.model_dump_json() for result in results])
        # save combined_answers to file
        with open(
            f"{self.output_dir}/workflow_output/combined_answers.jsonl", "w"
        ) as f:
            f.write(combined_answers)

        return CollectedAnswersEvent(combined_answers=combined_answers)

    @step
    async def generate_output(
        self, ctx: Context, ev: CollectedAnswersEvent
    ) -> StopEvent:
        output_template = await ctx.get("output_template")
        output_template = "\n".join(
            [doc.get_content("none") for doc in output_template]
        )

        if self._verbose:
            ctx.write_event_to_stream(LogEvent(msg=">> GENERATING FINAL OUTPUT"))

        resp = await self.llm.astream(
            self.generate_output_prompt,
            output_template=output_template,
            answers=ev.combined_answers,
        )

        final_output = ""
        async for r in resp:
            ctx.write_event_to_stream(LogEvent(msg=r, delta=True))
            final_output += r

        # save final_output to file
        with open(f"{self.output_dir}/workflow_output/final_output.md", "w") as f:
            f.write(final_output)

        return StopEvent(result=final_output)

In [2]:
from llama_index.llms.openai import OpenAI

llm = OpenAI(
    model="gpt-4o"
)

In [3]:
workflow = DopenessReport(
    tools,
    parser=SimpleNodeParser.from_defaults(),
    llm=llm,
    verbose=True,
    timeout=None,  # don't worry about timeout to make sure it completes
)

NameError: name 'DopenessReport' is not defined

In [86]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(DopenessReport, filename="DopenessReport.html")

<class 'NoneType'>
<class '__main__.CollectedAnswersEvent'>
<class '__main__.HandleQuestionEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.QuestionAnsweredEvent'>
<class '__main__.OutputTemplateEvent'>
DopenessReport.html


In [110]:
from IPython.display import clear_output

handler = workflow.run(dopeness_report_path="data/dopeness_report.md")
async for event in handler.stream_events():
    if isinstance(event, LogEvent):
        if event.delta:
            print(event.msg, end="")
        else:
            print(event.msg)

response = await handler
print(str(response))

Running step parse_output_template
Step parse_output_template produced event OutputTemplateEvent
Running step extract_questions
Step extract_questions produced no event
Running step handle_question
>> Extracting questions from LLM
>> Questions:
* What is the document version and reference number for the Dopeness Compliance TPS Report?
* What are the percentages for Fresh Factor, Coolness Quotient, Innovation Index, and Style Coefficient in the Vibrational Analysis section?
* What is the level of Zeitgeist Alignment, Trend Correlation, and Meme Potential in the Cultural Resonance section?
* What are the grades for Implementation Quality, Performance Rating, and Scalability Score in the Technical Excellence section?
* What is the percentage of Cringe Factor, the count of Outdated Elements detected, and the number of Compliance Violations in the Risk Assessment section?
* What is the final determination status of the report, and is the certification valid for 90 days from the issue date?
