In [51]:
print("⏳ Installing required libraries...")
!pip install google-genai pydantic rich python-docx pypandoc --quiet
print("✅ Installation complete.")


⏳ Installing required libraries...
✅ Installation complete.


In [52]:
import asyncio
import json
import traceback
import os
import re
import requests
from google.colab import files

import datetime

import pypandoc



import google.genai as genai

from google.genai.types import Tool
from google.genai import types


from typing import Any, List, Dict, Optional

from pydantic import BaseModel, Field
from rich import print as rich_print

from google.colab import userdata

from typing import Any, Dict, Optional
from pydantic import BaseModel
from rich import print as rich_print

from IPython.display import display, Markdown




In [53]:
class PlanStep(BaseModel):
    step_id: int
    agent_name: str
    description: str

class ExecutionPlan(BaseModel):
    query: str
    steps: List[PlanStep]

class PlanningAgent(BaseModel):
    client: Any
    model_name: str
    debug: bool = False

    class Config:
        arbitrary_types_allowed = True

    def create_plan(self, query: str) -> ExecutionPlan:
        """Creates a static execution plan for research."""
        if self.debug:
            rich_print("🤔 [bold cyan]Planning Phase:[/bold cyan] Creating execution plan...")

        steps = [
            PlanStep(step_id=1, agent_name="QueryAnalysisAgent", description="Analyze the user query to identify key research topics."),
            PlanStep(step_id=2, agent_name="DataGatherAgent", description="Conduct web searches on the identified topics using Google Search."),
            PlanStep(step_id=3, agent_name="ReportAgent", description="Synthesize search results into a comprehensive analysis report with citations."),
        ]

        plan = ExecutionPlan(query=query, steps=steps)

        if self.debug:
            rich_print(f"✅ [bold cyan]Plan Created:[/bold cyan] {len(steps)} steps defined.")

        return plan

In [54]:
class QueryEntities(BaseModel):
    research_topics: List[str] = Field(description="A list of key technical topics and concepts to research based on the user's query.")
    main_subject: str = Field(description="The primary subject of the analysis, e.g., 'Aircraft Routers'.")

class QueryAnalysisAgent(BaseModel):
    client: Any
    model_name: str
    debug: bool = False

    class Config:
        arbitrary_types_allowed = True

    def analyze(self, query: str) -> Dict:
        """Extracts key research topics from the user's query using the LLM."""
        if self.debug:
            rich_print(f"🔎 [bold yellow]Query Analysis Phase:[/bold yellow] Analyzing query: '{query}'")

        prompt = f"""
        From the following user query, extract the key technical topics that need to be researched to provide a comprehensive answer.
        Also, identify the main subject or application area.

        User Query: "{query}"

        Provide the output as a valid JSON object with two keys: "research_topics" (a list of strings) and "main_subject" (a string).
        """

        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt,
            )

            llm_output = response.text

            # **Extract JSON using regex:**
            match = re.search(r"\{.*\}", llm_output, re.DOTALL)  # Find JSON object
            if match:
                json_string = match.group(0)  # Extract the matched JSON string
                print("Extracted JSON:", json_string) # Debugging
                try:
                  entities = QueryEntities.model_validate_json(json_string)  # Use model_validate_json

                  if self.debug:
                      rich_print(f"✅ [bold yellow]Query Analyzed. Topics:[/bold yellow] {entities.research_topics}")

                  return {"status": "success", "entities": entities.model_dump()}
                except Exception as e:
                  rich_print(f"💥 [bold red]Error after Regex:[/bold red] {e}")
                  return {"status": "error", "entities": {"research_topics": [query], "main_subject": "general analysis"}}


            else:
                rich_print(f"💥 [bold red]No JSON found in LLM output.[/bold red]")
                return {"status": "error", "entities": {"research_topics": [query], "main_subject": "general analysis"}}


        except Exception as e:
            rich_print(f"💥 [bold red]Error in Query Analysis:[/bold red] {e}")
            # Fallback with a generic topic list
            return {"status": "error", "entities": {"research_topics": [query], "main_subject": "general analysis"}}

In [55]:

class SearchResult(BaseModel):
    topic: str
    summary: str
    sources: List[Dict[str, str]]

class DataGatherAgent(BaseModel):
    client: Any
    model_name: str
    debug: bool = False

    class Config:
        arbitrary_types_allowed = True

    async def _search_topic(self, topic: str) -> SearchResult:
        """Performs a grounded search for a single topic."""
        if self.debug:
            rich_print(f"🌐 [bold blue]Data Gathering:[/bold blue] Searching for '{topic}'...")

        prompt = f"Provide a detailed summary of the topic: {topic}. Include key technical details, recent developments, and comparisons where relevant. Base your answer entirely on the provided search results. Provide the original uri for the reference"

        try:
            search_tool = types.Tool(
                  google_search=types.GoogleSearch()
            )

            # Configure generation settings
            config = types.GenerateContentConfig(
              tools=[search_tool]
            )


            response = await self.client.aio.models.generate_content(
                model=self.model_name,
               contents=prompt,
                config=config
            )

            supports = response.candidates[0].grounding_metadata.grounding_supports
            chunks = response.candidates[0].grounding_metadata.grounding_chunks

            sources = []
            if hasattr(response.candidates[0], 'grounding_metadata') and response.candidates[0].grounding_metadata and hasattr(response.candidates[0].grounding_metadata, 'grounding_chunks'):
                for chunk in response.candidates[0].grounding_metadata.grounding_chunks:
                    if hasattr(chunk, 'web') and hasattr(chunk.web, 'uri') and hasattr(chunk.web, 'title'):
                        sources.append({"uri": chunk.web.uri, "title": chunk.web.title})


            summary = response.text
            if self.debug:
                rich_print(f"✅ [bold blue]Found {len(sources)} sources for '{topic}'.[/bold blue]")

            return SearchResult(topic=topic, summary=summary, sources=sources)

        except Exception as e:
            rich_print(f"💥 [bold red]Search failed for topic '{topic}':[/bold red] {e}")
            import traceback
            traceback.print_exc()
            return SearchResult(topic=topic, summary=f"Failed to retrieve information for this topic.", sources=[])

    async def search(self, topics: List[str]) -> List[SearchResult]:
        """Concurrently searches for multiple topics."""
        tasks = [self._search_topic(topic) for topic in topics]
        results = await asyncio.gather(*tasks)
        return [result.model_dump() for result in results]


In [56]:
class Report(BaseModel):
    title: str
    executive_summary: str
    sections: Dict[str, str]
    citations: List[Dict[str, str]]
    full_report_markdown: str

class ReportAgent(BaseModel):
    client: Any
    model_name: str
    debug: bool = False

    class Config:
        arbitrary_types_allowed = True

    async def generate_report(self, query: str, search_results: List[Dict]) -> Report:
        """Generates a comprehensive report from search results."""
        if self.debug:
            rich_print("✍️ [bold magenta]Report Generation Phase:[/bold magenta] Synthesizing search results...")

        # Prepare the context for the LLM
        context = "SEARCH RESULTS:\n\n"
        citations = []
        citation_map = {}
        for i, result in enumerate(search_results):
            context += f"--- Source Group {i+1} (Topic: {result['topic']}) ---\n"
            context += f"Summary: {result['summary']}\n"
            context += "Referenced Documents:\n"
            for source in result['sources']:
                if source['uri'] not in citation_map:
                    citation_index = len(citations) + 1
                    citations.append({"index": str(citation_index), "title": source['title'], "uri": source['uri']})
                    citation_map[source['uri']] = citation_index
                else:
                    citation_index = citation_map[source['uri']]
                context += f"- [{citation_index}] {source['title']} ({source['uri']})\n"
            context += "\n"


        prompt = f"""
        You are a world-class technical and research analyst specializing in avionics, aerospace industry, .
        Your task is to write a detailed competitive analysis report based *only* on the provided search results.

        **User's Original Request:** "{query}"

        **Your Task:**
            1.  Write a comprehensive report that addresses the user's request.
            2.  Structure the report with a title, an executive summary, and multiple detailed sections (e.g.,
                "Introduction",
                "Technology Landscape Overview",
                "Research Papers in this Area",
                "Emerging and Macro Trends",
                "Market Trends and Benchmarking",
                "Competitive Landscape and Offerings",
                "Key Challenges",
                "Existing products in the market",
                "Competitive Differentiators",
                "Strategic Recommendations for Staying Ahead",
                "Conclusion and Future Outlook").
            3.  **Crucially, you must cite every claim you make.** Use the citation numbers provided in the search results (e.g., [1], [2], etc.).
            4.  Do not invent any information. All analysis must be directly supported by the provided context.
            5.  The tone should be professional, objective, and technical.
            6.  Restrict all the analysis to Avionics and Aerospace industry *only*.
            7.  Report format should be a valid JSON string with required escape encoding


            **Specifically for your analysis, you must:**
            -   **Understand Competitors:** Detail what key competitors do and what their specific product offerings are in the market.
            -   **Understand Market:** Analyze the current trends in the market and benchmark against the identified competitors.
            -   **Bullet out the Differences:** Clearly articulate the differences between technologies, products, or approaches in a concise, bulleted format.
            -   **Strategize to Stay Ahead:** Provide actionable strategies and recommendations to maintain a competitive edge.


        **Provided Context (Search Results):**
        {context}

        **Output Format:**
        Provide your response as a single, valid JSON object with the following keys:
        - "title": A string for the report title.
        - "executive_summary": A string for the summary.
        - "sections": A valid JSON object where each key is a section title (string) and the value is the section's content (string in Markdown format).
        """

        try:
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt,
            )
            llm_output = response.text #get the response from the model.
            print (llm_output) #For debugging

            # 1. Remove Known Prefixes
            llm_output = llm_output.replace("Here's the JSON:", "").strip()
            llm_output = llm_output.replace("Okay, I will provide the results in JSON format:", "").strip()
            llm_output = llm_output.replace("As requested, here's the report in JSON format:", "").strip()


            # 2. Extract JSON with more lenient regex:
            match = re.search(r"\{[\s\S]*\}", llm_output)

            if match:
                json_string = match.group(0)

                try:
                   report_data = json.loads(json_string) #parse the JSON

                   # Assemble the full markdown report
                   full_markdown = f"# {report_data['title']}\n\n"
                   full_markdown += f"## Executive Summary\n\n{report_data['executive_summary']}\n\n"
                   for title, content in report_data['sections'].items():
                       full_markdown += f"## {title}\n\n{content}\n\n"

                   full_markdown += "## Sources\n\n"
                   for cit in sorted(citations, key=lambda x: x['index']):
                       full_markdown += f"[{cit['index']}] {cit['title']}: {cit['uri']}\n"

                   report_data['full_report_markdown'] = full_markdown
                   report_data['citations'] = citations

                   final_report = Report.model_validate(report_data)

                   if self.debug:
                       rich_print("✅ [bold magenta]Report Generated Successfully.[/bold magenta]")

                   return final_report.model_dump()

                except json.JSONDecodeError as e:
                  rich_print(f"💥 [bold red]Error Decoding JSON:[/bold red] {e}")
                  #Attempt to return some sort of report with an error message if unable to extract json
                  return {"title": "Report generation failure", "executive_summary": "Report generation failure due to llm json error, check console output", "sections": {}, "citations": [], "full_report_markdown": "","error" : str(e)} #return the report
                except Exception as e:
                  rich_print(f"💥 [bold red]Error processing the report:[/bold red] {e}")
                  return {"title": "Report generation failure", "executive_summary": "Report generation failure due to other llm error, check console output", "sections": {}, "citations": [], "full_report_markdown": "","error" : str(e)} #return the report

            else:
              rich_print(f"💥 [bold red]No JSON found in LLM output.[/bold red]")
              return {"title": "Report generation failure", "executive_summary": "Report generation failure due to no json found in llm output, check console output", "sections": {}, "citations": [], "full_report_markdown": "","error" : "no json found in output"} #return the report

        except Exception as e:
            rich_print(f"💥 [bold red]Error during report generation:[/bold red] {e}")
            raise


In [57]:
class ExecutionAgent(BaseModel):
    client: Any
    model_name: str
    debug: bool = False
    stage_output: bool = False

    class Config:
        arbitrary_types_allowed = True

    async def execute(self, query: str) -> Optional[Dict]:
        """Executes the full analysis pipeline for 3GPP research."""
        try:
            rich_print(f"▶️ [bold green]Starting Execution for query:[/bold green] '{query}'")

            # 1. Planning Phase
            planning_agent = PlanningAgent(
                client=self.client,
                model_name=self.model_name,
                debug=self.debug
            )
            plan = planning_agent.create_plan(query)
            if self.stage_output: rich_print(plan)

            # 2. Query Analysis
            query_agent = QueryAnalysisAgent(
                client=self.client,
                model_name=self.model_name,
                debug=self.debug
            )
            analyzed_query = query_agent.analyze(query)
            if self.stage_output: rich_print(analyzed_query)

            # 3. Data Gathering (via Web Search)
            data_agent = DataGatherAgent(
                client=self.client,
                model_name=self.model_name,
                debug=self.debug
            )
            gathered_data = await data_agent.search(
                topics=analyzed_query['entities']['research_topics']
            )
            if self.stage_output: rich_print(gathered_data)

            # 4. Report Generation
            report_agent = ReportAgent(
                client=self.client,
                model_name=self.model_name,
                debug=self.debug
            )
            report = await report_agent.generate_report(
                query=query,
                search_results=gathered_data
            )
            if self.stage_output: rich_print(report)

            return {"report": report}

        except Exception as e:
            rich_print(f"💥 [bold red]Error during execution:[/bold red] {e}")
            import traceback
            traceback.print_exc()
            return None

    @classmethod
    def create(cls, client: Any, model_name: str, debug: bool = False, stage_output: bool = False) -> "ExecutionAgent":
        return cls(
            client=client,
            model_name=model_name,
            debug=debug,
            stage_output=stage_output
        )

In [58]:
def generate_filename(
    user_input: str,
    extension: str,
    add_timestamp: bool = True,
    max_length: int = 60,
    default_name: str = "document"
) -> str:
    """
    Generates a robust, safe, and unique filename from user input.

    - Handles empty or whitespace-only input.
    - Converts to lowercase.
    - Replaces spaces and invalid characters with hyphens (a "slugify" approach).
    - Truncates the name to a safe length.
    - Optionally adds a timestamp to ensure uniqueness.

    Args:
        user_input: The string to convert into a filename.
        extension: The file extension (e.g., "txt", "md").
        add_timestamp: If True, appends a YYYYMMDD-HHMMSS timestamp.
        max_length: The maximum character length for the base name part.
        default_name: The name to use if user_input is empty.

    Returns:
        A robust, safe, and unique filename string.
    """
    # 1. Handle empty or whitespace-only input
    base_name = user_input.strip()
    if not base_name:
        base_name = default_name

    # 2. Convert to lowercase and "slugify"
    # Replace any character that is not a letter, number, or whitespace with a space
    base_name = re.sub(r'[^\w\s-]', ' ', base_name.lower())
    # Replace one or more whitespace characters or hyphens with a single hyphen
    base_name = re.sub(r'[\s-]+', '-', base_name).strip('-')

    # 3. Truncate to a max length
    base_name = base_name[:max_length].strip('-')

    # 4. Get timestamp if requested
    timestamp_str = ""
    if add_timestamp:
        timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        timestamp_str = f"_{timestamp}"

    # 5. Clean up extension and assemble the final name
    clean_extension = extension.lstrip('.')

    return f"{base_name}{timestamp_str}.{clean_extension}"


In [59]:
    def save_as_docx_1(markdown_string: str, filename: str):
        """Converts Markdown string to DOCX and saves it using python-docx."""
        try:
            from docx import Document
            from docx.shared import Inches
        except ImportError:
            rich_print("   [red]Skipping DOCX generation: python-docx library not installed.[/red]")
            return None

        try:
            doc = Document()
            lines = markdown_string.split('\n')
            current_paragraph = None

            for line in lines:
                if line.startswith('# '):
                    if current_paragraph: doc.add_paragraph()
                    p = doc.add_paragraph()
                    p.add_run(line[2:]).bold = True
                    p.add_run(" (Heading 1)").italic = True
                elif line.startswith('## '):
                    if current_paragraph: doc.add_paragraph()
                    p = doc.add_paragraph()
                    p.add_run(line[3:]).bold = True
                    p.add_run(" (Heading 2)").italic = True
                elif line.startswith('- ['):
                    match = re.match(r'- \[(.*?)\] (.*?)\((.*?)\)', line)
                    if match:
                        index, title, uri = match.groups()
                        if current_paragraph: doc.add_paragraph()
                        p = doc.add_paragraph()
                        p.add_run(f"[{index}] ").italic = True
                        p.add_run(f"{title}: {uri}").italic = True
                    else:
                        if current_paragraph: doc.add_paragraph()
                        p = doc.add_paragraph(line)
                elif line.strip() == "":
                    if current_paragraph: doc.add_paragraph()
                else:
                    if current_paragraph is None or current_paragraph.text.strip() != "":
                         p = doc.add_paragraph()
                         run = p.add_run(line)
                         current_paragraph = p
                    else:
                        current_paragraph.add_run(" " + line)

            doc.save(filename)
            rich_print(f"   [green]Successfully saved DOCX: {filename}[/green]")
            return filename
        except Exception as e:
            rich_print(f"   [red]Error saving DOCX file:[/red] {e}")
            return None


In [60]:
def save_as_docx(markdown_string: str, filename: str):
    """Converts Markdown string to DOCX using pypandoc for proper formatting."""
    if pypandoc is None:
        rich_print("   [red]Skipping DOCX generation: pypandoc library not installed or pandoc not found.[/red]")
        return None

    try:
        pypandoc.convert_text(
            markdown_string,
            'docx',          # Output format
            format='md',     # Input format
            outputfile=filename,
        )
        rich_print(f"   [green]Successfully saved DOCX: {filename}[/green]")
        return filename
    except Exception as e:
        rich_print(f"   [red]Error saving DOCX file with pypandoc:[/red] {e}")
        return None


In [61]:
async def main(model, query):
    """The main function to run the agentic workflow."""
    # MODEL_NAME = "gemini-2.5-flash"
    # USER_QUERY = "3GPP 5G and 6G integration for aviation, specifically for aircraft routers."
    MODEL_NAME=model
    USER_QUERY=query

    display(Markdown(f"## 🚀 Kicking off research for: '{USER_QUERY}'"))

    try:
        client = genai.Client(api_key="AIzaSyBKYSHvUDhD2tcKgPxMJYkNuUhZp8qSOrc")
        agent = ExecutionAgent.create(
            client=client,
            model_name=MODEL_NAME,
            debug=True,
            stage_output=False
        )
        result = await agent.execute(query=USER_QUERY)

        file_name=generate_filename(query, "docx",True,40,"Untitled")
        display(Markdown("--- \n ## 📄 Final Report"))
        if result and result.get("report"):
            final_report=result["report"]["full_report_markdown"]
            doc_file_name=save_as_docx(final_report,file_name)

            files.download(doc_file_name)

            display(Markdown(final_report))
        else:
            display(Markdown("### ❌ Report generation failed. Please check the logs above for errors."))

    except Exception as e:
        rich_print(f"💥 [bold red]A critical error occurred in the main execution block:[/bold red] {e}")
        traceback.print_exc()



In [None]:
    user_query_from_user = input("Enter your query for the report: ").strip()
    if not user_query_from_user:
        user_query_from_user = "Provide the competitive analysis of avionics router for cabin"

    try:
        await main("gemini-2.5-flash", user_query_from_user)

    except NameError:
        print("\n--- Failed to perform the requested operation ---")


