In [None]:
# utils/helpers.py
import os
import threading
from functools import wraps
from typing import Callable
import structlog
from structlog import get_logger
from structlog.stdlib import LoggerFactory
from structlog.dev import ConsoleRenderer
from structlog.processors import TimeStamper, StackInfoRenderer, format_exc_info, add_log_level
from structlog.processors import JSONRenderer

from tenacity import retry, stop_after_delay, wait_exponential
import os
from pathlib import Path
from typing import Optional
from cashews import cache
from crewai.tools import BaseTool  
from dotenv import load_dotenv
from llama_index.core import (
    Settings, 
    Document, 
    ServiceContext, 
    SimpleDirectoryReader, 
    StorageContext, 
    VectorStoreIndex,
    load_index_from_storage 
)
from llama_index.core.node_parser import SemanticSplitterNodeParser, SimpleNodeParser

from llama_index.vector_stores.postgres import PGVectorStore

import threading
from llama_index.embeddings.fastembed import FastEmbedEmbedding



# Configure structlog once
structlog.configure(
    processors=[
        add_log_level,
        TimeStamper(fmt="iso"),
        StackInfoRenderer(),
        format_exc_info,
        JSONRenderer() if "JSON" in os.getenv("LOG_FORMAT", "console") else ConsoleRenderer()
    ],
    logger_factory=LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

logger = get_logger("Marketing content creation bot")



In [None]:
# llm/groq_client.py
import os
from dotenv import load_dotenv

from crewai import Agent, Task, Crew, LLM
from threading import Lock
from typing import Dict
load_dotenv()



# Correct - module level (your original)
_init_locks: Dict[str, Lock] = {}
_locks_lock = Lock()

def get_init_lock(business_id):
    with _locks_lock:
        if business_id not in _init_locks:
            _init_locks[business_id] = Lock()
        return _init_locks[business_id]



GROQ_API_KEY = os.getenv("GROQ_API_KEY")
GROQ_MODEL = "llama-3.3-70b-versatile"

if not GROQ_API_KEY:
    logger.error("GROQ_API_KEY not found") 
    raise

def get_llm():
    """Get configured ChatGroq LLM instance."""
    return LLM(
    model="groq/llama-3.3-70b-versatile",
    api_key=os.getenv("GROQ_API_KEY"),
    temperature=0.6
)


In [None]:
from tenacity import retry, stop_after_attempt, wait_exponential

POSTGRES_URI = os.getenv("POSTGRES_URI")

if not POSTGRES_URI:
    logger.error("POSTGRES_URI environment variable not set")
    raise ValueError("POSTGRES_URI environment variable not set")



class Marketing_Rag_System:
    def __init__(self, data_path: str = "./data/marketing",business_id=None, 
                 content_type: str = "default"):
        self._embed_model_cache=None
        self._setup_lock= threading.Lock()
        self.data_path= data_path
        self.content_type=content_type
        self.table_name = f"rag_data_{business_id or 'default'}"
        self.business_id=business_id

        get_init_lock(business_id)
    
    @retry(stop=stop_after_attempt(2))
    def initialize_embedding_model(self):
        if self._embed_model_cache is not None:
            return self._embed_model_cache
        with self._setup_lock:
            if self._embed_model_cache is not None:
                return self._embed_model_cache
            
            logger.info("Setting up embedding model...")
            
            self.embed_model = FastEmbedEmbedding(model_name="BAAI/bge-small-en-v1.5")
            self._embed_model_cache = self.embed_model
            logger.info("embedding model ready!")
            return self.embed_model


    @cache(ttl="24h", key="marketing_index:{self.table_name}")
    @retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10))
    def build_marketing_index(self):

        if not self.business_id:
            logger.warn("Business ID not given")

        from llama_index.core import Settings

        lock=get_init_lock(self.business_id)
        with lock:
        
            if self.content_type == "blog_article":
                parser = SemanticSplitterNodeParser.from_defaults(buffer_size=1, breakpoint_percentile_threshold=95)
            elif self.content_type in ["social_post", "ad_copy", "email_subject"]:
                parser = SimpleNodeParser.from_defaults(chunk_size=256, chunk_overlap=50)
            else:
                parser = SimpleNodeParser.from_defaults(chunk_size=512, chunk_overlap=100)
            
            
            try:
                vector_store = PGVectorStore.from_params(
                    connection_string=POSTGRES_URI, 
                    table_name=self.table_name, 
                    embed_dim=384,
                    hybrid_search=True, 
                    perform_setup=True,
                    hnsw_kwargs={
                        "hnsw_m": 16,                      # Links per node (higher = better recall)
                        "hnsw_ef_construction": 64,        # Build-time search depth
                        "hnsw_ef_search": 40,              # Query-time search depth
                        "hnsw_dist_method": "vector_cosine_ops"  # Distance metric
                    }
                        )
                
                Settings.embed_model= self.initialize_embedding_model()
                Settings.llm = None 
                Settings.node_parser = parser
                storage_context = StorageContext.from_defaults(vector_store=vector_store)
                
                # Try to connect to existing index
                try:
                    index = VectorStoreIndex.from_vector_store(
                        vector_store=vector_store,
                        embed_model=Settings.embed_model
                    )
                    logger.info(f"Connected to existing index: {self.table_name}")
                except Exception as e:
                    # Build new if doesn't exist
                    logger.info(f"Building new index: {self.table_name}")
                    documents = SimpleDirectoryReader(self.data_path).load_data()
                    if not documents:
                        raise ValueError(f"No documents found in {self.data_path}")
                    
                    index = VectorStoreIndex.from_documents(
                        documents,
                        storage_context=storage_context,
                        show_progress=True
                    )
                            
                logger.info(f"Index ready: {self.table_name} ({self.content_type})")
                return index
            except Exception as e:
                logger.error("Index build failed", error=str(e), error_type=type(e).__name__)
                raise


VALID_CONTENT_TYPES = {"blog", "social", "ad"}

class BrandVoiceKnowledgeBase:
    """Manages brand voice knowledge bases for different content types"""
    
    def __init__(self, postgres_uri: Optional[str] = None, llm=None,business_id="default"):
        self.postgres_uri = postgres_uri or POSTGRES_URI
        self.llm = get_llm()
        self.business_id=business_id
        
        # Lazy-loaded knowledge bases
        self._blog_kb = None
        self._social_kb = None
        self._ad_kb = None
        
        # Track which KBs failed to load
        self._load_errors = {}
    
    @property
    def blog_kb(self):
        """Lazy-load blog knowledge base"""
        if self._blog_kb is None and "blog" not in self._load_errors:
            try:
                rag_system = Marketing_Rag_System(
                data_path="./brand_blogs",
                content_type="blog_article",
                table_name=f"brand_blog_posts_{self.business_id}",
                business_id=self.business_id
            )
                self._blog_kb = rag_system.build_marketing_index()
            except Exception as e:
                logger.info(f"Failed to load blog KB: {e}")
                self._load_errors["blog"] = str(e)
                raise
        return self._blog_kb
    
    @property
    def social_kb(self):
        """Lazy-load social knowledge base"""
        if self._social_kb is None and "social" not in self._load_errors:
            try:
                rag_system= Marketing_Rag_System(data_path= "./brand_social", 
                                                 content_type="social_post",
                                                table_name=f"brand_social_posts_{self.business_id}",
                                                business_id=self.business_id)

                self._social_kb = rag_system.build_marketing_index()
            except Exception as e:
                logger.info(f"Failed to load social KB: {e}")
                self._load_errors["social"] = str(e)
                raise
        return self._social_kb
    
    @property
    def ad_kb(self):
        """Lazy-load ad knowledge base"""
        if self._ad_kb is None and "ad" not in self._load_errors:
            try:
                rag_system= Marketing_Rag_System(data_path= "./brand_ads", 
                                                 content_type="ad_copy",
                                                table_name=f"brand_ad_copy_{self.business_id}",
                                                business_id=self.business_id)
                
                self._ad_kb = rag_system.build_marketing_index()
            except Exception as e:
                logger.info(f"Failed to load ad KB: {e}")
                self._load_errors["ad"] = str(e)
                raise
        return self._ad_kb
    
    def _get_llm(self):
        """Get LLM, creating if needed"""
        if self.llm is None:
            self.llm = get_llm()
        return self.llm
    
    def get_style_guide(self, content_type: str, business_id):
        """Get query engine for specified content type"""
        if content_type not in VALID_CONTENT_TYPES:
            raise ValueError(
                f"Invalid content_type '{content_type}'. "
                f"Must be one of: {VALID_CONTENT_TYPES}"
            )
        
        # Check if this KB failed to load
        if content_type in self._load_errors:
            raise ValueError(
                f"Knowledge base for '{content_type}' failed to load: "
                f"{self._load_errors[content_type]}"
            )
        self.business_id=business_id
        get_init_lock(business_id)
        llm = self._get_llm()
        
        kb_config = {
            "blog": (self.blog_kb, 3),
            "social": (self.social_kb, 2),
            "ad": (self.ad_kb, 2)
        }
        
        kb, top_k = kb_config[content_type]
        return kb.as_query_engine(
            similarity_top_k=top_k,
            response_mode="compact",
            llm=llm
        )




from typing import Any

class KBQueryTool(BaseTool):
    name: str = "kb_query"
    description: str = "..."
    kb: BrandVoiceKnowledgeBase  
    
    def __init__(self, kb: BrandVoiceKnowledgeBase, business_id):
        super().__init__()
        self.kb = kb
        self.business_id=business_id
    
    def _run(self, **kwargs: Any) -> str:
        """CrewAI passes args as kwargs dict."""
        content_type = kwargs.get("content_type")
        query = kwargs.get("query")
        
        # Validate
        if not content_type or not query:
            return "Error: Missing content_type or query"
        
        if content_type not in VALID_CONTENT_TYPES:
            return f"Error: content_type must be one of {VALID_CONTENT_TYPES}"
        
        try:
            style_guide = self.kb.get_style_guide(content_type, self.business_id)
            response = style_guide.query(query)
            return str(response)
        except Exception as e:
            logger.warning(f"KB query failed: {e}")
            return self._get_fallback_response(content_type)
    
    def _get_fallback_response(self, content_type: str) -> str:
        """Provide fallback guidelines when KB unavailable"""
        fallbacks = {
            "blog": """BRAND FALLBACK (KB unavailable):
- Formal tone, 18±2 word sentences
- Precise vocabulary (avoid 'leverage' → use 'use')
- Authoritative but approachable
- Structure: Problem → Solution → CTA""",
            
            "social": """BRAND FALLBACK (KB unavailable):
- Conversational, 10-15 words per post
- Emoji-friendly, engaging CTAs
- Brand hashtags, trending topics
- Authentic voice""",
            
            "ad": """BRAND FALLBACK (KB unavailable):
- Clear value prop, 5-10 words
- Action-oriented, benefit-focused
- Urgency without pressure
- Strong CTAs"""
        }
        return fallbacks.get(content_type, fallbacks["blog"])



In [None]:
from tavily import TavilyClient

load_dotenv()
tavily_api_key = os.getenv("TAVILY_API_KEY")

if not tavily_api_key:
    logger.info("tavily_api_key not found")


class TavilySearchTool(BaseTool):
    name: str = "tavily_search"
    description: str = "Search the web using Tavily API for research queries"
    
    def _run(self, query: str) -> str:
        """Execute Tavily search."""
        client = TavilyClient(api_key=tavily_api_key)
        result = client.search(query)
        return str(result)

tavily_search = TavilySearchTool()



In [None]:
import json
from datetime import datetime
from pydantic import BaseModel, Field


LEARNING_FILE = Path("data/reviewer_learning.json")
LEARNING_FILE.parent.mkdir(exist_ok=True)

# PROPER CrewAI BaseTool IMPLEMENTATION
class ReviewerAccuracyTool(BaseTool):
    name: str = "update_reviewer_accuracy"
    description: str = "Update reviewer learning from human feedback. Tracks accuracy over time."
    
    def _run(self, kb_score: float, confidence: float, human_approved: bool) -> str:
        """Persistently track reviewer accuracy after human feedback."""
        if LEARNING_FILE.exists():
            history = json.loads(LEARNING_FILE.read_text())
        else:
            history = {"total": 0, "correct": 0, "reviews": []}
        
        agent_auto_approved = (kb_score >= 8.5)
        was_correct = (agent_auto_approved == human_approved)
        
        history["total"] += 1
        history["correct"] += 1 if was_correct else 0
        history["reviews"].append({
            "kb_score": kb_score,
            "confidence": confidence,
            "human_approved": human_approved,
            "agent_correct": was_correct,
            "timestamp": datetime.utcnow().isoformat(),
        })
        
        history["reviews"] = history["reviews"][-200:]
        LEARNING_FILE.write_text(json.dumps(history, indent=2))
        
        accuracy = history["correct"] / history["total"] if history["total"] > 0 else 1.0
        return f"{accuracy:.1%}"

update_reviewer_accuracy = ReviewerAccuracyTool()



In [None]:
# deployer.py
import argparse
import sys


from crewai import Crew, Process



class ContentCrewFactory:
    def __init__(self, kb:BrandVoiceKnowledgeBase,
                 tavily_search:TavilySearchTool,
                 update_reviewer_accuracy:ReviewerAccuracyTool,
                 business_id ):
        self.kb=kb
        self.tavily_search=tavily_search
        self.update_reviewer_accuracy=update_reviewer_accuracy
        self.business_id=business_id
        
    def create_crew(self)-> Crew:
        kb_tool = KBQueryTool(kb=self.kb, business_id=self.business_id)
        self.llm = get_llm() 

        researcher_agent = Agent(
            role="Research Analyst",
            goal="Find and summarize specific topics",
            backstory="Experienced researcher with attention to detail.",
            tools=[self.tavily_search],
            llm=get_llm(), 
            verbose=True,
        )

        research_task = Task(
            description="""
                Research {topic} for {format}.
                Consider current trends in 2026.
            """,
            expected_output="10 bullet points of relevant information.",
            agent=researcher_agent,
        )
        
        writer_agent = Agent(
            role="Brand-Aligned Content Writer",
            goal="Write matching {brand_style} from KB",
            backstory="""
            Expert at replicating brand voice. ALWAYS query KB first.
            Extract: tone, style, structure, phrasing from examples.
            """,
            tools=[kb_tool],
            llm=get_llm(),
            verbose=True,
            memory=True,
        )

        writer_task = Task(
            description="""
            Write {topic} in {format}:
            1. Query KB for 3-5 relevant examples
            2. Analyze brand patterns
            3. Match exactly
            4. End with brand alignment confidence
            """,
            agent=writer_agent,
            context=[research_task],
            expected_output="A piece of content that matches the brand style with confidence score"
        )

        reviewer_agent = Agent(
                role="Brand Style Enforcer",
                goal="Enforce brand alignment via KB + self-calibration",
                backstory="""
                Protocol:
                1. Query KB multiple times (tone/structure/vocab)
                2. Compute composite score
                3. HITL only if uncertain (score 5-7, conf <0.8)
                4. Learn from human feedback
                """,
                tools=[kb_tool, self.update_reviewer_accuracy],
                memory=True,
                verbose=True,
                llm=get_llm()
            )
        
        reviewer_task = Task(
                description="""
                Review writer output:
                1. KB deep dive (tone, structure, vocab)
                2. Compute kb_score, confidence
                3. Decide HITL or auto-approve (>=8.5)
                4. Learn post-human input
                """,
                agent=reviewer_agent,
                context=[writer_task],
                human_input=True,
                expected_output="JSON matching ReviewOutput schema",
            )

        return Crew(
            agents=[researcher_agent, writer_agent, reviewer_agent],
            tasks=[research_task, writer_task, reviewer_task],
            process=Process.sequential,
            verbose=True,
            )


# deployer.py - Entry point (CLI/API)
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--topic", default="AI Agents")
    parser.add_argument("--format", default="LinkedIn Post")
    parser.add_argument("--voice", default="formal")  # ← ADD THIS LINE
    parser.add_argument("--business_id", default="default")
    args = parser.parse_args()
    
    # Initialize per-request dependencies
    
    kb = BrandVoiceKnowledgeBase(business_id="default")
    factory = ContentCrewFactory(
        kb=kb,
        tavily_search=tavily_search,
        update_reviewer_accuracy=update_reviewer_accuracy,
        business_id="default"
        
    )
    crew = factory.create_crew()
    
    # Execute
    result = crew.kickoff(inputs={
        "topic": args.topic,
        "format": args.format,
    })
    print(result)