<a href="https://colab.research.google.com/github/KaifAhmad1/code-test/blob/main/Search_Sources.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --quiet -U "langchain-community>=0.2.16" langchain-exa langchain-google-community tavily-python exa_py python-dotenv

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.2 MB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.2/1.2 MB[0m [31m18.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [34]:
import asyncio
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from datetime import datetime
import pytz
from dotenv import load_dotenv
import nest_asyncio
import os

from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain_google_community import GoogleSearchAPIWrapper
from exa_py import Exa
from tavily import TavilyClient

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Load environment variables
load_dotenv()

# API Keys (hidden for security purposes)
SERPER_API_KEY = os.getenv("SERPER_API_KEY", "d8e815ef6caa94dbef7b977a0ea7d505b43a5a06")
EXA_API_KEY = os.getenv("EXA_API_KEY", "953b5801-11be-4b37-a313-f8df8f37027c")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "AIzaSyBIQo9X6acoBazBfte9jF9Pl0QEZ9oe8pk")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID", "63053004a7e2445c3")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "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")

# Set environment variables for Search Tools
os.environ["SERPER_API_KEY"] = SERPER_API_KEY
os.environ["EXA_API_KEY"] = EXA_API_KEY
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
os.environ["GOOGLE_CSE_ID"] = GOOGLE_CSE_ID
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY

In [35]:
# Initialize search tools
google_serper = GoogleSerperAPIWrapper()
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
google_search = GoogleSearchAPIWrapper()
exa = Exa(api_key=EXA_API_KEY)

In [36]:
class SearchResult(BaseModel):
    source: str
    title: str
    snippet: str
    url: str
    date: Optional[str] = None
    media: Optional[List[str]] = []
    media_content: Optional[List[Dict[str, str]]] = []

In [37]:
def google_serper_search(query: str) -> List[SearchResult]:
    results = google_serper.results(query)
    return [
        SearchResult(
            source="Google Serper",
            title=result.get("title", "No title"),
            snippet=result.get("snippet", "No snippet"),
            url=result.get("link", "No link"),
            date=result.get("date"),
        ) for result in results.get("organic", [])
    ]

def exa_search(query: str) -> List[SearchResult]:
    try:
        response = exa.search_and_contents(
            query,
            use_autoprompt=True,
            num_results=5,
            text=True,
            highlights=True
        )
        results = response.results  # Access the results directly
        print("Exa Search Response:", results)  # Debugging statement
        return [
            SearchResult(
                source="Exa Search",
                title=result.title,
                snippet=result.highlights[0] if result.highlights else "No snippet",
                url=result.url,
                date=result.publishedDate,
                media_content=[{"image_url": result.url}] if result.url else []
            ) for result in results
        ]
    except Exception as e:
        print(f"ERROR in Exa Search: {str(e)}")
        return []

def tavily_search(query: str) -> List[SearchResult]:
    try:
        response = tavily_client.search(
            query,
            search_depth="advanced",
            include_answer=True,
            include_raw_content=True,
            include_images=True,
            max_results=5
        )
        print("Tavily Search Response:", response)  # Debugging statement
        return [
            SearchResult(
                source="Tavily Search",
                title=result.get("title", "No title"),
                snippet=result.get("content", "No snippet"),
                url=result.get("url", "No link"),
                date=result.get("published_date"),
                media=[result.get("url", "No link")] if result.get("url") else [],
                media_content=[{"image_url": result.get("url", "No link")}] if result.get("url") else []
            ) for result in response.get("results", [])
        ]
    except Exception as e:
        print(f"ERROR in Tavily Search: {str(e)}")
        return []

def google_programmable_search(query: str) -> List[SearchResult]:
    try:
        results = google_search.results(query, num_results=5)
        return [
            SearchResult(
                source="Google Programmable Search",
                title=result.get("title", "No title"),
                snippet=result.get("snippet", "No snippet"),
                url=result.get("link", "No link"),
                date=result.get("date"),
            ) for result in results
        ]
    except Exception as e:
        print(f"ERROR in Google Programmable Search: {str(e)}")
        return []

def google_serper_image_search(query: str) -> List[SearchResult]:
    search_images = GoogleSerperAPIWrapper(type="images")
    results_images = search_images.results(query)
    return [
        SearchResult(
            source="Google Serper Image Search",
            title=result.get("title", "No title"),
            snippet=result.get("snippet", "No snippet"),
            url=result.get("imageUrl", "No link"),
            media=[result.get("imageUrl", "No link")],
            media_content=[{"image_url": result.get("imageUrl", "No link")}]
        ) for result in results_images.get("images", [])
    ]

def google_programmable_image_search(query: str) -> List[SearchResult]:
    try:
        results = google_search.results(query + " image", num_results=5)
        return [
            SearchResult(
                source="Google Programmable Image Search",
                title=result.get("title", "No title"),
                snippet=result.get("snippet", "No snippet"),
                url=result.get("link", "No link"),
                media=[result.get("link", "No link")],
                media_content=[{"image_url": result.get("link", "No link")}]
            ) for result in results
        ]
    except Exception as e:
        print(f"ERROR in Google Programmable Image Search: {str(e)}")
        return []

def aggregate_search_results(*args: List[SearchResult]) -> List[SearchResult]:
    all_results = []
    media_content = []

    for results in args:
        all_results.extend(results)
        media_content.extend([media for result in results for media in result.media_content])

    seen_urls = set()
    unique_results = []

    for result in all_results:
        if result.url not in seen_urls:
            seen_urls.add(result.url)
            unique_results.append(result)

    return unique_results, media_content

In [38]:
async def execute_searches(query: str) -> Dict[str, Any]:
    search_functions = [
        google_serper_search,
        google_programmable_search,
        exa_search,
        tavily_search,
        google_serper_image_search,
        google_programmable_image_search
    ]
    search_tasks = [asyncio.to_thread(func, query) for func in search_functions]
    search_results = await asyncio.gather(*search_tasks, return_exceptions=True)

    successful_results = []
    for results in search_results:
        if isinstance(results, Exception):
            print(f"ERROR in search: {str(results)}")
        else:
            successful_results.append(results)

    combined_results, media_content = aggregate_search_results(*successful_results)
    urls = [result.url for result in combined_results]

    return {
        "results": combined_results,
        "urls": urls,
        "media_content": media_content
    }

In [39]:
if __name__ == "__main__":
    query = "Latest Cyber Incidents by Lockbit Ransomware Group?"
    results = asyncio.run(execute_searches(query))

    print("Search Results:")
    for result in results["results"]:
        print(f"Title: {result.title}, URL: {result.url}")
        if result.media_content:
            print(f"Media Content: {result.media_content}")

    print("URLs List:")
    print(results["urls"])

    print("Media Content List:")
    print(results["media_content"])

ERROR in Tavily Search: The provided API key is invalid.
Exa Search Response: [Result(url='https://techcrunch.com/2024/12/24/clop-ransomware-gang-takes-credit-for-latest-mass-hack-that-breached-dozens-of-companies/', id='https://techcrunch.com/2024/12/24/clop-ransomware-gang-takes-credit-for-latest-mass-hack-that-breached-dozens-of-companies/', title='Clop ransomware gang takes credit for latest mass hack that breached dozens of companies', score=0.15353845059871674, published_date='2024-12-24T00:00:00.000Z', author='Zack Whittaker', image='https://techcrunch.com/wp-content/uploads/2024/03/cash-ransomware-ransom-payments-hackers-getty.jpg?resize=1200,675', favicon='https://techcrunch.com/wp-content/uploads/2015/02/cropped-cropped-favicon-gradient.png?w=32', subpages=None, extras=None, text='Clop, a prolific ransomware gang, has taken credit for stealing data from at least 66 companies by exploiting a bug in widely used corporate file transfer tools made by Cleo Software. The cybercrimi