# Content Planning and Publishing Crew

This notebook demonstrates how to create an AI crew for planning and publishing content using CrewAI Flows.
The crew will take a link to blog post, download content as markdown using firecrawl, analyze it and generate a twitter thread and schedule it on Typefully.

### Initialization and Setup
Initial imports for the CrewAI Flow and Crew and setting up the environment

In [55]:
!pip install firecrawl




[notice] A new release of pip is available: 23.2.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [56]:
!pip install crewai[tools]




[notice] A new release of pip is available: 23.2.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [57]:
# Importing necessary libraries
import getpass
import os
import datetime
import uuid
import yaml
import json
import subprocess
from pathlib import Path
import pydantic
from pydantic import BaseModel
from typing import Optional

# Firecrawl SDK
from firecrawl import FirecrawlApp

# Typefully scheduler
import scheduler

# Importing Crew related components
from crewai import Agent, Task, Crew, LLM

# Importing CrewAI Flow related components
from crewai.flow.flow import Flow, listen, start, router, or_

from dotenv import load_dotenv
load_dotenv()
# Apply a patch to allow nested asyncio loops in Jupyter
import nest_asyncio
nest_asyncio.apply()

## Setup LLM

Make sure you have ollama installed and running on your machine

In [58]:
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

# Initialize ChatOpenAI
llm = ChatOpenAI(
    model_name="gpt-4o",
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# Test cell to verify LLM is working
try:
    response = llm.invoke("Say 'Hello, I am working!' if you can read this.")
    print("LLM Test Response:", response)
    print("‚úÖ LLM is working correctly")
except ValueError as e:
    print("‚ùå Invalid API key or model name:", str(e))
except Exception as e:
    print("‚ùå Error testing LLM:", str(e))
    print("Please check your OpenAI API key and internet connection")

LLM Test Response: content='Hello, I am working!' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 7, 'prompt_tokens': 21, 'total_tokens': 28, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_f785eb5f47', 'finish_reason': 'stop', 'logprobs': None} id='run-6f9de170-432a-4544-b4c8-a4ffc8bff61a-0' usage_metadata={'input_tokens': 21, 'output_tokens': 7, 'total_tokens': 28, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}
‚úÖ LLM is working correctly


blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"

In [138]:

blog_post_url = "https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag"




## Plan for our Flow

1. Scrape the blog post
2. Decode where to post using a router
3. Kickoff the right **[Crew of Agents]** to prepare a draft ready to publish
4. Publish it using typefully

In [139]:
from IPython.display import HTML
HTML('<img src="content_writing_flow.png" width="1000" height="750"/>')

# Twitter Thread Planning Crew

This structure will be used to capture the output of the planning crew which will be used to create the twitter thread and schedule it on Typefully.

In [140]:
class Tweet(BaseModel):
    """Represents an individual tweet in a thread"""
    content: str
    is_hook: bool = False  # Identifies if this is the opening/hook tweet
    media_urls: Optional[list[str]] = []  # Optional media attachments (images, code snippets)

class Thread(BaseModel):
    """Represents a Twitter thread"""
    topic: str  # Main topic/subject of the thread
    tweets: list[Tweet]  # List of tweets in the thread

In [141]:
from crewai_tools import (
    DirectoryReadTool,
    FileReadTool,
)

# Load agent and task configurations from YAML files
with open('config/planner_agents.yaml', 'r') as f:
    agents_config = yaml.safe_load(f)

with open('config/planner_tasks.yaml', 'r') as f:
    tasks_config = yaml.safe_load(f)

In [142]:
draft_analyzer = Agent(config=agents_config['draft_analyzer'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)
twitter_thread_planner = Agent(config=agents_config['twitter_thread_planner'], tools=[
    DirectoryReadTool(),
    FileReadTool()
], llm=llm)

analyze_draft = Task(
  config=tasks_config['analyze_draft'],
  agent=draft_analyzer
)
create_twitter_thread_plan = Task(
  config=tasks_config['create_twitter_thread_plan'],
  agent=twitter_thread_planner,
  output_pydantic=Thread
)

planning_crew = Crew(
    agents=[draft_analyzer, twitter_thread_planner],
    tasks=[analyze_draft, create_twitter_thread_plan],
    verbose=False
)



# LinkedIn Post Planning Crew

In [143]:
class LinkedInPost(BaseModel):
    """Represents a LinkedIn post"""
    content: str
    media_url: str # Main image url for the post

In [144]:
linkedin_post_planner = Agent(config=agents_config['linkedin_post_planner'], llm=llm)

create_linkedin_post_plan = Task(
  config=tasks_config['create_linkedin_post_plan'],
  agent=linkedin_post_planner,
  output_pydantic=LinkedInPost
)

linkedin_planning_crew = Crew(
    agents=[draft_analyzer, linkedin_post_planner],
    tasks=[analyze_draft, create_linkedin_post_plan],
    verbose=False
)



# Create Content Planning Flow

A Flow to create the content planning for twitter and linkedin using separate crews for twitter and linkedin

In [145]:
from IPython.display import HTML
HTML('<img src="content_writing_flow.png" width="1000" height="750"/>')

In [146]:

from crewai.flow.flow import Flow, listen, start, router, or_

class ContentPlanningState(BaseModel):
  """
  State for the content planning flow
  """
  blog_post_url: str = blog_post_url
  draft_path: Path = "workdir/"
  post_type: str = "twitter"
  path_to_example_threads: str = "workdir/example_threads.txt"

class CreateContentPlanningFlow(Flow[ContentPlanningState]):
  # Scrape the blog post  
  # No need for AI Agents on this step, so we just use regular Python code
  @start()
  def scrape_blog_post(self):
    print(f"# fetching draft from: {self.state.blog_post_url}")
    app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
    scrape_result = app.scrape_url(self.state.blog_post_url, params={'formats': ['markdown', 'html']})
    
    try:
        title = scrape_result['metadata']['title']
        # Sanitize filename - remove special chars and limit length
        safe_title = re.sub(r'[\\/*?:"<>|]', '', title)
        safe_title = safe_title[:50]  # Limit length
    except Exception as e:
        safe_title = str(uuid.uuid4())

    # Ensure workdir exists
    os.makedirs('workdir', exist_ok=True)
    
    # Use simpler filename format
    self.state.draft_path = os.path.join('workdir', f'{safe_title}.md')
    
    with open(self.state.draft_path, 'w', encoding='utf-8') as f:
        f.write(scrape_result['markdown'])
    
    return self.state

  @router(scrape_blog_post)
  def select_platform(self):
    if self.state.post_type == "twitter":
      return "twitter"
    elif self.state.post_type == "linkedin":
      return "linkedin"

  # @listen("twitter")
  # def twitter_draft(self):
  #   print(f"# Planning content for: {self.state.draft_path}")
  #   with open(self.state.draft_path, 'r', encoding='utf-8') as f:
  #     draft_content = f.read()

  #   # Extract media URLs from markdown
  #   import re
  #   image_urls = re.findall(r'!\[.*?\]\((.*?)\)', draft_content)
  #   print("Found image URLs:", image_urls)

  #   # Send draft path and extracted media URLs to your planner/crew
  #   result = planning_crew.kickoff(inputs={
  #       'draft_path': self.state.draft_path,
  #       'path_to_example_threads': self.state.path_to_example_threads,
  #       'media_urls': image_urls
  #   })
  #   # result = planning_crew.kickoff(inputs={'draft_path': self.state.draft_path, 'path_to_example_threads': self.state.path_to_example_threads})
  #   print(f"# Planned content for {self.state.draft_path}:")
  #   for tweet in result.pydantic.tweets:
  #       print(f"    - {tweet.content}")
  #   return result
  @listen("twitter")
  def twitter_draft(self):
      print(f"# Planning content for: {self.state.draft_path}")
      with open(self.state.draft_path, 'r', encoding='utf-8') as f:
          draft_content = f.read()

      import re
      image_urls = re.findall(r'!\[.*?\]\((.*?)\)', draft_content)

      result = planning_crew.kickoff(inputs={
          'draft_path': self.state.draft_path,
          'path_to_example_threads': self.state.path_to_example_threads,
          'media_urls': image_urls
      })

      # Map image URLs to the tweets
      for i, tweet in enumerate(result.pydantic.tweets):
          # Add one or more URLs to each tweet as needed
          if i < len(image_urls):
              tweet.media_urls = [image_urls[i]]
          else:
              tweet.media_urls = []

      print(f"# Planned content for {self.state.draft_path}:")
      for tweet in result.pydantic.tweets:
          print(f"    - {tweet.content}, media: {tweet.media_urls}")
      return result
  
  @listen("linkedin")
  def linkedin_draft(self):
    print(f"# Planning content for: {self.state.draft_path}")
    result = linkedin_planning_crew.kickoff(inputs={'draft_path': self.state.draft_path})
    print(f"# Planned content for {self.state.draft_path}:")
    print(f"    - {result.pydantic.content}")
    return result

  @listen(or_(twitter_draft, linkedin_draft))    
  def save_plan(self):
      # Create thread directory if it doesn't exist
      os.makedirs('thread', exist_ok=True)
      
      # Get filename from draft path
      filename = os.path.basename(self.state.draft_path)
      output_path = os.path.join('thread', f'{filename}_{self.state.post_type}.json')
      
      try:
          with open(output_path, 'w', encoding='utf-8') as f:
              json.dump(self.state.model_dump(), f, indent=2)
          print(f"Thread saved to: {output_path}")
          return self.state
      except Exception as e:
          print(f"Error saving thread: {str(e)}")
          raise

  @listen(or_(twitter_draft, linkedin_draft))
  def publish(self, plan):
    print(f"# Publishing thread for: {self.state.draft_path}")
    ## Schedule for 1 hour from now    
    response = scheduler.schedule(
        thread_model=plan,
        post_type=self.state.post_type
    )
    print(f"# Thread scheduled for: {self.state.draft_path}")
    print(f"Here's the link to scheduled draft: {response['share_url']}")



Implementing helper methods to plot and execute the flow in a Jupyter notebook

In [147]:
# Plot the flow
flow = CreateContentPlanningFlow()
flow.plot()

# Display the flow visualization using IFrame
from IPython.display import IFrame

# Display the flow visualization
IFrame(src='./crewai_flow.html', width='100%', height=400)

Plot saved as crewai_flow.html


In [148]:
post_type = "twitter"
flow = CreateContentPlanningFlow()
flow.state.post_type = post_type
flow.state

ContentPlanningState(blog_post_url='https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag', draft_path='workdir/', post_type='twitter', path_to_example_threads='workdir/example_threads.txt')

In [153]:

flow.kickoff()


# fetching draft from: https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag
# Planning content for: workdir\5 Chunking Strategies For RAG - by Avi Chawla.md
[91m Error parsing LLM output, agent will retry: I did it wrong. Invalid Format: I missed the 'Action:' after 'Thought:'. I will do right next, and don't use a tool I have already used.

If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfy the expect criteria, use the EXACT format below:

Thought: I now can give a great answer
Final Answer: my best complete final answer to the task.

[00m
# Planned content for workdir\5 Chunking Strategies For RAG - by Avi Chawla.md:
    - 5 Chunking Strategies For RAG, media: ['https://substackcdn.com/image/fetch/w_96,c_limit,f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fc5dc1fee-2d1e-4892-b219-4b96f6998ab5_288x288.png']
    - Struggling with data chunking in RAG? Di

In [150]:
flow = CreateContentPlanningFlow()


In [151]:
flow.state

ContentPlanningState(blog_post_url='https://blog.dailydoseofds.com/p/5-chunking-strategies-for-rag', draft_path='workdir/', post_type='twitter', path_to_example_threads='workdir/example_threads.txt')