In [4]:
%pip install watercrawl-py

Defaulting to user installation because normal site-packages is not writeable
Collecting watercrawl-py
  Downloading watercrawl_py-0.9.2-py3-none-any.whl.metadata (11 kB)
Downloading watercrawl_py-0.9.2-py3-none-any.whl (12 kB)
Installing collected packages: watercrawl-py
Successfully installed watercrawl-py-0.9.2
Note: you may need to restart the kernel to use updated packages.


In [12]:
import os
import base64
import datetime
import requests
from langchain_tavily import TavilySearch
from langchain_openai import OpenAI as OI
from openai import OpenAI
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict

# API Keys
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

tool = TavilySearch(
    max_results=1,
    topic="finance",
    include_answer=True,
    include_raw_content=True,
    include_images=True,
    search_depth="advanced",
    include_domains=["Finance"],
    tavily_api_key=TAVILY_API_KEY
)

client = OpenAI(api_key=OPENAI_API_KEY)

# State
class AgentState(TypedDict):
    query: str
    news_url: str
    news_title: str
    news_content: str
    news_image_url: str
    article_image_path: str
    reference_folder: str
    post_prompt: str
    image_path: str
    txt_path: str
    caption: str
    hashtags: str
    source: str
    error: str
    step_completed: str
    logo_path: str

# Helpers
def encode_image(image_path: str) -> str:
    try:
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    except Exception as e:
        print(f"Error encoding image {image_path}: {e}")
        return ""

def download_image(url: str, save_path: str) -> str:
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            with open(save_path, "wb") as f:
                f.write(response.content)
            return save_path
    except Exception as e:
        print(f"⚠️ Failed to download image: {e}")
    return ""

# Nodes
def check_logo_node(state: AgentState) -> AgentState:
    print("\n🏷️ NODE 1: Checking for logo...")
    reference_folder = state.get('reference_folder', 'reference_posts')
    logo_path = os.path.join(reference_folder, 'logo.png')

    if os.path.exists(logo_path):
        state['logo_path'] = logo_path
        print(f"✅ Logo found: {logo_path}")
    else:
        print(f"⚠️ Logo not found at: {logo_path}")
        state['logo_path'] = ""

    state['step_completed'] = "logo_check"
    return state

def search_news_node(state: AgentState) -> AgentState:
    print("\n🔍 NODE 2: Searching for news...")
    query = state.get('query', 'latest finance news')

    try:
        search_results = tool.invoke(query)

        if search_results.get('results'):
            first_result = search_results['results'][0]
            state['news_url'] = first_result.get('url', '')
            state['news_title'] = first_result.get('title', '')
            state['news_content'] = first_result.get('content', '')
            state['news_image_url'] = first_result.get('images', [])[0] if first_result.get('images') else ''

            print(f"✅ Found article: {state['news_title']}")
            print(f"📰 URL: {state['news_url']}")
            if state['news_image_url']:
                print(f"🖼️ Article image found: {state['news_image_url']}")
            state['step_completed'] = "news_search"
        else:
            print("❌ No results found for the query")
            state['error'] = "No news results found"

    except Exception as e:
        print(f"❌ Error in news search: {e}")
        state['error'] = f"News search error: {str(e)}"

    return state

def analyze_references_node(state: AgentState) -> AgentState:
    print("\n📸 NODE 3: Analyzing reference images...")
    reference_folder = state.get('reference_folder', 'reference_posts')
    logo_path = state.get('logo_path', '')

    try:
        image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp')
        reference_images = []

        if os.path.exists(reference_folder):
            for file in os.listdir(reference_folder):
                if file.lower().endswith(image_extensions) and file != 'logo.png':
                    reference_images.append(os.path.join(reference_folder, file))

        reference_images = reference_images[:5]

        base_prompt = f"""Create a professional Instagram post design based on this news:
Title: {state.get('news_title', 'N/A')}
Content: {state.get('news_content', 'N/A')}

IMPORTANT: The post MUST include the company logo prominently placed (top corner, bottom corner, or as a watermark)."""

        if not reference_images:
            state['post_prompt'] = base_prompt + " Use a modern design with clean typography."
            state['step_completed'] = "reference_analysis"
            return state

        messages = [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"""Analyze these {len(reference_images)} Instagram posts for visual style, colors, and branding. 
Then create a detailed prompt for a new post based on this article:

Title: {state.get('news_title', 'N/A')}
Content: {state.get('news_content', 'N/A')}

Requirements:
- Channel name: Signl (finance news channel)
- Instagram format
- Must include company logo (corner or watermark)
- Keep branding style consistent"""
                    }
                ]
            }
        ]

        if logo_path and os.path.exists(logo_path):
            base64_logo = encode_image(logo_path)
            if base64_logo:
                messages[0]["content"].append({
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_logo}"}
                })

        for img_path in reference_images:
            base64_image = encode_image(img_path)
            if base64_image:
                messages[0]["content"].append({
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
                })

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            max_tokens=800,
            temperature=0.3
        )

        state['post_prompt'] = response.choices[0].message.content
        state['step_completed'] = "reference_analysis"

    except Exception as e:
        print(f"❌ Error in reference analysis: {e}")
        state['error'] = f"Reference analysis error: {str(e)}"
        state['post_prompt'] = base_prompt

    return state

def generate_image_node(state: AgentState) -> AgentState:
    print("\n🎨 NODE 4: Generating image...")

    try:
        posts_dir = "posts"
        os.makedirs(posts_dir, exist_ok=True)
        prompt = state.get('post_prompt', 'Create a social media post')

        # Download article image if available
        base64_article_img = None
        if state.get('news_image_url'):
            article_img_path = os.path.join(posts_dir, "article_image.png")
            downloaded_path = download_image(state['news_image_url'], article_img_path)
            if downloaded_path:
                state['article_image_path'] = downloaded_path
                base64_article_img = encode_image(downloaded_path)
                print("✅ Article image downloaded for integration")

        enhanced_prompt = f"""{prompt}

IMPORTANT:
- Company name: **Signl**, bold green (#008037).
- Proper typography hierarchy (Title > Subheadline > Body).
- Use fonts: Poppins, Inter, or SF Pro display
- Include company logo clearly but unobtrusively.
- Professional finance news layout."""

        image_response = client.images.generate(
            model="gpt-image-1",
            prompt=enhanced_prompt + ("\nIncorporate the attached article image naturally in the design." if base64_article_img else ""),
            size="1024x1024",
            quality="high",
            n=1,
        )

        image_data = image_response.data[0].b64_json
        image_bytes = base64.b64decode(image_data)

        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        image_filename = f"post_{timestamp}.png"
        image_path = os.path.join(posts_dir, image_filename)

        with open(image_path, 'wb') as f:
            f.write(image_bytes)

        state['image_path'] = image_path
        print(f"✅ Image generated and saved: {image_path}")
        state['step_completed'] = "image_generation"

    except Exception as e:
        print(f"❌ Error generating image: {e}")
        state['error'] = f"Image generation error: {str(e)}"

    return state
def analyze_image_node(state: AgentState) -> AgentState:
    print("\n🔍 NODE 5: Analyzing generated image...")
    
    try:
        image_path = state.get('image_path')
        if not image_path or not os.path.exists(image_path):
            raise Exception("Generated image not found")
        
        base64_image = encode_image(image_path)
        if not base64_image:
            raise Exception("Could not encode image")
        
        analysis_prompt = f"""Analyze this generated social media post image and create:

1. A catchy caption (2-3 sentences) related to this news:
   Title: {state.get('news_title', 'N/A')}
   
2. 8-10 relevant hashtags for Instagram
3. Source attribution text

Also verify that the company logo is visible in the image. If the logo is present, mention it briefly.

Format your response as:
CAPTION:
[your caption here]

HASHTAGS:
[your hashtags here]

SOURCE:
[source info here]

LOGO_CHECK:
[brief note about logo visibility]"""

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": analysis_prompt
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=400,
            temperature=0.5
        )
        
        analysis_result = response.choices[0].message.content
        
        lines = analysis_result.split('\n')
        caption = ""
        hashtags = ""
        source = ""
        logo_check = ""
        
        current_section = None
        for line in lines:
            line = line.strip()
            if line.startswith('CAPTION:'):
                current_section = 'caption'
                caption = line.replace('CAPTION:', '').strip()
            elif line.startswith('HASHTAGS:'):
                current_section = 'hashtags'
                hashtags = line.replace('HASHTAGS:', '').strip()
            elif line.startswith('SOURCE:'):
                current_section = 'source'
                source = line.replace('SOURCE:', '').strip()
            elif line.startswith('LOGO_CHECK:'):
                current_section = 'logo_check'
                logo_check = line.replace('LOGO_CHECK:', '').strip()
            elif line and current_section:
                if current_section == 'caption':
                    caption += " " + line
                elif current_section == 'hashtags':
                    hashtags += " " + line
                elif current_section == 'source':
                    source += " " + line
                elif current_section == 'logo_check':
                    logo_check += " " + line
        
        state['caption'] = caption.strip()
        state['hashtags'] = hashtags.strip()
        state['source'] = source.strip()
        
        # Log logo check result
        if logo_check:
            print(f"🏷️ Logo check: {logo_check}")
        
        print("✅ Image analysis completed successfully")
        state['step_completed'] = "image_analysis"
        
    except Exception as e:
        print(f"❌ Error analyzing image: {e}")
        state['error'] = f"Image analysis error: {str(e)}"
        state['caption'] = f"Latest update on {state.get('news_title', 'breaking news')}"
        state['hashtags'] = "#news #update #finance #business #trending #instagram #post #viral"
        state['source'] = f"Source: {state.get('news_url', 'N/A')}"
    
    return state

def save_output_node(state: AgentState) -> AgentState:
    print("\n💾 NODE 6: Saving final output...")
    
    try:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        txt_filename = f"post_{timestamp}.txt"
        txt_path = os.path.join("posts", txt_filename)
        
        text_content = f"""SOCIAL MEDIA POST DETAILS
Generated: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}

NEWS SOURCE:
Title: {state.get('news_title', 'N/A')}
URL: {state.get('news_url', 'N/A')}

POST CONTENT:
Caption: {state.get('caption', 'N/A')}

Hashtags: {state.get('hashtags', 'N/A')}

Source: {state.get('source', 'N/A')}

TECHNICAL DETAILS:
Image Path: {state.get('image_path', 'N/A')}
Logo Path: {state.get('logo_path', 'N/A')}
Prompt Used: {state.get('post_prompt', 'N/A')}
Reference Folder: {state.get('reference_folder', 'N/A')}
"""
        
        with open(txt_path, 'w', encoding='utf-8') as f:
            f.write(text_content)
        
        state['txt_path'] = txt_path
        print(f"✅ Final output saved: {txt_path}")
        print(f"📱 Caption: {state.get('caption', 'N/A')}")
        print(f"🏷️ Hashtags: {state.get('hashtags', 'N/A')}")
        state['step_completed'] = "output_saved"
        
    except Exception as e:
        print(f"❌ Error saving output: {e}")
        state['error'] = f"Save output error: {str(e)}"
    
    return state

class SocialMediaStateGraph:
    
    def __init__(self):
        self.workflow = StateGraph(AgentState)
        self.compiled_graph = None
    
    def add_nodes(self):
        print("🔧 Adding nodes to the workflow...")
        
        self.workflow.add_node("check_logo", check_logo_node)
        print("✅ Added Node 1: check_logo")
        
        self.workflow.add_node("search_news", search_news_node)
        print("✅ Added Node 2: search_news")
        
        self.workflow.add_node("analyze_references", analyze_references_node)
        print("✅ Added Node 3: analyze_references")
        
        self.workflow.add_node("generate_image", generate_image_node)
        print("✅ Added Node 4: generate_image")
        
        self.workflow.add_node("analyze_image", analyze_image_node)
        print("✅ Added Node 5: analyze_image")
        
        self.workflow.add_node("save_output", save_output_node)
        print("✅ Added Node 6: save_output")
    
    def add_edges(self):
        print("\n🔗 Adding edges to connect nodes...")
        
        self.workflow.set_entry_point("check_logo")
        print("✅ Set entry point: check_logo")
        
        self.workflow.add_edge("check_logo", "search_news")
        print("✅ Added edge: check_logo → search_news")
        
        self.workflow.add_edge("search_news", "analyze_references")
        print("✅ Added edge: search_news → analyze_references")
        
        self.workflow.add_edge("analyze_references", "generate_image")
        print("✅ Added edge: analyze_references → generate_image")
        
        self.workflow.add_edge("generate_image", "analyze_image")
        print("✅ Added edge: generate_image → analyze_image")
        
        self.workflow.add_edge("analyze_image", "save_output")
        print("✅ Added edge: analyze_image → save_output")
        
        self.workflow.add_edge("save_output", END)
        print("✅ Added edge: save_output → END")
    
    def compile_graph(self):
        print("\n⚙️ Compiling the workflow graph...")
        try:
            self.compiled_graph = self.workflow.compile()
            print("✅ Graph compiled successfully!")
            return True
        except Exception as e:
            print(f"❌ Error compiling graph: {e}")
            return False
    
    def run_workflow(self, query: str, reference_folder: str = "reference_posts"):
        if not self.compiled_graph:
            print("❌ Graph not compiled yet. Please compile first.")
            return None
        
        print(f"\n🚀 Starting workflow execution...")
        print(f"📝 Query: {query}")
        print(f"📁 Reference folder: {reference_folder}")
        
        initial_state = {
            "query": query,
            "reference_folder": reference_folder
        }
        
        try:
            result = self.compiled_graph.invoke(initial_state)
            print("\n🎉 Workflow completed successfully!")
            return result
        except Exception as e:
            print(f"❌ Workflow execution error: {e}")
            return None

def main():
    sm_graph = SocialMediaStateGraph()
    sm_graph.add_nodes()
    sm_graph.add_edges()
    
    if sm_graph.compile_graph():
        result = sm_graph.run_workflow(
            query="latest stock market news",
            reference_folder="reference_posts"
        )
        
        if result and not result.get('error'):
            print(f"\n✅ SUCCESS! Files created:")
            print(f"🖼️ Image: {result.get('image_path')}")
            print(f"📄 Details: {result.get('txt_path')}")
            if result.get('logo_path'):
                print(f"🏷️ Logo used: {result.get('logo_path')}")
        else:
            print(f"\n❌ Workflow failed: {result.get('error') if result else 'Unknown error'}")
    else:
        print("❌ Failed to compile graph")

if __name__ == "__main__":
    main()

🔧 Adding nodes to the workflow...
✅ Added Node 1: check_logo
✅ Added Node 2: search_news
✅ Added Node 3: analyze_references
✅ Added Node 4: generate_image
✅ Added Node 5: analyze_image
✅ Added Node 6: save_output

🔗 Adding edges to connect nodes...
✅ Set entry point: check_logo
✅ Added edge: check_logo → search_news
✅ Added edge: search_news → analyze_references
✅ Added edge: analyze_references → generate_image
✅ Added edge: generate_image → analyze_image
✅ Added edge: analyze_image → save_output
✅ Added edge: save_output → END

⚙️ Compiling the workflow graph...
✅ Graph compiled successfully!

🚀 Starting workflow execution...
📝 Query: latest stock market news
📁 Reference folder: reference_posts

🏷️ NODE 1: Checking for logo...
✅ Logo found: reference_posts\logo.png

🔍 NODE 2: Searching for news...


KeyboardInterrupt: 