# Plan-and-Execute

This notebook shows how to create a "plan-and-execute" style agent. This is heavily inspired by the Plan-and-Solve paper as well as the Baby-AGI project.

The core idea is to first come up with a multi-step plan, and then go through that plan one item at a time. After accomplishing a particular task, you can then revisit the plan and modify as appropriate.

FYI [LANGGRPAH](https://langchain-ai.github.io/langgraph/tutorials/plan-and-execute/plan-and-execute/) page of this type of agent.

This compares to a typical ReAct style agent where you think one step at a time. The advantages of this "plan-and-execute" style agent are:

- Explicit long term planning (which even really strong LLMs can struggle with)
- Ability to use smaller/weaker models for the execution step, only using larger/better models for the planning step.

The following walkthrough demonstrates how to do so in LangGraph. 

## Start with define Tools

In [9]:
from langchain_core.tools import Tool
import re
from langchain_core.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun
from pathlib import Path
from datetime import datetime
from typing import Dict, Any,List
from langchain_xai import ChatXAI
from openai import OpenAI
import os
import io
from bs4 import BeautifulSoup
from PIL import Image
os.environ['DYLD_LIBRARY_PATH'] = '/opt/homebrew/lib:' + os.environ.get('DYLD_LIBRARY_PATH', '')
from cairosvg import svg2png
XAI_API_KEY = os.getenv("XAI_API_KEY")
xai_client = OpenAI(base_url="https://api.x.ai/v1", 
                    api_key=XAI_API_KEY)

In [10]:
# import os

# from openai import OpenAI

# XAI_API_KEY = os.getenv("XAI_API_KEY")
# client = OpenAI(base_url="https://api.x.ai/v1", api_key=XAI_API_KEY)

# response = client.images.generate(
#   model="grok-2-image",
#   prompt="A cat in Taiwan 101 backflipping",
#   response_format="b64_json"
# )

# base64_string = response.data[0].b64_json


In [11]:
# # Read the SVG template
# with open('static/content_withimage.svg', "r", encoding="utf-8") as f:
#     svg_content = f.read()

# # Replace the placeholder with the actual base64 string
# svg_content = svg_content.replace("{{BASE64_STRING}}", base64_string)

# # Write the modified SVG to a new file
# with open('static/content_withimage_test.svg', "w", encoding="utf-8") as f:
#     f.write(svg_content)

In [12]:
search_tool = DuckDuckGoSearchRun(max_results=3)


XAI_API_KEY = os.getenv("XAI_API_KEY")
xai_client = OpenAI(base_url="https://api.x.ai/v1", 
                    api_key=XAI_API_KEY)

def svg_to_jpg(bytestring, write_to, scale=1):
    # Convert SVG to PNG in memory
    png_data = io.BytesIO()
    svg2png(bytestring=bytestring, write_to=png_data, scale=scale)
    
    # Convert PNG to JPG
    png_data.seek(0)
    img = Image.open(png_data)
    img = img.convert("RGB")  # Remove alpha channel if present
    img.save(write_to, "JPEG", quality=95)

@tool
def generate_graph(prompt: str) -> dict:
    """
    Generate artwork using xAI's image generation model and return as base64 string.
    
    Args:
        prompt (str): Description of the artwork to generate.
    
    Returns:
        dict: Contains 'b64_image' key with base64-encoded image string or 'error' key on failure.
    """
    image_model = 'grok-2-image'
    try:
        response = xai_client.images.generate(
            model=image_model,
            prompt=prompt,
            response_format="b64_json"
        )
        # Extract base64 string from response (assuming response.data[0].b64_json)
        b64_image = response.data[0].b64_json
        return {"b64_image": b64_image}
    except Exception as e:
        return {"error": str(e)}

@tool
def generate_collections(templates_dir:Path|str,
                         content_json: Dict[str, Any],
                         output_dir:Path|str = f'outputs/{datetime.today().strftime('%Y-%b-%d')}',
                         use_ai_genereate_image: str = None
                         ):
    """
    Generate images from content JSON
    
    Args:
        content_json: Content generated by content_agent
        output_dir: Directory to save generated images (overrides init value)
        
    Returns:
        list: Metadata about generated images
    """
    os.makedirs(output_dir,exist_ok=True)

    def genereate_cover():
        cover_path = templates_dir / "cover.svg"
        output_path = output_dir / "cover.png"
        # Read the SVG template
        with open(cover_path, "r", encoding="utf-8") as f:
            svg_content = f.read()

        cover_data = content_json.get("cover", {})
        # Replace template placeholders with content
        svg_content = svg_content.replace("#{Hashtag}", f"#{cover_data['hashtag']}")
        svg_content = svg_content.replace("{Main Heading First Line}", f"{cover_data['heading_line1']}")
        svg_content = svg_content.replace("{Main Heading Second Line}", f"{cover_data['heading_line2']}")
        svg_content = svg_content.replace("{Grey Box with Text}", f"{cover_data['grey_box_text']}")
        
        svg2png(bytestring=svg_content,write_to=output_path)
        return output_path
    def gernerate_content():
        return

    

@tool
def send_email():
    """Send notification email"""

@tool
def read_email():
    """read email"""

@tool
def update_historical_data():
    """Update historical post data with current input"""
@tool
def read_historical_data():
    """Read historical post in last 10 days"""
    return

@tool
def login_instagram():
    """Login to instagram"""

@tool
def post_collection():
    """Post collection of images to instagram"""

@tool
def read_reply_message():
    """Read and reply message on instagram"""

@tool
def read_reply_comment():
    """Read and reply message on instagram post"""

In [13]:

def generate_collections(templates_dir: Path | str,
                         content_json: Dict[str, Any],
                         output_dir: Path | str = None,
                         use_ai_generate_image: str = None
                         ) -> Path:
    """
    Generate images from content JSON
    
    Args:
        templates_dir: Directory stores the templates of SVG
        content_json: Content generated by content_agent
        output_dir: Directory to save generated images (overrides init value)
        use_ai_generate_image: base64 encoded string of the image
        
    Returns:
        list: Metadata about generated images
    """
    # Convert string paths to Path objects
    templates_dir = Path(templates_dir)
    
    # Set default output directory if not provided
    if output_dir is None:
        output_dir = Path(f'outputs/{datetime.today().strftime("%Y-%b-%d-%H%M%S")}')
    else:
        output_dir = Path(output_dir)
    
    os.makedirs(output_dir, exist_ok=True)

    def generate_cover():
        cover_path = templates_dir / "cover.svg"
        output_path = output_dir / "cover.png"
        # Read the SVG template
        with open(cover_path, "r", encoding="utf-8") as f:
            svg_content = f.read()

        cover_data = content_json.get("cover", {})
        # Replace template placeholders with content
        svg_content = svg_content.replace("#{Hashtag}", f"#{cover_data['hashtag']}")
        svg_content = svg_content.replace("{Main Heading First Line}", f"{cover_data['heading_line1']}")
        svg_content = svg_content.replace("{Main Heading Second Line}", f"{cover_data['heading_line2']}")
        svg_content = svg_content.replace("{Grey Box with Text}", f"{cover_data['grey_box_text']}")
        
        svg_to_jpg(bytestring=svg_content, write_to=str(output_path))
        return Path(output_path)

    def generate_content_page() -> List[Path]:
        content_paths = []
        if use_ai_generate_image is None:
            content_path = templates_dir / "content_withoutimage.svg"
        else:
            content_path = templates_dir / "content_withimage.svg"
        
        for i, page in enumerate(content_json["content_pages"]):
            try:
                with open(content_path, "r", encoding="utf-8") as f:
                    svg_content = f.read()
            except FileNotFoundError:
                # Try alternative extension
                alt_path = Path(str(content_path).replace('.svg', '.csv'))
                with open(alt_path, "r", encoding="utf-8") as f:
                    svg_content = f.read()
            
            page_number = i + 1
            output_path = output_dir / f"content_0{page_number}.jpg"
            
            # Regular replacements for simple text
            svg_content = svg_content.replace("{page_number}", str(page_number))
            svg_content = svg_content.replace("{Content Title}", page['content_title'])
            # After loading the SVG content but before writing it out:
            if '<foreignObject' in svg_content:
                # Extract foreignObject position attributes
                foreign_obj_match = re.search(r'<foreignObject\s+x="([^"]+)"\s+y="([^"]+)"\s+width="([^"]+)"\s+height="([^"]+)"', svg_content)
                
                if foreign_obj_match:
                    # Get the positioning from the original template
                    x = foreign_obj_match.group(1)
                    y = foreign_obj_match.group(2)
                    width = foreign_obj_match.group(3)
                    height = foreign_obj_match.group(4)
                    
                    # Calculate text position (add padding)
                    text_x = float(x) + 20
                    text_y = float(y) + 50  # Add padding from top
                    
                    # Replace the content in the main_point div first
                    div_pattern = r'<div id="main_point">.*?</div>'
                    replacement = f'<div id="main_point">{page["main_point"]}</div>'
                    svg_content = re.sub(div_pattern, replacement, svg_content, flags=re.DOTALL)
                    
                    # Then replace the entire foreignObject with SVG text
                    foreign_obj_pattern = r'<foreignObject.*?</foreignObject>'
                    
                    # Get the main text
                    main_text = page["main_point"]
                    
                    # Mixed language text wrapping with fixed length of ~17 chars per line
                    chars_per_line = 17  # Fixed line length as requested
                    
                    # Split text into segments (words for English, characters for Chinese)
                    # We'll treat continuous Chinese characters and continuous English words as segments
                    segments = []
                    current_segment = ""
                    is_english_segment = False
                    
                    for char in main_text:
                        # Check if the character is English (ASCII range)
                        is_english_char = ord(char) < 128
                        
                        # If we're starting a new segment or switching between English/Chinese
                        if not current_segment or is_english_char != is_english_segment:
                            if current_segment:
                                segments.append(current_segment)
                            current_segment = char
                            is_english_segment = is_english_char
                        else:
                            # Continue the current segment
                            current_segment += char
                    
                    # Add the last segment
                    if current_segment:
                        segments.append(current_segment)
                    
                    # Now wrap the text by segments
                    lines = []
                    current_line = ""
                    
                    for segment in segments:
                        # For English words, keep them intact
                        # For Chinese, we can split at any character if needed
                        if ord(segment[0]) < 128:  # English segment
                            # If adding this word would exceed the line length
                            if len(current_line) + len(segment) + (1 if current_line else 0) > chars_per_line:
                                # If the line is already approaching the limit, start a new line
                                lines.append(current_line)
                                current_line = segment
                            else:
                                # Add the English word with a space if the line isn't empty
                                if current_line:
                                    current_line += " " + segment
                                else:
                                    current_line = segment
                        else:  # Chinese segment
                            # Process character by character
                            for char in segment:
                                if len(current_line) + 1 > chars_per_line:
                                    lines.append(current_line)
                                    current_line = char
                                else:
                                    current_line += char
                    
                    # Add the last line if it's not empty
                    if current_line:
                        lines.append(current_line)
                    
                    # Create tspan elements for each line
                    tspans = ""
                    for i, line in enumerate(lines[:10]):  # Limit to 10 lines or adjust based on height
                        dy = "0" if i == 0 else "45"
                        tspans += f'<tspan x="{text_x}" dy="{dy}">{line}</tspan>\n      '
                    
                    # Build the SVG text element
                    text_element = f'''<text x="{text_x}" y="{text_y}" font-family="Songti SC, PingFang SC, Heiti SC, STKaiti" 
                    font-weight="bold" font-size="38" fill="#000000">
                    {tspans}
                    </text>'''
                    
                    # Replace the foreignObject with the text element
                    svg_content = re.sub(foreign_obj_pattern, text_element, svg_content, flags=re.DOTALL)
            # Add the image if needed
            if use_ai_generate_image is not None:
                svg_content = svg_content.replace("{{BASE64_STRING}}", use_ai_generate_image)

            svg_to_jpg(bytestring=svg_content, write_to=str(output_path))
            content_paths.append(output_path)
        
        return content_paths
        
    cover_path = generate_cover()
    content_path = generate_content_page()

    return 


In [14]:
with open('base64image.txt','r') as f:
    base64image = f.read()

In [15]:
content_json = {'cover':
                    {'hashtag':'標籤',
                    'heading_line1':'頭條1',
                    'heading_line2':'頭條2',
                    'grey_box_text':'灰字'
                    },
                'content_pages':[
                    {'content_title':'這是標題',
                     'main_point':'主要內容主要內this is i do not know what this is 容主要內容主要內容主要內要內容主要內容主要內容主要內容主要內容主要內容主要內主要內容主要內容主要內容主要內容主要內容主要內容主要內主要內容主要內容主要內容主要內容主要內容主要內容主要內容'}
                ]
                
                }

In [23]:
generate_collections(templates_dir= 'static',
                     content_json = content_json)

In [25]:
content_json = {'cover':
                    {'hashtag':'標籤',
                    'heading_line1':'頭條1',
                    'heading_line2':'頭條2',
                    'grey_box_text':'灰字'
                    },
                'content_pages':[
                    {'content_title':'這是標題',
                     'main_point':'主要內容主要內this is i do not know what this is'}
                ]
                
                }

In [28]:
generate_collections(templates_dir= 'static',
                     content_json = content_json,
                     use_ai_generate_image=base64image)

In [11]:



tools = [search_tool]