In [None]:
!pip install langchain
!pip install langchain_groq
!pip install langchain_community
!pip install langgraph
!pip install arxiv
!pip install tavily_tool

Collecting langchain_groq
  Downloading langchain_groq-0.3.1-py3-none-any.whl.metadata (2.6 kB)
Collecting groq<1,>=0.4.1 (from langchain_groq)
  Downloading groq-0.20.0-py3-none-any.whl.metadata (15 kB)
Downloading langchain_groq-0.3.1-py3-none-any.whl (15 kB)
Downloading groq-0.20.0-py3-none-any.whl (124 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m124.9/124.9 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: groq, langchain_groq
Successfully installed groq-0.20.0 langchain_groq-0.3.1
Collecting langchain_community
  Downloading langchain_community-0.3.20-py3-none-any.whl.metadata (2.4 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain_community)
  Downloading pydantic_settings-2.8.1-py3-none-any.whl.metadata (3.5 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain_community)
  Do

In [None]:
from google.colab import userdata
groq_api_key=userdata.get('GROQ_API_KEY')
tavily_api_key=userdata.get('TAVILY_API_KEY')

In [None]:
import asyncio
import ast
from typing import List, Dict, Any, Optional

from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.language_models import BaseLanguageModel
from langchain_community.tools import ArxivQueryRun, TavilySearchResults
from langchain_core.tools import BaseTool
from langgraph.graph import StateGraph, END
from langchain_groq import ChatGroq

class ResearchTask(BaseModel):
    """Represents a single research task"""
    task_id: str
    description: str
    priority: int = Field(ge=1, le=5)
    status: str = "pending"
    result: Optional[str] = None
    validation_notes: Optional[str] = None
    tool_results: Dict[str, str] = {}
    llm_prompt: Optional[str] = None

class ResearchState(BaseModel):
    """Represents the state of the research process"""
    query: str
    tasks: List[ResearchTask] = Field(default_factory=list)
    completed_tasks: List[ResearchTask] = Field(default_factory=list)
    current_results: Dict[str, Any] = Field(default_factory=dict)
    final_answer: Optional[str] = None
    messages: List[Dict[str, Any]] = Field(default_factory=list)

class TaskPlan(BaseModel):
    """Output schema for the planner component"""
    tasks: List[ResearchTask] = Field(description="List of research tasks to accomplish")
    reasoning: str = Field(description="Explanation of how these tasks will help answer the query")

class ResearchLangGraphAgent:
    def __init__(self,
                 llm: Optional[BaseLanguageModel] = None
                 ):
        """
        Initialize the Research Agent with LangGraph

        Args:
            llm: Language model for generation (defaults to Groq)
        """
        self.llm = ChatGroq(
            model='llama3-70b-8192',
            api_key=groq_api_key,
            temperature=0.3
        )

        self.arxiv_tool = ArxivQueryRun()
        self.tavily_tool = TavilySearchResults(tavily_api_key=tavily_api_key)

        self.tools = [self.arxiv_tool, self.tavily_tool]

    def select_tool(self, task_description: str) -> BaseTool:
        """
        Select appropriate tool based on task description

        Args:
            task_description: Description of the research task

        Returns:
            Selected research tool
        """
        task_lower = task_description.lower()
        if "literature review" in task_lower or "arxiv" in task_lower:
            return self.arxiv_tool
        return self.tavily_tool

    def simple_llm_call(self, prompt: str, system_prompt: Optional[str] = None) -> str:
        """Async LLM call for generating content"""
        messages = []
        if system_prompt:
            messages.append(SystemMessage(content=system_prompt))

        messages.append(HumanMessage(content=prompt))
        response = self.llm.invoke(messages)
        return response.content

    def analyze_task_relationships_with_llm(self, tasks: List[ResearchTask]) -> Dict[int, List[str]]:
        """
        Analyze relationships between tasks using an LLM to identify potential dependencies and prioritize tasks.

        Args:
            tasks: List of research tasks

        Returns:
            Dictionary mapping priority levels to lists of task_ids
        """

        task_descriptions = "\n".join([f"Task {task.task_id}: {task.description}" for task in tasks])
        prompt = (
            f"Here are some tasks:\n\n{task_descriptions}\n\n"
            "Analyze the relationships between the tasks to identify dependencies. "
            "Group tasks that can run in parallel into the same list. Order the groups such that tasks in a later group depend on tasks in earlier groups. "
            "Return a dictionary where the key is the priority level (starting from 1), and the value is a list of task IDs that can run in parallel."
        )

        try:
            response = self.simple_llm_call(
                prompt=prompt,
                system_prompt="You are an expert task dependency analyzer. Provide a structured output of task priorities and parallel execution groups."
            )

            start = response.find("{")
            end = response.rfind("}") + 1

            if start != -1 and end != -1:
                task_relationships_str = response[start:end]
                task_relationships = ast.literal_eval(task_relationships_str)
                if isinstance(task_relationships, dict):
                    return task_relationships
                else:
                    raise ValueError("Extracted data is not in the expected dictionary format.")
            else:
                raise ValueError("No dictionary found in the response.")
        except Exception as e:
            print(f"Task relationship analysis failed: {e}")
            return {priority: [task.task_id for task in tasks if task.priority == priority]
                    for priority in sorted(set(task.priority for task in tasks))}

    async def generate_research_tasks(self, state: ResearchState) -> ResearchState:
      """
      Generate research tasks based on the original query

      Args:
          state: Current research state

      Returns:
          Updated research state with tasks
      """
      task_generation_prompt = f"""
      Generate a list of research tasks for the following query:
      "{state.query}"

      Requirements:
      - Create 3-4 specific, actionable research tasks
      - Each task should contribute to answering the main query
      - Provide a task ID, clear description, and initial priority

      Output format:
      {{
          "tasks": [
              {{
                  "task_id": "T1",
                  "description": "...",
                  "priority": 1
              }},
              ...
          ],
          "reasoning": "Explanation of task selection"
      }}
      """

      response = self.llm.invoke([
          SystemMessage(content="You are an expert research task generator. Always respond with a JSON block."),
          HumanMessage(content=task_generation_prompt)
      ])

      try:
          import re
          json_match = re.search(r'```json?\n(.*?)```', response.content, re.DOTALL)

          if json_match:
              json_str = json_match.group(1).strip()
          else:
              # If no code block, try to extract the entire JSON structure
              json_match = re.search(r'\{.*\}', response.content, re.DOTALL)
              if json_match:
                  json_str = json_match.group(0).strip()
              else:
                  raise ValueError("No JSON structure found in the response")

          import json
          parsed_json = json.loads(json_str)

          task_plan = TaskPlan.model_validate(parsed_json)

          state.tasks = task_plan.tasks
          state.messages.append({
              "role": "system",
              "content": f"Task Generation Reasoning: {task_plan.reasoning}"
          })

          try:
              task_relationships = self.analyze_task_relationships_with_llm(state.tasks)

              for priority, task_ids in task_relationships.items():
                  for task in state.tasks:
                      if task.task_id in task_ids:
                          task.priority = priority

          except Exception as e:
              print(f"Task relationship analysis failed: {e}")

      except Exception as e:
          print(f"Task generation error: {e}")

      return state

    async def execute_research_task(self, state: ResearchState) -> ResearchState:
        """
        Execute research tasks in parallel within priority groups

        Args:
            state: Current research state

        Returns:
            Updated research state with task results
        """
        priority_groups = {}
        for task in state.tasks.copy():
            if task.priority not in priority_groups:
                priority_groups[task.priority] = []
            priority_groups[task.priority].append(task)

        for priority in sorted(priority_groups.keys()):
            current_tasks = priority_groups[priority]

            task_results = await asyncio.gather(
                *[self._execute_single_task(task) for task in current_tasks]
            )

            for completed_task in task_results:
                state.completed_tasks.append(completed_task)
                if completed_task in state.tasks:
                    state.tasks.remove(completed_task)

        return state

    async def _execute_single_task(self, task: ResearchTask) -> ResearchTask:
        """
        Execute a single research task

        Args:
            task: Research task to execute

        Returns:
            Updated research task with results
        """
        try:
            tool = self.select_tool(task.description)

            tool_result = tool.run(task.description)

            task.tool_results = {"result": str(tool_result)}

            summary_prompt = f"""
            Summarize the research findings for the task:
            "{task.description}"

            Research Data:
            {tool_result}

            Provide a concise summary with key insights and implications.
            """

            summary = self.simple_llm_call(
                prompt=summary_prompt,
                system_prompt="You are an expert research summarizer."
            )

            task.result = summary
            task.status = "completed"

        except Exception as e:
            task.status = "failed"
            task.validation_notes = str(e)

        return task

    async def synthesize_final_report(self, state: ResearchState) -> ResearchState:
        """
        Synthesize final research report from completed tasks

        Args:
            state: Current research state

        Returns:
            Updated research state with final report
        """
        if not state.completed_tasks:
            state.final_answer = "No research findings available."
            return state

        synthesis_prompt = f"""
        Original Query: {state.query}

        Research Findings:
        {chr(10).join([f"Task {task.task_id}: {task.result}" for task in state.completed_tasks])}

        Synthesize these findings into a comprehensive, well-structured report.
        Include key insights, potential implications, and a direct answer to the original query.
        """

        final_report = self.simple_llm_call(
            prompt=synthesis_prompt,
            system_prompt="You are an expert research synthesizer."
        )

        state.final_answer = final_report
        return state

    async def validate_final_report(self, state: ResearchState) -> ResearchState:
      """
      Validate the synthesized final report against the original query to check for hallucinations.
      If validation fails, re-execute all tasks until a valid output is obtained.

      Args:
          state: Current research state

      Returns:
          Updated research state with validated final report
      """
      validation_prompt = f"""
      Given the original research query and the synthesized report, validate whether the report accurately answers the query.

      Original Query: {state.query}

      Synthesized Report:
      {state.final_answer}

      Validation Criteria:
      - Ensure the report directly addresses the original query.
      - Verify that key insights are well-supported by research findings.
      - Identify and flag any hallucinations or unsupported claims.

      Output format (JSON):
      {{
          "valid": true/false,
          "validation_notes": "..."
      }}
      """

      response = self.simple_llm_call(
          prompt=validation_prompt,
          system_prompt="You are a research validator. Ensure factual accuracy and alignment with the original query. Always respond in JSON format."
      )

      import json
      import re

      try:
          json_match = re.search(r'```json?\n(.*?)```', response, re.DOTALL)

          if json_match:
              json_str = json_match.group(1).strip()
          else:
              json_match = re.search(r'\{.*\}', response, re.DOTALL)
              if json_match:
                  json_str = json_match.group(0).strip()
              else:
                  raise ValueError("No JSON structure found in the response")

          validation_result = json.loads(json_str)

          if validation_result.get("valid", False):
              state.final_answer = state.final_answer
              state.messages.append({
                  "role": "validator",
                  "content": f"Validation Successful: {validation_result['validation_notes']}"
              })
          else:
              state.final_answer = None
              state.messages.append({
                  "role": "validator",
                  "content": f"Validation Failed: {validation_result['validation_notes']}. Re-executing tasks..."
              })

              # Re-run task execution
              state.tasks = state.completed_tasks.copy()
              state.completed_tasks.clear()
              state = await self.execute_research_task(state)
              state = await self.synthesize_final_report(state)
              state = await self.validate_final_report(state)  # Recursively validate again

      except Exception as e:
          state.messages.append({
              "role": "validator",
              "content": f"Validation error: {str(e)}"
          })

      return state

    def create_research_workflow(self) -> StateGraph:
        """
        Create LangGraph workflow for research

        Returns:
            Configured StateGraph
        """
        workflow = StateGraph(ResearchState)

        # Add nodes
        workflow.add_node("generate_tasks", self.generate_research_tasks)
        workflow.add_node("execute_research", self.execute_research_task)
        workflow.add_node("synthesize_report", self.synthesize_final_report)
        workflow.add_node("validate_report", self.validate_final_report)

        # Define workflow edges
        workflow.set_entry_point("generate_tasks")
        workflow.add_edge("generate_tasks", "execute_research")

        # Conditional edge for task execution
        workflow.add_conditional_edges(
            "execute_research",
            lambda state: "end" if not state.tasks else "execute_research",
            {
                "end": "synthesize_report",
                "execute_research": "execute_research"
            }
        )

        workflow.add_edge("synthesize_report", "validate_report")
        workflow.add_edge("validate_report", END)

        return workflow.compile()

async def main():
    agent = ResearchLangGraphAgent()
    workflow = agent.create_research_workflow()

    initial_state = ResearchState(
        query="What are some recent advancements in Generative AI, and what are its use cases in Healthcare??"
    )

    result = await workflow.ainvoke(initial_state)

    print("\n=== FINAL RESEARCH REPORT ===")
    print(result["final_answer"])

    print("\n=== TASK DETAILS ===")
    for task in result['completed_tasks']:
        print(f"\nTask {task.task_id} (Priority: {task.priority}):")
        print(task.result)

if __name__ == "__main__":
    await main()


=== FINAL RESEARCH REPORT ===
**Comprehensive Report: Recent Advancements in Generative AI and its Applications in Healthcare**

**Introduction**

Generative AI, a subset of artificial intelligence, has witnessed significant advancements in recent years, particularly in the areas of hyperspectral imagery processing and EEG-based multimodal generation. This report provides an overview of the recent breakthroughs in Generative AI, its potential applications in healthcare, and the implications of its adoption in the healthcare industry.

**Recent Advancements in Generative AI**

Recent research has led to several key breakthroughs in Generative AI, including:

1. **Hyperspectral Imagery Processing:** Deep learning architectures like CNNs, Autoencoders, GANs, and RNNs have shown promise in processing hyperspectral data, with GANs being effective in addressing limited training data and computational constraints.
2. **EEG-based Multimodal Generation:** The integration of EEG signals with Ge