<a href="https://colab.research.google.com/github/Mubashar-Bashir/AgenticAI/blob/main/03_conditional_logic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Conditional Logic
In CrewAI Flows, implementing conditional logic is facilitated through the **`or_`** and **`and_`** functions, which allow methods to respond to multiple events or combinations of events.

##Using **`or_`** for Conditional Execution

The **or_** function enables a method to execute when any one of the specified events occurs. This is useful when you want a task to proceed upon the completion of **any** among several preceding tasks.

**Example:**

In [23]:
!pip install -Uq crewai  'crewai[tools]' crewai-tools

In [24]:
import warnings
warnings.filterwarnings('ignore')

In [25]:
import nest_asyncio
nest_asyncio.apply()
'''In Jupyter notebooks, running asyncio code directly can cause errors
because an event loop is already active.
nest_asyncio solves this by allowing multiple nested event loops,
so you can run asynchronous code without conflict. '''

'In Jupyter notebooks, running asyncio code directly can cause errors\nbecause an event loop is already active.\nnest_asyncio solves this by allowing multiple nested event loops,\nso you can run asynchronous code without conflict. '

In [26]:
from crewai.flow.flow import Flow, listen, start, or_

class OrExampleFlow(Flow): # Inherited from Flow
    @start() # Start point of the flow
    def task_a(self): # Method 1
        return "Output from task A"  # Return output

    @start() # Start point of the flow its parallel of flow 1
    def task_b(self): # Method 2
        return "Output from task B"  # Return output

    @listen(or_(task_a, task_b)) # Listen to the completion of either task_a or task_b
    def task_c(self, result):    # Method 3, receives result from the listened task (a,b) any
        print(f"Task C received: {result}") # Print the received result
        return "Output from task C"  # Return output

    @listen(task_c) # Listen to the completion of task_c
    def task_d(self, result): # Method 4, receives result from task_c
        print(f"Task D received: {result}") # Print the received result

flow = OrExampleFlow() # Create an instance of the flow
flow.kickoff() # Start the flow

Task C received: Output from task A
Task C received: Output from task B
Task D received: Output from task C
Task D received: Output from task C


In this example, **task_c** will execute when either **task_a** or **task_b** completes, receiving the result from whichever task finishes first.

#Using and_ for Conditional Execution

The **and_** function allows a method to execute only after all specified events have occurred. This is beneficial when a task depends on the completion of multiple preceding tasks.

**Example:**

In [53]:
from crewai.flow.flow import Flow, listen, start, and_

class AndExampleFlow(Flow): # Inherited from Flow


  @start() # Start point of the flow
  def task_x(self): # Method 1
    print("Executing task X")
    self.state["greeting"] = "Hello from the start method"

  @start()  # Start point of the flow
  def task_y(self):  # Method 2
    print("Executing task Y")
    self.state["joke"] = "What do computers eat? Microchips."

  # @listen(and_(task_x, task_y))
  # def task_z(self, results):
  #   print(self.state)
  #   print(f"Results received by Task Z: {results}")
    # print(f"Task X executed: {self.task_x.__name__}")
    # print(f"Task Y executed: {self.task_y.__name__}")

  @listen(and_(task_x, task_y))
  def task_z(self, results):
      print(f"Results received by Task Z: {results}")
      print(f"Results type: {type(results)}")
  def logger(self):
    print("---- Logger ----")
    print(self.state)

      # # Check if we have at least two results
      # if len(results) >= 2:
      #     result_x, result_y = results[:2]  # Safely extract the first two elements
      #     print(f"Task Z received: {result_x} and {result_y}")
      # else:
      #     print("Warning: Not all tasks returned results as expected.")
      #     result_x = results[0] if len(results) > 0 else "No result from task_x"
      #     result_y = "No result from task_y"

      # return "Output from task Z"  # Continue the flow


  # @listen(and_(task_x, task_y))
  # def task_z(self, results):
  #     print(f"Results received by Task Z: {results}")
  #     result_x, result_y = results  # Extract first two elements
  #     print(f"Task Z received: {result_x} and {result_y}")
  #     return "Output from task "

flow = AndExampleFlow()
flow.kickoff()

Executing task X
Executing task Y
Results received by Task Z: None
Results type: <class 'NoneType'>


In [58]:
from crewai.flow.flow import Flow, listen, start, and_

class AndExampleFlow(Flow):
    @start()
    def task_x(self):
        return "Output from task X"

    @start()
    def task_y(self):
        return "Output from task Y"

    @listen(and_(task_x, task_y))
    def task_z(self, results):
        print("Task_z",results)
        result_x, result_y = results
        print(f"Task Z received: {result_x} and {result_y}")
        return "Output from task Z"

flow = AndExampleFlow()
flow.kickoff()

Task_z Output from task Y
[Flow._execute_single_listener] Error in method task_z: too many values to unpack (expected 2)


Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/crewai/flow/flow.py", line 363, in _execute_single_listener
    listener_result = await self._execute_method(
  File "/usr/local/lib/python3.10/dist-packages/crewai/flow/flow.py", line 306, in _execute_method
    else method(*args, **kwargs)
  File "<ipython-input-58-65fee5f27dc2>", line 15, in task_z
    result_x, result_y = results
ValueError: too many values to unpack (expected 2)


'Output from task Y'

In [55]:
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.kickoff()


---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}


Here, **task_z** will execute only after both **task_x** and **task_y** have completed, receiving their results as a tuple.

##Important Considerations

**Execution Timing:** When using **or_**, the listening method executes upon the first completion among the specified tasks. With **and_**, the method waits until all specified tasks have completed.

**Result Handling:** For **or_**, the result passed to the listening method corresponds to the task that completed first. For **and_,** the results are provided as a **tuple**, maintaining the order of the specified tasks.

**Potential Issues:** Be aware of known issues with these functions. For instance, there have been reports of the or_ function executing only once when listening to multiple methods . Similarly, issues with the and_ function have been documented . It's advisable to consult the latest CrewAI documentation and community forums for updates and best practices.

By effectively utilizing or_ and and_, you can implement complex conditional logic within your CrewAI Flows, allowing for dynamic and responsive workflows.