# Dynamic Travel Advisor - CrewAI Flows with Validation & Routing

## üìö Learning Objectives

In this notebook, you will learn how to:
- **Build CrewAI Flows** - Advanced workflow orchestration beyond simple crews
- **Implement validation loops** - Validate outputs and regenerate if needed
- **Use routing logic** - Conditionally route execution based on state
- **Manage state** - Track and update information across workflow steps
- **Create dynamic workflows** - Build intelligent, self-correcting AI systems

## üéØ Project Overview

This notebook demonstrates building a **Dynamic Travel Advisor** using CrewAI Flows. Unlike simple crews, Flows provide:

- **State Management**: Track vacation plans, validation status, and retry attempts
- **Event Listeners**: Trigger actions based on previous step completions
- **Conditional Routing**: Route workflow based on validation results
- **Auto-Regeneration**: Automatically retry if plans don't meet requirements

### Workflow Architecture:
1. **Generate Plan** ‚Üí Travel Planner creates vacation plan
2. **Validate Plan** ‚Üí Validator checks if plan meets constraints
3. **Route Decision**:
   - ‚úÖ **Valid** ‚Üí Save plan to file
   - ‚ùå **Invalid** ‚Üí Regenerate (max 2 attempts)
   - üö´ **Not Feasible** ‚Üí Notify user

## üîë Key Concepts

- **Flow**: A stateful workflow that coordinates multiple steps
- **State**: Shared data structure that persists across workflow steps
- **Listeners**: Methods that execute when specific events occur
- **Router**: Decision logic that determines next step based on state
- **@start()**: Decorator marking the entry point of the flow
- **@listen()**: Decorator for methods triggered by events
- **@router()**: Decorator for conditional routing logic

## üöÄ Let's Build It!

In [None]:
# ================================================================================
# SETUP: Import Required Libraries and Initialize LLM
# ================================================================================
# This section imports all necessary libraries and sets up the LLM
# ================================================================================

# CrewAI core components
from crewai import Agent, Task, LLM, Crew
from crewai.flow.flow import Flow, start, listen, router

# Data modeling and type hints
from pydantic import BaseModel, Field
from typing import List

# System utilities
import os

# Load environment variables (if using .env file)
from dotenv import load_dotenv
load_dotenv()

# ================================================================================
# Configure LLM
# ================================================================================

# Set up API key (uncomment and replace with your key if not using .env)
# os.environ['OPENAI_API_KEY'] = "YOUR_OPENAI_API_KEY"

# Initialize GPT-4 for advanced reasoning
llm = LLM(model="gpt-4")

## STEP 1: Define AI Agents

Agents are the "workers" in our workflow. Each agent has specialized expertise.

In [None]:
# This cell has been merged with the setup cell above
# Uncomment the line below if you need to set the API key directly
# os.environ['OPENAI_API_KEY'] = "YOUR_OPENAI_API_KEY"

## STEP 2: Create Specialized Agents

We'll create two agents with distinct responsibilities:
- **Travel Planner**: Creates detailed vacation plans
- **Travel Validator**: Ensures plans meet requirements

In [None]:
# ================================================================================
# Agent 1: Travel Planner
# ================================================================================
# This agent specializes in creating comprehensive vacation plans including
# activities, accommodation, and cost estimates
# ================================================================================

planner_agent = Agent(
    llm=llm,
    role="Travel Planner",
    backstory="An expert in planning group vacations, considering budgets, destinations, and activities.",
    goal="Create a vacation plan based on traveler names, departure city, and destination.",
    verbose=True,  # Set to False to hide detailed reasoning
)

# ================================================================================
# Agent 2: Travel Plan Validator
# ================================================================================
# This agent reviews plans to ensure they meet quality standards:
# - Budget-friendly
# - Include at least 2 activities
# - Clear accommodation details
# ================================================================================

validator_agent = Agent(
    llm=llm,
    role="Travel Plan Validator",
    backstory="An experienced travel advisor who ensures all plans meet safety, budget, and feasibility constraints.",
    goal="Evaluate vacation plans and validate if they meet predefined constraints.",
    verbose=True,  # Set to False to hide validation reasoning
)

## STEP 3: Define Task Factory Functions

Instead of creating tasks upfront, we define functions that create tasks dynamically.
This allows us to generate new tasks with different parameters during workflow execution.

In [None]:
# ================================================================================
# Task Factory 1: Create Vacation Plan Task
# ================================================================================
# This function creates a task for generating vacation plans.
# It's called dynamically during workflow execution.
# 
# Parameters:
#   - names: List of traveler names
#   - city: Departure city
#   - destination: Destination city
# ================================================================================

def create_vacation_plan_task(names, city, destination) -> Task:
    """Factory function to create a vacation planning task"""
    return Task(
        description=(
            f"Create a detailed vacation plan for the following travelers: {', '.join(names)}, "
            f"departing from {city} to {destination}. Ensure the plan includes "
            f"activities, accommodation, and estimated costs."
        ),
        expected_output="A detailed vacation plan including activities, accommodation, and costs.",
        agent=planner_agent,
    )


# ================================================================================
# Task Factory 2: Validate Vacation Plan Task
# ================================================================================
# This function creates a task for validating vacation plans.
# 
# Validation Criteria:
#   ‚úì Budget-friendly pricing
#   ‚úì At least 2 activities included
#   ‚úì Clear accommodation details
# 
# Returns: Task that outputs "Valid" or "Invalid"
# ================================================================================

def validate_vacation_plan_task(vacation_plan) -> Task:
    """Factory function to create a validation task"""
    return Task(
        description=(
            f"Evaluate the following vacation plan: {vacation_plan}. "
            f"Ensure it meets the following constraints: budget-friendly, includes at least two activities, "
            f"and has clear accommodation details. Reply with 'Valid' if it meets the constraints, "
            f"otherwise reply with 'Invalid'."
        ),
        expected_output="Valid or Invalid.",
        agent=validator_agent,
    )

## STEP 4: Define State and Create the Flow

**State Management**: The Flow uses a Pydantic model to track workflow state across steps.

**Flow Decorators**:
- `@start()`: Entry point of the workflow
- `@listen(method_name)`: Triggered when specified method completes
- `@router(method_name)`: Makes routing decisions based on state

In [None]:
# ================================================================================
# Define Workflow State
# ================================================================================
# The state object persists across all workflow steps, allowing different
# methods to read and update shared information.
# ================================================================================

class TravelState(BaseModel):
    """State model for tracking travel planning workflow"""
    vacation_plan: str = ""                    # Stores the generated vacation plan
    is_plan_valid: bool = False                # Tracks validation result
    generation_attempts_left: int = 2          # Limits regeneration attempts


# ================================================================================
# Define the Travel Advisor Flow
# ================================================================================
# This Flow coordinates the entire vacation planning workflow with validation
# and automatic regeneration logic.
# 
# Workflow Steps:
# 1. generate_vacation_plan (@start)      - Creates initial plan
# 2. validate_vacation_plan (@listen)     - Validates the plan
# 3. route_vacation_plan (@router)        - Routes based on validation
# 4. finalize_vacation_plan (@listen)     - Saves valid plan
# 5. regenerate_vacation_plan (@listen)   - Retries if invalid
# 6. notify_user (@listen)                - Handles failure case
# ================================================================================

class TravelAdvisorFlow(Flow[TravelState]):
    """Flow for dynamic travel planning with validation and regeneration"""
    
    # Flow input parameters
    names: List[str] = Field(description="List of travelers")
    city: str = Field(description="Departure city")
    destination: str = Field(description="Destination city")

    def __init__(self, names: List[str], city: str, destination: str):
        """Initialize the flow with travel parameters"""
        super().__init__()
        self.names = names
        self.city = city
        self.destination = destination

    # ============================================================================
    # Step 1: Generate Vacation Plan (@start)
    # ============================================================================
    # Entry point of the workflow. Creates initial vacation plan.
    # ============================================================================
    
    @start()
    def generate_vacation_plan(self):
        """Generate a vacation plan using the Travel Planner agent"""
        print("Generating vacation plan")
        
        # Create task dynamically with current parameters
        task = create_vacation_plan_task(self.names, self.city, self.destination)
        
        # Execute task with a single-agent crew
        crew = Crew(agents=[planner_agent], tasks=[task])
        result = crew.kickoff()
        
        # Store result in shared state
        self.state.vacation_plan = result.raw
        print("Vacation plan generated!")

    # ============================================================================
    # Step 2: Validate Vacation Plan (@listen)
    # ============================================================================
    # Automatically triggered after generate_vacation_plan completes.
    # Checks if plan meets requirements.
    # ============================================================================
    
    @listen(generate_vacation_plan)
    def validate_vacation_plan(self):
        """Validate the generated plan using the Validator agent"""
        print("Start validation of the plan...")
        
        # Create validation task with the current plan
        task = validate_vacation_plan_task(self.state.vacation_plan)
        
        # Execute validation
        crew = Crew(agents=[validator_agent], tasks=[task])
        result = crew.kickoff()
        
        # Update state based on validation result
        self.state.is_plan_valid = "Valid" in result.raw
        print("Validation complete", "Valid" if self.state.is_plan_valid else "Invalid")

    # ============================================================================
    # Step 3: Route Based on Validation (@router)
    # ============================================================================
    # Conditional routing logic that decides the next step:
    # - "valid": Plan is good ‚Üí finalize
    # - "regenerate": Plan is bad but retries left ‚Üí regenerate
    # - "not_feasible": Plan is bad and no retries left ‚Üí notify user
    # ============================================================================
    
    @router(validate_vacation_plan)
    def route_vacation_plan(self):
        """Route workflow based on validation result and retry attempts"""
        if self.state.is_plan_valid:
            return "valid"
        elif self.state.generation_attempts_left == 0:
            return "not_feasible"
        else:
            return "regenerate"

    # ============================================================================
    # Step 4a: Finalize Valid Plan (@listen "valid")
    # ============================================================================
    # Triggered when router returns "valid". Saves plan to file.
    # ============================================================================
    
    @listen("valid")
    def finalize_vacation_plan(self):
        """Save the validated plan to a file"""
        with open("vacation_plan.txt", "w") as file:
            file.write(self.state.vacation_plan)
        print("Vacation plan saved in file")

    # ============================================================================
    # Step 4b: Regenerate Plan (@listen "regenerate")
    # ============================================================================
    # Triggered when router returns "regenerate". Decrements attempts and
    # regenerates plan.
    # ============================================================================
    
    @listen("regenerate")
    def regenerate_vacation_plan(self):
        """Regenerate plan if validation failed but attempts remain"""
        self.state.generation_attempts_left -= 1
        print(f"Regenerating plan... (Attempts left: {self.state.generation_attempts_left})")
        self.generate_vacation_plan()

    # ============================================================================
    # Step 4c: Notify User of Failure (@listen "not_feasible")
    # ============================================================================
    # Triggered when router returns "not_feasible". Max retries exceeded.
    # ============================================================================
    
    @listen("not_feasible")
    def notify_user(self):
        """Notify user that plan couldn't be created within constraints"""
        print("Plan is not feasible, I'm sorry :(")
        print(f"Tried {3 - self.state.generation_attempts_left} times but couldn't meet all constraints.")

In [None]:
# ================================================================================
# This cell has been merged with the cell above
# ================================================================================
# The complete TravelAdvisorFlow class is defined in the previous cell.
# Scroll up to see the full implementation with detailed comments.
# ================================================================================


## STEP 5: Execute the Flow

Now we'll run the complete workflow with sample travel data.

**Note**: `nest_asyncio` allows `asyncio.run` to work in Jupyter Notebooks by enabling nested event loops. This is required for CrewAI Flows in notebook environments.

In [None]:
# ================================================================================
# Execute the Travel Advisor Flow
# ================================================================================
# This cell runs the complete workflow with sample data.
# 
# Expected Flow Execution:
# 1. Generate vacation plan for Alice, Robert, Charlie
# 2. Validate the plan
# 3. If valid ‚Üí Save to vacation_plan.txt
#    If invalid ‚Üí Regenerate (up to 2 more times)
#    If still invalid ‚Üí Notify failure
# ================================================================================

# Required for running async code in Jupyter Notebooks
import nest_asyncio
nest_asyncio.apply()

def kickoff():
    """Main execution function"""
    
    # ============================================================================
    # Define Travel Parameters
    # ============================================================================
    # Customize these parameters to plan different trips
    # ============================================================================
    
    names = ["Alice", "Robert", "Charlie"]  # Travelers
    city = "Berlin"                         # Departure city
    destination = "Rome"                    # Destination city
    
    # ============================================================================
    # Initialize and Run the Flow
    # ============================================================================
    
    print("="*70)
    print("üåç DYNAMIC TRAVEL ADVISOR - STARTING WORKFLOW")
    print("="*70)
    print(f"Travelers: {', '.join(names)}")
    print(f"Route: {city} ‚Üí {destination}")
    print("="*70)
    print()
    
    # Create flow instance with travel parameters
    travel_flow = TravelAdvisorFlow(names, city, destination)
    
    # Start the workflow
    # The flow will automatically:
    # - Generate plan (@start)
    # - Validate plan (@listen)
    # - Route based on validation (@router)
    # - Take appropriate action (@listen on routes)
    travel_flow.kickoff()
    
    print()
    print("="*70)
    print("‚úÖ WORKFLOW COMPLETED")
    print("="*70)

# Execute the workflow
kickoff()

# ================================================================================
# Expected Output
# ================================================================================
# You should see:
# 1. "Generating vacation plan" ‚Üí Agent creates plan
# 2. "Vacation plan generated!" ‚Üí Plan created
# 3. "Start validation of the plan..." ‚Üí Validation begins
# 4. "Validation complete Valid/Invalid" ‚Üí Validation result
# 5. Either:
#    - "Vacation plan saved in file" (success)
#    - "Regenerating plan..." (retry)
#    - "Plan is not feasible" (failure after retries)
# ================================================================================

Generating vacation plan
[1m[95m# Agent:[00m [1m[92mTravel Planner[00m
[95m## Task:[00m [92mCreate a detailed vacation plan for the following travelers: Alice, Robert, Charlie, departing from Berlin to Rome. Ensure the plan includes activities, accommodation, and estimated costs.[00m


Overriding of current TracerProvider is not allowed




[1m[95m# Agent:[00m [1m[92mTravel Planner[00m
[95m## Final Answer:[00m [92m
Day 1: Departure and Arrival

- Depart from Berlin via Lufthansa, flight cost approx. ‚Ç¨150 per person.
- Arrive in Rome, after the airport transfer, check into Hotel Artemide for ‚Ç¨200 per night for a triple occupancy room.
- Have a welcome dinner at La Piazzetta restaurant, estimated cost ‚Ç¨25 per person.

Day 2: Exploring Rome

- Breakfast at the hotel, included in the room price.
- Take a tour of the Colosseum ($25 per person) and Roman Forum.
- Lunch at Trattoria da Valentino, estimated cost ‚Ç¨30 per person.
- Visit the Pantheon and Trevi Fountain, free admission.
- Dinner at Ristorante Il Gabriello, estimated cost ‚Ç¨40 per person.

Day 3: Vatican City

- Breakfast at the hotel.
- Visit Vatican City, entrance to the Vatican Museums and Sistine Chapel is ‚Ç¨26 per person.
- Lunch at Pizzarium Bonci, estimated cost ‚Ç¨20 per person.
- Visit St. Peter's Basilica, free entry. 
- Have dinner at 