<a href="https://colab.research.google.com/github/VegasCryptoAgent/AI-Agents/blob/main/Refined_Airflow_DAG_Real_Estate_Lead_Gen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# --- Imports ---
import pendulum
import os
import json
import logging
from typing import List, Dict, Any

# Airflow specific imports
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.exceptions import AirflowSkipException, AirflowFailException

# AI SDK imports - Ensure 'airflow-ai-sdk[openai,duckduckgo]' is installed
# Or install providers for other models you might use (e.g., anthropic, google-generativeai)
import airflow_ai_sdk as ai_sdk
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
# You might need to install tool dependencies: pip install duckduckgo-search

# --- Configuration ---
# IMPORTANT: Configure LLM API Keys via Airflow Connections!
# Example: Create an 'openai_default' connection in the Airflow UI (Admin -> Connections)
# The SDK will automatically look for relevant connections based on the model provider.
# Ensure necessary Python libraries for your chosen models are installed (e.g., 'openai')

# Define the output path for the report (adjust as needed for your environment)
# This path is relative to where the Airflow task executes. For production,
# consider writing to cloud storage (S3, GCS) or a database.
REPORT_OUTPUT_DIR = "/opt/airflow/reports" # Example path, ensure it exists/is writable by Airflow worker
REPORT_FILENAME = "las_vegas_real_estate_report.json"

# --- Define Pydantic Models for Structured Output ---
class ContentIdeas(ai_sdk.BaseModel):
    """Represents content ideas based on research."""
    blog_post_titles: List[str]
    social_media_snippets: List[str]
    target_keywords: List[str]

class ResearchReport(ai_sdk.BaseModel):
    """Structured report combining research and content ideas."""
    research_query: str
    research_summary: str
    content_ideas: ContentIdeas
    report_generated_at: str # ISO format timestamp

# --- Define AI Agents ---

# Agent for researching market trends
# This agent uses DuckDuckGo to search the web.
# Ensure the model name matches your Airflow Connection and installed libraries.
research_agent = Agent(
    model="gpt-4o-mini", # Example: Use OpenAI. Requires 'openai' library and connection.
    system_prompt="""
    You are a real estate market research assistant specializing in Las Vegas, Nevada.
    Your goal is to find the latest (within the past month) news, articles, and reliable discussions
    about the Las Vegas housing market. Focus on trends, prices, inventory levels, popular neighborhoods
    (like Summerlin, Henderson, Downtown), and key reasons people are moving to/from the area.
    Use the provided search tool efficiently. Synthesize the findings into a concise,
    informative summary (approx. 200-300 words) suitable for informing content strategy.
    """,
    tools=[duckduckgo_search_tool()],
    # Optional: Add parameters like temperature=0.7 if needed
)

# --- Airflow DAG Definition ---

@dag(
    schedule="@weekly", # Run once a week (adjust schedule as needed)
    start_date=pendulum.datetime(2025, 4, 1, tz="UTC"), # Use a relevant start date
    catchup=False, # Don't run for past missed schedules
    tags=["ai", "real_estate", "lead_gen", "las_vegas"],
    doc_md="""
    ### Las Vegas Real Estate Lead Generation DAG

    This DAG uses AI agents orchestrated by Airflow to:
    1.  **Research Market:** Uses an AI agent with web search capabilities (`@task.agent`)
        to gather current information about the Las Vegas real estate market.
    2.  **Generate Content Ideas:** Uses an LLM task (`@task.llm`) to brainstorm blog titles,
        social media snippets, and keywords based on the research.
    3.  **Report Findings:** Saves the combined research and content ideas into a JSON file.

    **Configuration:**
    * Requires Airflow Connections for the chosen LLM provider (e.g., `openai_default`).
    * Ensure necessary Python libraries (`airflow-ai-sdk`, LLM providers, tools) are installed.
    * The output report is saved locally by default; modify `report_findings` for production storage.
    """,
    params={ # Allow manual trigger with custom query
        "research_query_override": Param(
            type=["null", "string"],
            default=None,
            description="Optional: Specify a custom research query instead of the default.",
        ),
        "output_filename_override": Param(
            type=["null", "string"],
            default=None,
            description="Optional: Specify a custom output filename (e.g., 'report_YYYYMMDD.json').",
        )
     }
)
def real_estate_lead_gen_dag_refined():
    """
    Airflow DAG for Las Vegas Real Estate Market Research and Content Idea Generation.
    """

    @task.agent(agent=research_agent, result_type=str)
    def research_las_vegas_market(dag_run: DagRun = None) -> str:
        """
        Task to perform web research on the Las Vegas real estate market.
        Uses the research_agent defined above. Handles basic errors.
        """
        # Determine the query
        query = dag_run.conf.get("research_query_override") if dag_run else None
        if not query:
            query = "Las Vegas real estate market trends, news, and moving statistics recent month"

        logging.info(f"Researching market with query: {query}")

        try:
            # The agent execution happens here. The SDK handles the call.
            # The function just needs to return the input for the agent.
            return query
        except Exception as e:
            logging.error(f"Error during agent execution for query '{query}': {e}", exc_info=True)
            # Fail the task explicitly on error
            raise AirflowFailException(f"Agent execution failed: {e}")

    @task.llm(
        model="gpt-4o-mini", # Example: Use OpenAI. Requires 'openai' library and connection.
        result_type=ContentIdeas, # Expect structured output defined by the Pydantic model
        system_prompt="""
        You are a creative content strategist for a Las Vegas real estate agent.
        Based on the provided market research summary, generate engaging and SEO-friendly content ideas
        to attract potential buyers and sellers interested in the Las Vegas area.

        Produce:
        - 3-5 catchy and relevant blog post titles.
        - 3-5 short, engaging social media snippets (max 280 chars each) related to the findings.
        - A list of 5-10 relevant keywords (including long-tail keywords) to target.

        Ensure the ideas are directly inspired by the research summary provided.
        Format the output strictly according to the 'ContentIdeas' structure.
        """,
    )
    def generate_content_ideas_from_research(research_summary: str) -> str:
        """
        Task to generate content ideas using an LLM based on the research summary.
        Handles basic errors and validates input.
        """
        if not research_summary or len(research_summary) < 50: # Basic validation
             logging.warning("Research summary seems too short or empty. Skipping content generation.")
             raise AirflowSkipException("Input research summary is insufficient.")

        logging.info("Generating content ideas based on research summary...")
        try:
            # The LLM call happens here. The SDK handles the call.
            # The function just needs to return the input for the LLM.
            return research_summary
        except Exception as e:
            logging.error(f"Error during LLM execution for content generation: {e}", exc_info=True)
            # Fail the task explicitly on error
            raise AirflowFailException(f"LLM execution for content ideas failed: {e}")

    @task
    def report_findings(research_query: str, research_summary: str, ideas: Dict[str, Any], dag_run: DagRun = None):
        """
        Task to consolidate research and ideas into a structured report (JSON file).

        Args:
            research_query: The query used for the research task.
            research_summary: The summary text generated by the research agent.
            ideas: The dictionary representation of the ContentIdeas object from the LLM task.
                   Airflow passes Pydantic models as dicts between tasks by default.
            dag_run: The current DAG run object to access configuration.
        """
        logging.info("Consolidating research and content ideas into a report.")

        try:
            # Re-construct the Pydantic model from the dictionary for validation/structure
            content_ideas_obj = ContentIdeas(**ideas)

            # Get current timestamp
            generation_time = pendulum.now(tz="UTC").isoformat()

            # Create the final report object
            report_data = ResearchReport(
                research_query=research_query,
                research_summary=research_summary,
                content_ideas=content_ideas_obj,
                report_generated_at=generation_time
            )

            # Determine output filename
            base_filename = dag_run.conf.get("output_filename_override") if dag_run else None
            if not base_filename:
                 base_filename = REPORT_FILENAME
            output_path = os.path.join(REPORT_OUTPUT_DIR, base_filename)

            # Ensure output directory exists
            os.makedirs(REPORT_OUTPUT_DIR, exist_ok=True)
            logging.info(f"Ensured report directory exists: {REPORT_OUTPUT_DIR}")

            # Write report data to JSON file
            # In production: Replace this with writing to a database, cloud storage (S3/GCS), or API.
            with open(output_path, 'w') as f:
                # Use .model_dump_json() for Pydantic v2+
                json.dump(report_data.model_dump(mode='json'), f, indent=4)

            logging.info(f"Successfully saved report to {output_path}")

        except Exception as e:
            logging.error(f"Error during report generation or saving: {e}", exc_info=True)
            # Fail the task explicitly on error
            raise AirflowFailException(f"Report generation/saving failed: {e}")


    # --- Define Task Dependencies ---
    # research_task_output holds the 'query' string returned by research_las_vegas_market
    # but the actual agent result (summary) is implicitly passed via XComs by the SDK
    research_task_output = research_las_vegas_market()

    # The SDK automatically pushes the result of research_las_vegas_market (the summary string)
    # via XComs and makes it available as input to the next task.
    ideas_task_output = generate_content_ideas_from_research(research_summary=research_task_output)

    # Pass the original query and the generated ideas (as dict) to the reporting task.
    # The research_summary is implicitly passed via XComs from research_las_vegas_market.
    report_findings(
        research_query=research_task_output, # Pass the query explicitly
        research_summary=research_task_output, # Pass the summary explicitly
        ideas=ideas_task_output # Pass the ideas dict explicitly
    )

# Instantiate the DAG
real_estate_lead_gen_dag_refined()