<a href="https://colab.research.google.com/github/Harooniqbal4879/AgenticAI/blob/main/ai_content_marketing_assistant.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install streamlit

Collecting streamlit
  Downloading streamlit-1.49.1-py3-none-any.whl.metadata (9.5 kB)
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.49.1-py3-none-any.whl (10.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m62.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m106.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pydeck, streamlit
Successfully installed pydeck-0.9.1 streamlit-1.49.1


In [1]:
import os
import streamlit as st
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from enum import Enum
import json
import requests
from openai import OpenAI
import time
from datetime import datetime
import re
from urllib.parse import urlparse

# Core Dependencies for Multi-Agent System
try:
    from langgraph.graph import StateGraph, END
    from langgraph.graph.state import CompiledStateGraph
    from typing_extensions import TypedDict
    print("LangGraph imported successfully")
except ImportError:
    # Fallback implementation if LangGraph not available
    print("LangGraph not available - using fallback implementation")
    class TypedDict(dict):
        pass
    class StateGraph:
        def __init__(self, state_schema):
            self.nodes = {}
            self.edges = {}
            self.state_schema = state_schema
        def add_node(self, name, func):
            self.nodes[name] = func
        def add_edge(self, from_node, to_node):
            self.edges[from_node] = to_node
        def compile(self):
            return CompiledStateGraph(self.nodes, self.edges)
    class CompiledStateGraph:
        def __init__(self, nodes, edges):
            self.nodes = nodes
            self.edges = edges
        def invoke(self, state):
            # Simple sequential execution for fallback
            current = "query_handler"
            while current and current != "END":
                if current in self.nodes:
                    state = self.nodes[current](state)
                    current = self.edges.get(current, "END")
            return state

# Configuration Management
@dataclass
class Config:
    openai_api_key: str
    serp_api_key: Optional[str] = None
    max_tokens: int = 2000
    temperature: float = 0.7
    model: str = "gpt-4"

class ContentType(Enum):
    RESEARCH = "research"
    BLOG = "blog"
    LINKEDIN = "linkedin"
    IMAGE = "image"
    STRATEGY = "strategy"

# State Management for Multi-Agent System
class WorkflowState(TypedDict):
    user_query: str
    content_type: str
    research_data: Dict[str, Any]
    generated_content: Dict[str, str]
    images: List[str]
    metadata: Dict[str, Any]
    conversation_history: List[Dict[str, str]]
    error_log: List[str]

# Agent Base Class
class BaseAgent:
    def __init__(self, config: Config):
        self.config = config
        self.client = OpenAI(api_key=config.openai_api_key)

    def execute(self, state: WorkflowState) -> WorkflowState:
        raise NotImplementedError

# 1. Query Handler Agent
class QueryHandlerAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Routes requests to appropriate specialized agents"""
        try:
            prompt = f"""
            Analyze the following user query and determine the primary content type needed.
            Respond with ONLY one of: research, blog, linkedin, image, strategy

            Query: {state['user_query']}
            """

            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=50,
                temperature=0.1
            )

            content_type = response.choices[0].message.content.strip().lower()
            state['content_type'] = content_type
            state['metadata']['routing_decision'] = content_type

        except Exception as e:
            state['error_log'].append(f"Query Handler Error: {str(e)}")
            state['content_type'] = 'research'  # Default fallback

        return state

# 2. Deep Research Agent
class DeepResearchAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Conducts comprehensive web research and analysis"""
        try:
            # Simulate web research (in production, use SERP API)
            search_query = self._extract_search_terms(state['user_query'])

            # Generate research-based content using LLM
            research_prompt = f"""
            Conduct comprehensive research analysis for: {state['user_query']}

            Provide:
            1. Key findings and insights
            2. Current trends and statistics
            3. Expert opinions and quotes
            4. Relevant case studies
            5. Source recommendations

            Format as structured research report.
            """

            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": research_prompt}],
                max_tokens=self.config.max_tokens,
                temperature=self.config.temperature
            )

            research_content = response.choices[0].message.content
            state['research_data'] = {
                'content': research_content,
                'search_terms': search_query,
                'timestamp': datetime.now().isoformat(),
                'sources': self._generate_mock_sources(search_query)
            }

        except Exception as e:
            state['error_log'].append(f"Research Agent Error: {str(e)}")
            state['research_data'] = {'content': 'Research unavailable', 'error': str(e)}

        return state

    def _extract_search_terms(self, query: str) -> List[str]:
        # Simple keyword extraction
        words = re.findall(r'\b\w+\b', query.lower())
        return [word for word in words if len(word) > 3]

    def _generate_mock_sources(self, terms: List[str]) -> List[str]:
        return [
            f"https://example-research.com/{term}-study-2024"
            for term in terms[:3]
        ]

# 3. SEO Blog Writer Agent
class SEOBlogWriterAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Creates search-optimized long-form content"""
        try:
            research_context = state.get('research_data', {}).get('content', '')

            blog_prompt = f"""
            Write a comprehensive, SEO-optimized blog post based on:
            Query: {state['user_query']}
            Research Context: {research_context[:1000]}...

            Requirements:
            - 1500+ words
            - Clear H1, H2, H3 structure
            - Meta description
            - Focus keywords
            - Internal linking suggestions
            - Call-to-action
            - Engaging introduction and conclusion
            """

            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": blog_prompt}],
                max_tokens=self.config.max_tokens,
                temperature=self.config.temperature
            )

            blog_content = response.choices[0].message.content
            state['generated_content']['blog'] = blog_content
            state['generated_content']['blog_metadata'] = {
                'word_count': len(blog_content.split()),
                'estimated_read_time': len(blog_content.split()) // 200,
                'seo_keywords': self._extract_keywords(state['user_query'])
            }

        except Exception as e:
            state['error_log'].append(f"Blog Writer Error: {str(e)}")
            state['generated_content']['blog'] = f"Blog generation failed: {str(e)}"

        return state

    def _extract_keywords(self, query: str) -> List[str]:
        return re.findall(r'\b\w+\b', query.lower())[:5]

# 4. LinkedIn Post Writer Agent
class LinkedInPostWriterAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Generates engaging professional social content"""
        try:
            research_context = state.get('research_data', {}).get('content', '')

            linkedin_prompt = f"""
            Create an engaging LinkedIn post based on:
            Query: {state['user_query']}
            Research: {research_context[:500]}...

            Requirements:
            - Professional tone
            - Hook in first line
            - 3-5 bullet points or key insights
            - Call-to-action
            - 5-8 relevant hashtags
            - Emoji usage for engagement
            - 150-300 words optimal length
            """

            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": linkedin_prompt}],
                max_tokens=800,
                temperature=self.config.temperature
            )

            linkedin_content = response.choices[0].message.content
            state['generated_content']['linkedin'] = linkedin_content
            state['generated_content']['linkedin_metadata'] = {
                'character_count': len(linkedin_content),
                'hashtags': self._extract_hashtags(linkedin_content),
                'engagement_score': self._calculate_engagement_score(linkedin_content)
            }

        except Exception as e:
            state['error_log'].append(f"LinkedIn Writer Error: {str(e)}")
            state['generated_content']['linkedin'] = f"LinkedIn post generation failed: {str(e)}"

        return state

    def _extract_hashtags(self, content: str) -> List[str]:
        return re.findall(r'#\w+', content)

    def _calculate_engagement_score(self, content: str) -> int:
        # Simple engagement scoring
        score = 0
        score += len(re.findall(r'[?!]', content)) * 2  # Questions and exclamations
        score += len(self._extract_hashtags(content))   # Hashtags
        score += len(re.findall(r'[📊📈💡🚀🎯]', content)) * 3  # Engagement emojis
        return min(score, 100)

# 5. Image Generation Agent
class ImageGenerationAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Produces custom visuals with prompt optimization"""
        try:
            # Optimize image prompt based on content
            image_prompt = self._optimize_image_prompt(state['user_query'])

            # For demo purposes, generate image descriptions instead of actual images
            # In production, you'd use DALL-E 3 API
            image_description = f"""
            Image Concept: {image_prompt}
            Style: Professional, modern, high-quality
            Format: 16:9 landscape for blog headers, 1:1 square for social media
            Elements: Clean typography, brand-appropriate colors, engaging visuals
            """

            state['images'] = [image_description]
            state['generated_content']['image_prompts'] = [image_prompt]

            # If OpenAI API key is available and DALL-E is accessible, generate actual image
            try:
                if hasattr(self.client, 'images'):
                    image_response = self.client.images.generate(
                        model="dall-e-3",
                        prompt=image_prompt,
                        size="1024x1024",
                        quality="standard",
                        n=1
                    )
                    state['images'].append(image_response.data[0].url)
            except Exception:
                pass  # Continue with description only

        except Exception as e:
            state['error_log'].append(f"Image Generation Error: {str(e)}")
            state['images'] = [f"Image generation failed: {str(e)}"]

        return state

    def _optimize_image_prompt(self, query: str) -> str:
        prompt_optimization = f"""
        Create a DALL-E optimized prompt for: {query}

        Make it:
        - Visually descriptive
        - Professional and modern
        - Suitable for content marketing
        - High quality and engaging
        """

        try:
            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": prompt_optimization}],
                max_tokens=150,
                temperature=0.8
            )
            return response.choices[0].message.content.strip()
        except:
            return f"Professional illustration about {query}, modern design, high quality"

# 6. Content Strategist Agent
class ContentStrategistAgent(BaseAgent):
    def execute(self, state: WorkflowState) -> WorkflowState:
        """Formats and organizes research into readable content"""
        try:
            # Compile all generated content into a strategic summary
            strategy_prompt = f"""
            As a content strategist, analyze and provide strategic recommendations for:

            Original Query: {state['user_query']}
            Content Type: {state['content_type']}

            Generated Content Summary:
            - Blog: {len(state['generated_content'].get('blog', ''))} characters
            - LinkedIn: {len(state['generated_content'].get('linkedin', ''))} characters
            - Images: {len(state['images'])} generated

            Provide:
            1. Content performance predictions
            2. Distribution strategy
            3. Optimization recommendations
            4. Next content suggestions
            5. KPI tracking recommendations
            """

            response = self.client.chat.completions.create(
                model=self.config.model,
                messages=[{"role": "user", "content": strategy_prompt}],
                max_tokens=self.config.max_tokens,
                temperature=self.config.temperature
            )

            strategy_content = response.choices[0].message.content
            state['generated_content']['strategy'] = strategy_content

            # Generate content quality score
            state['metadata']['quality_score'] = self._calculate_quality_score(state)

        except Exception as e:
            state['error_log'].append(f"Content Strategist Error: {str(e)}")
            state['generated_content']['strategy'] = f"Strategy generation failed: {str(e)}"

        return state

    def _calculate_quality_score(self, state: WorkflowState) -> int:
        score = 50  # Base score

        # Add points for successful content generation
        if state['generated_content'].get('blog'):
            score += 15
        if state['generated_content'].get('linkedin'):
            score += 15
        if state['research_data'].get('content'):
            score += 10
        if state['images']:
            score += 10

        # Deduct points for errors
        score -= len(state['error_log']) * 5

        return max(0, min(100, score))

# Multi-Agent Orchestrator
class ContentMarketingOrchestrator:
    def __init__(self, config: Config):
        self.config = config
        self.agents = {
            'query_handler': QueryHandlerAgent(config),
            'research': DeepResearchAgent(config),
            'blog_writer': SEOBlogWriterAgent(config),
            'linkedin_writer': LinkedInPostWriterAgent(config),
            'image_generator': ImageGenerationAgent(config),
            'content_strategist': ContentStrategistAgent(config)
        }
        self.workflow = self._build_workflow()

    def _build_workflow(self):
        """Build LangGraph workflow"""
        workflow = StateGraph(WorkflowState)

        # Add all agent nodes
        workflow.add_node("query_handler", self.agents['query_handler'].execute)
        workflow.add_node("research", self.agents['research'].execute)
        workflow.add_node("blog_writer", self.agents['blog_writer'].execute)
        workflow.add_node("linkedin_writer", self.agents['linkedin_writer'].execute)
        workflow.add_node("image_generator", self.agents['image_generator'].execute)
        workflow.add_node("content_strategist", self.agents['content_strategist'].execute)

        # Define workflow edges based on content type routing
        workflow.add_edge("query_handler", "research")
        workflow.add_edge("research", "blog_writer")
        workflow.add_edge("blog_writer", "linkedin_writer")
        workflow.add_edge("linkedin_writer", "image_generator")
        workflow.add_edge("image_generator", "content_strategist")
        workflow.add_edge("content_strategist", END)

        return workflow.compile()

    def process_request(self, user_query: str, conversation_history: List = None) -> Dict[str, Any]:
        """Process user request through multi-agent workflow"""
        initial_state: WorkflowState = {
            'user_query': user_query,
            'content_type': '',
            'research_data': {},
            'generated_content': {},
            'images': [],
            'metadata': {'start_time': datetime.now().isoformat()},
            'conversation_history': conversation_history or [],
            'error_log': []
        }

        try:
            final_state = self.workflow.invoke(initial_state)
            final_state['metadata']['end_time'] = datetime.now().isoformat()
            final_state['metadata']['success'] = len(final_state['error_log']) == 0

            return final_state
        except Exception as e:
            return {
                **initial_state,
                'error_log': [f"Workflow Error: {str(e)}"],
                'metadata': {
                    'end_time': datetime.now().isoformat(),
                    'success': False,
                    'error': str(e)
                }
            }



ModuleNotFoundError: No module named 'streamlit'