## Implementing Flows



**Aim**: Build a task management systemm 
1. Generates a task for a software engineer.
2. Updates the task’s status to "In Progress."
3. Marks the task as "Completed."


In [None]:
# !pip install crewai
# !pip install crewai-tools

In [2]:
from dotenv import load_dotenv
load_dotenv()

from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, start, listen

import openai
import os

openai_client = openai.OpenAI(api_key = os.getenv("OPENAI_API_KEY"))

### Using Unstructured State

In [3]:
class TaskManagementFlow(Flow):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state['id']}")
        
        # Step 1: Generate a new task
        self.state["task"] = "Fix a critical bug in the payment system"
        self.state["status"] = "Pending"
        print(f"Task generated: {self.state['task']} (Status: {self.state['status']})")

    @listen(generate_task)
    def start_task(self):
        # Step 2: Update task status to 'In Progress'
        self.state["status"] = "In Progress"
        print(f"Task status updated: {self.state['status']}")

    @listen(start_task)
    def complete_task(self):
        # Step 3: Mark task as 'Completed'
        self.state["status"] = "Completed"
        print(f"Task status updated: {self.state['status']}")
        print(f"Final Task State: {self.state}")

In [4]:
# Execute the flow
flow = TaskManagementFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 5cdd8dfc-d2a3-4a96-90ad-ce2d87a6d0bc[00m


Flow started. State ID: 5cdd8dfc-d2a3-4a96-90ad-ce2d87a6d0bc
Task generated: Fix a critical bug in the payment system (Status: Pending)


Task status updated: In Progress


Task status updated: Completed
Final Task State: {'id': '5cdd8dfc-d2a3-4a96-90ad-ce2d87a6d0bc', 'task': 'Fix a critical bug in the payment system', 'status': 'Completed'}


In [6]:
flow.state

{'id': '5cdd8dfc-d2a3-4a96-90ad-ce2d87a6d0bc',
 'task': 'Fix a critical bug in the payment system',
 'status': 'Completed'}

### Structured State

In [10]:
from pydantic import BaseModel

class TaskState(BaseModel):
    task: str = None
    status: str = None

In [13]:
class TaskManagementFlow(Flow[TaskState]):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state.id}")
        
        # Step 1: Generate a new task
        self.state.task = "Fix a critical bug in the payment system"
        self.state.status = "Pending"
        print(f"Task generated: {self.state.task} (Status: {self.state.status})")

    @listen(generate_task)
    def start_task(self):
        # Step 2: Update task status to 'In Progress'
        self.state.status = "In Progress"
        print(f"Task status updated: {self.state.status}")

    @listen(start_task)
    def complete_task(self):
        # Step 3: Mark task as 'Completed'
        self.state.status = "Completed"
        print(f"Task status updated: {self.state.status}")
        print(f"Final Task State: {self.state}")

In [14]:
# Execute the flow
flow = TaskManagementFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: b1136ed9-ddf3-4617-91c0-94078d10bdb8[00m


Flow started. State ID: b1136ed9-ddf3-4617-91c0-94078d10bdb8
Task generated: Fix a critical bug in the payment system (Status: Pending)


Task status updated: In Progress


Task status updated: Completed
Final Task State: id='b1136ed9-ddf3-4617-91c0-94078d10bdb8' task='Fix a critical bug in the payment system' status='Completed'


In [15]:
flow.state

StateWithId(id='b1136ed9-ddf3-4617-91c0-94078d10bdb8', task='Fix a critical bug in the payment system', status='Completed')

### Conditional Flow

In [18]:
# OR Conditional Flow Execution

from crewai.flow.flow import Flow, listen, or_, start

class SupportFlow(Flow):
    @start()
    def live_chat_request(self):
        return "Support request received via live chat"

    @start()
    def email_ticket_request(self):
        return "Support request received via email ticket"

    @listen(or_(live_chat_request, email_ticket_request))
    def log_request(self, request_source):
        print(f"Logging request: {request_source}")


In [25]:
# Execute the flow
flow = SupportFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 74b4dc83-5896-4e9d-8b8e-8c6acebee756[00m


Logging request: Support request received via live chat


Logging request: Support request received via email ticket


In [26]:
## AND Conditional Flow Execution
from crewai.flow.flow import Flow, and_, listen, start

class TicketEscalationFlow(Flow):

    @start()
    def user_confirms_issue(self):
        self.state["user_confirmation"] = True
        print("User confirmed they still need assistance.")

    @listen(user_confirms_issue)
    def agent_reviews_ticket(self):
        self.state["agent_review"] = True
        print("Support agent has reviewed the ticket.")

    @listen(and_(user_confirms_issue, agent_reviews_ticket))
    def escalate_ticket(self):
        print("Escalating ticket to Level 2 support!")
    

In [27]:
# Execute the flow
flow = TicketEscalationFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: ec1e63e1-ef7f-4751-8366-92ecdaf4bf46[00m


User confirmed they still need assistance.


Support agent has reviewed the ticket.


Escalating ticket to Level 2 support!


### Router Logic

Aim : Assign Urgent ticker to live customer support

In [29]:
from pydantic import BaseModel
import random

class TickerState(BaseModel):
    priority: str = "low"

In [35]:
from crewai.flow.flow import Flow, listen, router, start


class TickerRouting(Flow[TickerState]):

    @start()
    def classify_ticker(self):
        print("Classifying ticket priority...")
        self.state.priority = random.choice(["low", "high"])
        print(f"Ticket classified as {self.state.priority} priority.")

    @router(classify_ticker)
    def route_ticker(self):
        if self.state.priority == "high":
            return "Urgent Support"
        else:
            return "Email Support"
        
    @listen("urgent_support")
    def assign_to_agent(self):
        print("Assigning high priority ticket to an agent.")

    @listen("email_support")
    def send_to_email_queue(self):
        print("Sending low priority ticket to email queue.")

In [36]:
# Execute the flow
flow = TickerRouting()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: cb55fd21-d340-4de3-9580-c854cac57b89[00m


Classifying ticket priority...
Ticket classified as high priority.


In [37]:
flow.plot()

Plot saved as crewai_flow.html
