##Installing Libraries


In [None]:
!pip install langchain langgraph tavily-python openai


In [None]:
!pip install -U langchain-community langchain


In [None]:
import os

# Tavily API key (sign up at https://app.tavily.com/)
os.environ["TAVILY_API_KEY"] = "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"


## Research Agent


In [None]:
from tavily import TavilyClient

# Initialize the client with your API Key
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

def research_agent(query):
    # Perform a search
    search_results = tavily.search(
        query=query,
        search_depth="advanced",   # you can set it to "basic" for faster response
        include_raw_content=True,
        max_results=5
    )

    # Extract content
    collected_data = []
    for result in search_results['results']:
        collected_data.append(result['content'])

    return " ".join(collected_data)


##Drafting an Answer


In [None]:
def drafting_agent(research_content, user_query):
    # Very basic draft logic
    answer = f"Based on the research conducted for: '{user_query}', the answer is summarized as:\n\n{research_content[:1000]}..."
    return answer


## Connect using LangGraph

In [None]:
!pip install langgraph


In [None]:
import langgraph
dir(langgraph)


In [None]:
from langgraph import graph
dir(graph)


In [None]:
import datetime
from langgraph.graph import StateGraph
from typing import TypedDict, Optional

# Define the state schema for the Research Agent
class ResearchState(TypedDict):
    query: str
    research_content: Optional[str]
    final_answer: Optional[str]

# Initialize the graph - Assuming StateGraph is used
graph = StateGraph(state_schema=ResearchState)

# Define the research agent function with date handling
def research_agent(query: str) -> str:
    # If the query is related to the date, return the current date
    if "today's date" in query.lower():
        return f"Today's date is: {datetime.datetime.now().strftime('%Y-%m-%d')}"
    # If the query is not recognized, return a generic message
    return f"Research results for query: {query}"

# Define the drafting agent function (drafts the final answer)
def drafting_agent(research_content: Optional[str], query: str) -> str:
    # If research content exists, create the final answer
    if research_content:
        return f"Final answer for '{query}': {research_content}"
    return "No research content available"

# Define the search node function
def search_node(state: ResearchState) -> ResearchState:
    research_content = research_agent(state["query"])  # Perform research
    state["research_content"] = research_content  # Add research to state
    return state

# Define the draft node function
def draft_node(state: ResearchState) -> ResearchState:
    final_answer = drafting_agent(state["research_content"], state["query"])  # Generate final answer
    state["final_answer"] = final_answer  # Add final answer to state
    return state

# Add the search and draft nodes to the graph using unique identifiers
graph.add_node("search", search_node)  # Add search node with identifier
graph.add_node("draft", draft_node)    # Add draft node with identifier

# Manually set the nodes and add edges between them using identifiers
graph.set_entry_point("search")  # Set search node as entry point
graph.add_edge("search", "draft")  # Connect search and draft nodes using their identifiers

# Compile the system into an executable process
deep_research_system = graph.compile()

# Test the system with user input
user_query = input("Enter your research query: ")
state = {"query": user_query, "research_content": None, "final_answer": None}
final_state = deep_research_system.invoke(state)

# Output the final drafted answer
print("\n\nFinal Answer:\n")
print(final_state["final_answer"])


##Research Agent

In [None]:
from tavily import TavilyClient

# Initialize Tavily client
tavily = TavilyClient(api_key="YOUR_TAVILY_API_KEY")  # Replace YOUR_TAVILY_API_KEY

def research_agent(query: str) -> str:
    # Search the web using Tavily
    response = tavily.search(query=query, max_results=3)  # You can choose max_results
    results = response.get('results', [])

    if not results:
        return "No research content found."

    # Combine the top results into a research content string
    research_summary = "\n\n".join([f"{res['title']}: {res['url']}" for res in results])
    return research_summary


##Drafting Agent

In [None]:
def drafting_agent(research_content: Optional[str], query: str) -> str:
    if research_content:
        return f"Here is the research for '{query}':\n\n{research_content}"
    return "No research content available."


##Final Deep Research System

In [None]:
# Install Tavily if you haven't
!pip install tavily-python

# Imports
from langgraph.graph import StateGraph
from typing import TypedDict, Optional
from tavily import TavilyClient

# Define your state schema
class ResearchState(TypedDict):
    query: str
    research_content: Optional[str]
    final_answer: Optional[str]

# Initialize the StateGraph
graph = StateGraph(state_schema=ResearchState)

# Initialize Tavily client
import os
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))


# Define the research agent function using Tavily
def research_agent(query: str) -> str:
    response = tavily.search(query=query, max_results=3)
    results = response.get('results', [])

    if not results:
        return "No research content found."

    research_summary = "\n\n".join([f"{res['title']}: {res['url']}" for res in results])
    return research_summary

# Define the drafting agent function
def drafting_agent(research_content: Optional[str], query: str) -> str:
    if research_content:
        return f"Here is the research for '{query}':\n\n{research_content}"
    return "No research content available."

# Define the search node
def search_node(state: ResearchState) -> ResearchState:
    research_content = research_agent(state["query"])
    state["research_content"] = research_content
    return state

# Define the draft node
def draft_node(state: ResearchState) -> ResearchState:
    final_answer = drafting_agent(state["research_content"], state["query"])
    state["final_answer"] = final_answer
    return state

# Add nodes to the graph
graph.add_node("search", search_node)
graph.add_node("draft", draft_node)

# Set entry point and edges
graph.set_entry_point("search")
graph.add_edge("search", "draft")

# Compile the system
deep_research_system = graph.compile()

# Run it!
previous_queries = []  # Initialize empty memory
user_query = input("Enter your research query: ")
state = {"query": user_query, "research_content": None, "final_answer": None}

try:
    final_state = deep_research_system.invoke(state)
    previous_queries.append(user_query)  # Save query into memory

except Exception as e:
    print(f"Error occurred: {e}")
    exit()  # or ask user to try again

print("\n\nFinal Answer:\n")
if final_state["final_answer"]:
    print(final_state["final_answer"])
else:
    print("No result found.")



##Experimenting by using different Techniques

##Not the Main Code, from here on (Optional)

In [None]:
!pip install -q transformers


In [None]:
from transformers import pipeline
from langchain_community.llms import HuggingFacePipeline

# Load a pipeline
pipe = pipeline("text2text-generation", model="google/flan-t5-large", max_length=512)

# Use LangChain wrapper
llm = HuggingFacePipeline(pipeline=pipe)


## Connect Both Agents Using LangGraph



In [None]:
import langgraph

# Define the steps
def deep_research_system(user_query):
    research_info = researcher_agent(user_query)
    final_answer = answer_drafter_agent(research_info)
    return final_answer


## User Interaction

In [None]:
user_question = input("Enter your research query: ")

answer = deep_research_system(user_question)

print("\n\nFinal Answer:\n")
print(answer)


In [None]:
def deep_research_system(query):
    # Fetch research data from search agent
    search_data = search_agent.run(query)

    # If search_data is a list, join it into a string
    if isinstance(search_data, list):
        search_data = " ".join([str(item) for item in search_data])

    # If it's a dictionary, extract the relevant content (assumes 'text' is the key holding the data)
    if isinstance(search_data, dict):
        search_data = search_data.get("text", "")

    # Now, clean and trim the research data
    trimmed_data = trim_research_data(search_data)

    # Prepare the final prompt
    final_prompt = f"Based on the following research data, draft a professional and concise answer:\n\n{trimmed_data}\n\nAnswer:"

    # Use the model to generate the answer
    response = llm.invoke(final_prompt)

    return response

def trim_research_data(text, max_words=450):
    words = text.split()
    if len(words) > max_words:
        words = words[:max_words]
    return " ".join(words)

# Example query to test the system
user_question = input("Enter your research query: ")
answer = deep_research_system(user_question)

print("\n\nFinal Answer:\n")
print(answer)


In [None]:
def deep_research_system(query):
    try:
        # Perform the research task and fetch data
        research_data = search_agent.run(query)

        # Check if the result is a list, and join it into a string if necessary
        if isinstance(research_data, list):
            research_data = " ".join([str(item) for item in research_data])

        # Trim research data to prevent overloading the model with too much information
        trimmed_data = trim_research_data(research_data)

        # Prepare the final prompt for the model
        final_prompt = f"Based on the following research data, draft a professional and concise answer:\n\n{trimmed_data}\n\nAnswer:"

        # Query the model for an answer
        response = llm.invoke(final_prompt)

        return response

    except Exception as e:
        return f"Error occurred: {str(e)}"

# Running the system with a query
user_query = input("Enter your research query: ")
answer = deep_research_system(user_query)

print("\n\nFinal Answer:\n")
print(answer)


In [None]:
def deep_research_system(query):
    try:
        # Perform the research task and fetch data
        research_data = search_agent.run(query)

        # If the result is a list, we will attempt to extract the most relevant information
        if isinstance(research_data, list):
            relevant_data = ""
            for item in research_data:
                if isinstance(item, dict):
                    # Extract relevant content from 'content' or similar fields
                    if 'content' in item:
                        relevant_data += item['content'] + "\n"
                else:
                    relevant_data += str(item) + "\n"
            research_data = relevant_data

        # Trim the research data to avoid overloading the model
        trimmed_data = trim_research_data(research_data)

        # Prepare the final prompt to query the model
        final_prompt = f"Based on the following research data, provide a concise and relevant answer:\n\n{trimmed_data}\n\nAnswer (only date and day of the week):"

        # Query the model for a concise answer
        response = llm.invoke(final_prompt)

        return response.strip()

    except Exception as e:
        return f"Error occurred: {str(e)}"

# Running the system with the query
user_query = input("Enter your research query: ")
answer = deep_research_system(user_query)

print("\n\nFinal Answer:\n")
print(answer)


In [None]:
from datetime import datetime
import pytz

def deep_research_system(query):
    try:
        # Get the current date and time in UTC, then convert to IST
        utc_now = datetime.now(pytz.utc)
        ist = pytz.timezone('Asia/Kolkata')
        ist_now = utc_now.astimezone(ist)

        # Format the response to only include the day and date in IST
        day_of_week = ist_now.strftime('%A')  # Get the day of the week (e.g., Monday, Tuesday)
        date_today = ist_now.strftime('%d %B %Y')  # Format the date (e.g., 25 April 2025)

        # Construct the final, concise answer
        final_answer = f"Today is {day_of_week}, {date_today} in IST."

        return final_answer

    except Exception as e:
        return f"Error occurred: {str(e)}"

# Running the system with the query
user_query = input("Enter your research query: ")
answer = deep_research_system(user_query)

print("\n\nFinal Answer:\n")
print(answer)


##Mini Research System using Facts + Wikipedia + DuckDuckGo (Optional)

In [None]:
!pip install wikipedia


In [None]:
# Install necessary package
# pip install wikipedia

import wikipedia

def basic_nlp_preprocess(query):
    # Very simple preprocessing: make lowercase, strip spaces
    return query.lower().strip()

def answer_query_from_wikipedia(query):
    try:
        processed_query = basic_nlp_preprocess(query)
        summary = wikipedia.summary(processed_query, sentences=3)
        return summary
    except wikipedia.exceptions.DisambiguationError as e:
        return f"Your question is a bit broad. Try being more specific. Some options: {e.options}"
    except wikipedia.exceptions.PageError:
        return "Sorry, I couldn't find any information about that topic."
    except Exception as e:
        return f"An unexpected error occurred: {str(e)}"

def main():
    print("✨ Welcome to the Smart Research System ✨\n")
    user_query = input("Enter your research query: ")
    answer = answer_query_from_wikipedia(user_query)
    print("\n🔎 Final Answer:\n")
    print(answer)

if __name__ == "__main__":
    main()


In [None]:
!pip install wikipedia duckduckgo-search


In [None]:
import wikipedia
from duckduckgo_search import DDGS

# Simple hardcoded facts
FACTS = {
    "how many days are there in a week": "There are 7 days in a week.",
    "what is the color of sunflower": "The color of a sunflower is bright yellow.",
    "what is the capital of india": "The capital of India is New Delhi.",
    "how many minutes are there in an hour": "There are 60 minutes in an hour.",
    # Add more if you want
}

def basic_nlp_preprocess(query):
    return query.lower().strip()

def check_facts_database(query):
    return FACTS.get(query)

def fetch_from_wikipedia(query):
    try:
        summary = wikipedia.summary(query, sentences=2)
        return summary
    except:
        return None

def fetch_from_duckduckgo(query):
    try:
        ddgs = DDGS()
        results = ddgs.text(query, max_results=3)
        if results:
            return results[0]['body']
        else:
            return None
    except:
        return None

def deep_research_system(query):
    query = basic_nlp_preprocess(query)

    # 1. Check hardcoded facts
    answer = check_facts_database(query)
    if answer:
        return answer

    # 2. Try Wikipedia
    answer = fetch_from_wikipedia(query)
    if answer:
        return answer

    # 3. Finally, try DuckDuckGo
    answer = fetch_from_duckduckgo(query)
    if answer:
        return answer

    return "Sorry, I couldn't find an answer to your question."

# ----------- Test it -------------
user_question = input("Enter your research query: ")
answer = deep_research_system(user_question)
print("\n\nFinal Answer:\n")
print(answer)


In [None]:
import nbformat

# Load the notebook
with open('AI_Agent.ipynb', 'r') as file:
    notebook = nbformat.read(file, as_version=4)

# Add missing 'state' in widgets metadata if it doesn't exist
if 'widgets' in notebook.metadata:
    if 'state' not in notebook.metadata['widgets']:
        notebook.metadata['widgets']['state'] = {}

# Save the modified notebook
with open('fixed_notebook.ipynb', 'w') as file:
    nbformat.write(notebook, file)


In [None]:
import os
print("Current working directory:", os.getcwd())


In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Now you can access the file from your Google Drive path
with open('/content/drive/MyDrive/your_folder_name/AI_Agent.ipynb', 'r') as file:
    notebook = nbformat.read(file, as_version=4)


In [None]:
from google.colab import drive
drive.mount('/content/drive')
