In [1]:
# %uv pip install -U -q crewai crewai-tools

In [6]:
%uv pip install -U -q crewai[google-genai]

Note: you may need to restart the kernel to use updated packages.


In [1]:
from crewai import Agent, Task, LLM, Crew
from typing import Tuple, Union, Dict, Any
from crewai import TaskOutput
from datetime import date
from crewai_tools import SerperDevTool

In [2]:
import os

SERPER_API_KEY = os.getenv('SERPER_API_KEY')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')

if SERPER_API_KEY and GEMINI_API_KEY:
    os.environ['SERPER_API_KEY'] = SERPER_API_KEY
    os.environ['GEMINI_API_KEY'] = GEMINI_API_KEY
    print('âœ… API keys set successfully!')
else:
    raise ValueError('Please enter both SERPER and GEMINI API keys')

âœ… API keys set successfully!


In [3]:
def validate_blog_content(result: TaskOutput) -> Tuple[bool, Any]:
    """Validate blog content meets requirements."""
    try:
        # Check word count
        try:
            word_count = len(result.raw.split())
            print(f"Word count: {word_count}")
            if word_count > 50:
                return (False, "Blog content exceeds 50 words")
        except Exception as wc_error:
            print(f"Error during word count check: {wc_error}")
            return (False, f"Error during word count check: {wc_error}")


        # Additional validation logic here
        return (True, result.raw.strip())
    except Exception as e:
        print(f"Unexpected error during validation: {e}")
        return (False, f"Unexpected error during validation: {e}")

In [4]:
# LLM to be used by the agent(s)
llm = LLM(model="gemini/gemini-3-flash-preview", api_key=GEMINI_API_KEY)

blog_agent = Agent(
    role="Blog Writer",
    goal="Write blog post",
    backstory="An expert blog writer",
    tools=[SerperDevTool()],
    llm=llm,
    verbose=True
)

blog_task = Task(
    description="Write a super DETAILED blog post about {prompt} for {year}",
    expected_output="""A properly structured blog post under 50 words.
    Blog format:
    # Title
    ## Subtitle
    Paragraphs...
    """,
    agent=blog_agent,
    markdown=True,
    guardrail=validate_blog_content,  # Add the guardrail function
    max_retries=4  # Set the maximum number of retries
)

crew = Crew(
    agents=[blog_agent],
    tasks=[blog_task],
    verbose=True
)

In [None]:
results = crew.kickoff(
    inputs={
        "prompt": "CrewAI",
        "year": date.today().year
    }
)



