# 🚀 **CrewAI Flow: Dynamic Content Creation Router**

## Project Overview

This notebook demonstrates a **CrewAI Flow** that dynamically routes content creation tasks to specialized crew of agents based on user input. This workflow showcases CrewAI's advanced Flow architecture with **state persistence**, **event-driven routing**, and **multi-crew orchestration**.

### 🎯 What you'll learn

- **Flow Architecture**: Event-driven workflows with `@start`, `@router`, and `@listen` decorators
- **State Management**: Persistent state across workflow execution with `@persist`
- **Dynamic Routing**: Intelligent crew selection based on content type
- **Multi-Crew Orchestration**: Coordinating specialized teams for different content formats

### 🌊 Flow Steps:
1. **Input Collection**: Gather URL and desired content type from user
2. **Dynamic Routing**: Route to appropriate specialized crew based on content type
3. **Content Processing**: Execute specialized content creation with dedicated crews
4. **Output Finalization**: Return polished, ready-to-use content

First we'll start by installing the required packages.

In [None]:
%pip install -U --quiet crewai==0.177.0 crewai_tools

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.6/40.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.5/48.5 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m418.7/418.7 kB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m67.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m66.9 MB/s[0m eta [3

Set the API Keys needed:

- `OPENAI_API_KEY` or any LLM provider you prefer
- `SERPER_API_KEY` they have a generous free tier you can use. Get your key at https://serper.dev

In [None]:
import getpass
import os

# Get keys from environment first
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
SERPER_API_KEY = os.environ.get('SERPER_API_KEY')

# Prompt user for keys only if not already set
if not OPENAI_API_KEY:
    OPENAI_API_KEY = getpass.getpass("OpenAI API key (hidden): ")
if not SERPER_API_KEY:
    SERPER_API_KEY = getpass.getpass("Serper API key (hidden): ")

if OPENAI_API_KEY and SERPER_API_KEY:
    os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY
    os.environ['SERPER_API_KEY'] = SERPER_API_KEY
    print("✅ API keys set")
else:
    print("🚨 API keys not set")

✅ API keys set


### Define the State Model for the Flow

The state model is like a shared notebook that all agents can read and write to.

It persists data throughout the entire workflow execution.

In [None]:
from typing import Dict, Any
from pydantic import BaseModel

class ContentState(BaseModel):
  """
  State model that tracks information throughout the content creation workflow.

  Think of this as a form that gets filled out as the workflow progresses:
  - Start: Only URL is filled
  - After routing: Content type is determined
  - After processing: Final content is ready
  - Throughout: Metadata can be added by any step
  """
  url: str = "" # The source website we're analyzing
  content_type: str = ""  # "blog", "newsletter", or "linkedin"
  final_content: str = ""  # The final content after processing
  metadata: Dict[str, Any] = {}  # Arbitrary metadata

**Why use state?**
1. **Data Persistence**: Information flows between different workflow steps
2. **Type Safety**: Pydantic ensures data integrity
3. **Flexibility**: Easy to add new fields as workflow evolves
4. **Debugging**: Clear view of what data exists at each step

### Create Specialized Agent Teams for Each Content Type

Think of these agents as factories for each content type:
- We're using factory functions to create specialized agent pairs
- Each content type gets its own **researcher + writer** team
- This allows for highly specialized expertise and output formats

1.   **Blog content creation agents.**




In [None]:
from crewai import Agent, LLM
from crewai_tools import SerperDevTool

def create_blog_agents():
    """Create agents specialized for blog content"""

    blog_researcher = Agent(
        role="Blog Content Researcher",
        goal="Extract and analyze web content to identify key insights for blog posts",
        backstory="""You are an expert content researcher who specializes in analyzing
        web content and identifying the most valuable insights for creating engaging blog posts.
        You excel at understanding complex topics and breaking them down into digestible content.""",
        verbose=True,
        tool=[SerperDevTool()],
        llm=LLM(model="gpt-4o-mini"),
        max_iter=5
    )

    blog_writer = Agent(
        role="Blog Content Writer",
        goal="Transform research into engaging, well-structured blog posts",
        backstory="""You are a skilled blog writer with expertise in creating compelling content
        that engages readers and drives meaningful discussions. You excel at taking complex
        information and making it accessible and interesting.""",
        verbose=True,
        llm=LLM(model="gpt-4o-mini")
    )

    return blog_researcher, blog_writer

  return datetime.utcnow().replace(tzinfo=utc)


2.   **Newsletter content creation agents.**

In [None]:
def create_newsletter_agents():
    """Create agents specialized for newsletter content"""

    newsletter_researcher = Agent(
        role="Newsletter Content Researcher",
        goal="Extract key insights from web content for newsletter format",
        backstory="""You are an expert at identifying the most newsworthy and actionable
        insights from web content. You understand what makes content valuable for newsletter
        subscribers and how to present information concisely.""",
        verbose=True,
        tool=[SerperDevTool()],
        llm=LLM(model="gpt-4o-mini"),
        max_iter=5
    )

    newsletter_writer = Agent(
        role="Newsletter Writer",
        goal="Create engaging newsletter content that provides immediate value",
        backstory="""You are a newsletter specialist who knows how to craft content that
        busy professionals want to read. You excel at creating scannable, actionable content
        with clear takeaways.""",
        verbose=True,
        llm=LLM(model="gpt-4o-mini")
    )

    return newsletter_researcher, newsletter_writer

3.   **LinkedIn content creation agents.**

In [None]:
def create_linkedin_agents():
    """Create agents specialized for LinkedIn content"""

    linkedin_researcher = Agent(
        role="LinkedIn Content Researcher",
        goal="Extract professional insights suitable for LinkedIn audience",
        backstory="""You are an expert at identifying professional insights and industry
        trends that resonate with LinkedIn's professional audience. You understand what
        content drives engagement on professional networks.""",
        verbose=True,
        tool=[SerperDevTool()],
        llm=LLM(model="gpt-4o-mini"),
        max_iter=5
    )

    linkedin_writer = Agent(
        role="LinkedIn Content Writer",
        goal="Create engaging LinkedIn posts that drive professional engagement",
        backstory="""You are a LinkedIn content specialist who knows how to craft posts
        that get noticed in the professional feed. You excel at creating content that
        sparks meaningful professional discussions.""",
        verbose=True,
        llm=LLM(model="gpt-4o-mini")
    )

    return linkedin_researcher, linkedin_writer

### Define task creation functions for each content type

Task design principles:
1. Clear, specific instructions improve output quality
2. One-shot examples of the final output helps the agent accomplish better than without examples
3. Task guardrails is another advanced CrewAI feature that can be used to make the task outputs more deterministic. More on that [here](https://docs.crewai.com/en/concepts/tasks#task-guardrails).

1.   **Blog writing Task.**


In [None]:
from crewai import Task

def create_blog_tasks(researcher, writer, url):
    """Create tasks for blog content generation"""

    research_task = Task(
        description=f"""
        Analyze the content from {url} and extract key insights for a blog post.
        Your analysis should identify:
        1. Main themes and key points
        2. Interesting insights or data points
        3. Potential angles for blog content
        4. Target audience considerations
        5. SEO-worthy topics and keywords

        Provide a comprehensive research summary that will guide blog writing.
        """,
        expected_output="A detailed research summary with key insights, themes, and recommendations for blog content",
        agent=researcher
    )

    writing_task = Task(
        description="""
        Create an engaging blog post based on the research findings.

        Requirements:
        - 800-1200 words
        - Engaging headline
        - Clear introduction with hook
        - Well-structured body with subheadings
        - Actionable insights or takeaways
        - Strong conclusion
        - SEO-optimized content
        - Professional yet accessible tone

        Format the output in markdown.
        """,
        expected_output="A complete, well-structured blog post in markdown format",
        agent=writer,
        context=[research_task]
    )

    return [research_task, writing_task]

2.   **Newsletter writing Task.**

In [None]:
def create_newsletter_tasks(researcher, writer, url):
    """Create tasks for newsletter content generation"""

    research_task = Task(
        description=f"""
        Analyze the content from {url} and extract the most newsworthy insights for a newsletter.
        Focus on:
        1. Most important news or updates
        2. Actionable insights subscribers can use immediately
        3. Key statistics or data points
        4. Industry implications
        5. Quick takeaways for busy professionals

        Prioritize information that provides immediate value.
        """,
        expected_output="A focused research summary highlighting the most valuable and actionable insights",
        agent=researcher
    )

    writing_task = Task(
        description="""
        Create a compelling newsletter section based on the research.

        Requirements:
        - 400-600 words
        - Catchy subject line
        - Scannable format with bullet points
        - Clear action items or takeaways
        - Conversational yet professional tone
        - Include relevant links or resources
        - End with a clear call-to-action

        Format for easy reading in email.
        """,
        expected_output="A complete newsletter section with subject line and formatted content",
        agent=writer,
        context=[research_task]
    )

    return [research_task, writing_task]

2.   **LinkedIn post writing Task.**

In [None]:
def create_linkedin_tasks(researcher, writer, url):
    """Create tasks for LinkedIn content generation"""

    research_task = Task(
        description=f"""
        Analyze the content from {url} and extract insights suitable for LinkedIn audience.
        Consider what would engage LinkedIn's professional audience based on the content.
        """,
        expected_output="Research summary focused on professional insights and engagement opportunities",
        agent=researcher
    )

    writing_task = Task(
        description="""
        Create an engaging LinkedIn post based on the research.

        Requirements:
        - 150-300 words (optimal LinkedIn length)
        - Professional yet conversational tone
        - Include relevant hashtags (3-5)
        - Pose a question to encourage engagement
        - Share a key insight or lesson learned from the content
        - Use line breaks for readability
        - Include a call-to-action for comments

        Make it shareable and discussion-worthy.
        """,
        expected_output="A complete LinkedIn post with hashtags and engagement elements",
        agent=writer,
        context=[research_task]
    )

    return [research_task, writing_task]

### **ContentRouterFlow Class Analysis**

This class orchestrates the entire content creation pipeline.

**Class Overview**

- `ContentRouterFlow` is a **subclass of `Flow[ContentState]`** that implements an event-driven workflow for dynamic content creation routing. It demonstrates CrewAI's Flow architecture with state management and specialized crew routing.

### Key Decorators & Their Roles

**💾 `@persist()`**

- **Purpose**: Enables automatic state persistence across workflow execution
- **Behavior**: Saves flow state to SQLite database by default
- **Benefits**: Workflow recovery, state tracking, and debugging capabilities
- **Scope**: Applied at class level to persist all method states

**🚀 `@start()`**
- **Purpose**: Marks the workflow entry point
- **Function**: Collects user input (URL + content type) and initializes state
- **Triggers**: Automatic when flow begins

**🧭 `@router(get_user_input)`**
- **Purpose**: Creates decision point for workflow routing
- **Logic**: Returns `self.state.content_type` to determine next path
- **Output**: Routes to "blog", "newsletter", or "linkedin" listeners

**👂 `@listen("event_name")`**
- **Purpose**: Creates event listeners for specific triggers
- **Behavior**: Methods execute when their specified event occurs
- **Examples**: `@listen("blog")`, `@listen("newsletter")`, `@listen("linkedin")`

## Workflow:

1. The workflow starts with **`@start()`** on **`get_user_input()`**.
2. The result of **`get_user_input()`** is routed by **`@router()`** to one of the `process_*_content` methods based on the content_type.
3. Whichever `process_*_content` method runs (e.g., `process_blog_content`), it will eventually complete and save the result to the flow's state.

In [None]:
from crewai.flow.flow import Flow, listen, router, start
from crewai.flow.persistence import persist
from crewai import Crew

@persist(verbose=True)
class ContentRouterFlow(Flow[ContentState]):
    """
    A dynamic workflow that routes content creation to specialized crews.

    Flow Overview:
    1. START: Get user input (URL + content type)
    2. ROUTE: Direct to appropriate content crew
    3. PROCESS: Execute specialized content creation
    4. FINISH: Return the final content

    This flow demonstrates:
    - Event-driven architecture with decorators
    - State management across workflow steps
    - Dynamic routing based on user input
    - Parallel processing capabilities
    """

    @start()
    def get_user_input(self):
        """Get URL and desired content type from user"""
        # Enter your target URL here
        url = input("Enter your target URL: ") # example: https://blog.crewai.com/crewai-on-2025-ia-enablers-list-with-openai-and-anthropic/

        # Enter your content type: blog, newsletter, or LinkedIn
        content_type = input("Enter your desired content type (blog, newsletter, or linkedin): ")

        # Store in state
        self.state.url = url
        self.state.content_type = content_type

        return "Input collected"

    @router(get_user_input)
    def route_to_crew(self, previous_result):
        """Route to appropriate crew based on content type"""
        return self.state.content_type

    @listen("blog")
    def process_blog_content(self):
        """Process content using blog crew"""
        # Create blog agents
        researcher, writer = create_blog_agents()

        # Create blog tasks
        tasks = create_blog_tasks(researcher, writer, self.state.url)

        # Create and run crew
        blog_crew = Crew(
            agents=[researcher, writer],
            tasks=tasks,
            verbose=True
        )

        result = blog_crew.kickoff()
        self.state.final_content = result.raw

        return "Blog content created"

    @listen("newsletter")
    def process_newsletter_content(self):
        """Process content using newsletter crew"""

        # Create newsletter agents
        researcher, writer = create_newsletter_agents()

        # Create newsletter tasks
        tasks = create_newsletter_tasks(researcher, writer, self.state.url)

        # Create and run crew
        newsletter_crew = Crew(
            agents=[researcher, writer],
            tasks=tasks,
            verbose=True
        )

        result = newsletter_crew.kickoff()
        self.state.final_content = result.raw

        return "Newsletter content created"

    @listen("linkedin")
    def process_linkedin_content(self):
        """Process content using LinkedIn crew"""

        # Create LinkedIn agents
        researcher, writer = create_linkedin_agents()

        # Create LinkedIn tasks
        tasks = create_linkedin_tasks(researcher, writer, self.state.url)

        # Create and run crew
        linkedin_crew = Crew(
            agents=[researcher, writer],
            tasks=tasks,
            verbose=True
        )

        result = linkedin_crew.kickoff()
        self.state.final_content = result.raw

        return "LinkedIn content created"

### Run the Flow

Now we're ready to bring everything together into the flow and run it to start the automation. You should see a print out of the agent process so you can keep track of what the agents are doing.

In [None]:
import asyncio

# Create and run the flow
flow = ContentRouterFlow()
result = await flow.kickoff_async()

  return datetime.utcnow().replace(tzinfo=utc)


Output()

Enter your target URL: https://www.yahoo.com/news/articles/ftc-launches-inquiry-ai-chatbots-153035090.html?guccounter=1&guce_referrer=aHR0cHM6Ly93d3cuZ29vZ2xlLmNvbS8&guce_referrer_sig=AQAAAHyaxWe9vR_wMDQVV_SHygSZCWuY2g3DZVG5zsA7FG57L7IcRS4pXEpPARguD-jFVynVi1wSiTp1Lq567Ye02Zv00d0BAlzNGGs7QwFkRoosBwYK_wMPSJ9fcafgYQVvSi8YVSRwZl_jJ3Xz-6H0CuTBSN6aOFeNtlaWfp_Mutdz
Enter your desired content type (blog, newsletter, or linkedin): linkedin


  return datetime.utcnow().replace(tzinfo=utc)


Output()

  return datetime.utcnow().replace(tzinfo=utc)


  return datetime.utcnow().replace(tzinfo=utc)


Output()

  return datetime.utcnow().replace(tzinfo=utc)


Output()

[96m Saving flow state to memory for ID: 8d81f08b-afd3-4713-be92-1327d8702d84[00m


Now that the run has completed, let's access the final content saved in our state via **`flow.state.final_content`**

In [None]:
from IPython.display import display, Markdown

# Display the actual content
display(Markdown("---"))

# Format the content nicely for notebook
content = str(flow.state.final_content)
display(Markdown(content))

---

🌟 **Navigating the AI Landscape: Compliance, Ethics, and Innovation** 🌟  

The ongoing inquiry by the Federal Trade Commission (FTC) into AI chatbot practices is a pivotal moment for all of us in the technology, marketing, and ethics sectors. This is not just a regulatory check—it's a wake-up call for responsible innovation.  

Here are key insights:  

1. **Evolving Regulatory Landscape**: Stay updated on regulatory changes; they will shape how we develop and implement AI solutions.  
2. **Ethical AI Usage**: Prioritizing privacy and accuracy in AI responses is vital for building trust with our users—let’s discuss how we can uphold these standards.  
3. **Market Competitiveness**: Those who can balance innovation with regulatory compliance will lead the market. Are you ready to adapt?  
4. **Collaboration is Key**: Engaging with legal experts and ethicists can foster a balanced approach to AI deployment.  

As the landscape shifts, we must focus on collaborating, learning, and evolving our skill sets. Let's embrace ethical considerations and enhance our understanding of regulations.  

🔍 **Question for You**: How is your organization preparing for the impending regulatory changes surrounding AI?  

Let's spark this conversation! Comment below and share your insights. 👇  

#ArtificialIntelligence #EthicalAI #MarketInnovation #FTCInquiry #Collaboration

## **Next Steps**

- Explore CrewAI's advanced features like **`plot()`**, **`and_`**, and **`or_`** for complex logic. Access the [docs here](https://docs.crewai.com).
- Build human-in-the-loop workflows for interactive decision making
- Add guardrails + observability to your crews
- Scale to production with [CrewAI's enterprise](https://app.crewai.com) orchestration capabilities
- Join the [CrewAI community](https://community.crewai.com) and contribute to the ecosystem

**You're now equipped to build sophisticated, production-ready multi-agent AI systems that can handle complex, real-world automation challenges!**
