In [24]:
from textwrap import dedent
from typing import AsyncGenerator, List, Optional

# Import our agent team
from app.agents.a_problem_definition.problem_validator import ProblemValidatorFeedback, problem_validator_prompt_instructions
from app.agents.b_initial_research import create_market_research, create_competitor_analysis, create_customer_insights, create_online_trends
from app.agents.c_research_reviewer import create_research_reviewer
from app.agents.d_feasibility_research import create_finance_feasibility, create_operations_feasibility, create_tech_feasibility
from app.agents.e_strategy_research import create_go_to_market, create_monetization, create_risk_analysis
from app.agents.f_output_production import create_landing_page_poc, create_podcaster, create_summarizer

from app.workflows.single import AgentRunEvent, AgentRunResult, FunctionCallingAgent
from llama_index.core.chat_engine.types import ChatMessage
from llama_index.core.prompts import PromptTemplate
from llama_index.core.settings import Settings
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

In [25]:
### High Level Events ###
class QnaWorkflowEvent(Event):
    '''
    Fired when the user has a question for the AI
    '''
    input: str

class ResearchWorkflowEvent(Event):
    '''
    Fired when the user has a research query
    '''
    input: str


In [26]:
### Initial Research Events ###
class StartResearchPipelineEvent(Event):
    '''
    Fired when the user has provided enough information to start the research pipeline
    '''
    input: str
    
class InitialResearchCompleteEvent(Event):
    '''
    Fired when the initial research is complete
    '''
    input: str

class CompetitorAnalysisFeedbackEvent(Event):
    '''
    Fired when the competitor analysis critique gives feedback
    '''
    input: str

class GetCompetitorAnalysisCritiqueEvent(Event):
    '''
    Fired when the competitor analysis agent needs feedback
    '''
    input: str

class GetCustomerInsightsCritiqueEvent(Event):
    '''
    Fired when the customer insights agent needs feedback
    '''
    input: str

class CustomerInsightsFeedbackEvent(Event):
    '''
    Fired when the customer insights critique gives feedback
    '''
    input: str

class GetOnlineTrendsCritiqueEvent(Event):
    '''
    Fired when the online trends agent needs feedback
    '''
    input: str

class OnlineTrendsFeedbackEvent(Event):
    '''
    Fired when the online trends critique gives feedback
    '''
    input: str

class MarketResearchFeedbackEvent(Event):
    '''
    Fired when the market research critique gives feedback
    '''
    input: str

class GetMarketResearchCritiqueEvent(Event):
    '''
    Fired when the market research agent needs feedback
    '''
    input: str

In [27]:
### Feasibility Research Events ###
class StartFeasibilityResearchEvent(Event):
    '''
    Fired when the user has provided enough information to start the feasibility research pipeline
    '''
    input: str

class FeasibilityCompleteEvent(Event):
    '''
    Fired when the feasibility research is complete
    '''
    input: str

class TechFeasibilityFeedbackEvent(Event):
    '''
    Fired when the tech feasibility critique gives feedback
    '''
    input: str

class GetTechFeasibilityCritiqueEvent(Event):
    '''
    Fired when the tech feasibility research agent needs feedback
    '''
    input: str

class FinanceFeasibilityFeedbackEvent(Event):
    '''
    Fired when the finance feasibility critique gives feedback
    '''
    input: str

class GetFinanceFeasibilityCritiqueEvent(Event):
    '''
    Fired when the finance feasibility research agent needs feedback
    '''
    input: str  

class OperationsFeasibilityFeedbackEvent(Event):
    '''
    Fired when the operations feasibility critique gives feedback
    '''
    input: str

class GetOperationsFeasibilityCritiqueEvent(Event):
    '''
    Fired when the operations feasibility research agent needs feedback
    '''
    input: str

In [28]:
class RiskAnalysisCompleteEvent(Event):
    '''
    Fired when the risk analysis is complete
    '''
    input: str

class GetRiskAnalysisCritiqueEvent(Event):
    '''
    Fired when the risk analysis agent needs feedback
    '''
    input: str



In [29]:
class SummarizeEverythingEvent(Event):
    '''
    Fired when all research is complete and that its time to summarize everything
    '''
    input: str


In [30]:
import random


class IdeaResearchWorkflow(Workflow):
    def __init__(
        self, timeout: int = 600, chat_history: Optional[List[ChatMessage]] = None
    ):
        super().__init__(timeout=timeout)
        self.chat_history = chat_history or []

    @step()
    async def start(self, ctx: Context, ev: StartEvent) -> QnaWorkflowEvent | ResearchWorkflowEvent:
        # set streaming
        ctx.data["streaming"] = getattr(ev, "streaming", False)
        # start the workflow with researching about a topic
        ctx.data["task"] = ev.input
        ctx.data["user_input"] = ev.input

        # Decision-making process
        prompt_template = PromptTemplate(
            dedent(
                """
                ### High Level Context
                You are part of a workflow to help users do autonomous research and idea validation for their business ideas.

                ### Your Role
                You are an expert in decision-making, given the chat history and the new user request, decide whether the user request is a research query (including providing more information for an existing research query) or a question for the AI.

                Here is the chat history:
                {chat_history}

                The current user request is:
                {input}

                Decision (respond with either 'research' or 'qna'):
            """
            )
        )

        chat_history_str = "\n".join(
            [f"{msg.role}: {msg.content}" for msg in self.chat_history]
        )
        prompt = prompt_template.format(chat_history=chat_history_str, input=ev.input)

        output = await Settings.llm.acomplete(prompt)
        decision = output.text.strip().lower()

        if decision == "research":
            return ResearchWorkflowEvent(input=f"User input: {ev.input}")
        else:
            return QnaWorkflowEvent(input=f"User input: {ev.input}")

    @step()
    async def validate_problem_statement(self, ctx: Context, ev: ResearchWorkflowEvent) -> StartResearchPipelineEvent | StopEvent:
        chat_history_str = "\n".join(
            [f"{msg.role}: {msg.content}" for msg in self.chat_history]
        )
        prompt = problem_validator_prompt_instructions.format(chat_history=chat_history_str, input=ev.input)

        output = await Settings.llm.as_structured_llm(output_cls=ProblemValidatorFeedback).acomplete(prompt)
        result: ProblemValidatorFeedback = output.raw
        if result.enough_information:
            return StartResearchPipelineEvent(input=result.refined_problem_statement)
        else:
            return StopEvent(input=result.feedback)

    @step()
    async def qna(self, ctx: Context, ev: QnaWorkflowEvent) -> StopEvent:
        messages = self.chat_history + [ChatMessage(role="user", content=ev.input)]
        result = await Settings.llm.achat(messages)
        return StopEvent(result=result.content)
    
    ### Initial Research Analysts Team 1 ###

    ### Market Research ###
    @step()
    async def market_research(self, ctx: Context, ev: StartResearchPipelineEvent | MarketResearchFeedbackEvent) -> GetMarketResearchCritiqueEvent:
        return GetMarketResearchCritiqueEvent(input=ev.input)

    @step()
    async def critique_market_research(self, ctx: Context, ev: GetMarketResearchCritiqueEvent) -> InitialResearchCompleteEvent | MarketResearchFeedbackEvent:
        if random.random() > 0.5:
            return InitialResearchCompleteEvent(input=ev.input)
        else:
            return MarketResearchFeedbackEvent(input=ev.input)

    ### Customer Insights ###
    @step()
    async def customer_insights(self, ctx: Context, ev: StartResearchPipelineEvent | CustomerInsightsFeedbackEvent) -> GetCustomerInsightsCritiqueEvent:
        return GetCustomerInsightsCritiqueEvent(input=ev.input)

    @step()
    async def critique_customer_insights(self, ctx: Context, ev: GetCustomerInsightsCritiqueEvent) -> InitialResearchCompleteEvent | CustomerInsightsFeedbackEvent:
        if random.random() > 0.5:
            return InitialResearchCompleteEvent(input=ev.input)
        else:
            return CustomerInsightsFeedbackEvent(input=ev.input)    

    ### Online Trends ###
    @step()
    async def online_trends(self, ctx: Context, ev: StartResearchPipelineEvent | OnlineTrendsFeedbackEvent) -> GetOnlineTrendsCritiqueEvent:
        return GetOnlineTrendsCritiqueEvent(input=ev.input)

    @step()
    async def critique_online_trends(self, ctx: Context, ev: GetOnlineTrendsCritiqueEvent) -> InitialResearchCompleteEvent | OnlineTrendsFeedbackEvent:
        if random.random() > 0.5:
            return InitialResearchCompleteEvent(input=ev.input)
        else:
            return OnlineTrendsFeedbackEvent(input=ev.input)

    ### Competitor Analysis ###
    @step()
    async def competitor_analysis(self, ctx: Context, ev: StartResearchPipelineEvent | CompetitorAnalysisFeedbackEvent) -> GetCompetitorAnalysisCritiqueEvent:
        return GetCompetitorAnalysisCritiqueEvent(input=ev.input)

    @step()
    async def critique_competitor_analysis(self, ctx: Context, ev: GetCompetitorAnalysisCritiqueEvent) -> InitialResearchCompleteEvent | CompetitorAnalysisFeedbackEvent:
        if random.random() > 0.5:
            return InitialResearchCompleteEvent(input=ev.input)
        else:
            return CompetitorAnalysisFeedbackEvent(input=ev.input)

    ### Collect Initial Research Feedback ###
    @step()
    async def review_initial_research(self, ctx: Context, ev: InitialResearchCompleteEvent) -> SummarizeEverythingEvent:
        print("Received event ", ev.result)

        # wait until we receive all 4 prior events
        if (
            ctx.collect_events(
                ev,
                [InitialResearchCompleteEvent] * 4,
            )
            is None
        ):
            return None
        return SummarizeEverythingEvent(input=ev.input)

    ### Output Production ###
    @step()
    async def summarize_everything(self, ctx: Context, ev: SummarizeEverythingEvent) -> StopEvent:
        return StopEvent(result=ev.input)

    # ### Feasibility Research ###
    # @step()
    # async def tech_feasibility(self, ctx: Context, ev: StartFeasibilityResearchEvent) -> StopEvent:
    #     return StopEvent(result=ev.input)

    async def run_agent(
        self,
        ctx: Context,
        agent: FunctionCallingAgent,
        input: str,
        streaming: bool = False,
    ) -> AgentRunResult | AsyncGenerator:
        handler = agent.run(input=input, streaming=streaming)
        # bubble all events while running the executor to the planner
        async for event in handler.stream_events():
            # Don't write the StopEvent from sub task to the stream
            if type(event) is not StopEvent:
                ctx.write_event_to_stream(event)
        return await handler


In [31]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(IdeaResearchWorkflow)

<class 'NoneType'>
<class '__main__.GetCompetitorAnalysisCritiqueEvent'>
<class '__main__.InitialResearchCompleteEvent'>
<class '__main__.CompetitorAnalysisFeedbackEvent'>
<class '__main__.InitialResearchCompleteEvent'>
<class '__main__.CustomerInsightsFeedbackEvent'>
<class '__main__.InitialResearchCompleteEvent'>
<class '__main__.MarketResearchFeedbackEvent'>
<class '__main__.InitialResearchCompleteEvent'>
<class '__main__.OnlineTrendsFeedbackEvent'>
<class '__main__.GetCustomerInsightsCritiqueEvent'>
<class '__main__.GetMarketResearchCritiqueEvent'>
<class '__main__.GetOnlineTrendsCritiqueEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.SummarizeEverythingEvent'>
<class '__main__.QnaWorkflowEvent'>
<class '__main__.ResearchWorkflowEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.StartResearchPipelineEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>


workflow_all_flows.html


In [7]:
workflow = IdeaResearchWorkflow(timeout=360)
result = await workflow.run(input="What's LlamaIndex?")
result

WorkflowValidationError: The following events are produced but never consumed: StartFeasibilityResearchEvent, MarketResearchFeedbackEvent

In [6]:
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    InputRequiredEvent,
    HumanResponseEvent,
    StopEvent,
    Workflow,
    step,
)

In [28]:
API_KEY = ""
from llama_index.llms.openai import OpenAI
from typing import List, Optional
from llama_index.core.settings import Settings
from app.agents.new_workflow import StartResearchPipelineEvent
from llama_cloud import ChatMessage
from app.agents.a_problem_definition.problem_validator import problem_definer_prompt_instructions, ProblemDefinerFeedback

class TestEvent(Event):
    input: str

class OpenAIGenerator(Workflow):
    def __init__(
        self, timeout: int = 600, chat_history: Optional[List[ChatMessage]] = None
    ):
        super().__init__(timeout=timeout)
        self.chat_history = chat_history or []
        
    @step
    async def generate(self, ev: StartEvent) -> TestEvent | StartResearchPipelineEvent:
        # Validate user problem statement
        prompt_template = problem_definer_prompt_instructions
        chat_history_str = "\n".join(
            [f"{msg.role}: {msg.content}" for msg in self.chat_history]
        )
        prompt = prompt_template.format(chat_history=chat_history_str, input="TEST")

        output = await OpenAI(model="gpt-4o-mini", api_key=API_KEY).as_structured_llm(output_cls=ProblemDefinerFeedback).acomplete(prompt)
        res: ProblemDefinerFeedback = output.raw
        
        if res.enough_information:
            return StartResearchPipelineEvent(input=res.refined_problem_statement)
        else:
            return TestEvent(input=res.feedback)
        
    @step
    async def step2(self, ev: TestEvent) -> StopEvent:
        return StopEvent(result=ev.input)
    
    @step
    async def step3(self, ev: StartResearchPipelineEvent) -> StopEvent:
        return StopEvent(result=ev.input)


w = OpenAIGenerator(timeout=10)
handler = w.run(query="What's LlamaIndex?")
async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        # here, we can handle human input however you want
        # this means using input(), websockets, accessing async state, etc.
        # here, we just use input()
        response = input(event.prefix)
        handler.ctx.send_event(HumanResponseEvent(response=response))

await handler

In [42]:
from typing import List, Optional
from llama_index.core.settings import Settings
from app.agents.new_workflow import StartResearchPipelineEvent
from llama_cloud import ChatMessage
from app.agents.a_problem_definition.problem_validator import problem_definer_prompt_instructions, ProblemDefinerFeedback

class IdeaResearcdWorkflow(Workflow):
    def __init__(
        self, timeout: int = 600, chat_history: Optional[List[ChatMessage]] = None
    ):
        super().__init__(timeout=timeout)
        self.chat_history = chat_history or []
        
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> TestEvent | StartResearchPipelineEvent:
        # Validate user problem statement
        prompt_template = problem_definer_prompt_instructions
        chat_history_str = "\n".join(
            [f"{msg.role}: {msg.content}" for msg in self.chat_history]
        )
        prompt = prompt_template.format(chat_history=chat_history_str, input="TEST")

        output = await OpenAI(model="gpt-4o-mini", api_key=API_KEY).as_structured_llm(output_cls=ProblemDefinerFeedback).acomplete(prompt)
        res: ProblemDefinerFeedback = output.raw
        
        if res.enough_information:
            return StartResearchPipelineEvent(input=res.refined_problem_statement)
        else:
            return TestEvent(input=res.feedback)
        
    @step
    async def step2(self, ev: TestEvent) -> StopEvent:
        return StopEvent(result=ev.input)
    
    @step
    async def step3(self, ev: StartResearchPipelineEvent) -> StopEvent:
        return StopEvent(result=ev.input)

    
workflow = OpenAIGenerator(timeout=360)
result = await workflow.run(query="What's LlamaIndex?")
result



"The input 'TEST' is too vague and does not provide any specific information about a problem, the affected individuals, or the impact on their lives. Please provide more detailed information about the business idea or problem you want to address."