In [61]:
from dotenv import load_dotenv
from typing import Annotated,TypedDict
from langgraph.graph import StateGraph,START,END
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.tools import tool
import os
from langgraph.graph.message import add_messages
from pydantic import BaseModel,Field
import time
load_dotenv()

True

In [2]:
def get_time():
    return time.strftime('%H:%M:%S', time.localtime())

In [3]:
get_time()

'16:34:16'

In [63]:
import datetime
from typing import Optional

def now_iso():
    """Return current datetime in ISO format including time and timezone if available."""
    return datetime.datetime.now(datetime.timezone.utc).isoformat()

class AnalysisSource(BaseModel):
    source_name:  Optional[str] = Field(..., description="Name of the data source, e.g., Google, Bing, Reddit, Google Finance")
    analysis: str = Field(..., description="Analysis result or summary from this source")
    source_link: Optional[str] = Field(..., description="Direct link to the information or post")


class LLMAnalysisResult(BaseModel):
    sources: list[AnalysisSource] = Field(..., description="List of sources with analysis and URLs")
    synthesized_answer: str = Field(..., description="LLM's synthesized answer using all sources")

In [64]:
llm=ChatGoogleGenerativeAI(model='gemini-2.5-flash',api_key=os.getenv('GEMINI_API_KEY'),async_client_running=True,verbose=True)
analysing_llm=llm.with_structured_output(AnalysisSource)

In [6]:
await llm.ainvoke('give me aa para about ethics in ai')

AIMessage(content="Ethics in Artificial Intelligence is a paramount concern, moving beyond mere technical functionality to address the profound societal implications of intelligent systems. It encompasses critical considerations like algorithmic bias, ensuring fairness and equity in decision-making, safeguarding data privacy, establishing clear accountability for AI systems' actions, and demanding transparency and explainability in their operation. Ignoring these ethical dimensions risks eroding public trust, perpetuating discrimination, and creating systems that could have profound, unintended negative consequences on individuals and society at large. Therefore, embedding ethical principles from design to deployment is essential, demanding a proactive, human-centered approach that prioritizes transparency, fairness, and human oversight to ensure AI truly serves humanity as a beneficial and trustworthy tool.", additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason':

In [None]:
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_community.tools import GoogleSearchRun
from langchain.tools import tool
from pydantic import BaseModel, Field
import re

class Ticker(BaseModel):
    ticker: str = Field(description="ticker symbol", examples=["MSFT", "AAPL"])

@tool(description="Run a web search and extract ticker symbol", response_format="content")
async def duckduckgo_search(query: str) -> str:
    """
    Runs a DuckDuckGo web search with the given query to extract a ticker symbol.

    Args:
        query (str): The search query that describes the company, stock, or concept for which a ticker symbol is desired.

    Returns:
        Ticker: A Pydantic model containing the extracted ticker symbol.

    Workflow:
        - Uses the DuckDuckGoSearchRun tool to perform a search on DuckDuckGo asynchronously.
        - Parses the search results to find and return the most relevant ticker symbol wrapped in the Ticker model.
        - Suitable for use within an agent or workflow where automatic, up-to-date retrieval of ticker symbols is required.

    Example:
        response = await duckduckgo_search("Apple Inc NASDAQ")
        print(response.ticker)
    """
    tool_instance = DuckDuckGoSearchRun()
    response = await tool_instance.arun(query)
    print(response)

from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

@tool(description="Search Wikipedia for a given query and return a summary", response_format="content")
async def wikipedia_search(query: str) -> str:
    """
    Asynchronously search Wikipedia for a given query and return the summary content as a string.

    This tool uses the WikipediaQueryRun class (with WikipediaAPIWrapper) to perform a search
    on Wikipedia given a user query string, and then returns the resulting summary for that query.
    It is designed to be used within an agent workflow for rapidly retrieving encyclopedia summaries.

    Args:
        query (str): The search query or phrase to look up on Wikipedia.

    Returns:
        str: The summary content for the queried topic from Wikipedia.

    Example:
        summary = await wikipedia_search("Tesla")
        print(summary) # Provides a short summary about Tesla from Wikipedia.

    Usage Notes:
        - Suitable for agents or automated pipelines that require factual encyclopedic information.
        - Requires asynchronous context to invoke.
    """
    wiki = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
    return await wiki.arun(query)

from deepagents import create_deep_agent
from langchain.agents import create_agent

deepagent1 = create_agent(
    model=llm,
    system_prompt=(
        "Your job is to accurately identify the official stock ticker symbol (such as 'AAPL' for Apple Inc.) corresponding to the provided query. "
        "You can utilize the available tools, including DuckDuckGo and Wikipedia search, to assist you in this task. "
        "Carefully verify the result to ensure the ticker symbol is correct and represents the most relevant publicly traded entity in the context of the query. "
        "If no valid ticker symbol can be found or if the query does not pertain to a publicly traded company or instrument, respond clearly with 'Ticker not available'. "
        "Otherwise, provide only the correct ticker symbol."
    ),
    response_format=Ticker,
    tools=[duckduckgo_search, wikipedia_search]
)

deepagent = create_deep_agent(
    model=llm,
    system_prompt=(
        "Your job is to accurately identify the official stock ticker symbol (such as 'AAPL' for Apple Inc.) corresponding to the provided query. "
        "You can utilize the available tools, including DuckDuckGo and Wikipedia search, to assist you in this task. "
        "Carefully verify the result to ensure the ticker symbol is correct and represents the most relevant publicly traded entity in the context of the query. "
        "If no valid ticker symbol can be found or if the query does not pertain to a publicly traded company or instrument, respond clearly with 'Ticker not available'. "
        "Otherwise, provide only the correct ticker symbol."
    ),
    response_format=Ticker,
    tools=[duckduckgo_search, wikipedia_search]
)

In [87]:
response=await deepagent.ainvoke({'messages':{'role':'user',"content":'find me the ticker symbol for tesla '}})



  lis = BeautifulSoup(html).find_all('li')


In [89]:
response

{'messages': [HumanMessage(content='find me the ticker symbol for tesla ', additional_kwargs={}, response_metadata={}, id='e1463427-b4fd-43c3-898b-19982f6abad1'),
  AIMessage(content='', additional_kwargs={'function_call': {'name': 'duckduckgo_search', 'arguments': '{"query": "tesla ticker symbol"}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': [], 'grounding_metadata': {}, 'model_provider': 'google_genai'}, id='lc_run--ac1f6e31-75bd-4c20-818c-2dfb81907100-0', tool_calls=[{'name': 'duckduckgo_search', 'args': {'query': 'tesla ticker symbol'}, 'id': '3f0ba78f-91f7-4ea8-a269-947718482b8a', 'type': 'tool_call'}], usage_metadata={'input_tokens': 5171, 'output_tokens': 72, 'total_tokens': 5243, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 53}}),
  ToolMessage(content="ticker='Ticker not available'", name='duckduckgo_search', id='d7e437a8-a5f9-

In [85]:
import re

output = response['messages'][-1].content
# Try to extract the ticker symbol using regex (handles e.g. "ticker='MSFT'", "ticker: MSFT", etc.)
match = re.search(r"[\'\"]?ticker[\'\"]?\s*[=:]\s*[\'\"]?([A-Z.-]{1,10})[\'\"]?", output)
if match:
    ticker_symbol = match.group(1)
    print(ticker_symbol)
else:
    print("Ticker symbol not found in output.")

T


In [52]:
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

@tool(description="Search Wikipedia for a given query and return a summary", response_format="content")
async def wikipedia_search(query: str) -> str:
    wiki = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
    return await wiki.arun(query)
   

In [54]:
await wikipedia_search.ainvoke('Tesla')

'Page: Tesla, Inc.\nSummary: Tesla, Inc. ( TEZ-lə or   TESS-lə), is an American multinational automotive and clean energy company. Headquartered in Austin, Texas, it designs, manufactures, and sells battery electric vehicles (BEVs), stationary battery energy storage devices from home to grid-scale, solar panels and solar shingles, and related products and services.\nTesla was incorporated in July 2003 by Martin Eberhard and Marc Tarpenning as Tesla Motors. Its name is a tribute to the inventor and electrical engineer Nikola Tesla. In February 2004, Elon Musk led Tesla\'s first funding round and became the company\'s chairman; in 2008, he was named chief executive officer. In 2008, the company began production of its first car model, the Roadster sports car. This was followed by the Model S sedan in 2012, the Model X SUV in 2015, the Model 3 sedan in 2017, the Model Y crossover in 2020, the Tesla Semi truck in 2022, and the Cybertruck pickup truck in 2023.\nTesla is one of the world\'s 

In [58]:
await duckduckgo_search.ainvoke('Tesla')

Ticker(ticker='IEEE')

In [63]:
from pathlib import Path

for _ in Path().iterdir():
    print(_.suffix)

.ipynb



In [71]:
dir=Path('main.ipynb')

if dir.exists():
    print('this directory exists')
else:
    print('fuck u bitch')

dir.parent

this directory exists


PosixPath('.')

In [None]:
parent_dir=Path('..').resolve()
log_folder = parent_dir / 'logs'
log_folder.mkdir(parents=True, exist_ok=True)
log_file=log_folder / 'app.log'
log_file.touch(exist_ok=True)


NameError: name 'file_path' is not defined

In [None]:
import logging

In [None]:
class logging:
    def __init__(self):
        

In [91]:
dir.absolute().parent.absolute().parent.absolute()


new=Path('..')


Path.home() /'advaced-research-agent/src/'


PosixPath('/Users/divyyadav/advaced-research-agent/src')

In [96]:
Path('..').resolve() 

PosixPath('/Users/divyyadav/Desktop/Advanced-Research-Agent')

In [117]:
parent = Path('..').resolve()
src_dir = parent / 'src'

# Ensure src_dir exists
src_dir.mkdir(exist_ok=True)

dipu_file = 'dipu.txt'
new_structure = src_dir / dipu_file

# Create the file if it doesn't exist
if not new_structure.exists():
    new_structure.touch()


60


In [11]:
class ResearchState(TypedDict):
    messages:Annotated[str,add_messages]
    user_question:str|None
    google_search_results:str|None
    google_finance_results:str|None
    bing_search_results:str|None
    reddit_search_results:str|None
    selected_reddit_urls:list[str]|None
    reddit_posts:str|None
    google_analysis:str
    bing_analysis:str
    reddit_analysis:str
    google_finance_analysis:str
    synthesized_answer:str

# NODES

In [33]:

def google_search(state:ResearchState)->ResearchState:
    return state


def bing_search(state:ResearchState)->ResearchState:
    return state


def reddit_search(state:ResearchState)->ResearchState:
    return state

def twitter_search(state: ResearchState) -> ResearchState:
    return state

def google_finance_search(state:ResearchState)->ResearchState:
    return state

def analyze_google_finance_results(state:ResearchState)->ResearchState:
    return state
    
def analysis_google_results(state:ResearchState)->ResearchState:
    return state


def analysis_bing_results(state:ResearchState)->ResearchState:
    return state


def analysis_reddit_results(state:ResearchState)->ResearchState:
    return state


def analyze_results_results(state:ResearchState)->ResearchState:
    return state
    

def synthesize_results(state:ResearchState)->ResearchState:
    return state


def final_results(state:ResearchState)->ResearchState:
    return state

def search_report(state:ResearchState)->ResearchState:
    return state

In [47]:
import aiohttp
import os
from aiohttp.web import HTTPError
from typing import Optional


class GoogleFinanceData:

  BASE_URL="https://serpapi.com/search"

  def __init__(self,symbol,trend: Optional[str]=None,index_market: Optional[str]=None):
    self._api_key=os.getenv('SERP_API')
    if not self._api_key:
        raise EnvironmentError("SERP_API key missing")
    self.symbol=symbol
    self.trend=trend
    self.index=index_market

  def build_serpapi_params(self) -> dict:
      """
      Build parameters for SerpAPI Google Finance query.

      Args:
          symbol (str): The stock symbol to query.

      Returns:
          dict: Parameters for the API call.
      """
      if not self.trend and not self.index:
        return {
          "engine": "google_finance",
          "q": self.symbol,
          "api_key": self._api_key,
          'window':'MAX',
          'async':'true',
          'no_cache':'false',
      }
      elif self.trend and self.index:
        return {
          "engine": "google_finance",
          "q": self.symbol,
          "api_key": self._api_key,
          'window': '5Y',
          'async': 'true',
          'no_cache': 'false',
          'trend': self.trend,
          'index_market': self.index
        }

      elif self.trend:
        return {
          "engine": "google_finance",
             "q": self.symbol,
          "api_key": self._api_key,
          'window': 'MAX',
          'async': 'true',
          'no_cache': 'false',
          'trend': self.trend
        }

      elif self.index:
        return {
          "engine": "google_finance",
          "api_key": self._api_key,
          'window': '5Y',
          'async': 'true',
          'no_cache': 'false',
          'index_market': self.index
        }

  async def fetch_google_finance_data(self) -> dict:
      """
      Asynchronously fetch financial data from SerpAPI's Google Finance engine.

      Args:
          symbol (str): The stock symbol to retrieve data for.

      Returns:
          dict: Parsed JSON response containing financial data.
      Raises:
          aiohttp.ClientResponseError: If the request fails or returns an error status.
      """
      params = self.build_serpapi_params()
      try:
          async with aiohttp.ClientSession() as session:
              async with session.get(url=self.BASE_URL, params=params) as response:
                            if response.status != 200:
                               response.raise_for_status()
                               raise  HTTPError(response.status)
                            data = await response.json()
                            return data
      except aiohttp.ClientResponseError as cre:
          logger.error(f"HTTP error occurred while retrieving financial data: {cre}")
          raise
      except aiohttp.ClientError as ce:
          logger.error(f"Client error occurred during financial data fetch: {ce}")
          raise
      except Exception as exc:
          logger.exception(f"Unexpected error during SerpAPI fetch: {exc}")
          raise

  async def fetch_financial_markets_data(self) -> dict:
      """
      Asynchronously fetch broader financial markets data.

      Returns:
          dict: Parsed JSON response containing the requested markets data.

      Raises:
          aiohttp.ClientResponseError: If the HTTP request fails.
          aiohttp.ClientError: For general client-side errors.
          Exception: For any unexpected errors during data retrieval.
      """
      params=self.build_serpapi_params()
      try:
        async with aiohttp.ClientSession() as session:
          async with session.get(url=self.BASE_URL, params=params) as response:
            if response.status != 200:
              response.raise_for_status()
              raise HTTPError(response.status)
            data= await response.json()
            if isinstance(data, dict) and 'search_parameters' in data and 'graph' in data:
                search_params = data['search_parameters']
                graph_data = data['graph']
                return {'search_parameters are ': search_params, 'graph is' : graph_data}

      except aiohttp.ClientResponseError as cre:
          print(f"HTTP error occurred while retrieving financial data: {cre}")
          raise
      


In [44]:
response =GoogleFinanceData(symbol='MSFT:NASDAQ',trend='gainers')

In [45]:
await  response.fetch_financial_markets_data()

{'search_parameters are ': {'engine': 'google_finance',
  'q': 'MSFT:NASDAQ',
  'hl': 'en',
  'window': 'MAX'},
 'graph is': [{'price': 0.1,
   'currency': 'USD',
   'date': 'Mar 14 1986, 04:00 PM UTC-04:00',
   'volume': 4652600},
  {'price': 0.09,
   'currency': 'USD',
   'date': 'Mar 21 1986, 04:00 PM UTC-04:00',
   'volume': 1270700},
  {'price': 0.1,
   'currency': 'USD',
   'date': 'Mar 27 1986, 04:00 PM UTC-04:00',
   'volume': 475600},
  {'price': 0.1,
   'currency': 'USD',
   'date': 'Apr 04 1986, 04:00 PM UTC-04:00',
   'volume': 349300},
  {'price': 0.1,
   'currency': 'USD',
   'date': 'Apr 11 1986, 04:00 PM UTC-04:00',
   'volume': 243300},
  {'price': 0.1,
   'currency': 'USD',
   'date': 'Apr 18 1986, 04:00 PM UTC-04:00',
   'volume': 336800},
  {'price': 0.12,
   'currency': 'USD',
   'date': 'Apr 25 1986, 04:00 PM UTC-04:00',
   'volume': 700600},
  {'price': 0.11,
   'currency': 'USD',
   'date': 'May 02 1986, 04:00 PM UTC-04:00',
   'volume': 568900},
  {'price': 0.1

In [None]:

import yfinance as yf

class YahooFinance:
    def __init__(self,symbol) -> None:
         self.symbol=symbol
         self.ticker=yf.Ticker(self.symbol)
    def get_history(self, start=None, end=None, interval="1d"):
        return self.ticker.history(start=start, end=end, interval=interval).to_json()

    def get_fundamentals(self):
        return {
            "info": self.ticker.info,
            "balance_sheet": self.ticker.balance_sheet,
            "income_statement": self.ticker.income_stmt,
            "cashflow": self.ticker.cashflow,
        }

    def get_corporate_actions(self):
        return {
            "dividends": self.ticker.dividends,
            "splits": self.ticker.splits,
        }



In [25]:
import yfinance as yf

In [33]:
group = yf.Tickers("Apple Tesla Amazon")

group.history().to_json()

[*********************100%***********************]  2 of 3 completed

3 Failed downloads:
['TESLA', 'APPLE', 'AMAZON']: YFPricesMissingError('possibly delisted; no price data found  (period=1mo) (Yahoo error = "No data found, symbol may be delisted")')


'{"(\'Adj Close\', \'AMAZON\')":{},"(\'Adj Close\', \'APPLE\')":{},"(\'Adj Close\', \'TESLA\')":{},"(\'Close\', \'AMAZON\')":{},"(\'Close\', \'APPLE\')":{},"(\'Close\', \'TESLA\')":{},"(\'High\', \'AMAZON\')":{},"(\'High\', \'APPLE\')":{},"(\'High\', \'TESLA\')":{},"(\'Low\', \'AMAZON\')":{},"(\'Low\', \'APPLE\')":{},"(\'Low\', \'TESLA\')":{},"(\'Open\', \'AMAZON\')":{},"(\'Open\', \'APPLE\')":{},"(\'Open\', \'TESLA\')":{},"(\'Volume\', \'AMAZON\')":{},"(\'Volume\', \'APPLE\')":{},"(\'Volume\', \'TESLA\')":{}}'

{'AAPL': [{'id': '035a1404-14a9-4292-96b8-08cca27526ee',
   'content': {'id': '035a1404-14a9-4292-96b8-08cca27526ee',
    'contentType': 'STORY',
    'title': 'Earnings live: Berkshire Hathaway profits rise, Beyond Meat postpones earnings, with Palantir results on deck',
    'description': '',
    'summary': 'Third quarter earnings season is in full swing, and analysts expect S&P 500 companies grew their profits by 8% during the quarter.',
    'pubDate': '2025-11-03T13:06:31Z',
    'displayTime': '2025-11-03T14:20:36Z',
    'isHosted': True,
    'bypassModal': False,
    'previewUrl': None,
    'thumbnail': {'originalUrl': 'https://s.yimg.com/os/creatr-uploaded-images/2024-03/a650e0f0-e147-11ee-aee7-58f12ae54fb2',
     'originalWidth': 6124,
     'originalHeight': 4083,
     'caption': '',
     'resolutions': [{'url': 'https://s.yimg.com/uu/api/res/1.2/wy81zZIhScnw.zJ2nq3zLg--~B/aD00MDgzO3c9NjEyNDthcHBpZD15dGFjaHlvbg--/https://s.yimg.com/os/creatr-uploaded-images/2024-03/a650e0f0-e147-

In [12]:
import yfinance as yf

a = yf.Ticker("TSLA")        # ✅ Valid
sus=a.sustainability.to_json()

if sus is None:
    print("No sustainability data available for this stock.")
else:
    print(sus)

HTTP Error 404: {"quoteSummary":{"result":null,"error":{"code":"Not Found","description":"No fundamentals data found for symbol: TSLA"}}}


{}


In [34]:
import os 
import aiohttp
from aiohttp.web import HTTPException

class GoogleSearch:
    BASE_URL = "https://serpapi.com/search"
    def __init__(self, user_query):
        self.query = user_query
        self._api_key = os.getenv('SERP_API')
        if not self._api_key:
            raise EnvironmentError("SERP_API key missing")
    
    def build_params(self, engine: str = 'google_light_fast'):

        return {
            "q": self.query,
            'no_cache': 'false',
            'api_key': self._api_key,
            'engine': engine
        }

    async def googld_ligh_fast_search(self):
        try:
            params = self.build_params(engine='google_light_fast')
            async with aiohttp.ClientSession() as session:
                async with session.get(self.BASE_URL, params=params) as resp:
                    if resp.status == 200:
                        return await resp.json()
                    else:
                        raise HTTPException('ERROR OCCURED IN GOOGLE SEARCH', status_code=resp.status)
        except Exception as e:
            print(e)
            return None

    async def google_news(self):
        try:
            params = self.build_params(engine='google_news')
            async with aiohttp.ClientSession() as session:
                async with session.get(self.BASE_URL, params=params) as resp:
                    if resp.status == 200:
                        return await resp.json()
                    else:
                        raise HTTPException('ERROR OCCURED IN GOOGLE SEARCH', status_code=resp.status)
        except Exception as e:
            print(e)
            return None

    async def google_search(self):
        try:
            params = self.build_params(engine='google')
            async with aiohttp.ClientSession() as session:
                async with session.get(self.BASE_URL, params=params) as resp:
                    if resp.status == 200:
                        return await resp.json()
                    else:
                        raise HTTPException('ERROR OCCURED IN GOOGLE SEARCH', status_code=resp.status)
        except Exception as e:
            print(e)
            return None

# GRAPH

In [37]:
graph=StateGraph(ResearchState)

In [38]:
graph.add_node("google_search", google_search)
graph.add_node("bing_search", bing_search)
graph.add_node("reddit_search", reddit_search)
graph.add_node('financial_search',google_finance_search)
graph.add_node("analysis_google_results", analysis_google_results)
graph.add_node("analysis_google_finance_results", analyze_google_finance_results)
graph.add_node("analysis_bing_results", analysis_bing_results)
graph.add_node("analysis_reddit_results", analysis_reddit_results)
graph.add_node("synthesize_results", synthesize_results)
graph.add_node("search_report", search_report)
graph.add_node("final_results", final_results)

<langgraph.graph.state.StateGraph at 0x11d5b7b10>

In [39]:
graph.add_edge(START, "google_search")
graph.add_edge(START, "bing_search")
graph.add_edge(START, "reddit_search")
graph.add_edge(START, 'financial_search')

graph.add_edge('google_search', 'analysis_google_results')
graph.add_edge('bing_search', 'analysis_bing_results')
graph.add_edge('reddit_search', 'analysis_reddit_results')
graph.add_edge('financial_search', 'analysis_google_finance_results')

graph.add_edge('analysis_google_results', 'synthesize_results')
graph.add_edge('analysis_bing_results', 'synthesize_results')
graph.add_edge('analysis_reddit_results', 'synthesize_results')
graph.add_edge('analysis_google_finance_results', 'synthesize_results')

graph.add_edge('synthesized_result','search_report')
graph.add_edge('search_report','final_results')
graph.add_edge('final_results',END)


<langgraph.graph.state.StateGraph at 0x11d5b7b10>

In [40]:
graph.compile()

ValueError: Found edge starting at unknown node 'synthesized_result'

In [68]:
from typing import Any, List, Dict, Optional
import asyncpraw
import asyncio
import os
from aiocache.backends.redis import RedisCache
from aiocache import caches, cached


class RedditCacheClient:
    def __init__(self, subreddit: str):
        # Load Reddit API credentials from environment variables (with default fallback)
        self._client_id = os.getenv('REDDIT_CLIENT_ID')
        self._client_secret = os.getenv('REDDIT_CLIENT_SECRET')
        self._user_agent = 'dp by /u/Temporary_Version105'
        self.subreddit_name = subreddit
        self.reddit=self.get_reddit()
        self._init_cache()
    
    def _init_cache(self):
        # Reset aiocache state (helpful in Jupyter notebooks)
        caches._caches.clear()
        caches._config.clear()
        caches.set_config({
            'default': {
                'cache': 'aiocache.backends.redis.RedisCache', 
                'endpoint': '127.0.0.1',
                'port': 6379,
                'serializer': {'class': 'aiocache.serializers.PickleSerializer'},
                'ttl': 4800 
            }})

    def get_reddit(self):
        return asyncpraw.Reddit(
            client_id=self._client_id,
            client_secret=self._client_secret,
            user_agent=self._user_agent
        )
    
    @staticmethod
    @cached(
        alias='default', 
        key_builder=lambda f, post: f'post_data:{post.id}', 
        fail_safe=True
    )
    async def fetch_post_data(post) -> Optional[Dict[str, Any]]:
        """
        This function will automatically check the cache first.
        If there is a MISS, the code below runs (slow part).
        If there is a HIT, the decorator returns the cached result instantly.
        """
        print(f"--- Cache MISS: Loading post and comments for {post.id}")
        try:
            await post.load()
            await post.comments.replace_more()
            comments = []
            for com in post.comments.list()[:20]:
                if hasattr(com, 'body') and com.body.strip():
                    comments.append(
                        {
                            "id": com.id,
                            "body": com.body,
                            "author": str(getattr(com, "author", "N/A")),
                            "score": getattr(com, "score", 0),
                            "depth": com.depth,
                            "controversiality": com.controversiality,
                            "gilded": com.gilded,
                            "total_awards_received": com.total_awards_received,
                        }
                    )
            return {
                "title": getattr(post, "title", "N/A"),
                "score": getattr(post, "score", 0),
                "comments": comments
            }
        except Exception as e:
            print(f"Error loading post {post.id}: {e}")
            return None  # Do not cache failures

    async def fetch_reddit_posts(self) -> None:
        """
        Fetch 'hot' posts and top-level comments from specified subreddit and store in cache.
        Skips posts if already cached. Stores post IDs for easy querying later.
        """
        cache = caches.get('default')
        print("Connecting to Reddit...")

        reddit = self.get_reddit()
        try:
            subreddit = await reddit.subreddit(self.subreddit_name)
            post_processed = 0
            post_ids = []
            async for post in subreddit.hot(limit=50):
                post_processed += 1
                post_id = post.id
                post_ids.append(post_id)
                post_cache_key = f'posts{post_id}'
                cached_post = await cache.get(post_cache_key)
                if cached_post:
                    continue
                else:
                    try:
                        post_data = await self.fetch_post_data(post)
                        if post_data:
                            post_ids.append(post.id)
                    except asyncio.TimeoutError:
                        print(f"  > Timeout fetching post: {post.title[:100]}")
            await cache.set('all_post_ids', post_ids, ttl=4800)
            print(f'Session post ids set with length {len(post_ids)}')
        except asyncio.TimeoutError:
            print("Operation timed out.")
        except Exception as e:
            print(f"An error occurred: {e}")
        finally:
            await reddit.close()

    async def get_post( self,post_id: str) -> Optional[Dict[str, Any]]:
        cache = caches.get('default')
        cache_key = f"post:{post_id}"

        cached_post = await cache.get(cache_key)
        if cached_post:
            print(f"--- Cache HIT: {post_id}")
            return cached_post

        print(f"--- Cache MISS: Loading post and comments for {post_id}")

        reddit = self.get_reddit()
        try:
            submission = await reddit.submission(id=post_id)
            post_data = await self.fetch_post_data(submission)
            await cache.set(cache_key, post_data, ttl=4800)
            return post_data
        except Exception as e:
            print(f"Error getting posts for postid {post_id}: {e}")
            return None
        finally:
            await reddit.close()

    async def get_all_comments(self) -> List[Dict[str, Any]]:
        """
        Retrieve all cached comments from all posts currently stored in cache.
        Returns a flat list of comments (good for analysis or language models).
        """
        cache = caches.get('default')
        post_ids = await cache.get('all_post_ids') or []
        if not post_ids:
            print("⚠️  No cached posts found. Run fetch_reddit_posts() first!")
            return []
        all_comments = []
        print(f"📥 Collecting comments from {len(post_ids)} posts...\n")
        for post_id in post_ids:
            post_data = await self.get_post(post_id)
            if post_data and 'comments' in post_data:
                for comment in post_data['comments']:
                    comment['post_id'] = post_id  # Track origin post
                    all_comments.append(comment)
        print(f"✅ Total comments collected: {len(all_comments)}")
        return all_comments

    async def get_all_posts(self) -> List[Optional[Dict[str, Any]]]:
        """
        Retrieve all the cached posts.
        """
        cache = caches.get('default')
        post_ids = await cache.get('all_post_ids') or []
        if not post_ids:
            print("⚠️  No cached posts found. Run fetch_reddit_posts() first!")
            return []
        posts = [await self.get_post(i) for i in post_ids]
        print(f'the posts are {posts}')
        return posts


# FUNCTIONS

In [55]:
import os
from typing import Literal
from tavily import TavilyClient
from deepagents import create_deep_agent

api=os.environ["TAVILY_API_KEY"]='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'

tavily_client = TavilyClient(api_key=api)

# Web search tool
def internet_search(
    query: str,
    max_results: int = 5,
    topic: Literal["general", "news", "finance"] = "general",
    include_raw_content: bool = False,
):
    """Run a web search"""
    return tavily_client.search(
        query,
        max_results=max_results,
        include_raw_content=include_raw_content,
        topic=topic,
    )


# System prompt to steer the agent to be an expert researcher
research_instructions = """You are an expert researcher. Your job is to conduct thorough research, and then write a polished report.

You have access to an internet search tool as your primary means of gathering information.

## `internet_search`

Use this to run an internet search for a given query. You can specify the max number of results to return, the topic, and whether raw content should be included.
"""

# Create the deep agent
agent = create_deep_agent(
    tools=[internet_search],
    system_prompt=research_instructions,
    model=llm,
    
)


In [56]:
result = agent.invoke({"messages": [{"role": "user", "content": "how to write the code of deep agents by langgraph ause websearch to find ?"}]})

In [53]:
for results in result['messages']:
    print(results)

content='how to write the code of deep agents by langgraph ause websearch to find ?' additional_kwargs={} response_metadata={} id='42d534bd-4398-4593-9a27-3df3096ad341'
content='' additional_kwargs={} response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'MALFORMED_FUNCTION_CALL', 'model_name': 'gemini-2.5-flash', 'safety_ratings': [], 'grounding_metadata': {}, 'model_provider': 'google_genai'} id='lc_run--db920744-9b51-4171-8151-145cbba07437-0' usage_metadata={'input_tokens': 5128, 'output_tokens': 0, 'total_tokens': 5128, 'input_token_details': {'cache_read': 0}}


In [47]:
import asyncio
import time 


async def fetch(time):
    print(f'doing task with{time}')
    asyncio.sleep(time)
    print('done with the task')

async def main():
    start=time.perf_counter()
    task1=asyncio.create_task(fetch(100000000000000000000))
    task2=asyncio.create_task(fetch(3))
    asyncio.sleep(5)
    resul1=await task1
    resul2=await task2
    print(resul1,resul2)
    end=time.perf_counter()
    print(end-start)


In [48]:
await main()

doing task with100000000000000000000
done with the task
doing task with3
done with the task
None None
0.0005715830047847703


In [None]:
async with asyncio.TaskGroup() as tg:
    tg,create_tas