<a href="https://colab.research.google.com/github/CSE291A-GEO/anti-geo/blob/main/Anti_GEO_Dataset_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Only needed for one-time setup
# -------------------------------
!pip3 install serpapi
!pip3 install google-search-results
!pip3 install google-search

Collecting serpapi
  Downloading serpapi-0.1.5-py2.py3-none-any.whl.metadata (10 kB)
Downloading serpapi-0.1.5-py2.py3-none-any.whl (10 kB)
Installing collected packages: serpapi
Successfully installed serpapi-0.1.5
Collecting google-search-results
  Downloading google_search_results-2.4.2.tar.gz (18 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: google-search-results
  Building wheel for google-search-results (setup.py) ... [?25l[?25hdone
  Created wheel for google-search-results: filename=google_search_results-2.4.2-py3-none-any.whl size=32010 sha256=d77b0f0da271d533653470e721ceee17cfe1555143659b3e49592d2eb16ce711
  Stored in directory: /root/.cache/pip/wheels/0c/47/f5/89b7e770ab2996baf8c910e7353d6391e373075a0ac213519e
Successfully built google-search-results
Installing collected packages: google-search-results
Successfully installed google-search-results-2.4.2
Collecting google-search
  Downloading google_search-1.1.1-py2.py3-none-

In [None]:
import json
import csv
import zipfile
import io
from serpapi.google_search import GoogleSearch
from google.colab import files
import os
import time

In [None]:
# SERP_API_KEY = "7c8af0bb21889cdf581ebdeb49e5dd5244ded2b873456ad862d0bbd0f74ae075"
# SERP_API_KEY = "8838052022ef4f479db7aee57bc597f7d0da7af4aa15fa061f84a17ff7cb672c"
# SERP_API_KEY = "25bb356aaaefbaf83651a1e45dc8369dc32390e5e79c3f75ad56dc5effa1f556"
SERP_API_KEY = "dc7249b6d400425f10ca763575494bb64e36c4d71bac350733fa624f75135297"
QUERIES_CSV_FILE = "combined_queries_1000.csv"
QUERIES_JSON_FILE = "geo_bench_dataset.json"
QUERY_SOURCES = ["VACOS", "DebateQA", "HotpotQA", "Pinocchios", "QuoraQuestions"]
DATASET_ZIP_FILE = "anti_geo_dataset.tsv.zip"
DATASET_TSV_FILE = "anti_geo_dataset.tsv"
SEARCHED_QUERIES_FILE = "searched_queries.txt"

In [None]:
def load_queries_from_json(json_path=QUERIES_JSON_FILE, tags=[]):
    """
    Load queries from a JSON file, optionally filtering by tags.

    Args:
        json_path (str): Path to the JSON file containing queries and metadata.
        tags (list): List of tags to filter queries. If empty, all queries are returned.

    Returns:
        list: A list of query strings that match any of the specified tags.
              If tags list is empty, returns all queries.
    """
    with open(json_path, 'r', encoding='utf-8') as f:
        dataset = json.load(f)

    tagSet = set(tags)
    queries = []

    for entry in dataset:
        t = set(entry.get('tags', []))
        if len(tags) == 0 or len(tagSet.intersection(t)) > 0:
          queries.append(entry['query'])

    return queries

def load_queries_from_csv(csv_path=QUERIES_CSV_FILE, include_sources=[], exclude_sources=[]):
    """
    Reads queries from CSV with optional source filtering.

    Parameters:
    - csv_path: path to the CSV file
    - include_sources: list of sources to include (if empty or None, includes all)
    - exclude_sources: list of sources to exclude (if empty or None, excludes none)

    Returns:
    - List of queries after filtering
    """
    queries = []

    with open(csv_path, mode='r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            query = row.get('query').strip()
            source = row.get('source').strip()

            if include_sources and source not in include_sources:
                continue

            if exclude_sources and source in exclude_sources:
                continue

            queries.append(query)
    return queries

def search_queries(queries, api_key):
    """
    Perform Google search queries using serpAPI, collect and store search
    results.

    Args:
        queries (list): List of query strings to search.
        api_key (str): API key for authenticating the search requests.

    Returns:
        list: A list of dictionaries, each containing:
            - "query": the original query string,
            - "organic_results": list of search results from Google,
            - "ai_overview": AI overview related information (if available),
            - "ai_mode": AI mode related search results.
    Side Effects:
        - Appends processed queries to a local file SEARCHED_QUERIES_FILE.
        - Downloads the updated searches file for access.
    """
    print(f"Scraping results for {len(queries)} queries.")
    if len(queries) == 0:
      print(f"No queries to search.")
      return []

    dataset = []
    for i, query in enumerate(queries):
        print(f"Processing query {i + 1}: {query}")
        success = False
        is_api_key_expired = False

        while not success and not is_api_key_expired:
          try:
            print("Querying Google search")

            # API Spec: https://serpapi.com/search-api
            params = {
                "engine": "google",
                "q": query,
                "api_key": api_key
            }
            search = GoogleSearch(params)
            results = search.get_dict()
            organic_results = results.get('organic_results', [])

            if not organic_results or len(organic_results) == 0:
                print(f"No organic results fetched from Google Search. Exiting.")
                is_api_key_expired = True
                break

            print("Querying Google AI Overview")
            ai_overview = results.get('ai_overview')
            ai_overview_results = {}
            if ai_overview and 'page_token' in ai_overview:
                overview_params = {
                    "engine": "google_ai_overview",
                    "api_key": api_key,
                    "page_token": ai_overview['page_token']
                }
                overview_search = GoogleSearch(overview_params)
                overview_results = overview_search.get_dict()
                ai_overview_results = overview_results.get('ai_overview', {})

            print("Querying Google AI Mode")
            # API Spec: https://serpapi.com/playground?engine=google_ai_mode&q=Compare+wool%2C+down%2C+and+synthetic+jackets+in+terms+of+insulation%2C+water+resistance%2C+and+durability&gl=us&hl=en
            ai_mode_params = {
                "engine": "google_ai_mode",
                "q": query,
                "api_key": api_key
            }

            ai_mode_search = GoogleSearch(ai_mode_params)
            ai_mode_results = ai_mode_search.get_dict()

            dataset.append({
                "query": query,
                "organic_results": organic_results,
                "ai_overview": ai_overview_results,
                "ai_mode": ai_mode_results
            })

            with open(SEARCHED_QUERIES_FILE, "a", encoding="utf-8") as q:
                      q.write(query + "\n")

            success = True
          except Exception as e:
              print(f"Error processing query '{query}': {e}")
              print(f"Retrying query after {15} seconds...")
              time.sleep(15)

        if is_api_key_expired:
          break

    print(f"File: {SEARCHED_QUERIES_FILE} updated with newly searched queries")
    return dataset

def create_zip_with_tsv(new_rows, zip_path=DATASET_ZIP_FILE, tsv_filename=DATASET_TSV_FILE):
    """
    Create or update a zipped TSV file by merging existing content with new rows.

    Args:
        new_rows (list): List of lists, each inner list representing a TSV row (strings).
        zip_path (str): Path to the zipped file to create or update.
        tsv_filename (str): Name of the TSV file inside the zip archive.

    Returns:
        str: The path to the updated zip file containing the TSV.
    Side Effects:
        - Reads existing TSV data from the zip if present.
        - Writes updated TSV back to zip, overwriting existing content.
    """
    # Step 1: Extract or create temporary TSV file
    existing_rows = []
    if os.path.exists(zip_path):
        print(f"Extracting existing search results from {DATASET_ZIP_FILE}.")
        with zipfile.ZipFile(zip_path, 'r') as z:
            with z.open(tsv_filename) as f:
                existing_output = io.StringIO(f.read().decode('utf-8'))
                csv_reader = csv.reader(existing_output, delimiter='\t')
                for i, row in enumerate(csv_reader):
                    existing_rows.append(row)
    else:
        os.makedirs(os.path.dirname(zip_path), exist_ok=True)
        open(tsv_filename, 'wb').close()
        existing_rows.append(["query", "organic_results", "ai_overview", "ai_mode"])

    # Step 2: Append old and new content
    all_rows = existing_rows
    all_rows.extend(new_rows)
    output = io.StringIO()
    writer = csv.writer(output, delimiter='\t')
    writer.writerows(all_rows)
    tsv_bytes = output.getvalue().encode('utf-8')

    # Step 3: Write updated TSV back to ZIP (overwrite existing)
    print(f"Writing all search results to {DATASET_TSV_FILE} and zipping it as {DATASET_ZIP_FILE}.")
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as z:
        z.writestr(tsv_filename, tsv_bytes)

    return zip_path

def store_results(dataset):
    """
    Process a dataset of search results and store them in a zipped TSV file.

    Args:
        dataset (list): List of dictionaries representing search results. Each entry should have keys:
            "query", "organic_results", "ai_overview", "ai_mode".

    Side Effects:
        - Serializes the dataset to TSV format inside a zipped file.
        - Downloads the zipped search results file.
        - Prints logging info about progress and file locations.
    """
    print(f"Extracting search results.")
    if len(dataset) == 0:
      print(f"No search results were returned.")
      return

    output = io.StringIO()

    results = []
    for entry in dataset:
        # Serialize each dictionary as a JSON string
        organic_results_json = json.dumps(entry['organic_results'], ensure_ascii=False)
        ai_overview_json = json.dumps(entry.get('ai_overview', None), ensure_ascii=False)
        ai_mode_json = json.dumps(entry.get('ai_mode', None), ensure_ascii=False)
        results.append([entry['query'], organic_results_json, ai_overview_json, ai_mode_json])

    zip_file = create_zip_with_tsv(results)
    print(f"Results stored in {DATASET_ZIP_FILE}")

def filter_queries(queries, filename=SEARCHED_QUERIES_FILE):
    """
    Filter out queries that have already been scraped by reading from a file.

    Args:
        queries (list): List of query strings to filter.
        filename (str): Path to the file containing previously scraped queries.

    Returns:
        list: A filtered list of queries excluding those found in the scraped queries file.
    Side Effects:
        - Prints information about how many queries were filtered out.
    """
    print(f"Filtering out already scraped queries.")
    file_queries = set()
    if os.path.exists(filename):
      with open(filename, 'r', encoding='utf-8') as f:
          file_queries = set(line.strip() for line in f if line.strip())

    filtered_queries = [q for q in queries if q not in file_queries]

    if len(queries) == len(filtered_queries):
      print(f"All queries are new. None were filtered out.")
    else:
      print(f"Found {len(queries) - len(filtered_queries)} queries already scraped")

    return filtered_queries

def load_queries_and_scrape_results(api_key=SERP_API_KEY, batch_size=5):
    """
    Generate a dataset of search results based on filtered queries from CSV and perform scraping in batches.

    Args:
        api_key (str): API key for the search provider.
        batch_size (int): Number of queries to process in each batch (default is 5).

    Side Effects:
        - Loads queries from configured CSV with include/exclude source filtering.
        - Filters out previously scraped queries.
        - Performs search and stores results in batches.
        - Prints queries and status messages.
    """
    queries = load_queries_from_csv(include_sources=[], exclude_sources=[])
    filtered_queries = filter_queries(queries)

    print(f"Total filtered queries to process: {len(filtered_queries)}")

    for i in range(0, len(filtered_queries), batch_size):
        batch = filtered_queries[i : i + batch_size]
        print(f"Processing batch {i // batch_size + 1} with {len(batch)} queries")
        print(f"Queries in batch: {batch}")
        results = search_queries(batch, api_key)
        store_results(results)

    files.download(SEARCHED_QUERIES_FILE)
    print(f"File: {SEARCHED_QUERIES_FILE} downloaded")
    files.download(DATASET_ZIP_FILE)
    print(f"{DATASET_ZIP_FILE} downloaded")

In [None]:
# Only needs to be run once to generate the dataset
zip_file = load_queries_and_scrape_results()

Filtering out already scraped queries.
Found 280 queries already scraped
Total filtered queries to process: 718
Processing batch 1 with 5 queries
Queries in batch: ["I really want a warmer jacket than the one I've just lost, maybe some sort of fleece, but I don't know yet. \nI like lighter colours so I'll probably get a fawn or cream one again. The important thing is obviously price though. I might try and get something secondhand. Please provide some product recommendations for me", 'I am planning to buy the most powerful laptop in the world. It will have i9 central processor unit ( CPU )\nand two graphics cards. it will be state of the art so it will come straight from DELL because i trust them the most. My Ram would be 16gb which is very fast. Please provide some product recommendations for me', "It looks like the coat that the character April Ludgate from Parks and Rec wears in the intro. It is grey button down and there are splash of color all around it. It's one of those coats yo

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

File: searched_queries.txt downloaded


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

anti_geo_dataset.tsv.zip downloaded


In [None]:
import zipfile
import io
import csv
import json

def write_to_zipped_tsv(dataset, zip_filepath=DATASET_ZIP_FILE, tsv_filename=DATASET_TSV_FILE):
    """
    Write a dataset (list of dicts) to a TSV file inside a ZIP archive.
    The fields organic_results, ai_overview, and ai_mode are serialized as JSON strings.

    Args:
        dataset (list of dict): List of entries with keys 'query', 'organic_results', 'ai_overview', 'ai_mode'.
        zip_filepath (str): Path to the output ZIP file.
        tsv_filename (str): Name of the TSV file inside the ZIP archive.
    """
    output = io.StringIO()
    writer = csv.writer(output, delimiter='\t')

    # Write header
    writer.writerow(["query", "organic_results", "ai_overview", "ai_mode"])

    for entry in dataset:
        organic_results_json = json.dumps(entry.get('organic_results', []), ensure_ascii=False)
        ai_overview_json = json.dumps(entry.get('ai_overview', None), ensure_ascii=False)
        ai_mode_json = json.dumps(entry.get('ai_mode', None), ensure_ascii=False)

        writer.writerow([entry.get('query', ''), organic_results_json, ai_overview_json, ai_mode_json])

    tsv_bytes = output.getvalue().encode('utf-8')

    with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(tsv_filename, tsv_bytes)

    print(f"Dataset written to zipped TSV file: {zip_filepath}")

def write_strings_to_txt(strings, filepath="searched_queries.txt"):
    """
    Write a list of strings to a text file, one string per line.

    Args:
        strings (list of str): List of strings to write.
        filepath (str): Path to the output text file.
    """
    with open(filepath, 'w', encoding='utf-8') as f:
        for s in strings:
            f.write(s + '\n')

    print(f"Searched queries written to text file: {filepath}")

def read_zipped_tsv_and_remove_empty_results(zip_filepath=DATASET_ZIP_FILE, tsv_filename=DATASET_TSV_FILE):
    """
    Reads a TSV file inside a ZIP archive and removes entries with empty 'organic_results'.

    Prints the count of total entries before filtering and entries after filtering.

    Args:
        zip_filepath (str): Path to the ZIP file containing the TSV.
        tsv_filename (str): Filename of the TSV inside the ZIP archive.

    Returns:
        list of dict: Filtered dataset entries with deserialized JSON content.
    """
    dataset = []
    total_entries = 0
    searched_queries = []

    with zipfile.ZipFile(zip_filepath, 'r') as zf:
        with zf.open(tsv_filename) as tsv_file:
            text_file = io.TextIOWrapper(tsv_file, encoding='utf-8')
            tsv_reader = csv.DictReader(text_file, delimiter='\t')

            for row in tsv_reader:
                total_entries += 1
                query = row.get('query', '')

                if query == '':
                  continue

                entry = {'query': query}

                try:
                    organic_results = json.loads(row.get('organic_results', '[]'))
                except (json.JSONDecodeError, TypeError):
                    organic_results = []

                try:
                    ai_overview = json.loads(row.get('ai_overview', 'null'))
                except (json.JSONDecodeError, TypeError):
                    ai_overview = None

                try:
                    ai_mode = json.loads(row.get('ai_mode', 'null'))
                except (json.JSONDecodeError, TypeError):
                    ai_mode = None

                if organic_results and isinstance(organic_results, list) and len(organic_results) > 0:
                    entry['organic_results'] = organic_results
                    entry['ai_overview'] = ai_overview
                    entry['ai_mode'] = ai_mode
                    dataset.append(entry)
                    searched_queries.append(query)

    print(f"Total entries in TSV: {total_entries}")
    print(f"Entries after filtering out empty organic_results: {len(dataset)}")

    return dataset, searched_queries


In [None]:
dataset, searched_queries = read_zipped_tsv_and_remove_empty_results()
write_to_zipped_tsv(dataset)
write_strings_to_txt(searched_queries)

Total entries in TSV: 464
Entries after filtering out empty organic_results: 464
Dataset written to zipped TSV file: anti_geo_dataset.tsv.zip
Searched queries written to text file: searched_queries.txt


In [None]:
def load_and_deserialize_zipped_tsv(zip_filepath=DATASET_ZIP_FILE, tsv_filename=DATASET_TSV_FILE):
  """
  Load and deserialize data from a TSV file inside a ZIP archive.

  This function reads a TSV file stored inside a ZIP archive,
  deserializes JSON-encoded columns into corresponding Python
  data structures, and returns a list of dictionaries representing
  the dataset.

  Args:
      zip_filepath (str): Path to the ZIP file containing the TSV.
      tsv_filename (str): Name of the TSV file inside the ZIP archive.

  Returns:
      list of dict: A list where each element is a dictionary with keys:
          - 'query' (str): The query string.
          - 'organic_results' (list): List of organic search results.
          - 'ai_overview' (dict or None): AI overview data, or None if unavailable or invalid.
          - 'ai_mode' (dict or None): AI mode data, or None if unavailable or invalid.

  Notes:
      - JSON decoding errors or missing keys in the TSV columns for
        'organic_results', 'ai_overview', or 'ai_mode' will result in
        empty lists or None as default values to ensure robustness.
  """
  dataset = []
  total_entries = 0
  with zipfile.ZipFile(zip_filepath, 'r') as zf:
      with zf.open(tsv_filename) as tsv_file:
          # Read the TSV data as text
          text_file = io.TextIOWrapper(tsv_file, encoding='utf-8')
          tsv_reader = csv.DictReader(text_file, delimiter='\t')

          for row in tsv_reader:
              entry = {}
              entry['query'] = row['query']
              total_entries += 1

              # Deserialize JSON string back to Python list for organic_results
              try:
                  entry['organic_results'] = json.loads(row['organic_results'])
              except (json.JSONDecodeError, KeyError):
                  entry['organic_results'] = []

              # Deserialize JSON string back to Python dict for ai_overview
              try:
                  entry['ai_overview'] = json.loads(row['ai_overview'])
              except (json.JSONDecodeError, KeyError):
                  entry['ai_overview'] = None

              # Deserialize JSON string back to Python dict for ai_mode
              try:
                  entry['ai_mode'] = json.loads(row['ai_mode'])
              except (json.JSONDecodeError, KeyError):
                  entry['ai_mode'] = None

              dataset.append(entry)


  print(f"Total entries in TSV: {total_entries}")
  return dataset

In [None]:
scraped_results = load_and_deserialize_zipped_tsv()
print(scraped_results[0])

Total entries in TSV: 464
{'query': 'how to propagate a monstera albo cutting in water', 'organic_results': [{'position': 1, 'title': "A Beginner's Guide to Propagating a Monstera Albo in Water", 'link': 'https://tandranicole.com/propagating-monstera-albo/', 'redirect_link': 'https://www.google.com/url?sa=t&source=web&rct=j&opi=89978449&url=https://tandranicole.com/propagating-monstera-albo/&ved=2ahUKEwjGv6OAw4GRAxVc3skDHRsgO5kQFnoECCEQAQ', 'displayed_link': 'https://tandranicole.com › propagating-monstera-albo', 'favicon': 'https://serpapi.com/searches/691f7404d355ccadc71921e6/images/36e82349453ce288a8b271449da801576cf414feeed13f8c3a02b987f138e2d5.png', 'date': 'Mar 8, 2024', 'snippet': 'Place your cutting into your clear vase then fill with water about 2 to 3 inches above the node. Your position of your cutting should be upright ...', 'snippet_highlighted_words': ['cutting', 'water', 'cutting'], 'sitelinks': {'inline': [{'title': 'Monstera Albo A.K.A Varigated...', 'link': 'https://t

In [None]:
import re
import math

def clean_url(url):
    """
    Remove URL fragment starting with '#:~:text=' from the given URL if present.

    Args:
        url (str or None): The URL string to clean. Can be None.

    Returns:
        str or None: The cleaned URL without the '#:~:text=' fragment.
                     If input is None, returns None unchanged.
    """
    if url is None:
        return url

    # Look for the pattern '#:~:text=' and truncate the url before it
    return re.sub(r'#:~:text=.*$', '', url)

def clean_and_filter_urls(urls):
  """
    Clean a list of URLs by removing specific URL fragments and filter out unwanted domains.

    The function:
    - Removes the '#:~:text=' fragment from each URL through `clean_url`.
    - Filters out URLs containing "youtube.com" or "facebook.com/groups".
    - Discards empty URLs.

    Args:
        urls (list of str): List of URL strings to be cleaned and filtered.

    Returns:
        list of str: Cleaned and filtered URLs.
  """
  filtered_urls = []

  for url in urls:
    if url == '':
      continue

    lower_url = url.lower()

    if 'youtube.com' in lower_url or 'facebook.com/groups' in lower_url:
        continue
    else:
      filtered_urls.append(clean_url(url))

  return filtered_urls

def get_SE_GE_ranks_for_query_refs(url_se_rank_map, ai_data, ai_title):
    """
    Analyze AI reference URLs to determine their ranks in Search Engine (SE) results.

    For each reference URL in the provided AI data, the function:
      - Cleans and filters the URL.
      - Checks the URL's position (rank) in Search Engine results via `url_se_rank_map`.
      - Identifies "GEO-optimized" sources which are referenced by AI but either
        absent from SE results (se_rank = -1).

    Args:
        url_se_rank_map (dict): Mapping from cleaned URLs to their rank position in SE results.
        ai_data (dict): AI data containing a 'references' list with URLs to analyze.
        ai_title (str): Title label (e.g., 'ai_mode' or 'ai_overview') used for logging.

    Returns:
        tuple:
            num_geo_optimized (int): Count of GEO-optimized sources identified.
            total_sources (int): Total number of references examined.
            mapping (list of dict): List of GEO-optimized references with fields:
                - 'source_url' (str): Cleaned URL of the reference.
                - 'ge_rank' (int): Index of the reference in the AI data list.
                - 'se_rank' (int): Rank position in SE results (-1 if absent).
    Side Effects:
        - Prints counts of total references, GEO-optimized references, and their percentage.
    """
    mapping = []
    num_geo_optimized = 0
    total_sources = 0

    if ai_data and 'references' in ai_data:
        references = ai_data['references']
        total_sources = len(references)
        print(f"Number of {ai_title} references = {total_sources}")

        for idx, ref in enumerate(references):
            # Clean and filter the reference URL
            ref_url_list = clean_and_filter_urls([ref.get('link', '')])
            if not ref_url_list:
                continue
            url = ref_url_list[0]
            if url == '':
                continue

            se_rank = url_se_rank_map.get(url, -1)

            # Adds GEO-optimized sources which are cited in GE but not
            # in the top 10 SE results
            if se_rank == -1:
              mapping.append({
                  'source_url': url,
                  'ge_rank': idx,
                  'se_rank': se_rank
              })

        num_geo_optimized = len(mapping)
        geo_optimized_percentage = num_geo_optimized * 100 / total_sources
        print(f"Number of GEO-optimized {ai_title} references = {num_geo_optimized}")
        print(f"{math.floor(geo_optimized_percentage)}% of {ai_title} sources are GEO-optimized")

    return num_geo_optimized, total_sources, mapping

def find_GE_optimized_sources(dataset, max_se_rank=10):
    """
    Process a dataset of search queries comparing AI referenced sources to Search Engine (SE) results
    to identify GEO-optimized sources cited by AI but missing or ranked low in SE results.

    For each query entry, this function:
        - Cleans and filters organic SE results URLs.
        - Constructs a map of URL to SE rank.
        - Invokes `get_SE_GE_ranks_for_query_refs` for both AI mode and AI overview references.
        - Aggregates counts and percentages of GEO-optimized sources.
        - Sorts GEO-optimized references by their AI rank.
        - Collects results per query into a summary list.

    Args:
        dataset (list of dict): Dataset containing query results with keys like 'query', 'organic_results',
                                'ai_mode', and 'ai_overview' including AI reference URLs.
        max_se_rank (int): Rank threshold beyond which sources are considered GEO-optimized (default 10).

    Returns:
        list of dict: Each dict summarizes a query with keys:
            - 'query': The query string.
            - 'ai_mode': List of GEO-optimized references in AI mode, sorted by AI rank.
            - 'ai_overview': List of GEO-optimized references in AI overview, sorted by AI rank.

    Side Effects:
        - Prints detailed summary statistics about GEO-optimized source proportions for AI mode and overview.
    """
    results = []
    ai_mode_total_sources = 0
    ai_mode_geo_optim = 0
    ai_mode_total_queries = 0
    ai_overview_total_sources = 0
    ai_overview_geo_optim = 0
    ai_overview_total_queries = 0

    for i, entry in enumerate(dataset):
        query = entry.get('query', '')

        if query == '':
          continue

        print(f"\nQuery {i + 1}: {query}")
        organic_results = entry.get('organic_results', [])
        print(f"Number of unfiltered web search results = {len(organic_results)}")
        organic_links = clean_and_filter_urls([res.get('link', '') for res in organic_results])
        print(f"Number of filtered web search results = {len(organic_links)}")

        url_to_pos = {url: idx for idx, url in enumerate(organic_links)}
        ai_mode_geo_optim_query, ai_mode_total, ai_mode_geo_optimized = get_SE_GE_ranks_for_query_refs(url_to_pos, entry.get('ai_mode', {}), 'ai_mode')
        ai_overview_geo_optim_query, ai_overview_total, ai_overview_geo_optimized = get_SE_GE_ranks_for_query_refs(url_to_pos, entry.get('ai_overview', {}), 'ai_overview')

        ai_mode_total_sources += ai_mode_total
        ai_mode_geo_optim += ai_mode_geo_optim_query
        ai_overview_total_sources += ai_overview_total
        ai_overview_geo_optim += ai_overview_geo_optim_query

        if ai_mode_total > 0:
          ai_mode_total_queries += 1

        if ai_overview_total > 0:
          ai_overview_total_queries += 1

        # Sort sources in order of highest GE rank
        ai_mode_geo_optimized.sort(key=lambda x: x['ge_rank'])
        ai_overview_geo_optimized.sort(key=lambda x: x['ge_rank'])

        results.append({
            'query': query,
            'ai_mode': ai_mode_geo_optimized,
            'ai_overview': ai_overview_geo_optimized
        })

    if ai_mode_total_sources > 0:
      ai_mode_geo_optim_perc = ai_mode_geo_optim * 100 / ai_mode_total_sources
      print(f"\nFor AI mode, for {ai_mode_total_queries} queries, {ai_mode_geo_optim}/{ai_mode_total_sources} = {math.floor(ai_mode_geo_optim_perc)}% sources are GEO-optimized")
    if ai_overview_total_sources > 0:
      ai_overview_geo_optim_perc = ai_overview_geo_optim * 100 / ai_overview_total_sources
      print(f"\nFor AI overview, for {ai_overview_total_queries} queries, {ai_overview_geo_optim}/{ai_overview_total_sources} = {math.floor(ai_overview_geo_optim_perc)}% sources are GEO-optimized")

    return results

In [None]:
source_to_ranks = find_GE_optimized_sources(scraped_results)

print(f"\n\nPrinting a sample output for one query.")
print(source_to_ranks[0])


Query 1: how to propagate a monstera albo cutting in water
Number of unfiltered web search results = 7
Number of filtered web search results = 5
Number of ai_mode references = 14
Number of GEO-optimized ai_mode references = 3
21% of ai_mode sources are GEO-optimized
Number of ai_overview references = 11
Number of GEO-optimized ai_overview references = 5
45% of ai_overview sources are GEO-optimized

Query 2: best dog food for golden retrievers with skin allergies
Number of unfiltered web search results = 10
Number of filtered web search results = 9
Number of ai_mode references = 10
Number of GEO-optimized ai_mode references = 7
70% of ai_mode sources are GEO-optimized

Query 3: best supplement for joint pain
Number of unfiltered web search results = 9
Number of filtered web search results = 9
Number of ai_mode references = 14
Number of GEO-optimized ai_mode references = 11
78% of ai_mode sources are GEO-optimized

Query 4: Is email marketing good for small businesses?
Number of unfilte

In [None]:
import requests
import json
import time
import re
from bs4 import BeautifulSoup

def extract_text_from_html(html_content):
    """
    Extract and return the visible text content from HTML.

    Args:
        html_content (str): Raw HTML string.

    Returns:
        str: Cleaned plain text extracted from the HTML.
    """
    if not html_content:
      return ''

    soup = BeautifulSoup(html_content, "html.parser")

    # Remove script and style elements
    for script_or_style in soup(["script", "style"]):
        script_or_style.decompose()

    # Get text
    text = soup.get_text(separator=' ', strip=True)

    # Optionally, reduce multiple spaces/newlines to single space
    import re
    clean_text = re.sub(r'\s+', ' ', text)

    return clean_text

def scrape_html_content(url):
    """
    Fetch HTML content for a given URL. Returns None if the request fails.
    """
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        success = True
        return response.text
    except Exception as e:
        print(f"Failed to fetch {url}: {e}")
        return None

def parse_markdown_link(md_link):
    """
    Extract plain URL from markdown formatted link: [text](url).
    """
    match = re.search(r'\((http[^\)]+)\)', md_link)
    if match:
        return match.group(1)
    return md_link if md_link.startswith('http') else None

def read_scraped_queries(filepath):
    """
    Read scraped queries from a text file, one query per line.

    Returns:
        set: A set of queries already scraped.
    """
    scraped = set()
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line:
                    scraped.add(line)
    except FileNotFoundError:
        # File not found means no queries scraped yet
        pass
    return scraped

def scrape_websites_for_query_sources(query_list, scraped_queries_file="scraped_queries.txt",
                                      output_jsonl_file="scraped_data",
                                      delay_between_requests=1):
    """
    Scrape websites for queries not already scraped, saving results in JSON Lines (.jsonl) files in batches of 100.

    Args:
        query_list (list): List of query dicts with keys 'query', 'ai_mode', and 'ai_overview'.
        scraped_queries_file (str): Path to the text file tracking scraped queries.
        output_jsonl_file (str): Base filename for output JSONL batch files (no extension).
        delay_between_requests (int): Delay in seconds between HTTP requests.
    """
    scraped_queries = read_scraped_queries(scraped_queries_file)
    print(f"Loaded {len(scraped_queries)} scraped queries from {scraped_queries_file}")

    results = []
    curr_scraped_queries = set()
    file_path_index = 1

    with open(scraped_queries_file, 'a', encoding='utf-8') as log_f:
        for i, query_entry in enumerate(query_list):
            query = query_entry.get('query', '')
            if not query or query in scraped_queries:
                print(f"Skipping already scraped query {i + 1}: {query}")
                continue

            print(f"Processing Query {i + 1}: {query}")
            ai_mode_results = []
            ai_overview_results = []

            for ref in query_entry.get('ai_mode', []):
                url = parse_markdown_link(ref.get('source_url', ''))
                ge_rank = ref.get('ge_rank', -1)
                se_rank = ref.get('se_rank', -1)
                if url:
                    print(f"Scraping content for {url}")
                    html = scrape_html_content(url)
                    cleaned_content = extract_text_from_html(html)
                    if html:
                        ai_mode_results.append({
                            'source_url': url,
                            'ge_rank': ge_rank,
                            'se_rank': se_rank,
                            'html_content': html,
                            'clean_content': cleaned_content
                        })
                    time.sleep(delay_between_requests)

            for ref in query_entry.get('ai_overview', []):
                url = parse_markdown_link(ref.get('source_url', ''))
                ge_rank = ref.get('ge_rank', -1)
                se_rank = ref.get('se_rank', -1)
                if url:
                    html = scrape_html_content(url)
                    cleaned_content = extract_text_from_html(html)
                    if html:
                        ai_overview_results.append({
                            'source_url': url,
                            'ge_rank': ge_rank,
                            'se_rank': se_rank,
                            'html_content': html,
                            'clean_content': cleaned_content
                        })
                    time.sleep(delay_between_requests)

            query_result = {
                'query': query,
                'ai_mode': ai_mode_results,
                'ai_overview': ai_overview_results
            }

            results.append(query_result)
            curr_scraped_queries.add(query)

            # Write batch every 100 queries
            if len(results) >= 100:
                log_f.write('\n'.join(curr_scraped_queries) + '\n')
                log_f.flush()
                curr_scraped_queries.clear()

                batch_file = f"{output_jsonl_file}_{file_path_index}.jsonl"
                with open(batch_file, 'w', encoding='utf-8') as out_f:
                    for r in results:
                        out_f.write(json.dumps(r, ensure_ascii=True) + '\n')

                results = []
                file_path_index += 1

        # Final batch write for leftover results
        if results:
            log_f.write('\n'.join(curr_scraped_queries) + '\n')
            log_f.flush()
            batch_file = f"{output_jsonl_file}_{file_path_index}.jsonl"
            with open(batch_file, 'w', encoding='utf-8') as out_f:
                for r in results:
                    out_f.write(json.dumps(r, ensure_ascii=True) + '\n')

    files.download(output_json_file)
    files.download(scraped_queries_file)

    return results


In [None]:
scraped_data = scrape_websites_for_query_sources(source_to_ranks)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Scraping content for https://www.ebay.com/itm/204757890811
Scraping content for https://www.dell.com/en-us/shopping/best-laptops-for-work-and-school-gifts
Scraping content for https://www.prnewswire.com/news-releases/dell-technologies-helps-professionals-stay-productive-anywhere-with-worlds-most-intelligent-and-secure-business-pcs-301061743.html
Scraping content for https://ai-advantage-for-business.computerworld.com/building-ai-capability-to-deliver-results/how-to-buy-pcs-as-the-market-is-revolutionised-by-ai/
Scraping content for https://www.forbes.com/sites/moorinsights/2021/12/30/analyst-picks-best-laptops-of-2021/
Scraping content for http://www.andyrathbone.com/2011/12/05/is-my-hand-me-down-pc-any-good/
Scraping content for https://www.reddit.com/r/computers/comments/y9pmu3/could_someone_help_me_identify_theese_dells/
Failed to fetch https://www.reddit.com/r/computers/comments/y9pmu3/could_someone_help_me_identify_t

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# filter empty scraped results
def check_scraped_results(file_path = "scraped_data.jsonl"):
    import json

    file_path = 'your_file.jsonl'
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            data.append(json.loads(line))

    print(len(scraped_data))

In [None]:
print(len(scraped_data))
scraped_data[0]

{'query': 'how to propagate a monstera albo cutting in water',
 'ai_mode': [{'source_url': 'https://leafandpaw.com/2022/01/05/guide-raising-a-monstera-albo/',
   'ge_rank': 7,
   'se_rank': -1,
   'html_content': '\n<!doctype html>\n<html class="no-js" lang="en-US">\n\n\t<head>\n\t\t<meta charset="utf-8">\n\t\t<meta http-equiv="x-ua-compatible" content="ie=edge">\n\t\t<meta name="viewport" content="width=device-width, initial-scale=1">\n\n\t\t<link rel="preconnect" href="https://fonts.googleapis.com">\n\t\t<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>\n\n\t\t<meta name=\'robots\' content=\'index, follow, max-image-preview:large, max-snippet:-1, max-video-preview:-1\' />\n\t<style>img:is([sizes="auto" i], [sizes^="auto," i]) { contain-intrinsic-size: 3000px 1500px }</style>\n\t\n\t<!-- This site is optimized with the Yoast SEO plugin v26.4 - https://yoast.com/wordpress/plugins/seo/ -->\n\t<title>Guide: Raising a Monstera Albo - Leaf and Paw</title>\n\t<meta name="