# Multi-Agent Content Creation Pipeline with CrewAI and Amazon Bedrock

This notebook implements a functional multi-agent content creation pipeline using CrewAI and Amazon Bedrock. The system performs actual web searches and uses real data to generate high-quality blog content.

## Architecture Overview

```
                ┌────────────────┐
                │                │
                │  Amazon        │
                │  Bedrock       │
                │                │
                └────────────────┘
                        ▲
                        │
                        ▼
┌───────────────────────────────────────────────┐
│                  CrewAI                       │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐ │
│  │          │    │          │    │          │ │
│  │Researcher│───►│  Writer  │───►│  Editor  │ │
│  │  Agent   │    │  Agent   │    │  Agent   │ │
│  │          │    │          │    │          │ │
│  └──────────┘    └──────────┘    └──────────┘ │
│        │               │               │      │
│        ▼               ▼               ▼      │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐ │
│  │ Internet │    │ Content  │    │ SEO      │ │
│  │ Search   │    │ Creation │    │ Analysis │ │
│  │ Tool     │    │ Tool     │    │ Tool     │ │
│  └──────────┘    └──────────┘    └──────────┘ │
└───────────────────────────────────────────────┘
```

**Notes:**
- Make sure you are running this code using your AWS Credentials. This notebook assumes you are loading the credentials using an IAM role, however, you may use your access_key if you are not using IAM Roles. For more details about how to set temporaty AWS credentials please check [this link](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html).
- Before using Amazon Bedrock models in this notebook you need to enable them in your account in us-east-1, for more details about the steps required to enabled the model please check [this link](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access-modify.html).
- This notebook uses Amazon Nova Lite 2.0 as default, all [charges on-demand on request basis](https://aws.amazon.com/bedrock/pricing/).
- Amazon Nova Lite 2.0 will be used with the [Cross-Region Inference mode](https://docs.aws.amazon.com/bedrock/latest/userguide/cross-region-inference.html).
- While this notebook uses us-east-1, Amazon Nova Lite 2.0 is available in a variety of AWS Regions. Check our [document pages](https://docs.aws.amazon.com/bedrock/latest/userguide/models-regions.html) for more details about the regions available.

## Installation and Setup

In [None]:
# Install required packages
%pip install -r requirements.txt -Uq 

In [None]:
from pydantic import BaseModel, Field

# CrewAI imports
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool, tool

# Search tools imports
from langchain_community.tools import DuckDuckGoSearchRun

## Configure Amazon Bedrock

Let's set up Amazon Bedrock as our LLM provider. You need to have your AWS credentials properly configured. Here we use a [custom implementation](https://docs.crewai.com/en/learn/custom-llm) because liteLLM does not yet support the newly released Nova Lite 2.0

In [None]:
import os
from crewai import BaseLLM
from typing import Any, Dict, List, Optional, Union
import boto3
import json

class NovaLLM(BaseLLM):
    """Custom LLM implementation for Amazon Nova models with CrewAI."""
    
    def __init__(
        self,
        model: str = "us.amazon.nova-2-lite-v1:0",
        temperature: Optional[float] = 0.7,
        region_name: str = "us-east-1",
        max_tokens: int = 16384,
        reasoning_effort: Optional[str] = None,  # "low", "medium", "high"
        **kwargs
    ):
        """
        Initialize Nova LLM for CrewAI.
        
        Args:
            model: Nova model ID (e.g., "us.amazon.nova-2-lite-v1:0")
            temperature: Sampling temperature (0-1)
            region_name: AWS region
            max_tokens: Maximum tokens to generate
            reasoning_effort: Enable reasoning ("low", "medium", "high")
        """
        super().__init__(model=model, temperature=temperature)
        
        self.bedrock = boto3.client('bedrock-runtime', region_name=region_name)
        self.max_tokens = max_tokens
        self.reasoning_effort = reasoning_effort
        
    def call(
        self,
        messages: Union[str, List[Dict[str, str]]],
        tools: Optional[List[dict]] = None,
        callbacks: Optional[List[Any]] = None,
        available_functions: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> str:
        """
        Call the Nova LLM with the given messages.
        """
        # Convert string to message format if needed
        if isinstance(messages, str):
            messages = [{"role": "user", "content": messages}]
        
        # Convert to Nova format and call API
        try:
            nova_request = self._convert_to_nova_format(messages, tools)
            response = self.bedrock.converse(**nova_request)
            return self._extract_response(response, messages, tools, available_functions)
        except Exception as e:
            import traceback
            return f"Error calling Nova API: {str(e)}\n{traceback.format_exc()}"
    

    def _convert_to_nova_format(
        self,
        messages: List[Dict[str, str]],
        tools: Optional[List[dict]] = None
    ) -> Dict[str, Any]:
        """Convert OpenAI/CrewAI format to Nova format."""
        nova_request = {
            "modelId": self.model,
            "inferenceConfig": {
                "temperature": self.temperature,
                "maxTokens": self.max_tokens,
            }
        }
        
        # Separate system and conversation messages
        system_messages = []
        conversation_messages = []
        has_assistant_prefill = False
    
        for msg in messages:
            role = msg.get("role", "")
            content = msg.get("content", "")
            
            if role == "system":
                system_messages.append({"text": str(content)})
            elif role in ["user", "assistant"]:
                # Check if there's assistant prefill
                if role == "assistant":
                    has_assistant_prefill = True
                    
                # Ensure content is in Nova format
                if isinstance(content, str):
                    conversation_messages.append({
                        "role": role,
                        "content": [{"text": content}]
                    })
                elif isinstance(content, list):
                    conversation_messages.append({
                        "role": role,
                        "content": content
                    })
            elif role == "tool":
                has_assistant_prefill = True  # Tool messages imply assistant prefill
                # Convert tool result to Nova format
                conversation_messages.append({
                    "role": "user",
                    "content": [{
                        "toolResult": {
                            "toolUseId": msg.get("tool_call_id", "unknown"),
                            "content": [{"text": str(content)}]
                        }
                    }]
                })
    
        # Add system messages
        if system_messages:
            nova_request["system"] = system_messages
        
        # Add conversation messages
        if conversation_messages:
            nova_request["messages"] = conversation_messages
        else:
            # Fallback for empty messages
            nova_request["messages"] = [
                {"role": "user", "content": [{"text": "Hello"}]}
            ]
        
        # Convert tools to Nova format
        if tools:
            nova_tools = self._convert_tools_to_nova_format(tools)
            if nova_tools:
                nova_request["toolConfig"] = {"tools": nova_tools}
        
        # Add reasoning config ONLY if no assistant prefill exists since reasoning is incompatible with assistant prefill
        if self.reasoning_effort and not has_assistant_prefill:
            nova_request["additionalModelRequestFields"] = {
                "reasoningConfig": {
                    "type": "enabled",
                    "maxReasoningEffort": self.reasoning_effort
                }
            }
        
        return nova_request    
    
    def _convert_tools_to_nova_format(self, tools: List[dict]) -> List[dict]:
        """Convert tool definitions to Nova toolSpec format."""
        nova_tools = []
        
        for tool in tools:
            try:
                # Handle OpenAI function format
                if isinstance(tool, dict) and tool.get("type") == "function":
                    function = tool.get("function", {})
                    nova_tools.append({
                        "toolSpec": {
                            "name": function.get("name", "unknown"),
                            "description": function.get("description", ""),
                            "inputSchema": {
                                "json": function.get("parameters", {
                                    "type": "object",
                                    "properties": {},
                                    "required": []
                                })
                            }
                        }
                    })
                # Handle direct tool spec format
                elif isinstance(tool, dict) and "name" in tool:
                    nova_tools.append({
                        "toolSpec": {
                            "name": tool.get("name", "unknown"),
                            "description": tool.get("description", ""),
                            "inputSchema": {
                                "json": tool.get("parameters", {
                                    "type": "object",
                                    "properties": {},
                                    "required": []
                                })
                            }
                        }
                    })
            except Exception as e:
                print(f"Warning: Failed to convert tool: {e}")
                continue
        
        return nova_tools
    
    def _extract_response(
        self,
        response: Dict[str, Any],
        messages: List[Dict[str, str]],
        tools: Optional[List[dict]],
        available_functions: Optional[Dict[str, Any]]
    ) -> str:
        """Extract text response or handle tool calls."""
        output = response.get('output', {})
        message = output.get('message', {})
        content_blocks = message.get('content', [])
        
        # Collect all text parts and tool uses
        text_parts = []
        tool_use_blocks = []
        
        for block in content_blocks:
            # Skip reasoning content blocks (Nova 2.0 feature)
            # These contain the model's thinking process but not the final answer
            if 'reasoningContent' in block:
                continue
            
            # Collect tool use blocks
            if 'toolUse' in block:
                tool_use_blocks.append(block['toolUse'])
                continue
            
            # Collect text content
            if 'text' in block:
                text_parts.append(block['text'])
        
        # If we have tool uses and available_functions, execute them
        if tool_use_blocks and available_functions:
            for tool_use in tool_use_blocks:
                tool_result = self._execute_tool(tool_use, available_functions)
                
                # Add tool use and result to message history
                messages.append({
                    "role": "assistant",
                    "content": [{"toolUse": tool_use}]
                })
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_use.get('toolUseId', ''),
                    "content": tool_result
                })
            
            # Recursively call to get final answer
            return self.call(messages, tools, None, available_functions)
        
        # If we have tool uses but no available_functions, 
        # CrewAI will handle tool execution - return text if available
        if tool_use_blocks and not available_functions:
            # If there's also text, return it (model might have provided both)
            if text_parts:
                return '\n'.join(text_parts)
            
            # Otherwise, format tool call for CrewAI to parse
            # CrewAI expects a specific format for tool calls
            tool_use = tool_use_blocks[0]
            tool_name = tool_use.get('name', 'unknown')
            tool_input = tool_use.get('input', {})
            
            # Format as CrewAI expects
            return f"Action: {tool_name}\nAction Input: {json.dumps(tool_input)}"
        
        # Return combined text response
        if text_parts:
            return '\n'.join(text_parts)
        
        return "No response generated"
    
    def _execute_tool(
        self,
        tool_use: Dict[str, Any],
        available_functions: Dict[str, Any]
    ) -> str:
        """Execute a tool and return the result."""
        tool_name = tool_use.get('name', '')
        tool_input = tool_use.get('input', {})
        
        # Parse JSON string input
        if isinstance(tool_input, str):
            try:
                tool_input = json.loads(tool_input)
            except json.JSONDecodeError:
                return f"Error: Invalid tool input format"
        
        # Execute function
        if tool_name in available_functions:
            try:
                result = available_functions[tool_name](**tool_input)
                return str(result)
            except Exception as e:
                return f"Error executing {tool_name}: {str(e)}"
        
        return f"Error: Tool '{tool_name}' not found"
    
    def supports_function_calling(self) -> bool:
        """Nova models support function calling."""
        return True
    
    def get_context_window_size(self) -> int:
        """Return context window size based on model."""
        if "lite" in self.model.lower():
            return 200000
        elif "pro" in self.model.lower():
            return 300000
        return 128000

In [None]:
from crewai import LLM

# Setting model_id and AWS region
region = "us-east-1"
model_id = "us.amazon.nova-2-lite-v1:0"

# Create Nova LLM with custom class imnplementation
llm = NovaLLM(
    model=model_id,
    temperature=0.7,
    region_name = region,
    reasoning_effort="medium"
)

## Define Tools for Agents

Now, let's define the tools our agents will use. We'll implement actual search functionality with DuckDuckGo for comprehensive research and an SEO analysis tool.

In [None]:
# Create a DuckDuckGo search tool
@tool("Internet Search")
def search_internet(query: str) -> str:
    """Search the internet for information on a specific query or topic.
    
    Args:
        query: The search query string.
    
    Returns:
        str: Search results as text.
    """
    search_tool = DuckDuckGoSearchRun()
    results = search_tool.run(query)
    return f"Search results for '{query}':\n\n{results}"

# SEO Analysis Tool
class SEOAnalysisInput(BaseModel):
    """Input schema for SEO analysis tool."""
    content: str = Field(..., description="The content to analyze for SEO optimization")
    target_keywords: List[str] = Field(..., description="List of target keywords to check for in the content")

class SEOAnalysisTool(BaseTool):
    """Tool for analyzing content for SEO optimization."""
    name: str = "SEO Content Analyzer"
    description: str = "Analyzes content for SEO optimization and provides recommendations"
    args_schema: type[BaseModel] = SEOAnalysisInput
    
    def _run(self, content: str, target_keywords: List[str]) -> str:
        # Calculate basic metrics
        word_count = len(content.split())
        
        # Calculate keyword density
        keyword_counts = {}
        content_lower = content.lower()
        
        for keyword in target_keywords:
            count = content_lower.count(keyword.lower())
            density = round(count / word_count * 100, 2) if word_count > 0 else 0
            keyword_counts[keyword] = {
                "count": count,
                "density": density
            }
        
        # Calculate readability metrics
        sentences = max(1, content.count('.') + content.count('!') + content.count('?'))
        avg_words_per_sentence = word_count / sentences
        
        # Generate recommendations
        recommendations = []
        
        if word_count < 800:
            recommendations.append("Increase content length to at least 800 words for better SEO performance")
        
        for keyword, data in keyword_counts.items():
            if data["count"] == 0:
                recommendations.append(f"Add the keyword '{keyword}' to your content")
            elif data["density"] < 0.5:
                recommendations.append(f"Increase the usage of '{keyword}' (current density: {data['density']}%)")
            elif data["density"] > 2.5:
                recommendations.append(f"Reduce the usage of '{keyword}' to avoid keyword stuffing (current density: {data['density']}%)")
        
        # Format response
        response = f"# SEO Content Analysis\n\n"
        response += f"## Overview\n"
        response += f"- Word Count: {word_count} words ({word_count < 800 and 'too short' or word_count > 2000 and 'lengthy' or 'good length'})\n"
        response += f"- Average Sentence Length: {round(avg_words_per_sentence, 1)} words\n\n"
        
        response += f"## Keyword Analysis\n"
        for keyword, data in keyword_counts.items():
            response += f"- '{keyword}': {data['count']} occurrences ({data['density']}% density)\n"
        
        response += f"\n## Recommendations\n"
        if not recommendations:
            response += "- Content is well-optimized for the target keywords.\n"
        else:
            for rec in recommendations:
                response += f"- {rec}\n"
        
        return response

## Define Specialized Agents

Now let's create our three specialized agents: Researcher, Writer, and Editor.

In [None]:
# Create the Researcher Agent
researcher = Agent(
    role="Research Specialist",
    goal="Conduct thorough research on topics and provide comprehensive, accurate information",
    backstory="""You are an expert researcher with a talent for finding the most relevant and up-to-date 
    information on any topic. You're known for your ability to gather comprehensive data, verify sources, 
    and organize information logically. Your research always includes key statistics, expert opinions, 
    and real-world examples to provide a complete picture of the topic.""",
    verbose=True,
    allow_delegation=False,
    tools=[search_internet],
    llm=llm
)

# Create the Writer Agent
writer = Agent(
    role="Content Writer",
    goal="Create engaging, informative content based on research findings",
    backstory="""You are a talented content writer with years of experience creating engaging blog posts. 
    You excel at turning research into compelling narratives that educate and entertain. 
    Your writing is always clear, structured, and tailored to the target audience. 
    You know how to naturally incorporate keywords while maintaining a conversational tone.""",
    verbose=True,
    allow_delegation=False,  # Disable delegation to avoid errors
    tools=[],  # The writer uses the LLM's capabilities for content creation
    llm=llm
)

# Create the Editor Agent
editor = Agent(
    role="Content Editor & SEO Specialist",
    goal="Refine and optimize content for readability and SEO performance",
    backstory="""You are a meticulous editor and SEO specialist. You have a keen eye for detail 
    and can transform good content into excellent content. You understand SEO best practices 
    and know how to optimize content for search engines without sacrificing readability or user experience. 
    You ensure content is error-free, well-structured, and strategically optimized for its target keywords.""",
    verbose=True,
    allow_delegation=False,
    tools=[SEOAnalysisTool()],
    llm=llm
)

## Define Tasks for Each Agent

Now let's define the specific tasks for our content creation pipeline. We'll choose a topic and target keywords for our blog post.

In [None]:
# Define blog post topic and target keywords
blog_topic = "Sustainable Gardening Practices for Urban Homes"
target_keywords = ["sustainable gardening", "urban gardening", "eco-friendly gardening", "composting", "water conservation"]

# Task 1: Research the topic thoroughly
research_task = Task(
    description=f"""
    Research the topic of "{blog_topic}" comprehensively. Your research should include:
    
    1. Latest trends and innovations in {blog_topic}
    2. Best practices and techniques that are accessible for urban dwellers
    3. Expert opinions and data-backed information
    4. Common challenges and practical solutions
    5. Real-world examples and success stories
    6. Resources or tools that would be helpful to readers
    
    Use the Internet Search tool to gather diverse and current information.
    Focus on content that includes the target keywords: {', '.join(target_keywords)}
    
    Organize your research findings in a structured format that will be helpful for the content writer.
    Include specific data points, statistics, and quotable insights when available.
    
    Return a comprehensive, organized research document with clear sections.
    """,
    agent=researcher,
    expected_output="A comprehensive research brief with organized, factual information about sustainable gardening practices for urban homes, including trends, best practices, expert insights, and real-world examples."
)

# Task 2: Write engaging blog content based on research
writing_task = Task(
    description=f"""
    Using the research provided, create an engaging and informative blog post about "{blog_topic}".
    
    The research findings are as follows:
    {{research_task.output}}
    
    Your blog post should:
    1. Start with an engaging introduction that hooks the reader and establishes the importance of the topic
    2. Include a logical structure with clear headings and subheadings
    3. Provide practical, actionable advice that urban dwellers can implement
    4. Incorporate real examples, success stories, or case studies
    5. Naturally integrate the target keywords: {', '.join(target_keywords)}
    6. End with a compelling conclusion and call-to-action
    
    The tone should be conversational yet informative, accessible to beginners while still valuable to those with some gardening experience.
    
    Aim for approximately 1,200-1,500 words in total.
    
    Return a complete, well-structured blog post in markdown format.
    """,
    agent=writer,
    context=[research_task],
    expected_output="A well-structured, engaging blog post about sustainable gardening practices for urban homes that effectively utilizes the research findings and naturally incorporates the target keywords."
)

# Task 3: Edit and optimize the content for SEO
editing_task = Task(
    description=f"""
    Review, edit, and optimize the following blog post about "{blog_topic}".
    
    Content to edit:
    {{writing_task.output}}
    
    Target keywords for SEO: {target_keywords}
    
    Your editing tasks:
    
    1. Improve the overall structure and flow
    2. Enhance readability with appropriate formatting (headings, bullet points, short paragraphs)
    3. Optimize for SEO using the target keywords: {', '.join(target_keywords)}
    4. Check factual accuracy and ensure proper attribution of sources when needed
    5. Refine the writing style for clarity and engagement
    6. Create a compelling meta title and meta description
    
    First, use the SEO Content Analyzer tool to evaluate the content. 
    When calling the tool, you MUST provide:
    - content: the blog post text to analyze
    - target_keywords: {target_keywords}
    
    Then, apply your improvements based on the SEO analysis to create the final version.
    
    IMPORTANT: Return ONLY the final, optimized blog post in markdown format with the meta title 
    and description at the top. Do NOT include any explanatory text, comments, or descriptions 
    about what you did. Just return the pure markdown content.
    """,
    agent=editor,
    context=[writing_task],
    expected_output="Pure markdown blog post with meta title and description at the top, with no additional explanatory text or comments."
)

## Create and Run the Content Creation Crew

Now we'll create and run our content creation crew with the defined agents and tasks.

In [None]:
# Create the content creation crew
content_crew = Crew(
    agents=[researcher, writer, editor],
    tasks=[research_task, writing_task, editing_task],
    verbose=True,  # Set to True for detailed logs
    process=Process.sequential  # Execute tasks in sequence
)

# Run the crew
result = content_crew.kickoff()

## Display the Final Blog Post and Save Results

In [None]:
# Display the final edited blog post
print("\n" + "="*80)
print("FINAL BLOG POST CONTENT")
print("="*80 + "\n")
print(result)

## Display Individual Agent Outputs (Optional)

Let's also look at the output from each stage of the process.

In [None]:
# Display research findings
print("\n" + "="*80)
print("RESEARCH FINDINGS")
print("="*80 + "\n")
print(research_task.output)

In [None]:
# Display initial blog draft
print("\n" + "="*80)
print("INITIAL BLOG DRAFT")
print("="*80 + "\n")
print(writing_task.output)

## Conclusion

This CrewAI content pipeline demonstrates how multiple specialized AI agents can collaborate to create high-quality content. The process follows a logical workflow:

1. The Researcher agent gathers comprehensive information from the web
2. The Writer agent transforms that research into engaging content
3. The Editor agent optimizes the content for readability and SEO

The result is a polished, SEO-optimized blog post ready for publication, created entirely through the orchestrated collaboration of AI agents.