New Flow called MovieRecommendationFlow. It contains two methods:

Generate a random movie genre.
Recommend a popular movie based on the generated genre.

In [7]:
import os
from dotenv import load_dotenv
import onnxruntime
import openai

# Load the .env file
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY2")
openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


In [10]:
from crewai.flow.flow import Flow, start, listen

class MovieRecommendationFlow(Flow):
    
    @start()
    def generate_genre(self):
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "user",
                    "content": "Give me a random movie genre.",
                },
            ],
        )
        
        random_genre = response.choices[0].message.content.strip()
        self.state["genre"] = random_genre
        return random_genre
    @listen(generate_genre)
    def recommend_movie(self, random_genre):
    
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "user",
                    "content": f"Recommend a movie in {random_genre} genre.",
                },
            ],
        )
        movie_recommendation = response.choices[0].message.content.strip()
        self.state["recommendation"] = movie_recommendation
        return movie_recommendation

In [11]:
flow = MovieRecommendationFlow()
final_result = await flow.kickoff_async()

print(f"\nMovie Recommendation: {final_result}")

[1m[35m Flow started with ID: 314f9e93-e7ee-430b-b134-dc13781dc82a[00m



Movie Recommendation: I recommend "Arrival" (2016), directed by Denis Villeneuve. The film explores themes of communication, time, and the nature of humanity through the story of a linguist, played by Amy Adams, who is recruited to communicate with extraterrestrial beings that have landed on Earth. The film is not just visually stunning but also thought-provoking, raising questions about language, perception, and the way we understand our world. It's a compelling blend of science fiction and emotional depth that leaves a lasting impression.


In [12]:
from pprint import pprint
pprint(flow.state)

{'genre': 'Science Fiction.',
 'id': '314f9e93-e7ee-430b-b134-dc13781dc82a',
 'recommendation': 'I recommend "Arrival" (2016), directed by Denis '
                   'Villeneuve. The film explores themes of communication, '
                   'time, and the nature of humanity through the story of a '
                   'linguist, played by Amy Adams, who is recruited to '
                   'communicate with extraterrestrial beings that have landed '
                   'on Earth. The film is not just visually stunning but also '
                   'thought-provoking, raising questions about language, '
                   "perception, and the way we understand our world. It's a "
                   'compelling blend of science fiction and emotional depth '
                   'that leaves a lasting impression.'}


Usage of States to store the intermediary results: 
- Unstructured
- Structured

In [13]:
from crewai.flow.flow import Flow, listen, start

class TaskManagementFlow(Flow):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state['id']}")
        
        # Step 1: Generate a new task
        self.state["task"] = "Fix a critical bug in the payment system"
        self.state["status"] = "Pending"
        print(f"Task generated: {self.state['task']} (Status: {self.state['status']})")

    @listen(generate_task)
    def start_task(self):
        # Step 2: Update task status to 'In Progress'
        self.state["status"] = "In Progress"
        print(f"Task status updated: {self.state['status']}")

    @listen(start_task)
    def complete_task(self):
        # Step 3: Mark task as 'Completed'
        self.state["status"] = "Completed"
        print(f"Task status updated: {self.state['status']}")
        print(f"Final Task State: {self.state}")

In [14]:
# Execute the flow
flow = TaskManagementFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 2857b6ca-95b1-4971-8954-ef5303d13f38[00m


Flow started. State ID: 2857b6ca-95b1-4971-8954-ef5303d13f38
Task generated: Fix a critical bug in the payment system (Status: Pending)


Task status updated: In Progress


Task status updated: Completed
Final Task State: {'id': '2857b6ca-95b1-4971-8954-ef5303d13f38', 'task': 'Fix a critical bug in the payment system', 'status': 'Completed'}


### Structured

In [15]:
from pydantic import BaseModel

# Defining a structured state model
class TaskState(BaseModel):
    task: str = "None"
    status: str = "None"

In [16]:
from crewai.flow.flow import Flow, listen, start

class StructuredTaskFlow(Flow[TaskState]):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state.id}")
        self.state.task = "Develop a new API endpoint"
        self.state.status = "Pending"
        print(f"Task generated: {self.state.task} (Status: {self.state.status})")

    @listen(generate_task)
    def start_task(self):
        self.state.status = "In Progress"
        print(f"Task status updated: {self.state.status}")

    @listen(start_task)
    def complete_task(self):
        self.state.status = "Completed"
        print(f"Task status updated: {self.state.status}")
        print(f"Final Task State: {self.state}")

In [17]:
# Execute the flow
flow = StructuredTaskFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 0956f378-1cb9-4b1c-bd3b-b2954baeab9e[00m


Flow started. State ID: 0956f378-1cb9-4b1c-bd3b-b2954baeab9e
Task generated: Develop a new API endpoint (Status: Pending)


Task status updated: In Progress


Task status updated: Completed
Final Task State: id='0956f378-1cb9-4b1c-bd3b-b2954baeab9e' task='Develop a new API endpoint' status='Completed'


### Conditional


In [18]:
from crewai.flow.flow import Flow, listen, or_, start

class SupportFlow(Flow):

    @start()
    def live_chat_request(self):
        return "Support request received via live chat"

    @start()
    def email_ticket_request(self):
        return "Support request received via email ticket"

    @listen(or_(live_chat_request, email_ticket_request))
    def log_request(self, request_source):
        print(f"Logging request: {request_source}")

# Execute the flow
flow = SupportFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 865c4631-aba4-45e6-933a-78c0d56da60b[00m


Logging request: Support request received via live chat


Logging request: Support request received via email ticket


In [19]:
from crewai.flow.flow import Flow, and_, listen, start

class TicketEscalationFlow(Flow):

    @start()
    def user_confirms_issue(self):
        self.state["user_confirmation"] = True
        print("User confirmed they still need assistance.")

    @listen(user_confirms_issue)
    def agent_reviews_ticket(self):
        self.state["agent_review"] = True
        print("Support agent has reviewed the ticket.")

    @listen(and_(user_confirms_issue, agent_reviews_ticket))
    def escalate_ticket(self):
        print("Escalating ticket to Level 2 support!")

# Execute the flow
flow = TicketEscalationFlow()
final_result = await flow.kickoff_async()

[1m[35m Flow started with ID: 9ae0d355-da55-4987-905c-677907e9d40b[00m


User confirmed they still need assistance.


Support agent has reviewed the ticket.


Escalating ticket to Level 2 support!
