In [57]:
import json
import os
import random
import uuid
from pathlib import Path
from typing import List, Optional

import nest_asyncio
import yaml
from crewai import Agent, Task, Crew, Process, LLM
from crewai.flow.flow import Flow, start, listen, router, or_, and_
from crewai_tools import DirectoryReadTool, FileReadTool
from firecrawl import FirecrawlApp
from pydantic import BaseModel

In [5]:
class SimpleFlow(Flow):

    @start()
    def initialize(self):
        print("Flow started")

flow = SimpleFlow()
await flow.kickoff_async()

Output()

In [6]:
class SequentialFlow(Flow):

    @start()
    def first_task(self):
        print("Step 1: Fetching data")
        return "data_fetched"

    @listen(first_task)
    def second_task(self, result):
        print(f"Step 2: Processing {result}")

flow = SequentialFlow()
await flow.kickoff_async()

Output()

In [8]:
class OrFlow(Flow):

    @start()
    def fetch_from_api(self):
        return "API data"

    @start()
    def read_from_db(self):
        return "Database record"

    @listen(or_(fetch_from_api, read_from_db))
    def process_data(self, result):
        print(f"Processing: {result}")

flow = OrFlow()
await flow.kickoff_async()

Output()

In [10]:
class AndFlow(Flow):

    @start()
    def step_one(self):
        print("Step 1: Collecting user input")
        return "User data"

    @start()
    def step_two(self):
        print("Step 2: Validating input")
        return "Validated data"

    @listen(and_(step_one, step_two))
    def final_step(self):
        print("All conditions met. Proceeding with final step.")

flow = AndFlow()
await flow.kickoff_async()

Output()

In [13]:
class RouterFlow(Flow):

    @start()
    def classify_request(self):
        request_type = random.choice(["urgent", "normal"])
        print(f"Request classified as: {request_type}")
        return request_type

    @router(classify_request)
    def handle_request(self, classification):
        if classification == "urgent":
            return "handle_urgent"
        else:
            return "handle_normal"

    @listen("handle_urgent")
    def urgent_handler(self):
        print("Handling urgent request")

    @listen("handle_normal")
    def normal_handler(self):
        print("Handling normal request")

flow = RouterFlow()
await flow.kickoff_async()

Output()

In [14]:
from crewai.flow.flow import Flow, listen, start

class StateFlow(Flow):

    @start()
    def initialize_state(self):
        self.state["count"] = 1
        print(f"Initial count: {self.state['count']}")

    @listen(initialize_state)
    def increment_count(self):
        self.state["count"] += 1
        print(f"Updated count: {self.state['count']}")

flow = StateFlow()
await flow.kickoff_async()

Output()

In [16]:
class CounterState(BaseModel):
    count: int = 0

class StructuredStateFlow(Flow[CounterState]):

    @start()
    def initialize_state(self):
        print(f"Initial count: {self.state.count}")
        self.state.count = 1

    @listen(initialize_state)
    def increment_count(self):
        self.state.count += 1
        print(f"Updated count: {self.state.count}")

flow = StructuredStateFlow()
await flow.kickoff_async()

Output()

In [51]:
with open("config/planner_agents.yaml", "r") as f:
    agents_config = yaml.safe_load(f)

with open("config/planner_tasks.yaml", "r") as f:
    tasks_config = yaml.safe_load(f)


class Tweet(BaseModel):
    """Represents an individual tweet in a thread"""
    content: str
    is_hook: bool = False  # Identifies if this is the opening/hook tweet
    media_urls: Optional[list[str]] = []  # Optional media attachments (images, code snippets)


class Thread(BaseModel):
    """Represents a Twitter thread"""
    topic: str  # Main topic/subject of the thread
    tweets: list[Tweet]  # List of tweets in the thread


class LinkedInPost(BaseModel):
    """Represents a LinkedIn post"""
    content: str
    media_url: str # Main image url for the post


llm = LLM(model="openai/gpt-4o")
all_tools = [DirectoryReadTool(), FileReadTool()]


draft_analyzer = Agent(
    config=agents_config["draft_analyzer"],
    tools=all_tools,
    llm=llm
)

analyze_draft = Task(
    config=tasks_config['analyze_draft'],
    agent=draft_analyzer
)

twitter_thread_planner = Agent(
    config=agents_config['twitter_thread_planner'],
    tools=all_tools,
    llm=llm
)

create_twitter_thread_plan = Task(
    config=tasks_config['create_twitter_thread_plan'],
    agent=twitter_thread_planner,
    output_pydantic=Thread
)

linkedin_post_planner = Agent(
    config=agents_config['linkedin_post_planner'],
    tools=all_tools,
    llm=llm
)

create_linkedin_post_plan = Task(
    config=tasks_config['create_linkedin_post_plan'],
    agent=linkedin_post_planner,
    output_pydantic=LinkedInPost
)

twitter_planning_crew = Crew(
    agents=[draft_analyzer, twitter_thread_planner],
    tasks=[analyze_draft, create_twitter_thread_plan],
    verbose=False
)

linkedin_planning_crew = Crew(
    agents=[draft_analyzer, linkedin_post_planner],
    tasks=[analyze_draft, create_linkedin_post_plan],
    verbose=False
)

In [58]:
class ContentPlanningState(BaseModel):
    blog_post_url: str = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"
    draft_path: Path = "assets/"
    post_type: str = "twitter"  
    path_to_example_threads: str = "assets/example_threads.txt" 
    path_to_example_linkedin: str = "assets/example_linkedin.txt"


class CreateContentPlanningFlow(Flow[ContentPlanningState]):

    @start()
    def scrape_blog_post(self):
        print(f"# Fetching draft from: {self.state.blog_post_url}")

        app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
        scrape_result = app.scrape_url(self.state.blog_post_url, formats=['markdown', 'html'])
        
        try:
            title = scrape_result.metadata['title']
        except Exception:
            title = str(uuid.uuid4())

        self.state.draft_path = f'assets/{title}.md'
        with open(self.state.draft_path, 'w') as f:
            f.write(scrape_result.markdown)

        return self.state

    @router(scrape_blog_post)
    def select_platform(self):
        if self.state.post_type == "twitter":
            return "twitter"
        elif self.state.post_type == "linkedin":
            return "linkedin"

    @listen("twitter")
    def twitter_draft(self):
        print(f"# Planning content for: {self.state.draft_path}")
    
        result = twitter_planning_crew.kickoff(inputs={
            'draft_path': self.state.draft_path, 
            'path_to_example_threads': self.state.path_to_example_threads
        })
    
        print(f"# Planned content for {self.state.draft_path}:")

        # for i, tweet in enumerate(result.pydantic.tweets):
        #     print(f"Tweet {i+1}: {tweet.content}")
        #     print(f"Media URLs: {tweet.media_urls}")
        #     print("-" * 100)
    
        return result

    @listen("linkedin")
    def linkedin_draft(self):
        print(f"# Planning content for: {self.state.draft_path}")

        result = linkedin_planning_crew.kickoff(inputs={
            'draft_path': self.state.draft_path, 
            'path_to_example_linkedin': self.state.path_to_example_linkedin
        })
    
        print(f"# Planned content for {self.state.draft_path}:")
        print(f"{result.pydantic.content}")
    
        return result

    @listen(or_(twitter_draft, linkedin_draft))
    def save_plan(self, plan):
        with open(f'output/draft.json', 'w') as f:
            json.dump(plan.pydantic.model_dump(), f, indent=2)

nest_asyncio.apply()
flow = CreateContentPlanningFlow()
flow.plot()

Plot saved as crewai_flow.html


In [59]:
result = flow.kickoff()

Output()