In [2]:
import os
os.chdir("..")

In [3]:
# Import libraries
import os
import logging
from enum import Enum

from pydantic import BaseModel, Field
from typing import Annotated, Dict, List, Sequence, Literal, Optional

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_tavily import TavilySearch
from langchain_core.prompts import PromptTemplate
from langgraph.graph import StateGraph, START, END, add_messages
from langchain_core.messages import ToolMessage, SystemMessage, BaseMessage

from src.config.settings import settings
from src.agent.utils import get_date_string

2025-10-15 22:55:43,257 - root - INFO - ad-automation-agent settings loaded successfully for development environment.


In [5]:
# Set langsmith project
os.environ["LANGSMITH_API_KEY"] = settings.langsmith_api_key.get_secret_value()
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = settings.app_name

# Suppress all debug logs from urllib3 and langsmith
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("langsmith").setLevel(logging.WARNING)
logging.getLogger("openai._base_client").setLevel(logging.WARNING)
logging.getLogger("openai._base_client").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("websockets.client").setLevel(logging.WARNING)
logging.getLogger("pyppeteer").setLevel(logging.WARNING)

---
## 1. State

In [6]:
# Campaign schema
class CampaignGoal(str, Enum):
    """Possible objectives for an advertising campaign"""
    AWARENESS = 'awareness'
    TRAFFIC = 'traffic'
    ENGAGEMENT = 'engagement'
    LEADS = 'leads'
    APP_INSTALLS = 'app installs'


class AdPlatform(str, Enum):
    """Specific ad placements across different social platforms"""
    INSTAGRAM_REELS = "instagram reels"
    INSTAGRAM_STORY = "instagram video story"
    FACEBOOK_FEEDS = "facebook feed"
    FACEBOOK_STORY = "facebook story"
    YOUTUBE_SHORT = "youtube short"
    TIKTOK_REELS = "tiktok reels"


class Campaign(BaseModel):
    """Schema for campaign information"""
    goal: CampaignGoal = Field(
        ...,
        description="Objective for the marketing campaign"
    )
    platform: AdPlatform = Field(
        ...,
        description="Specific ad placement for the marketing campaign"
    )

In [7]:
# Target audience schema
class Gender(str, Enum):
    """
    Gender identity options for the target audience.
    """
    MALE = "male"
    FEMALE = "female"
    ALL = "male and female"


class Location(str, Enum):
    """
    Location options for the target audience.
    """
    USA = "united states"
    UK = "united kingdom"
    CANADA = "canada"
    AUSTRALIA = "australia"


class IncomeRange(str, Enum):
    """
    Income range categories for audience segmentation.
    """
    LOW = "below $30k"
    LOWER_MIDDLE = "$30k to $60k"
    MIDDLE = "$60k to $100k"
    UPPER_MIDDLE = "$100k to $200k"
    HIGH = "above $200k"


class AgeRange(BaseModel):
    min_age: int = Field(
        ...,
        description="Minimum age for the target audience"
    )
    max_age: int = Field(
        ...,
        description="Maximum age for the target audience"
    )


class TargetAudience(BaseModel):
    """Audience information schema"""
    gender: Gender = Field(
        ...,
        description="Gender identity for the target audience"
    )
    location: Location = Field(
        ...,
        description="Location for the target audience"
    )
    income_range: IncomeRange = Field(
        ...,
        description="Income range for the target audience"
    )
    age_range: AgeRange = Field(
        ...,
        description="Age range for the target audience"
    )

In [8]:
# Product schema
class ProductPlatform(str, Enum):
    """Mobile platforms supported by the product."""
    ios = "ios"
    android = "android"
    web = "web"


class Product(BaseModel):
    """Product information"""
    name: str = Field(
        ...,
        description="Name of the product"
    )
    description: str = Field(
        ...,
        description="A brief overview of the product"
    )
    features: Dict[str, str] = Field(
        ...,
        description="Dictionary mapping feature names to their description"
    )
    supported_platforms: List[ProductPlatform] = Field(
        ...,
        description="Supported platforms for the application"
    )

In [9]:
# Research_findings
class ResearchNote(BaseModel):
    query: str = Field(
        ...,
        description="Query for the websearch"
    )
    context: str = Field(
        ...,
        description="Compressed findings"
    )
    sources: List[str] = Field(
        ...,
        description="List of all resources"
    )

In [10]:
class AudienceResearchState(BaseModel):
    supervisor_messages: Annotated[Sequence[BaseMessage], add_messages] = Field(
        default_factory=list,
        description="History of messages from the supervisor"
    )
    campaign: Campaign = Field(
        ...,
        description="Campaign information"
    )
    target_audience: TargetAudience = Field(
        ...,
        description="Target audience information"
    )
    product: Product = Field(
        ...,
        description="Product information"
    )
    raw_research_notes: Annotated[Sequence[ToolMessage], add_messages] = Field(
        None,
        description="List of conduct_research tool results"
    )
    research_notes: Annotated[List[ResearchNote], add_messages] = Field(
        None,
        description="List of processed and cleaned conduct_research results"
    )

---
## Prompts

In [33]:
research_instructions_prompt = PromptTemplate(
    input_variables=["date", "campaign", "target_audience", "product"],
    template="""You are a psychological researcher. Your job is to coordinate and perform research for a marketing campaign. for context, today date is {date}.

<campaign_info>
campaign: {campaign}
target_audience: {target_audience}
product: {product}
</campaign_info>

<task>
Your primary task is to gather information trough research that can transform provided generic target audience demographics into actionable deep psychological profiles which enables precise, high-impact advertising campaigns.
You have a conduct_research tool that searches for answer of your queries in the web and brings the answers back to you. You are responsible for coordinating and performing the research using this tool so that at the end of your research you can answer the following questions:

- **Core Values: What fundamental beliefs shape this audience’s preferences and loyalties?**
- **Daily routines: What are the regular habits and daily schedules of this audience?**
- **Behavioral patterns: What are the typical online and offline behaviors and purchasing habits of this audience?**
- **Emotional triggers: what emotionally motivates the audience to pay attention, react, or take action?**
- **Decision-making process: how decisions are made, including factors like price sensitivity, reliance on peer reviews, or  influencers’ authority?**
- **Pain points/challenges: What unmet needs or problems does this audience face that are relevant to our campaign or product?**
</task>

<available_tools>
1. **conduct_research**: For finding the answer of your queries in the web
2. **think_tool**: For reflection and strategic planning during research

**CRITICAL: Use think_tool before calling conduct_research to plan your approach, and after each conduct_research to assess progress**
</available_tools>

<instructions>
Adopt the mindset of a professional researcher with limited time and resources. Follow these steps strictly to maximize research effectiveness and depth:

1. **Read all details of the campaign, audience, and product carefully before taking any action.**
2. **Before starting each research cycle:** Use think_tool to reflect and plan your next query. Define exactly what information you seek, based on the current research goal. If the research topic or question is too broad to answer directly, break it down into specific, focused sub-questions. Prioritize which sub-question to resolve first.
3. **Use conduct_research to search for answers about the targeted query/sub-question.**
4. **After each conduct_research step:** Use think_tool to process and analyze the results, assess whether the answer is complete or needs elaboration, and refine subsequent queries as needed. If the information found is still too generic, continue narrowing/clarifying your sub-question until actionable, detailed insight is obtained. If enough information has been collected to meaningfully address the current question, move on to the next question or subtopic.
</instructions>

<constraints>
**Tool Call Budgets:**
- Hard maximum: 10 conduct_research calls per research session.
- Limit: 3 conduct_research calls per main profiling question (Core Values, Daily routines, etc.).

**Stopping Rules:**
- If your last 2 searches return similar information, or if you can confidently answer the current question, stop further research for that question and move on.
- If you reach 10 conduct_research calls in total without sufficient results, stop and summarize findings.
</constraints>

<final_step>
At the end of your research, approve that you have gathered enough information for creating effective audience profile.
</final_step>
"""
)

compress_research_instructions_prompt = PromptTemplate(
    input_variables=["date", "query", "results"],
    template="""You are a research assistant that has conducted research on a topic by calling web searche tool. Your job is now to clean up the findings, but preserve all of the relevant statements and information that the researcher has gathered. For context, today's date is {date}.

<task>
Your main task is to process the raw tool call containing results from web search and produce a structured report that:
- Aggregates every relevant fact and statement from results, in a clean, well-organized format
- Assigns in-text numeric citations (e.g., [1], [2]) for each unique source or URL found in the tool results
- Concludes with a full "Sources" section—mapping all citation numbers to their corresponding URLs and titles, as found in the tool outputs
</task>

<guidelines>
1. Your report should restate all factual findings and information from the tool results verbatim — DO NOT paraphrase, summarize, or alter any relevant detail or data.
2. If the same fact or statement occurs in multiple sources, you may group them but cite all sources that claimed it (e.g., "...as reported in [1][2][3]").
3. Assign a unique sequential citation number to each URL/source that appears in the raw tool call results, in the order you first reference them.
5. Your output should be comprehensive. DO NOT omit or exclude any statement, number, name, or detail that could be relevant to the research question.
6. The final "Sources" section must list all citations, formatted: ["[1] Source Title: URL", [2] Source Title: URL, ...]
</guideline>

<research_result>
query: {query}
results: {results}
</research_result>
"""
)

---
## Tools

In [17]:
# Tool functions
def deduplicate_search_results(search_results: dict) -> dict:
    """
    Deduplicate search results by URL to avoid processing duplicate content.

    Args:
        search_results: List of search result dictionaries

    Returns:
        Dictionary mapping URLs to unique results
    """
    unique_results = {}

    for result in search_results["results"]:
        url = result['url']
        if url not in unique_results:
            unique_results[url] = result["content"]

    return unique_results

In [23]:
# Tool implementation
tavily_search = TavilySearch(tavily_api_key=settings.tavily_api_key.get_secret_value(), max_results=3, search_depth="advanced")

@tool
def conduct_research(query: str) -> dict:
    """
    Searches for information about a given query using Tavily search engine.

    Args:
        query(str): The query to search for.

    Returns:
        dict: A dictionary containing the search results and sources.
    """
    # Search for the query
    search_result = tavily_search.invoke(query)

    # Deduplicate the search results
    unique_results = deduplicate_search_results(search_result)

    return unique_results


@tool
def think_tool(reflection: str) -> str:
    """
    Tool for strategic reflection on research progress and decision-making.

    Use this tool after each search to analyze results and plan next steps systematically.
    This creates a deliberate pause in the research workflow for quality decision-making.

    When to use:
    - After receiving search results: What key information did I find?
    - Before deciding next steps: Do I have enough to answer comprehensively?
    - When assessing research gaps: What specific information am I still missing?
    - Before concluding research: Can I provide a complete answer now?

    Reflection should address:
    1. Analysis of current findings - What concrete information have I gathered?
    2. Gap assessment - What crucial information is still missing?
    3. Quality evaluation - Do I have sufficient evidence/examples for a good answer?
    4. Strategic decision - Should I continue searching or provide my answer?

    Args:
        reflection: Your detailed reflection on research progress, findings, gaps, and next steps

    Returns:
        Confirmation that reflection was recorded for decision-making
    """
    return f"Reflection recorded: {reflection}"

tools = [conduct_research, think_tool]
tools_by_name = {tool.name: tool for tool in tools}

---
## Nodes

In [30]:
# Create the model and bind tools
model = ChatOpenAI(api_key=settings.open_ai_api_key.get_secret_value(), temperature=0, model="gpt-4.1")
model_with_tools = model.bind_tools(tools)

In [31]:
def researcher_node(state: AudienceResearchState):
    """Agent node responsible for conducting research."""
    # Extract state variables
    campaign = state.campaign.model_dump_json()
    target_audience = state.target_audience.model_dump_json()
    product = {"name": state.product.name, "description": test_state.product.description}

    # Inject state variables into the researcher prompt
    prompt = research_instructions_prompt.format(
        date=get_date_string(),
        campaign=campaign,
        target_audience=target_audience,
        product=product
    )

    # Create messages history
    messages = [SystemMessage(content=prompt)] + state.supervisor_messages
    return {"supervisor_messages": model_with_tools.invoke(messages)}

In [32]:
def tool_node(state: AudienceResearchState):
    """
    Execute all tool calls from the previous LLM response.

    Executes all tool calls from the previous LLM responses.
    Returns the updated state with tool execution results.
    """
    tool_calls = state.supervisor_messages[-1].tool_calls

    # Execute all tool calls
    observations = []
    for tool_call in tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observations.append(tool.invoke(tool_call["args"]))

    # Create tool message output
    tool_outputs = [
        ToolMessage(
            content=observation,
            name=tool_call["name"],
            tool_call_id=tool_call["id"],
            query=tool_call["args"]
        ) for observation, tool_call in zip(observations, tool_calls)
    ]

    return {"raw_research_notes": tool_outputs}

In [None]:
def compress_research_node(state: AudienceResearchState):
    """
    Compress the tool call result into structured output
    """
    last_research = state.raw_research_notes[-1]

    # Add structure output to the model
    model_with_structured_output = model.with_structured_output(ResearchNote)

    # Create prompt
    prompt = compress_research_instructions_prompt.format(
        date=get_date_string(),
        query=raw_research_notes.query
        results=raw_research_notes.content
    )

    compressed_research = model_with_structured_output.invoke(prompt)

    # Create Tool message
    tool_output = [ToolMessage(
        content=compressed_research.context,
        name=last_research.name,
        tool_call_id=last_research.tool_call_id,
        query=last_research.query,
        sources=compressed_research.sources
    )]

    return{"research_notes": compressed_research, "supervisor_messages": tool_output}

In [13]:
# Routing functions
def should_continue(state: AudienceResearchState) -> Literal["tool_node", "__end__"]:
    """
    Determine whether to continue research or provide the final answer.

    Determines whether the agent should continue the research loop or provide
    a final answer based on whether the LLM made tool calls.

    Returns:
        "tool_node": Continue to tool execution
        "compress_research": Stop and compress research
    """
    messages = state.supervisor_messages
    last_message = messages[-1]

    # If the LLM makes a tool call, continue to tool execution
    if last_message.tool_calls:
        return "tool_node"

    return "__end__"

In [14]:
builder = StateGraph(AudienceResearchState)

builder.add_node("researcher", researcher_node)
builder.add_node("tool_node", tool_node)

builder.add_edge(START, "researcher")
builder.add_conditional_edges(
    "researcher",
    should_continue,
    {
        "tool_node": "tool_node",
        "__end__": END
    }
)
builder.add_edge("tool_node", "researcher")
builder.add_edge("researcher", END)

graph = builder.compile()

In [15]:
# Test state
test_state = AudienceResearchState(
    supervisor_messages=[],
    campaign=Campaign(
        goal=CampaignGoal.AWARENESS,
        platform=AdPlatform.INSTAGRAM_REELS,
    ),
    target_audience=TargetAudience(
        gender=Gender.ALL,
        location=Location.USA,
        income_range=IncomeRange.LOWER_MIDDLE,
        age_range=AgeRange(
            min_age=25,
            max_age=35
        )
    ),
    product=Product(
        name="Delisio - Your Personal Chef",
        description="Delisio is a personal chef and nutrition assistant. When a user signs up, they enter their age, weight, height, diet (e.g., vegan, keto), nutritional goal (e.g., weight loss, muscle gain), allergies, and equipment. Delisio then provides personalized recipes based on preferences and uses AI to personalize the user's food experience.",
        features={
            "Photo to Recipe": "User uploads a dish photo, specifies equipment and calorie preference, and Delisio generates a tailored recipe.",
            "Surprise Me": "User selects meal type, cuisine, and equipment, defines calorie goal; Delisio creates a personalized recipe (e.g., vegetarian Chinese breakfast).",
            "Nutrition Scanner": "User scans food; Delisio analyzes and reports nutrients, vitamins, and minerals."
        },
        supported_platforms=[ProductPlatform.ios, ProductPlatform.android]
    )
)

In [17]:
state = graph.invoke(test_state, {"recursion_limit": 100})

In [25]:
state["supervisor_messages"][3]

ToolMessage(content='{\'https://pmc.ncbi.nlm.nih.gov/articles/PMC9185183/\': \'This cross-sectional study examines socioeconomic and geographic factors associated with diet quality in US adults.\', \'https://www.sciencedirect.com/science/article/pii/S2161831322007062\': \'Themes identified in the foods chosen by young adults include inadequate fruit and vegetable consumption, choosing international flavors and food formats,\', \'https://www.sciencedirect.com/science/article/pii/S0022316622002917\': \'Participants were recruited between November 2017 and December 2017 into the Primary REasons For Eating Research (PREFER) study, an Internet-based survey that included a DCE, to understand meal preferences in young adults. Individuals were excluded for following a vegan or vegetarian diet because of _1_) potentially greater complexity of influences on food choices; _2_) the low prevalence of vegetarianism and veganism in Australia, as culturally meat-based meals are still the predominant m