In [10]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context
)
import time
import asyncio
from dummy_data import product_information, user_information, product_reviews
from openai import OpenAI as OpenAI_from_openai
from datetime import datetime

In [None]:
class TriggerExecutionEvent(Event):
    pass

class FetchUserDetailsEvent(Event):
    pass

class FetchUserDetailsCompletedEvent(Event):
    pass

class FetchProductDetailsEvent(Event):
    pass

class FetchProductDetailsCompletedEvent(Event):
    pass

class FetchProductReviewsEvent(Event):
    pass

class FetchProductReviewsCompletedEvent(Event):
    pass

class StartCustomizationsEvent(Event):
    pass

class SuggestFeaturesEvent(Event):
    pass

class SuggestFeaturesEventCompleted(Event):
    pass

class SortReviewsEvent(Event):
    pass

class SortReviewsEventCompleted(Event):
    pass

class CustomProductDescriptionEvent(Event):
    pass

class CustomProductDescriptionEventCompleted(Event):
    pass

# -----------------------------------------------------------------------------------------------------------------------------------------------

class SolutionAcceleratorWorkflowInParallel(Workflow):

    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> TriggerExecutionEvent:
        print("In SETUP")
        return TriggerExecutionEvent()

    @step
    async def trigger_execution(self, ctx: Context, ev: TriggerExecutionEvent) -> FetchUserDetailsEvent | FetchProductDetailsEvent | FetchProductReviewsEvent:
        print(f"[{datetime.now()}] START trigger_execution")
        ctx.send_event(FetchUserDetailsEvent())
        ctx.send_event(FetchProductDetailsEvent())
        ctx.send_event(FetchProductReviewsEvent())
        print(f"[{datetime.now()}] END trigger_execution")
        pass
    
    @step
    async def fetch_user_details(self, ctx: Context, ev: FetchUserDetailsEvent) -> FetchUserDetailsCompletedEvent:
        print(f"[{datetime.now()}] START fetch_user_details")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        user_details = user_information.get("1", {})
        current_state["user_profile"] = user_details
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END fetch_user_details")
        return FetchUserDetailsCompletedEvent()

    @step
    async def fetch_product_details(self, ctx: Context, ev: FetchProductDetailsEvent) -> FetchProductDetailsCompletedEvent:
        print(f"[{datetime.now()}] START fetch_product_details")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        product_details = product_information
        current_state["product_information"] = product_details
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END fetch_product_details")
        return FetchProductDetailsCompletedEvent()

    @step
    async def fetch_product_reviews(self, ctx: Context, ev: FetchProductReviewsEvent) -> FetchProductReviewsCompletedEvent:
        print(f"[{datetime.now()}] START fetch_product_reviews")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        current_state["product_reviews"] = product_reviews
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END fetch_product_reviews")
        return FetchProductReviewsCompletedEvent()

    @step
    async def start_customizations(self, ctx: Context, ev: FetchUserDetailsCompletedEvent | FetchProductDetailsCompletedEvent | FetchProductReviewsCompletedEvent) -> SuggestFeaturesEvent | SortReviewsEvent | CustomProductDescriptionEvent:   
        
        events = ctx.collect_events(ev, [FetchUserDetailsCompletedEvent, FetchProductDetailsCompletedEvent, FetchProductReviewsCompletedEvent])
        
        if events is None:
            return None
        else:
            print(f"[{datetime.now()}] START start_customizations")
            ctx.send_event(SuggestFeaturesEvent())
            ctx.send_event(SortReviewsEvent())
            ctx.send_event(CustomProductDescriptionEvent())
            print(f"[{datetime.now()}] END start_customizations")

    @step
    async def suggest_features(self, ctx: Context, ev: SuggestFeaturesEvent) -> SuggestFeaturesEventCompleted:
        print(f"[{datetime.now()}] START suggest_features")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        user_profile = current_state.get("user_profile", {})
        product_info = current_state.get("product_information", {})
        prompt_template = f"""
        You are given a user profile and product information. Analyze the details of both and suggest which features of the product will be most appealing to the user. As your final output, return only a list of features. Do not put a variable name before the list. Your response should only contain the list and nothing else:
        
        [
            **List of features selected**
        ]
        
        Perform the task for the following information:
        User Profile: {user_profile}
        Product Information: {product_info}
        """
        client = OpenAI_from_openai(api_key=" ")
        messages = [{"role": "user", "content": prompt_template}]
        response = client.chat.completions.create(
            model="gpt-4o",
            temperature=0.1,
            top_p=0.1,
            messages=messages
        ).choices[0].message.content.strip()
        current_state["suggested_features"] = response
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END suggest_features")
        return SuggestFeaturesEventCompleted()
        
    @step
    async def sort_reviews(self, ctx:Context, ev: SortReviewsEvent) -> SortReviewsEventCompleted:
        print(f"[{datetime.now()}] START sort_reviews")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        user_profile = current_state.get("user_profile", {})
        product_reviews = current_state.get("product_reviews", [])
        prompt_template = f"""
        You are given a user profile and a list of product reviews. Analyze the details of both and sort the reviews based on the user profile information. The reviews that match the users profile the most and are the ones that the user will most likely be interested in should come first. As your final output, return only a list of reviews sorted based on the user profile information. Do not put a variable name before the list. Your response should only contain the list and nothing else:
        
        [
            **List of reviews sorted**
        ]
        User Profile: {user_profile}
        Product Reviews: {product_reviews}
        """
        client = OpenAI_from_openai(api_key=" ")
        messages = [{"role": "user", "content": prompt_template}]
        response = client.chat.completions.create(
            model="gpt-4o",
            temperature=0.1,
            top_p=0.1,
            messages=messages
        ).choices[0].message.content.strip()
        current_state["sorted_reviews"] = response
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END sort_reviews")
        return SortReviewsEventCompleted()

    @step
    async def custom_product_description(self, ctx:Context, ev: CustomProductDescriptionEvent) -> CustomProductDescriptionEventCompleted:
        print(f"[{datetime.now()}] START custom_product_description")
        await asyncio.sleep(4)
        current_state = await ctx.get("state", {})
        user_profile = current_state.get("user_profile", {})
        product_info = current_state.get("product_information", {})
        prompt_template = f"""
        You are given a user profile and product information. You have to create a custom description of the product based on the user profile information. The description should be most appealing to the user and should not be longer than 1 sentence.  As your final output, return a json object ONLY, like the following. Do not put a variable name before the json object. Your response should only contain the json object and nothing else:
        {{
            "custom_description": **The custom description that you create**
        }}
        User Profile: {user_profile}
        Product Information: {product_info}
        """
        client = OpenAI_from_openai(api_key=" ")
        messages = [{"role": "user", "content": prompt_template}]
        response = client.chat.completions.create(
            model="gpt-4o",
            temperature=0.1,
            top_p=0.1,
            messages=messages
        ).choices[0].message.content.strip()
        current_state["custom_description"] = response
        await ctx.set("state", current_state)
        print(f"[{datetime.now()}] END custom_product_description")
        return CustomProductDescriptionEventCompleted()

    @step
    async def cleanup(self, ctx: Context, ev: SuggestFeaturesEventCompleted | SortReviewsEventCompleted | CustomProductDescriptionEventCompleted) -> StopEvent:
        
        events = ctx.collect_events(ev, [SuggestFeaturesEventCompleted, SortReviewsEventCompleted, CustomProductDescriptionEventCompleted])

        if events is None:
            return None
        else:
            print(f"[{datetime.now()}] START cleanup")
            current_state = await ctx.get("state", {})
            print("Keys: ", current_state.keys())
            print("Suggested Features: ", current_state.get("suggested_features", ""))
            print("Sorted Reviews: ", current_state.get("sorted_reviews", ""))
            print("Custom Description: ", current_state.get("custom_description", ""))
            print("Workflow completed")
            print(f"[{datetime.now()}] END cleanup")
            return StopEvent()  



In [12]:
W = SolutionAcceleratorWorkflowInParallel(timeout=200, verbose=False)

from llama_index.utils.workflow import draw_all_possible_flows

# draw_all_possible_flows(W, filename="function_calling_agent_workflow.html")       

start = time.time()
result = await W.run()
end = time.time()
print("Time taken: ", end-start)

In SETUP
[2025-02-18 15:39:37.734392] START trigger_execution
[2025-02-18 15:39:37.734392] END trigger_execution
[2025-02-18 15:39:37.734392] START fetch_product_details
[2025-02-18 15:39:37.735390] START fetch_product_reviews
[2025-02-18 15:39:37.735390] START fetch_user_details
[2025-02-18 15:39:41.747218] END fetch_product_details
[2025-02-18 15:39:41.747218] END fetch_product_reviews
[2025-02-18 15:39:41.747218] END fetch_user_details
[2025-02-18 15:39:41.747218] START start_customizations
[2025-02-18 15:39:41.747218] END start_customizations
[2025-02-18 15:39:41.748256] START suggest_features
[2025-02-18 15:39:41.748256] START custom_product_description
[2025-02-18 15:39:41.748256] START sort_reviews
[2025-02-18 15:39:47.370075] END suggest_features
[2025-02-18 15:39:48.810707] END custom_product_description
[2025-02-18 15:39:54.228915] END sort_reviews
[2025-02-18 15:39:54.228915] START cleanup
Keys:  dict_keys(['product_information', 'product_reviews', 'user_profile', 'suggested