# Content Planning and Publishing Crew

This notebook demonstrates how to create an AI crew for planning and publishing content using CrewAI Flows.
The crew will take a link to blog post, download content as markdown using firecrawl, analyze it and generate a twitter thread and schedule it on Typefully.

### Initialization and Setup
Initial imports for the CrewAI Flow and Crew and setting up the environment

In [1]:
# Importing necessary libraries
import getpass
import os
import datetime
import uuid
import yaml
import json
import subprocess
from pathlib import Path
from pydantic import BaseModel
from typing import Optional

# Firecrawl SDK
from firecrawl import FirecrawlApp

# Typefully scheduler
from scheduler import schedule

# Importing Crew related components
from crewai import Agent, Task, Crew

# Importing CrewAI Flow related components
from crewai.flow.flow import Flow, listen, start

from dotenv import load_dotenv
load_dotenv()
# Apply a patch to allow nested asyncio loops in Jupyter
import nest_asyncio
nest_asyncio.apply()



# Blog Post URL

In [2]:
blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"

## Plan for our Flow

1. Clone the repository for the project
2. Plan the documentation for the project **[Crew of Agents]** 
3. Create the documentation for the project **[Crew of Agents]**

# ![CrewAIFlow.png](crewai_flow.png)

# Create Planning Crew

This structure will be used to capture the output of the planning crew which will be used to create the twitter thread and schedule it on Typefully.

In [3]:
class Tweet(BaseModel):
    """Represents an individual tweet in a thread"""
    content: str
    is_hook: bool = False  # Identifies if this is the opening/hook tweet
    media_urls: Optional[list[str]] = []  # Optional media attachments (images, code snippets)

class Thread(BaseModel):
    """Represents a Twitter thread"""
    topic: str  # Main topic/subject of the thread
    tweets: list[Tweet]  # List of tweets in the thread

In [4]:
from crewai_tools import (
    DirectoryReadTool,
    FileReadTool,
)

# Load agent and task configurations from YAML files
with open('config/planner_agents.yaml', 'r') as f:
    agents_config = yaml.safe_load(f)

with open('config/planner_tasks.yaml', 'r') as f:
    tasks_config = yaml.safe_load(f)

draft_analyzer = Agent(config=agents_config['draft_analyzer'], tools=[
    DirectoryReadTool(),
    FileReadTool()
])
twitter_thread_planner = Agent(config=agents_config['twitter_thread_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
])

analyze_draft = Task(
  config=tasks_config['analyze_draft'],
  agent=draft_analyzer
)
create_twitter_thread_plan = Task(
  config=tasks_config['create_twitter_thread_plan'],
  agent=twitter_thread_planner,
  output_pydantic=Thread
)

planning_crew = Crew(
    agents=[draft_analyzer, twitter_thread_planner],
    tasks=[analyze_draft, create_twitter_thread_plan],
    verbose=False
)

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# result = planning_crew.kickoff(inputs={'draft_path': "/Users/akshaypachaar/Eigen/ai-engineering/content_planner_flow/workdir/5_chunking_strategies_rag.md"})
# result.pydantic.model_dump()

In [6]:
# import json
# with open('output.json', 'w') as f:
#     json.dump(result.pydantic.model_dump(), f, indent=2)

# Create Documentation Flow

A Flow to create the documentation for the project where we will use the planning crew to plan the documentation and the documentation crew to create the documentation

In [10]:
class ContentPlanningState(BaseModel):
  """
  State for the content planning flow
  """
  blog_post_url: str = blog_post_url
  draft_path: Path = "workdir/"

class CreateContentPlanningFlow(Flow[ContentPlanningState]):
  # Scrape the blog post  
  # No need for AI Agents on this step, so we just use regular Python code
  @start()
  def scrape_blog_post(self):
    print(f"# fetching draft from: {self.state.blog_post_url}")
    app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
    scrape_result = app.scrape_url(self.state.blog_post_url, params={'formats': ['markdown', 'html']})
    try:
      title = scrape_result['metadata']['title']
    except Exception as e:
      title = str(uuid.uuid4())
    self.state.draft_path = f'workdir/{title}.md'
    with open(self.state.draft_path, 'w') as f:
      f.write(scrape_result['markdown'])
    return self.state

  @listen(scrape_blog_post)
  def plan_content(self):
    print(f"# Planning content for: {self.state.draft_path}")
    result = planning_crew.kickoff(inputs={'draft_path': self.state.draft_path})
    print(f"# Planned content for {self.state.draft_path}:")
    for tweet in result.pydantic.tweets:
        print(f"    - {tweet.content}")
    return result

  @listen(plan_content)
  def save_plan(self, plan):
    with open(f'thread/{self.state.draft_path.split("/")[-1]}.json', 'w') as f:
        json.dump(plan.pydantic.model_dump(), f, indent=2)

  @listen(plan_content)
  def publish_thread(self, plan):
    print(f"# Publishing thread for: {self.state.draft_path}")
    ## Schedule for 1 hour from now    
    response = schedule(
        thread_json=plan
    )
    print(f"# Thread scheduled for: {self.state.draft_path}")
    print(f"Here's the link to scheduled draft: {response['url']}")



Implementing helper methods to plot and execute the flow in a Jupyter notebook

In [11]:
# Plot the flow
flow = CreateContentPlanningFlow()
flow.plot()

# Display the flow visualization using IFrame
from IPython.display import IFrame

# Display the flow visualization
# IFrame(src='./crewai_flow.html', width='100%', height=400)

Plot saved as crewai_flow.html


In [12]:
flow = CreateContentPlanningFlow()
flow.kickoff()

# fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag
# Planning content for: workdir/5 Chunking Strategies For RAG - by Avi Chawla.md
# Planned content for workdir/5 Chunking Strategies For RAG - by Avi Chawla.md:
    - Chunking is key to RAG success!
    - Retrieval-Augmented Generation (RAG) enhances the quality of AI-generated content by using external information effectively. Chunking large texts is essential for optimal performance in RAG workflows.
    - 1️⃣ **Fixed-Size Chunking**: This basic method splits texts into uniform segments. While simple to implement, it often disrupts sentences and ideas due to fixed lengths. Consider overlapping sections to preserve context.
    - 2️⃣ **Semantic Chunking**: Chunks are formed based on the semantic meaning of text. This way, language flow is maintained, improving retrieval accuracy. Implement embeddings and cosine similarity thresholds to form meaningful chunks.
    - 3️⃣ **Recursive Chunking**: Start w

Traceback (most recent call last):
  File "/Users/akshaypachaar/miniconda3/envs/env_crewai/lib/python3.10/site-packages/crewai/flow/flow.py", line 363, in _execute_single_listener
    listener_result = await self._execute_method(
  File "/Users/akshaypachaar/miniconda3/envs/env_crewai/lib/python3.10/site-packages/crewai/flow/flow.py", line 306, in _execute_method
    else method(*args, **kwargs)
  File "/var/folders/4r/7f58988s6cs3d64773nhcl1w0000gn/T/ipykernel_75586/3192203743.py", line 45, in publish_thread
    response = schedule(
TypeError: schedule() got an unexpected keyword argument 'content'
