In [1]:
import phoenix as px
from llama_index.core import set_global_handler

px.launch_app()
set_global_handler("arize_phoenix")

  from .autonotebook import tqdm as notebook_tqdm


🌍 To view the Phoenix app in your browser, visit http://localhost:6006/
📖 For more information on how to use Phoenix, check out https://docs.arize.com/phoenix


In [51]:
import asyncio

from llama_index.core.workflow import (step, StartEvent, StopEvent, Workflow, Event, Context)
from llama_index.core.agent import FunctionCallingAgentWorker 
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.llms.openai import OpenAI
from colorama import Fore, Style
from typing import (Optional, List, Callable)
from dotenv import load_dotenv

load_dotenv()

import prompts
import os

# Custom Events that runs custom functions

class InitializeEvent(Event):
	pass

class ConciergeEvent(Event):
    request: Optional[str] = None
    just_completed: Optional[str] = None
    need_help: Optional[bool] = None

class OrchestratorEvent(Event):
	request: str

class GraphGenerateEvent(Event):
	request: str

class DataFrameLookupEvent(Event):
	request: str

class MainWorkflow(Workflow):
	"""Main Workflow

	The main workflow that runs the whole application's steps, calling all the Events from StartEvent -> StopEvent.
	"""
	@step(pass_context=True)
	async def initialiation(self, ctx: Context, ev: InitializeEvent) -> ConciergeEvent:
		ctx.data["user"] = {
			"username": None,
			"access_token": None,
			"session_token": None,
			"fb_access_key": None,
		}
		ctx.data["success"] = None
		ctx.data["redirecting"] = None
		ctx.data["overall_request"] = None
		ctx.data["llm"] = OpenAI(model="gpt-3.5-turbo")

		return ConciergeEvent()
  
	@step(pass_context=True)
	async def concierge(self, ctx: Context, ev: ConciergeEvent | StartEvent) -> InitializeEvent | OrchestratorEvent | StopEvent:
		# initialize user if not already
		# if not ctx.data["user"]:
		if ("user" not in ctx.data):
			return InitializeEvent()
		
		# initialize concierge if not already
		if ("concierge" not in ctx.data):
			system_prompt = prompts.initialiation
			
			agent_worker = FunctionCallingAgentWorker.from_tools(
				tools = [],
				llm = ctx.data["llm"],
				allow_parallel_tool_calls = False,
				system_prompt = system_prompt,
			)
			ctx.data["concierge"] = agent_worker.as_agent()

		concierge = ctx.data["concierge"]
		if ctx.data["overall_request"] is not None:
			print("In Progress - 'overall_request':", ctx.data["overall_request"])
			last_request = ctx.data["overall_request"]
			ctx.data["overall_request"] = None
			return OrchestratorEvent(request = last_request)
		elif (ev.just_completed is not None):
			response = concierge.chat(f"User Just Completed a Task: {ev.just_completed}")
		elif (ev.need_help):
			print("The previous post needs help with", ev.request)
			return OrchestratorEvent(request = ev.request)
		elif (ev.request):
			response = concierge.chat(ev.request)
		else:
			# first time experience
			response = concierge.chat("Hello!")
		
		print(
			Fore.MAGENTA
			+ f"SYSTEM >> {response}"
			+ Style.RESET_ALL
			)

		user_msg_str = input("USER >> ").strip()
		return OrchestratorEvent(request = user_msg_str)
	
	@step(pass_context=True)
	async def orchestrator(self, ctx: Context, ev: OrchestratorEvent) -> ConciergeEvent | GraphGenerateEvent | DataFrameLookupEvent:
		
		print(f"Orchestrator received a request: {ev.request}")

		# tools are listed below as functions
		def emit_dataframe_lookup() -> bool:
			"""Call this function if a dataframe needs to be accessed."""
			print("__emitted: dataframe lookup")
			self.send_event(DataFrameLookupEvent(request=ev.request))
			return True
		
		def emit_graph_generator() -> bool:
			"""Call this function if a graph needs to be generated."""
			print("__emitted: graph lookup")
			self.send_event(GraphGenerateEvent(request=ev.request))
			return True

		# functions are then converted into tools
		tools = [
			FunctionTool.from_defaults(fn=emit_dataframe_lookup),
			FunctionTool.from_defaults(fn=emit_graph_generator),
		]

		system_prompt = prompts.orchestrator

		agent_worker = FunctionCallingAgentWorker.from_tools(
				tools = tools,
				llm = ctx.data["llm"],
				allow_parallel_tool_calls = False,
				system_prompt = system_prompt,
		)

		ctx.data["orchestrator"] = agent_worker.as_agent()
		orchestrator = ctx.data["orchestrator"]
		response = orchestrator.chat(ev.request)

		if str(response) == "FAILED":
			print("Orchestrator agent failed to return any tools; try again")
			return ConciergeEvent(request = ev.request)
		
	###

	@step(pass_context=True)
	async def get_dataframe(self, ctx: Context, ev: DataFrameLookupEvent) -> ConciergeEvent:

		print(f"Dateframe lookup received request: {ev.request}")

		if ("get_dataframe_agent" not in ctx.data): 
			# tools are listed below as functions
			def get_campaign_df_data() -> str:
				"""Call this if the "campaign" dataframe is required"""
				print(f"Generating graph for ")
				return "Here's the generated Graph"

			def get_ad_account_df_data() -> str:
				"""Call this if the "ad account" dataframe is required"""
				print(f"Generating graph for ")
				return "Here is an ad account df"

			# functions are then converted into tools
			tools = [
				FunctionTool.from_defaults(fn=get_campaign_df_data),
				FunctionTool.from_defaults(fn=get_ad_account_df_data),
			]

			system_prompt = ""

			ctx.data["get_dataframe_agent"] = ConciergeAgent(
				name="Get Dataframe Agent",
				parent= Workflow,
				tools= tools,
				system_prompt= system_prompt ,
				context= ctx,
				# current_event= DataFrameLookupEvent,
				trigger_event= DataFrameLookupEvent
			)
		
		return ctx.data["get_dataframe_agent"].handle_event(ev)

	
	@step(pass_context=True)
	async def graph_generator(self, ctx: Context, ev: GraphGenerateEvent) -> OrchestratorEvent:

		print(f"Graph generator lookup received request: {ev.request}")

		if ("generate_graph_agent" not in ctx.data): 
			# tools are listed below as functions
			def get_bargraph() -> str:
				"""Call this if user requests a bar graph is required."""
				print(f"Generating graph for bar")
				return "Here's the generated Graph"

			def anyy() -> str:
				"""Call this if any other graphs are required."""
				print(f"Generating graph for any")
				return "Here's the generated Graph"

			# functions are then converted into tools
			tools = [
				FunctionTool.from_defaults(fn=get_bargraph),
				FunctionTool.from_defaults(fn=anyy),
			]

			system_prompt = prompts.graph_generator

			ctx.data["generate_graph_agent"] = ConciergeAgent(
				name="Generate Graph Agent",
				parent= Workflow,
				tools= tools,
				system_prompt= system_prompt ,
				context= ctx,
				# current_event= DataFrameLookupEvent,
				trigger_event= DataFrameLookupEvent
			)
		
		return ctx.data["generate_graph_agent"].handle_event(ev)


# building up from Workflow to handle Events
class ConciergeAgent():
	name: str
	parent: Workflow
	tools: list[FunctionTool]
	system_prompt: str
	context: Context
	current_event: Event
	trigger_event: Event
	
	def __init__(
			self,
			name: str,
			parent: Workflow,
			tools: List[Callable],
			system_prompt: str,
			context: Context,
			trigger_event: Event,
			):
		self.name = name
		self.parent = parent
		self.context = context
		self.system_prompt = system_prompt
		self.context.data["redirecting"] = False
		self.trigger_event = trigger_event

		# tools that are needed for everyone
		def done() -> None:
			"""When you complete a task, call this."""
			print(f"{self.name} is commplete")
			self.context.data["redirecting"] = True
			parent.send_event(ConciergeEvent(just_completed=self.name))

		# tools that are needed for everyone
		def need_help() -> None:
			"""If the user asks you do something, call this."""
			print(f"{self.name} is commplete")
			self.context.data["redirecting"] = True
			parent.send_event(ConciergeEvent(request = self.current_event.request, need_help=True))

		self.tools = [
			FunctionTool.from_defaults(fn=done),
			FunctionTool.from_defaults(fn=need_help),
		]
		
		# adding the Event's tools to the default tools
		for t in tools:
			self.tools.append(FunctionTool.from_defaults(fn=t))
		
		agent_worker = FunctionCallingAgentWorker.from_tools(
			tools = self.tools,
			llm = self.context.data["llm"],
			allow_parallel_tool_calls = False,
			system_prompt = self.system_prompt,
		)
		
		self.agent = agent_worker.as_agent()
	
	def handle_event(self, ev: Event):
		self.current_event = ev

		response = str(self.agent.chat(ev.request))
		print(
			Fore.MAGENTA
			+ "SYSTEM >> {response}"
			+ Style.RESET_ALL
			)

		# if they're sending us elsewhere, we're done here
		if self.context.data["redirecting"]:
			self.context.data["redirecting"] = False
			return None

		# otherwise, get some user input & then loop
		user_msg_str = input("> ").strip()
		return self.trigger_event(request=user_msg_str)


In [52]:
wf = MainWorkflow(timeout=10, verbose=True)
# draw_all_possible_flows(wf, filename="workflow.html")
result = await wf.run()

Running step concierge
Step concierge produced event InitializeEvent
Running step initialiation
Step initialiation produced event ConciergeEvent
Running step concierge
[35mSYSTEM >> Hello! How can I assist you today?[0m
Step concierge produced event OrchestratorEvent
Running step orchestrator
Orchestrator received a request: generate a graph for my campaigns
__emitted: graph lookup




Step orchestrator produced no event
Running step graph_generator
Graph generator lookup received request: generate a graph for my campaigns


WorkflowRuntimeError: Error in step 'graph_generator': 'FunctionTool' object has no attribute '__name__'