In [None]:
!pip install langgraph langchain-openai==0.3.12 langchain_core langchain-community==0.3.21 tavily-python==0.5.4 youtube_search

In [None]:
import os
from dotenv import load_dotenv
from datetime import datetime
from langchain_openai import AzureChatOpenAI
from tavily import TavilyClient
load_dotenv()

# Initialize Azure OpenAI client
openai_api_base=os.getenv("AZURE_OPENAI_ENDPOINT")
openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION")
openai_api_key=os.getenv("AZURE_OPENAI_API_KEY")
deployment_model_name=os.getenv("AZURE_OPENAI_API_TYPE")
azure_model_name=os.getenv("MODEL_NAME")
openai_api_type=os.getenv("OPENAI_API_TYPE")

# llm Initialization
llm = AzureChatOpenAI(
deployment_name=deployment_model_name,
model_name=azure_model_name,
temperature=0.1,
max_tokens=4000,
timeout=None,
max_retries=5,)

# Initialize Tavily Web Search
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

In [None]:
from pathlib import Path
from typing import TypedDict, Literal, Dict, Any
import json
import ast
from datetime import datetime
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_community.tools import YouTubeSearchTool


# === Constants ===
LEVEL_OF_DETAIL = 3
TOPIC = "Agentic RAG"

# === Prompts ===
ds_prompt = SystemMessage(content="You are a Senior Data Scientist.")

strategy_prompt = SystemMessage(content=(
    "You are a Strategy expert. Focus on market trends, business implications, "
    "and strategic insights including some business KPIs. Use McKinsey style reports."
))

author_prompt = (
    "You are a senior consultant at McKinsey. Your task is to rewrite the following content into a polished, high-quality English report focused on the topic: '{topic}'. "
    "The report is structured into two main sections: 'Market Landscape and Trends' and 'Technical Architecture and Capabilities'. "
    "You must strictly preserve this structure and keep all information belonging to each section separate. Do not merge or shift content between sections. "
    "Enhance clarity, sentence structure, grammar, and logical flow *within each section only*. "
    "Ensure the language is professional, executive-level, and precise. Do not remove any content. If you find incomplete or unclear sentences, rewrite them while preserving their meaning. "
    "Format references that include URLs using Markdown like this: [source name](https://example.com). Do not invent references or add URLs unless they are already included. "
    "If any YouTube links are present, make sure to list them at the end of the report as 'Video 1', 'Video 2', etc., and hyperlink them accordingly. You shouldn't lose any reference while rewriting. "
    "\n\nHere is the content to refine:\n{report}"
)

# === State ===
class State(TypedDict):
    topic: str
    iteration: int
    level_of_detail: int
    ds_result: str
    ds_result_markdown: str
    strategy_result: str
    strategy_result_markdown: str
    report_path_md: str
    report: str

# === Helper Functions ===
def stringify_response(response: Any) -> str:
    if isinstance(response, list):
        return "\n\n".join(
            "\n".join(f"**{k}**: {v}" for k, v in item.items())
            for item in response if isinstance(item, dict)
        )
    elif isinstance(response, dict):
        return "\n".join(
            f"**{k}**: {json.dumps(v, indent=2) if isinstance(v, (dict, list)) else v}"
            for k, v in response.items()
        )
    return str(response)

def search_and_format(query: str, domains: list[str]) -> str:
    response = tavily.search(
        query=query,
        search_depth="advanced",
        include_answers=True,
        max_results=5,
        include_domains=domains
    )
    results = response.get("results") if isinstance(response, dict) else response
    return stringify_response(results)

# === Agent: Data Science ===
def call_ds_agent(state: State) -> State:
    messages = [ds_prompt]
    search_query = f"What are the recent scientific disciplines advancements on the topic : {state['topic']}"
    response_youtube = []

    if state["iteration"] == 1:
        domains = ["arxiv.org", "researchgate.net"]

    elif state["iteration"] == 2:
        domains = ["medium.com", "youtube.com"]
        youtube_tool = YouTubeSearchTool()
        response_youtube = youtube_tool.invoke(search_query)

        if isinstance(response_youtube, str):
            try:
                urls = ast.literal_eval(response_youtube)
            except Exception as e:
                print("❌ Failed to parse YouTube result:", e)
                urls = []

            response_youtube = [
                {
                    "title": f"YouTube Video {i + 1}",
                    "url": url,
                    "content": "Relevant video content from YouTube (no transcript available)"
                }
                for i, url in enumerate(urls)
                if isinstance(url, str) and url.startswith("http")
            ]
    else:
        domains = [
            "anthropic.com", "microsoft.com", "llama.com",
            "langchain.com", "openai.com", "aws.amazon.com"
        ]

    response = tavily.search(
        query=search_query,
        search_depth="advanced",
        include_answers=True,
        max_results=5,
        include_domains=domains
    )

    if isinstance(response, dict) and isinstance(response.get("results"), list):
        if isinstance(response_youtube, list):
            response["results"].extend(response_youtube)
        response_string = stringify_response(response["results"])
    elif isinstance(response, list):
        if isinstance(response_youtube, list):
            response.extend(response_youtube)
        response_string = stringify_response(response)
    else:
        response_string = stringify_response(response)

    state["ds_result"] = response_string

    messages.append(HumanMessage(
        content=(
            f"You are an advanced scientist writing a high-quality paragraph on the topic: '{state['topic']}'. "
            f"Your task is to synthesize the following bullet points into a single, cohesive, and professionally written paragraph. "
            f"Use a formal, analytical tone as found in scientific publications. Do not omit any information from the bullets — all points must be included. "
            f"The final output should be a continuous paragraph, without line breaks, bullet points, or lists — structured like a newspaper article. "
            f"If any references include valid URLs, format them as clickable Markdown hyperlinks like this: [source name](https://example.com). "
            f"Do not invent references or add any without a provided URL. If there are YouTube links, list them at the end of the paragraph as 'Video 1', 'Video 2', etc., and link each accordingly. "
            f"\n\nHere are the bullet points to rewrite into a paragraph:\n{state['ds_result']}"
        )
    ))

    result = llm.invoke(messages)
    state["ds_result_markdown"] += getattr(result, "content", "") + "\n\n\n"
    return state

# === Agent: Strategy ===
def call_strategy_agent(state: State) -> State:
    query = f"What are the latest strategy reports on the topic: {state['topic']}"

    domains_map = {
        1: ["pwc.com", "mckinsey.com"],
        2: ["gartner.com"],
        "default": ["deloitte.com", "bcg.com"]
    }
    domains = domains_map.get(state["iteration"], domains_map["default"])
    response_string = search_and_format(query, domains)

    state["strategy_result"] = response_string

    messages = [
        strategy_prompt,
        HumanMessage(content=(
            f"You focus on the topic: '{state['topic']}'. Generate a well-written long paragraph in the style of a McKinsey senior consultant "
            f"using the following content. Dont put url in text referencing hyperlinks should be strictly like this [deloitte](https://www2.deloitte.com/us/en/insights/topics/digital-transformation/data-integrity-in-ai-engineering.html)"
            f"Please include all these bullets:\n{response_string}\n\n"
            f"Do not use new lines. Write it like a continuous paragraph, in the style of a newspaper report."
        ))
    ]
    result = llm.invoke(messages)
    state["strategy_result_markdown"] += getattr(result, "content", "") + "\n\n\n"
    state["iteration"] += 1
    return state

# === Agent: Author ===
def call_author_agent(state: State) -> State:
    output_path = Path("output")
    output_path.mkdir(exist_ok=True)
    formatted_prompt = author_prompt.format(topic=state['topic'], report=state['report'])
    result = llm.invoke([HumanMessage(content=formatted_prompt)])

    report_path = output_path / "report.md"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write(getattr(result, "content", ""))

    state["report_path_md"] = str(report_path)
    return state

# === Edge Router ===
def route_by_iteration(state: State) -> Literal["ds_agent", "generate_report"]:
    return "generate_report" if state["iteration"] > state["level_of_detail"] else "ds_agent"

# === Report Generator ===
def generate_fancy_markdown_report(state: dict) -> dict:
    logo_and_title_html = """
<div style="display: flex; align-items: center; justify-content: flex-start; gap: 16px; padding: 12px 0 32px 0; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;">
  <h1 style="margin: 0; font-size: 48px; font-weight: 600; color: #fff;">
    Agents in Action: A Daily Brief
  </h1>
</div>
"""

    report = f"""{logo_and_title_html}

** Topic:** *{state['topic']}*  
** Generated on:** {datetime.now().strftime("%Y-%m-%d %H:%M")}*

---

## Market Landscape and Trends:

{state['strategy_result_markdown'].strip()}

---

## Technical Architecture and Capabilities

{state['ds_result_markdown'].strip()}

---
"""
    state["report"] = report
    return state

# === LangGraph Builder ===
builder = StateGraph(State)
builder.add_node("ds_agent", call_ds_agent)
builder.add_node("strategy_agent", call_strategy_agent)
builder.add_node("generate_report", generate_fancy_markdown_report)
builder.add_node("correct_report", call_author_agent)

builder.set_entry_point("ds_agent")
builder.add_edge("ds_agent", "strategy_agent")
builder.add_conditional_edges("strategy_agent", route_by_iteration)
builder.add_edge("generate_report", "correct_report")
builder.add_edge("correct_report", END)

graph = builder.compile()

# === Runner ===
if __name__ == "__main__":
    initial_state: State = {
        "topic": TOPIC,
        "iteration": 1,
        "level_of_detail": LEVEL_OF_DETAIL,
        "ds_result": "",
        "ds_result_markdown": "",
        "strategy_result": "",
        "strategy_result_markdown": "",
        "report_path_md": "",
        "report": ""
    }
    final_state = graph.invoke(initial_state)
    print("✅ Markdown report generated at:", final_state["report_path_md"])