In [1]:
from urllib.request import urlopen
from bs4 import BeautifulSoup
from gdeltdoc import GdeltDoc, Filters
import concurrent.futures
from typing import Dict, List
import logging
import urllib.request
import ssl
from bs4 import BeautifulSoup
import time
import random

In [2]:
def _rearrange_and_dedup(doc_lists: List[List[Dict[str, str]]]) -> List[Dict[str, str]]:
    doc_list = []
    snippet_set = set()
    # print([len(i) for i in doc_lists])
    for i in range(50):
        for ds in doc_lists:
            if i < len(ds):
                if 'snippet' in ds[i]:
                    signature = ds[i]['snippet'].replace(' ', '')[:200]
                else:
                    signature = ds[i]['content'].replace(' ', '')[:200]
                if signature not in snippet_set:
                    doc_list.append(ds[i])
                    snippet_set.add(signature)
    return doc_list

def search(query_list: List[str], n_max_doc: int = 20, search_engine: str = 'gdelt', freshness: str = '') -> List[Dict[str, str]]:
    doc_lists = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(search_single, query, search_engine, freshness) for query in query_list]
        for future in concurrent.futures.as_completed(futures):
            try:
                doc_lists.append(future.result())
            except:
                pass

    doc_list = _rearrange_and_dedup([d for d in doc_lists if d])
    
    return doc_list[:n_max_doc]

def get_gdelt_data(query):
    f = Filters(
        keyword = query,
        start_date = "2024-09-27",
        end_date = "2025-03-27"
    )

    gd = GdeltDoc()

    # Search for articles matching the filters
    articles = gd.article_search(f)
    #urls_to_query = articles['url'].to_list()
    return articles


def search_single(query: str, search_engine: str, freshness: str = '') -> List[Dict[str, str]]:
    try:
        if search_engine == 'gdelt':
            #search_results = gdelt_request(query, freshness=freshness)
            #return gdelt_format_results(search_results)
            search_results = get_gdelt_data(query)
            return gdelt_format_results(search_results)
        
        else:
            raise ValueError(f'Unsupported Search Engine: {search_engine}')
    except Exception as e:
        logging.error(f'Search failed: {str(e)}')
        raise ValueError(f'Search failed: {str(e)}')

def get_snippet_from_url(url):
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Cache-Control': 'max-age=0'
        }
        req = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(req, timeout=5) as response:
            html = response.read()
            
        soup = BeautifulSoup(html, 'html.parser')
        
        for script in soup(["script", "style"]):
            script.extract()
            
        text = soup.get_text()
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = ' '.join(chunk for chunk in chunks if chunk)
        return text
    except Exception as e:
        print(f"Error fetching snippet from {url}: {e}")
        return ""
    
    return content

def gdelt_format_results(df):
    search_results = df.to_dict('records')
    formatted_results = [
        {
            'id': str(rank + 1),
            'title': str(res.get('title', '')),
            #'snippet': str(res.get('snippet', '')),
            'snippet': get_snippet_from_url(res.get('url', '')) ,
            'url': str(res.get('url', '')),
            'timestamp': str(res.get('seendate', ''))[:11]
        }
        for rank, res in enumerate(search_results)
    ]
    return formatted_results

In [3]:
search(["figure ai"])