<a href="https://colab.research.google.com/github/shixiao-coder/website/blob/upload-eval-colab/nl_evals_script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## See [Evaluation Documentation](https://docs.google.com/document/d/1ajUTCgdGFlInAqdWJB18STaG-TX_uoN1lQFgVPSd8pg/edit?resourcekey=0-czNd7QblghzoQarG5RRMqQ&tab=t.0) for more detailed instruction + background!

## (required) Run the Set Up Code

**Required: [Create a Colab secret](https://drlee.io/how-to-use-secrets-in-google-colab-for-api-key-protection-a-guide-for-openai-huggingface-and-c1ec9e1277e0#:~:text=Step%2Dby%2DStep%20Guide%20to%20Using%20Colab%20Secrets) called `DC_API_KEY` with your [Data Commons API Key](https://docs.datacommons.org/api/#get-key), by clicking on the key icon in the left navigation bar and "Add new secret".**

Includes imports, globals, utils, Pydantic model definitions, and df <-> gcs <-> sheets <-> model conversion utils.




Also includes the Scrape API code and Scoring code.

### General Inputs
This table describes the input required to define the runtime.

| Parameter | Type | Default Value | Description |
| :--- | :--- | :--- | :--- |
| `runtime_type` | `string` | `--` | Required. This defines the runtime/environment to use. |
| `host_website` | `string` | `--` | Optional. defines the HTTP(s) to use for given runtime_type. If left empty, it is generated from the current runtime_type. |

### Local Runtime
This part describes the setup required for local runtime.

1.   mkdir eval -> cd eval
2.   setup requirement.txt
3.   python3 -m venv evals_runtime
4.   source evals_runtime/bin/activate
5.   gcloud auth application-default login --scopes="https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform"
6.   pip3 install -q -r requirements.txt
7.   jupyter notebook -NotebookApp.allow_origin='https://colab.research.google.com' --port=8888 --NotebookApp.port_retries=0 --NotebookApp.allow_credentials=True
8.   local runtime link with host HTTP

requirement.txt packages:

    requests
    numpy
    python-dateutil
    pydantic
    scikit-learn
    pandas
    jupyter
    notebook
    google-auth
    fsspec
    gcsfs
    gspread
    google-api-python-client
    google-auth-httplib2
    google-auth-oauthlib
    oauth2client







Utils:

* `models_to_gcs_csv`(csv_path:str, models: List[BaseModel])

  * Upload list of Pydantic objects to GCS bucket as a CSV.

* `gcs_csv_to_sheet`(csv_path, sheet_id)
  * Copies csv from GCS to Google Sheets - Nl Goldens Viewer by default

* `gcs_csv_to_df`(csv_path, base_class: BaseModel = NlQueryResponse) -> pd.DataFrame:
  * Read csv from GCS into Pandas Dataframe, converts dates, places, and variables columns to their respective Pydantic classes

* `gcs_csv_to_pydantic_models`(csv_path:str, pydantic_model: BaseModel) -> List[BaseModel]:
  * Read csv from GCS into a list of pydantic_model objects

* `gcs_csv_to_variable_scrapes`(csv_path: str, pydantic_model: NlQueryResponse = NlGolden) -> List[VariableResponse]:
  * Read csv from GCS into a flattened list of VariableResponse objects

In [None]:
# @markdown #### Runtime Related
runtime_type = "local" # @param ["prod","dev","local"]
host_website = "" # @param {"type":"string"}

if not host_website:
  if runtime_type == "prod":
    host_website = "https://datacommons.org"
  elif runtime_type == "dev":
    host_website = "https://dev.datacommons.org"
  elif runtime_type == "local":
    host_website = "http://localhost:8080"


In [None]:
# @title ### Imports
import statistics
import re
from dateutil.relativedelta import relativedelta
from datetime import datetime
import numpy as np
from pydantic import model_validator
from typing import Optional, Any
import pandas as pd
from enum import Enum, auto
from pydantic import BaseModel, ValidationError, field_serializer, ConfigDict
import json
from typing import Dict, List, Optional
import pandas as pd
import numpy as np

In [None]:
# @title ### Utils + Globals

from datetime import datetime, date as dt_date

GCS_EVAL_DIR = 'gs://datcom-nl-evals/evals'
GCS_GOLDENS_DIR = 'gs://datcom-nl-evals/goldens'

def now():
 return datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

def today():
 return dt_date.today().strftime("%Y-%m-%d")

def scrapes_path(epoch):
  return f'{GCS_EVAL_DIR}/{epoch}/scrape.csv'

def goldens_path(epoch):
  return f'{GCS_GOLDENS_DIR}/{epoch}.csv'

def scores_path(scrape_epoch, score_epoch):
  return f'{GCS_EVAL_DIR}/{scrape_epoch}/score_{score_epoch}.csv'

def summary_path(scrape_epoch, score_epoch):
  return f'{GCS_EVAL_DIR}/{scrape_epoch}/summary_{score_epoch}.csv'

In [None]:
# @title Pydantic Models
!wget https://raw.githubusercontent.com/clincoln8/datcom-website/refs/heads/nl-pyd/tools/nl/detection_evals/eval_models.py

from eval_models import *

--2025-10-09 10:49:10--  https://raw.githubusercontent.com/clincoln8/datcom-website/refs/heads/nl-pyd/tools/nl/detection_evals/eval_models.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 2606:50c0:8003::154, 2606:50c0:8002::154, 2606:50c0:8000::154, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|2606:50c0:8003::154|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3565 (3.5K) [text/plain]
Saving to: ‘eval_models.py.5’


2025-10-09 10:49:10 (29.3 MB/s) - ‘eval_models.py.5’ saved [3565/3565]



In [None]:
# @title ### DF <-> GCS <-> Sheets <-> Pydantic Utils
import ast
if runtime_type != "local":
  from google.colab import auth
  auth.authenticate_user()
!pip install google-cloud-storage --quiet
from google.cloud import storage
import os
import numpy as np

GCS_GOLDENS_DIR = f"gs://datcom-nl-evals/goldens"

def models_to_gcs_csv(csv_path:str, models: List[BaseModel]):
  '''
  This function takes a list of Pydantic models and saves them to a CSV file.

  csv_path: The path where the CSV file will be saved.
  goldens: A list of NlQueryResponse objects to be converted to CSV.
  '''
  golden_df = pd.DataFrame([m.model_dump(mode='json') for m in models])
  golden_df.to_csv(csv_path, index=False)

def df_to_gcs_csv(csv_path, df, pydantic_model):
  validated_models = df_to_pydantic_models(df, pydantic_model)
  models_to_gcs_csv(csv_path, validated_models)

def df_to_pydantic_models(df, pydantic_model: BaseModel) -> List[BaseModel]:
  '''This function converts each row of a DF into a class object.

  csv_path: The path to the CSV file.
  pydantic_model: The Model Class the csv represents.
  Returns: A list of NlQueryResponse objects.
  '''
  return [pydantic_model.model_validate(record) for record in df.to_dict(orient='records')]


def df_to_sheet_in_folder(df, sheet_name, scrape_epoch, folder_id='1-yFZ-HrhP1e-JhJ3JRbCjTmeXp78wOwn'):
  '''This function reads a DataFrame and uploads its content to a Google Sheet,
  handling folder and spreadsheet creation, and sheet naming based on scrape_epoch.

  df: The DataFrame to upload.
  sheet_name: The desired name for the worksheet.
  scrape_epoch: A string representing the scrape epoch, used for spreadsheet naming.
  folder_id: The ID of the Google Drive folder to check/create the spreadsheet in.
  '''
  if runtime_type != "local":
    from google.colab import auth
    auth.authenticate_user()

  import gspread
  from gspread.exceptions import SpreadsheetNotFound, WorksheetNotFound

  from google.auth import default

  creds, _ = default()
  gc = gspread.client.Client(auth=creds)

  from googleapiclient.discovery import build
  drive_service = build('drive', 'v3', credentials=creds)

  spreadsheet_name = scrape_epoch

  # Check if spreadsheet already exists in folder.
  results = drive_service.files().list(
      q=(
        f"mimeType='application/vnd.google-apps.spreadsheet' and "
        f"name='{spreadsheet_name}' and "
        f"'{folder_id}' in parents and "
        f"trashed=false"
    ),
      spaces='drive',
      driveId='0AL_se0fAJ2R9Uk9PVA',
      includeItemsFromAllDrives=True,
      supportsAllDrives=True,
      corpora='drive',
      fields='files(id, name)'
  ).execute()

  items = results.get('files', [])
  if items:
      spreadsheet = gc.open_by_key(items[0]['id'])
      logging.info(f"Spreadsheet '{spreadsheet_name}' found with ID: {spreadsheet.id}")

  else:
    # Create a new spreadsheet in Google Drive
    spreadsheet = gc.create(spreadsheet_name)
    logging.info(f"Successfully created Google Sheet: {spreadsheet.url}")

    # Place new spreadsheet into the folder
    file_id = spreadsheet.id
    drive_service.files().update(fileId=file_id,
                                  addParents=folder_id,
                                  removeParents='root', # Remove from root
                                supportsAllDrives = True,
                                  fields='id, parents').execute()
    logging.info(f"Moved spreadsheet to folder ID: {folder_id}")

  # Add or overwrite sheet within spreadsheet
  try:
    worksheet = spreadsheet.worksheet(sheet_name)
    logging.info(f"✅ Found worksheet: '{sheet_name}'. Clearing existing data...")
    worksheet.clear() # Clear all data
  except WorksheetNotFound:
    logging.info(f"➕ Worksheet '{sheet_name}' not found. Creating new worksheet...")
    worksheet = spreadsheet.add_worksheet(title=sheet_name, rows=len(df), cols=len(df.columns))

  worksheet.update([df.columns.values.tolist()] + df.values.tolist())
  logging.info(f"DataFrame successfully uploaded to sheet '{sheet_name}' in spreadsheet '{spreadsheet.title}'.")
  return f"{spreadsheet.url}#gid={worksheet.id}"

def gcs_csv_to_sheet(csv_path, gcs_folder):
  '''This function reads a CSV file and uploads its content to a Google Sheet.
  It requires authentication and uses the gspread library.

  csv_path: The path to the CSV file.
  gcs_folder: The name of the folder in gcs.
  '''
  df = pd.read_csv(csv_path)
  df.replace(np.nan, None, inplace=True)
  sheet_name = csv_path.split('/')[-1].removesuffix(".csv")
  return df_to_sheet_in_folder(df, sheet_name, gcs_folder)


def gcs_csv_to_df(csv_path, pydantic_model: BaseModel = NlQueryResponse) -> pd.DataFrame:
  '''
  This function reads a CSV file containing golden queries and converts it into
  a pandas DataFrame. It uses converters to properly parse the nested data
  structures within the 'dates', 'places', and 'variables' columns into their
  respective Pydantic models.

  csv_path: The path to the CSV file.
  base_class: This value determines the appropriate converters to use for each column.
  Returns: A pandas DataFrame.
  '''

  converters = {}
  if issubclass(pydantic_model, NlQueryResponse):
    converters= {
        'dates': lambda dates: [DetectedDate.model_validate(x) for x in ast.literal_eval(dates)],
        'places': lambda places: [DetectedPlace.model_validate(x) for x in ast.literal_eval(places)],
        'variables': lambda vars: [VariableResponse.model_validate(x) for x in ast.literal_eval(vars)]
        }

  elif pydantic_model is NlQueryEvalScore:
    converters= {
        'golden_response': lambda x: NlGolden(**ast.literal_eval(x)),
        'scraped_response': lambda x: NlApiScrape(**ast.literal_eval(x)),
        'golden_type': GoldenType
        }

  return pd.read_csv(csv_path, converters=converters)


def gcs_csv_to_pydantic_models(csv_path:str, pydantic_model: BaseModel) -> List[BaseModel]:
  '''This function reads a CSV file and converts each row into a class object.

  csv_path: The path to the CSV file.
  pydantic_model: The Model Class the csv represents.
  Returns: A list of NlQueryResponse objects.
  '''
  df = gcs_csv_to_df(csv_path, pydantic_model)
  return df_to_pydantic_models(df, pydantic_model)

def gcs_csv_to_variable_scrapes(csv_path: str, pydantic_model: NlQueryResponse = NlGolden) -> List[VariableResponse]:
  '''This function reads a CSV file, extracts the 'variables' column, and combines all VariableResponse objects from that column into a single list.

  csv_path: The path to the CSV file.
  pydantic_model: The Model Class the csv represents.
  Returns: A list of VariableResponse objects.
  '''

  df = pd.read_csv(csv_path, usecols=['variables'])
  combined_variable_scrapes = [VariableResponse(**var) for vars in df['variables'] for var in ast.literal_eval(vars)]

  return combined_variable_scrapes

### Scrape API Code

In [None]:
# @title ### parse_dates
from dataclasses import dataclass
from abc import ABC

# DateClassificationAttributes taken from
# https://github.com/datacommonsorg/website/blob/b92b3f643e2b9116fcda9d0f631991b3c5a88fc7/server/lib/nl/detection/types.py#L338C1-L358C34
@dataclass
class Date:
  """Represents a range of two numeric quantities."""
  prep: str
  year: int
  month: Optional[int] = 0
  year_span: Optional[int] = 0

  def __str__(self):
    return f'{self.year} - {self.month} | {self.year_span}'

@dataclass
class DateClassificationAttributes(ABC):
  dates: List[Date]
  is_single_date: bool
  date_trigger_strings: List[str]

# Taken from
# https://github.com/datacommonsorg/website/blob/b92b3f643e2b9116fcda9d0f631991b3c5a88fc7/server/lib/nl/detection/date.py#L182C1-L201C30
def get_date_range_strings(date: Date) -> (str, str):
  _LAST_YEARS_PREP = 'last_years'
  _START_DATE_PREPS = ['after', 'since', 'from', _LAST_YEARS_PREP]
  _END_DATE_PREPS = ['before', 'by', 'until']
  _MIN_MONTH = 1
  _EXCLUSIVE_DATE_PREPS = ['before', 'after']
  _MIN_DOUBLE_DIGIT_MONTH = 10


  # Gets the year and month to use as the base date when calculating a date range
  def _get_base_year_month(date: Date) -> (int, int):

    base_year = date.year
    base_month = date.month
    # if date range excludes the specified date, need to do some calculations to
    # get the base date.
    if date.prep in _EXCLUSIVE_DATE_PREPS:
      # if specified date is an end date, base date should be earlier than
      # specified date
      if date.prep in _END_DATE_PREPS:
        # if date is monthly, use date that is one month before the specified date
        if base_month >= _MIN_MONTH:
          base_date = dt_date(base_year, base_month,
                                    1) - relativedelta(months=1)
          base_year = base_date.year
          base_month = base_date.month
        # otherwise, use date that is one year before the specified date
        else:
          base_year = base_year - 1
      # if specified date is a start date, base date should be later than
      # specified date
      elif date.prep in _START_DATE_PREPS:
        # if date is monthly, use date that is one month after the specified date
        if base_month >= _MIN_MONTH:
          base_date = dt_date(base_year, base_month,
                                    1) + relativedelta(months=1)
          base_year = base_date.year
          base_month = base_date.month
        # otherwise, use date that is one year after the specified date
        else:
          base_year = base_year + 1

    return base_year, base_month


  def _get_month_string(month: int) -> str:
    month_string = ''
    if month >= _MIN_DOUBLE_DIGIT_MONTH:
      month_string = f'-{month}'
    elif month >= _MIN_MONTH:
      month_string = f'-0{month}'
    return month_string

  start_date = ''
  end_date = ''
  if not date or not date.year:
    return start_date, end_date
  base_year, base_month = _get_base_year_month(date)
  year_string = str(base_year)
  month_string = _get_month_string(base_month)
  base_date = year_string + month_string
  if date.prep in _START_DATE_PREPS:
    start_date = base_date
    if date.year_span > 0:
      end_year = base_year + date.year_span
      end_date = str(end_year) + month_string
  elif date.prep in _END_DATE_PREPS:
    end_date = base_date
    if date.year_span > 0:
      start_year = base_year - date.year_span
      start_date = str(start_year) + month_string
  return start_date, end_date


def parse_dates(dbg_info):
  date_classification = dbg_info.get('date_classification', '')

  if not date_classification or date_classification == '<None>':
    return []

  attributes = eval(date_classification)

  if not attributes.dates or len(attributes.dates) > 1 :
    print('dates list length != 1')
    return []

  if attributes.is_single_date:
    start_date = str(attributes.dates[0].year)
    if attributes.dates[0].month:
      start_date = f'{start_date}-{attributes.dates[0].month:02d}'
    return [DetectedDate(base_date=start_date)]

  detected_date = attributes.dates[0]
  date_range = get_date_range_strings(detected_date)
  if date_range == ('',''):
    return empty_dates
  start_date = date_range[0] if date_range[0] else today()
  end_date = date_range[1] if date_range[1] else today()
  return [DetectedDate(base_date=start_date, end_date=end_date)]

In [None]:
# @title ### parse_places

def parse_places(id, query, dbg_info):
  # 1. Identify sub_place_type such as "County" or "State"
  contained_in_classification = dbg_info.get('contained_in_classification', '<None>') # "<None>"" matches the 'default' value returned by debug_info
  sub_place_type = contained_in_classification.split('.')[-1] if contained_in_classification != '<None>' else None
  if sub_place_type and 'DEFAULT_TYPE' in sub_place_type:
    sub_place_type = None

  # 2. Identify the detected place dcids
  places = []

  detection_logs =  dbg_info.get('query_detection_debug_logs', {})
  # print(detection_logs)
  # print(contained_in_classification)

  try:
    detected_places = detection_logs.get('place_resolution', {}).get('dc_resolved_places', {})
    for detected_place in detected_places:
      places.append(DetectedPlace(dcid=detected_place['dcid'], sub_place_type=sub_place_type))
  except AttributeError as e:
    logging.info(f'[places] no dc_resolved_places; {id} ("{query}")')
    if sub_place_type:
      logging.debug(f'[places] detected only sub_place_type; {id} ("{query}")')
      places.append(DetectedPlace(dcid='', sub_place_type=sub_place_type))
      return places

    llm_response = dbg_info.get('llm_response', {})
    if llm_response:
      # TODO: verify if skipping llm_response when sub_place_type is present is correct
      logging.info(f'[places] llm_response:", )', dbg_info.get('llm_response', {}))

      llm_sub_place = llm_response.get('SUB_PLACE_TYPE', '')
      if 'DEFAULT_TYPE' in llm_sub_place:
        llm_sub_place = None
      if llm_sub_place:
        places.append(DetectedPlace(dcid='', sub_place_type=llm_sub_place))
    else:
      logging.debug(f'[places] no place detected; {id} ("{query}")')

  return places

In [None]:
# @title ### parse_variables

def unfurl_group(topic_dcid, processed_topics):
  dcids = []
  processed_topic = processed_topics.get(topic_dcid, {})

  for peer_group in processed_topic.get('peer_groups', []):
    dcids.extend(peer_group[1]) # peer group structure is [peer_group_dcid, [stat_vars]]

  for sub_topic in processed_topic.get('sub_topics', []):
    dcids.extend(unfurl_group(sub_topic, processed_topics))

  dcids.extend(processed_topic.get('svs', []))

  return dcids

def unfurl_dcids(dcids, processed_topics, id, query):
  flat_vars = []
  for dcid in dcids:
    if dcid.startswith('dc/topic'):
      if dcid not in processed_topics:
        logging.info(f'[vars] topic not processed: {dcid}; {id} ("{query}")')
        continue
      flat_vars.extend(unfurl_group(dcid, processed_topics))
    else:
      flat_vars.append(dcid)

  return flat_vars

def parse_variables(id, query, dbg_info) -> list[VariableResponse]:

  sv_matching = dbg_info.get('sv_matching', {})
  detection_type = dbg_info.get('detection_type')
  query_detection_logs = dbg_info.get('query_detection_debug_logs')
  info_logs = dbg_info.get('counters', {}).get('INFO', {})

  topics_processed = {}
  for topic in info_logs.get('topics_processed', []):
    topics_processed.update(topic)


  single_sv_best_score = (sv_matching.get('CosineScore', [0]) + [0])[0]

  multi_sv_candidate = None
  if 'MultiSV' in sv_matching:
    for candidate in sv_matching.get('MultiSV', {}).get('Candidates', []):
      if candidate['DelimBased'] and len(candidate['Parts']) == 2:
        # 0.05 matches the logic in
        # https://github.com/datacommonsorg/website/blob/12f305f6525bd5d34d45d564503f827dcad2a9ee/shared/lib/constants.py#L458
        if candidate['AggCosineScore'] + 0.05 >= single_sv_best_score:
          variables = []
          multi_sv_candidate = candidate
          for part in multi_sv_candidate['Parts']:
            variables.append(VariableResponse(search_label=part['QueryPart'], dcids=unfurl_dcids(part['SV'], topics_processed, id, query)))
          return variables


  ranking = None
  if 'ranking_classification' in dbg_info:
    ranking_classification = dbg_info.get('ranking_classification')
    if 'HIGH' in ranking_classification:
      ranking = Ranking.HIGH
    elif 'LOW' in ranking_classification:
      ranking = Ranking.LOW


  if 'RICH' in dbg_info.get('superlative_classification', ''):
    search_label = "RICH"
    var_dcids = unfurl_dcids(info_logs.get('filtered_svs', [])[0], topics_processed, id, query)

  elif 'POOR' in dbg_info.get('superlative_classification', ''):
    search_label = "POOR"
    var_dcids = unfurl_dcids(topics_processed.keys(), topics_processed, id, query)

  elif detection_type == 'Hybrid - Heuristic Based' or detection_type=='Heuristic Based':
    search_label=query_detection_logs.get('query_transformations', {}).get('sv_detection_query_stop_words_removal', '')
    if not search_label:
      logging.warning(f'[vars] no sv_detection_query_stop_words_removal, using empty str; {id} ("{query}") {query_detection_logs} {sv_matching}')
    var_dcids = unfurl_dcids(info_logs.get('filtered_svs', [])[0], topics_processed, id, query)


  elif detection_type == 'Hybrid - LLM Fallback' or detection_type=='LLM Based':
    variable_strs = query_detection_logs['llm_response']['METRICS']
    if len(variable_strs) > 1:
      logging.warning('[vars] multiple llm detected statvars')

    search_label=variable_strs[0]
    var_dcids = unfurl_dcids(info_logs.get('filtered_svs', [])[0], topics_processed, id, query)

  else:
    logging.warning(f'[vars] different detection mode; {id} ("{query}")')


  return [VariableResponse(search_label=search_label, dcids=var_dcids, rank=ranking)]

In [None]:
# @title scrape query (main)

import requests
import logging


def scrape_query(id, query, detector_type='hybrid'):
  response = None
  query_response = None
  scrape_date = today()

  try:
    response = requests.post(f'{host_website}/api/explore/detect-and-fulfill?q={query}&detector={detector_type}', json={}, timeout=None)

    if response.status_code != 200:
      logging.warning(f'[api]  NL API request failed with status code {response.status_code}; {id} ("{query}")')
      return NlApiScrape(id=id, query=query, dates=[], places=[], variables=[], api_response_status=ResponseStatus.ERROR, scrape_date=scrape_date)
    query_response = response.json()

  except json.JSONDecodeError as e:
    raise json.JSONDecodeError(f'[api] NL API response is not valid JSON; {id} ("{query}"); {e}')

  logging.debug(query_response)

  dbg_info = query_response.get('debug', {})

  if dbg_info.get('blocked', False):
    logging.warning(f'[api] NL API blocked request; {id} ("{query}")')
    return NlApiScrape(id=id, query=query, dates=[], places=[], variables=[], api_blocked=True, scrape_date=scrape_date)

  dates = parse_dates(dbg_info)

  places = parse_places(id, query, dbg_info)

  variables = parse_variables(id, query, dbg_info)

  return NlApiScrape(id=id, query=query, dates=dates, places=places, variables=variables, scrape_date=scrape_date)

# -- Sanity Test --
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(levelname)s - %(message)s',
#     force=True
# )
# scrape_query(0, 'What is the demographics of students in Sunnyvale')

### Scoring Implementation

In [None]:
!pip install python-dateutil --quiet

In [None]:
# @title ### score_date

def calculate_date_scores(golden_dates: pd.Series, scrape_dates: pd.Series, date_of_scrape: pd.Series) -> pd.Series:
  """
  Calculates date scores. TODO: elaborate
  """

  def score_dates(golden_dates, scraped_dates, date_of_scrape):

    def replace_placeholder(date_str):
      if date_str == DATE_PLACEHOLDER:
        return date_of_scrape

      placeholder_pattern = r"^\$TODAY([+-])(\d+)([YM])$"
      placeholders = re.fullmatch(placeholder_pattern, date_str)

      if not placeholders:
        logging.error(f'Unable to parse date placeholder {date_str}')
        return date_str

      sign, adjustment, unit = placeholders.groups()
      adjustment = int(adjustment)

      date_obj = datetime.strptime(date_of_scrape, DATE_OF_SCRAPE_FORMAT)

      if unit == 'Y' and sign == '+':
          return (date_obj + relativedelta(years=adjustment)).strftime('%Y')
      elif unit == 'Y' and sign == '-':
        return (date_obj - relativedelta(years=adjustment)).strftime('%Y')
      elif unit == 'M' and sign == '+':
        return (date_obj + relativedelta(months=adjustment)).strftime('%Y-%m')
      elif unit == 'M' and sign == '-':
        return (date_obj - relativedelta(months=adjustment)).strftime('%Y-%m')
      else:
        logging.error(f'Unable to parse date placeholder {date_str}')
        return date_str

    # If there wasn't a date in the query and the scrape did not hallucinate one,
    # return "empty" so that this does not positively or negatively impact total
    # scoring.
    if not golden_dates and not scraped_dates:
      return np.nan

    # If either golden or scraped is present without the other, then automatic 0.
    # (Either we detected dates when goldens say there's none to detect or we
    # failed to detect dates when goldens say they are present in the query.)
    if bool(golden_dates) ^ bool(scraped_dates):
      return 0.0

    # Populate any $TODAY based placeholders in the goldens with values
    for golden in golden_dates:
      if DATE_PLACEHOLDER in golden.base_date:
        golden.base_date = replace_placeholder(golden.base_date)

      if DATE_PLACEHOLDER in golden.end_date:
        golden.end_date = replace_placeholder(golden.end_date)

    individual_date_scores = []
    for scrape in scraped_dates:
      best_score = 0
      for golden in golden_dates:

        base_date_score = 1.0 if scrape.base_date == golden.base_date else 0.0
        end_date_score = 1.0 if scrape.end_date == golden.end_date else 0.0
        score = statistics.mean([base_date_score, end_date_score])

        if score > best_score:
          best_score = score

      individual_date_scores.append(best_score)

    individual_date_scores.extend([0.0] * abs(len(golden_dates) - len(scraped_dates)))

    return statistics.mean(individual_date_scores)

  return [score_dates(g, s, d) for g, s, d in zip(golden_dates, scrape_dates, date_of_scrape)]

In [None]:
# @title calculate_fbeta (used by places + variables)
from sklearn.metrics import fbeta_score
from sklearn.preprocessing import MultiLabelBinarizer

def calculate_score(y_true_sets, y_pred_sets, beta=1.0):
  def per_sample_fbeta_score(y_true, y_pred, beta=1.0):
    """
    Calculates the F-beta score for each sample in a multilabel setting using
    fast, vectorized NumPy operations.

    Args:
        y_true (np.ndarray): A (n_samples, n_classes) binary matrix of true labels.
        y_pred (np.ndarray): A (n_samples, n_classes) binary matrix of predicted labels.
        beta (float): The beta value for the F-beta score.

    Returns:
        A (n_samples,) array of F-beta scores for each sample.
    """
    y_true = y_true.astype(bool)
    y_pred = y_pred.astype(bool)

    tp = (y_true & y_pred).sum(axis=1)
    fp = (y_pred & ~y_true).sum(axis=1)
    fn = (y_true & ~y_pred).sum(axis=1)

    beta_sq = beta**2

    precision = np.divide(tp, tp + fp, out=np.zeros_like(tp, dtype=float), where=(tp + fp) > 0)
    recall = np.divide(tp, tp + fn, out=np.zeros_like(tp, dtype=float), where=(tp + fn) > 0)

    fbeta = np.divide(
        (1 + beta_sq) * precision * recall,
        (beta_sq * precision) + recall,
        out=np.zeros_like(tp, dtype=float),
        where=((beta_sq * precision) + recall) > 0
    )

    fbeta[(tp + fp + fn) == 0] = 1.0

    return np.round(fbeta, decimals=3), np.round(precision, decimals=3), np.round(recall, decimals=3)

  mlb = MultiLabelBinarizer()
  all_labels = y_true_sets.tolist() + y_pred_sets.tolist()

  mlb.fit(all_labels)

  y_true_matrix = mlb.transform(y_true_sets)
  y_pred_matrix = mlb.transform(y_pred_sets)

  return per_sample_fbeta_score(y_true_matrix, y_pred_matrix, beta=beta)

In [None]:
# @title ### score_place

def calculate_place_scores(golden_places_col: pd.Series, scrape_places_col: pd.Series) -> pd.Series:

  def get_place_dcid_set(places: list[DetectedPlace]) -> set[str]:
    return {place.dcid for place in places}

  def get_sub_type_set(places: list[DetectedPlace]) -> set[str]:
    labels = set()
    for place in places:
      if place.sub_place_type:
        labels.add(place.sub_place_type)
    return labels

  def get_place_labels_set(places: list[DetectedPlace]) -> set[str]:
    labels = set()
    for place in places:
      sub_place_type = place.sub_place_type if place.sub_place_type else ''
      labels.add(f"{place.dcid}:{sub_place_type}")
    return labels


  # phase 1 - place_dcid scoring; this is to give "partial credit" for when a
  # parent place is properly detected even when the child place is not
  y_true_place_dcids = golden_places_col.apply(get_place_dcid_set)
  y_pred_place_dcids = scrape_places_col.apply(get_place_dcid_set)

  place_dcids_scores, _, _ = calculate_score(y_true_place_dcids, y_pred_place_dcids, beta=0.8)

  # phase 2 - child place type accuracy
  y_true_sub_place_types = golden_places_col.apply(get_sub_type_set)
  y_pred_sub_place_types = scrape_places_col.apply(get_sub_type_set)

  sub_place_type_scores,_,_ = calculate_score(y_true_sub_place_types, y_pred_sub_place_types)

  # phase 3 - full place_dcid +/- child type pairs
  y_true_full_place = golden_places_col.apply(get_place_labels_set)
  y_pred_full_place =  scrape_places_col.apply(get_place_labels_set)

  full_place_scores,_,_ = calculate_score(y_true_full_place, y_pred_full_place, beta=0.5)

  places_weight = 0.4
  child_types_weight = 0.2
  full_weight = 0.4

  combined_score = pd.Series((places_weight * place_dcids_scores) +
                    (child_types_weight * sub_place_type_scores) +
                    (full_weight * full_place_scores))

  # For cases where there are no places in the goldens, we shouldn't produce a score
  # instead, populate with NaN.
  do_not_score = (golden_places_col.str.len() == 0) & (scrape_places_col.str.len() == 0)
  combined_score.loc[do_not_score] = np.nan

  return combined_score

In [None]:
# @title ### score_variables

def calculate_variable_scores(golden_vars_col: pd.Series, scrape_vars_col: pd.Series) -> pd.Series:
  """

  """

  def get_var_dcid_set(vars: list[VariableResponse]) -> set[str]:
    return {dcid for var in vars for dcid in var.dcids}

  # phase 1 - variable dcid scoring

  y_true_var_dcids = golden_vars_col.apply(get_var_dcid_set)
  y_pred_var_dcids = scrape_vars_col.apply(get_var_dcid_set)

  # Use high beta score to favor recall over precision - care more about finding
  # right statvars than excluding non-required ones.
  var_dcid_fbeta, precision, recall = calculate_score(y_true_var_dcids, y_pred_var_dcids, beta=2)

  combined_score = pd.Series(var_dcid_fbeta)
  precision = pd.Series(precision)
  recall = pd.Series(recall)

  # TODO: once all goldens have variables, uncomment the following line and delete the next
  # do_not_score = (golden_vars_col.str.len() == 0) & (scrape_vars_col.str.len() == 0)
  do_not_score = (golden_vars_col.str.len() == 0)
  combined_score.loc[do_not_score] = np.nan
  precision.loc[do_not_score] = np.nan
  recall.loc[do_not_score] = np.nan

  return combined_score, precision, recall

In [None]:
# @title Score Epoch

import statistics
import re
from dateutil.relativedelta import relativedelta
from datetime import datetime
import numpy as np
from pydantic import model_validator
from typing import Optional, Any
import pandas as pd


DATE_OF_SCRAPE_FORMAT = '%Y-%m-%d'
DATE_PLACEHOLDER = '$TODAY'


def calculate_total_scores(date_scores: pd.Series, place_scores: pd.Series, variable_scores: pd.Series) -> pd.Series:

    # 1. Define the weights for each component
    weights = {
        'date': 0.2,
        'place': 0.4,
        'variable': 0.4
    }

    # 2. Create a DataFrame from the input Series for easier operations
    tmp_df = pd.DataFrame({
        'date': date_scores,
        'place': place_scores,
        'variable': variable_scores
    })

    # 3. Calculate the Numerator (the weighted sum of scores)
    # We replace NaNs with 0 before multiplying by the weight. This ensures that
    # missing components contribute nothing to the sum, which is correct.
    numerator = (tmp_df['date'].fillna(0) * weights['date']) + \
                (tmp_df['place'].fillna(0) * weights['place']) + \
                (tmp_df['variable'].fillna(0) * weights['variable'])

    # 4. Calculate the Dynamic Denominator (the sum of weights for non-NaN scores)
    # First, create a boolean DataFrame (True where scores exist)
    not_na_df = tmp_df.notna()

    # Multiply the boolean DataFrame by the weights. True becomes 1, False becomes 0.
    # This gives us the weight of each component IF it had a score, otherwise 0.
    applicable_weights_df = not_na_df * pd.Series(weights)

    # Sum these weights horizontally (axis=1) to get the total weight for each row
    denominator = applicable_weights_df.sum(axis=1)

    # 5. Calculate the Final Score
    # We use np.divide for safe division, which correctly produces NaN
    # if the denominator is 0 (i.e., all scores for a row were NaN).
    final_scores = np.divide(numerator, denominator, out=np.full_like(denominator, np.nan), where=denominator!=0)

    return pd.Series(final_scores)


def compute_scores(goldens_df, scrapes_df):
  goldens_df = goldens_df.rename(columns=lambda c: f"{c}_golden" if c != 'id' else c)
  scrapes_df = scrapes_df.rename(columns=lambda c: f"{c}_scraped" if c != 'id' else c)
  merged_df = pd.merge(goldens_df, scrapes_df, on='id')

  # Drop rows if query is not the same for golden and scraped response
  mismatched_query_mask = merged_df['query_golden'] != merged_df['query_scraped']
  if mismatched_query_mask.any():
    mismatched_ids = merged_df.loc[mismatched_query_mask, 'id'].tolist()
    logging.error(f'id collision: same id, different query for {mismatched_ids}; dropping from eval')
    merged_df = merged_df[~mismatched_query_mask]

  score_df = pd.DataFrame()
  score_df['id'] = merged_df['id']
  score_df['query'] = merged_df['query_golden']
  score_df['golden_type'] = merged_df['golden_type_golden']

  # Calculate all scores using vectorized operations; this is more efficient than iterating rows.
  score_df['date_score'] = calculate_date_scores(merged_df['dates_golden'], merged_df['dates_scraped'], merged_df['scrape_date_scraped'])
  score_df['place_score'] = calculate_place_scores(merged_df['places_golden'], merged_df['places_scraped'])
  score_df['variable_score'], score_df['variable_precision'], score_df['variable_recall'] = calculate_variable_scores(merged_df['variables_golden'], merged_df['variables_scraped'])

  score_df['total_score'] = calculate_total_scores(score_df['date_score'], score_df['place_score'], score_df['variable_score'])

  golden_cols = ['id'] + [col for col in merged_df.columns if col.endswith('_golden')]
  merged_goldens_df = merged_df[golden_cols].rename(
    columns=lambda c: c.removesuffix('_golden')
)
  score_df['golden_response'] = df_to_pydantic_models(merged_goldens_df, NlGolden)

  scraped_cols = ['id'] + [col for col in merged_df.columns if col.endswith('_scraped')]
  merged_srapes_df = merged_df[scraped_cols].rename(
    columns=lambda c: c.removesuffix('_scraped')
)
  score_df['scraped_response'] = df_to_pydantic_models(merged_srapes_df, NlApiScrape)

  return score_df

def run_epoch_score(golden_epoch, scrape_epoch, score_epoch, description, change_log):
  golden_path = goldens_path(golden_epoch)
  scrape_path = scrapes_path(scrape_epoch)
  score_output_path = scores_path(scrape_epoch, score_epoch)
  print(score_output_path)

  scrape_df = gcs_csv_to_df(scrape_path, NlApiScrape)
  golden_df = gcs_csv_to_df(golden_path, NlGolden)

  score_df = compute_scores(golden_df, scrape_df)



  df_to_gcs_csv(score_output_path, score_df, NlQueryEvalScore)

  summary = pd.DataFrame()
  cols = ['total_score', 'date_score', 'place_score', 'variable_score', 'variable_precision', 'variable_recall']
  summary['overall'] = score_df[cols].mean().round(3)
  summary['stable']  = score_df[score_df['golden_type'] == 'STABLE'][cols].mean().round(3)
  summary['aspirational']  = score_df[score_df['golden_type'] == 'ASPIRATIONAL'][cols].mean().round(3)

  summary.to_csv('local_summary.csv')

  gcs_summary_path = summary_path(scrape_epoch, score_epoch)

  metadata = EvalMetadata(golden_epoch = golden_epoch, scrape_epoch=scrape_epoch, score_epoch=score_epoch, description=description, change_log=change_log)

  # 6. Append metadata to the CSV
  with open('local_summary.csv', 'a') as f: # Use 'a' for append mode
      f.write(",\n") # Add a newline for separation
      f.write(",\n") # Add a newline for separation
      f.write("# --- METADATA ---\n") # Optional: a clear separator line
      for field_name, value in metadata.model_dump().items(): # Use model_dump() for Pydantic v2+
          if value is not None: # Only write fields that have a value
              # Format as key=value or key,value. Using comma for CSV compatibility
              f.write(f"{field_name},{value}\n")

  !gsutil cp local_summary.csv {gcs_summary_path} > /dev/null 2>&1

  return score_df

## Scrape and/or Score an epoch

### General Inputs
This table describes the main input required to define the queryset for the evaluation.

* Valid Golden File Names: [gcs link](https://pantheon.corp.google.com/storage/browser/datcom-nl-evals/goldens?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22,%22s%22:%5B(%22i%22:%22displayName%22,%22s%22:%221%22)%5D))&authuser=0&e=13803378&inv=1&invt=Ab3YaQ&mods=-monitoring_api_staging&prefix=&forceOnObjectsSortingFiltering=true)
* Previous scrapes: [gcs link](https://pantheon.corp.google.com/storage/browser/datcom-nl-evals/evals?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22,%22s%22:%5B(%22i%22:%22displayName%22,%22s%22:%221%22)%5D))&authuser=0&e=13803378&inv=1&invt=Ab3YaQ&mods=-monitoring_api_staging)

| Parameter | Type | Default Value | Description |
| :--- | :--- | :--- | :--- |
| `golden_epoch` | `string` | `--` | Required. The file name of the golden queryset (without extension). This defines the queries to be scraped and serves as the expected values for scoring. |
| `scrape_epoch` | `string` | `--` | Required for scoring-only runs, optional when scraping. The unique identifier for a scrape run. If left empty, one is generated from the current date and time. **Caution:** Re-using an existing name will overwrite the previous scrape. |

---

### Scrape Related Inputs
These inputs control the scraping process, which queries the API and saves the responses.

| Parameter | Type | Default Value | Description |
| :--- | :--- | :--- | :--- |
| `run_scrape` | `boolean` | `False` | Set to `True` to query the detect-and-fulfill endpoint and save the responses as `NlApiScrape` objects. |
| `detector_type` | `string` | `--` | Specifies the detector to use. Options include `"hybrid"`, `"heuristic"`, and `"llm"`. This is not used if `run_scrape` is `False`. |


---

### Scoring Related Inputs
These inputs control the scoring process, which compares the scraped results against the golden set and generates reports.

| Parameter | Type | Default Value | Description |
| :--- | :--- | :--- | :--- |
| `run_scoring` | `boolean` | `False` | Set to `True` to score the scraped results against the golden set and generate a score report and summary. |
| `eval_description` | `string` | `""` | A brief description of the evaluation run. |
| `eval_change_log` | `string` | `""` | A log of any changes relevant to this evaluation run. |
| `score_epoch` | `string` | `""` | An optional, unique identifier for the scoring run. If left empty, one will be generated. **Caution:** Re-using an existing name will overwrite previous score reports. |




In [None]:
# @markdown
# @markdown ### Inputs:
golden_epoch = "2025-06-23" # @param {"type":"string"}
scrape_epoch = "" # @param {"type":"string", "placeholder":"Optional for when running scrape, required for a score-only run. Created based on current date+time if empty"}


# @markdown #### Scrape Related Inputs
run_scrape = True # @param {"type":"boolean"}
detector_type = "hybrid" # @param ["hybrid","heuristic","llm"]

# @markdown #### Scoring Related Inputs
run_scoring = True # @param {"type":"boolean"}
eval_description = "" # @param {"type":"string"}
eval_change_log = "" # @param {"type":"string"}
score_epoch = "" # @param {"type":"string", "placeholder":"(optional, this will be generated based on current date+time if empty)"}


import concurrent.futures
import logging

logging.basicConfig(
    level=logging.WARNING,
    format='%(levelname)s - %(message)s',
    force=True
)

def scrape_queryset(queryset):
  results_threading = []

  with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

      future_to_query = {executor.submit(scrape_query, id, query, detector_type): (id, query) for id, query in queryset.items()}

      for future in concurrent.futures.as_completed(future_to_query):
          id, query = future_to_query[future]
          try:
              result = future.result()
              results_threading.append(result)
          except Exception as exc:
              print(f'{id} ("{query}"): generated an exception: {exc}')
  return results_threading

#
# ========================== MAIN ==========================
#

if not run_scrape and not run_scoring:
  print('Nothing to do! Select at least one operation to perform')

if run_scrape:

  goldens_file_path = goldens_path(golden_epoch)
  goldens_df = gcs_csv_to_df(goldens_file_path)
  queryset = dict(zip(goldens_df['id'].astype(int), goldens_df['query'].astype(str)))

  if not scrape_epoch:
    scrape_epoch = now()

  print(f'scrape_epoch: {scrape_epoch}')

  full_scrape = scrape_queryset(queryset)
  print(f'num queries scraped: {len(full_scrape)}')


  scrapes_gcs = scrapes_path(scrape_epoch)
  print(f'scrape output path: {scrapes_gcs}')

  # Write to GCS and Sheets
  models_to_gcs_csv(scrapes_gcs, full_scrape)
  gcs_csv_to_sheet(scrapes_gcs, scrape_epoch)

if run_scoring:
  if not score_epoch:
    score_epoch = now()
  print(f'score output path: {scores_path(scrape_epoch, score_epoch)}')

  # Conduct Eval
  score_df = run_epoch_score(golden_epoch, scrape_epoch, score_epoch, eval_description, eval_change_log)

  # Transfer scores from GCS to Sheet
  gcs_csv_to_sheet(scores_path(scrape_epoch, score_epoch), scrape_epoch)
  summary_url = gcs_csv_to_sheet(summary_path(scrape_epoch, score_epoch), scrape_epoch)

  print('------------------------------------------------------------------')
  #print('>> View results at', summary_url)

## Misc

In [None]:
# Copy goldens from GCS to sheet!
gcs_csv_to_sheet(goldens_path('2025-06-23'), 'goldens')

## Appendix

### Testing

In [None]:
import unittest

class TestCalculateDateScores(unittest.TestCase):

  def setUp(self):
      """Set up a mock date object structure for use in tests."""
      # Using SimpleNamespace is a quick way to create mock objects with attributes
      self.Date = lambda base, end: DetectedDate(base_date=base, end_date=end)
      self.date_of_scrape = '2025-06-11'

  def test_no_golden_no_scraped(self):
      """Test score when both golden and scraped are empty; should be NaN."""
      golden_series = pd.Series([[]])
      scraped_series = pd.Series([[]])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertTrue(np.isnan(result[0]))

  def test_no_golden_with_scraped(self):
      """Test score when scrape hallucinates a date; should be 0.0."""
      golden_series = pd.Series([[]])
      scraped_dates = [self.Date('2025-01-01', '2025-01-02')]
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 0.0)

  def test_perfect_match(self):
      """Test score for a perfect match; should be 1.0."""
      dates = [self.Date('2025-01-01', '2025-01-02')]
      golden_series = pd.Series([dates])
      scraped_series = pd.Series([dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 1.0)

  def test_partial_match_base_date_only(self):
      """Test score when only the base date matches; should be 0.5."""
      golden_dates = [self.Date('2025-01-01', '2025-01-02')]
      scraped_dates = [self.Date('2025-01-01', '2025-01-03')] # Different end_date
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 0.5)

  def test_no_match(self):
      """Test score when dates do not match at all; should be 0.0."""
      golden_dates = [self.Date('2025-01-01', '2025-01-02')]
      scraped_dates = [self.Date('2026-01-01', '2026-01-02')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 0.0)

  def test_multiple_golden_finds_best_match(self):
      """Test that a single scrape finds its best score among multiple goldens."""
      golden_dates = [
          self.Date('2025-01-01', '2025-01-02'), # No match
          self.Date('2025-02-01', '2025-02-02')  # Perfect match
      ]
      scraped_dates = [self.Date('2025-02-01', '2025-02-02')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 1.0) # Score is 1.0 because it found the best match

  def test_multiple_scraped_averages_scores(self):
      """Test that scores from multiple scraped dates are averaged."""
      golden_dates = [self.Date('2025-01-01', '2025-01-02')]
      scraped_dates = [
          self.Date('2025-01-01', '2025-01-02'), # Perfect match (score = 1.0)
          self.Date('2026-01-01', '2026-01-02')  # No match (score = 0.0)
      ]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 0.5) # Average of 1.0 and 0.0

  def test_golden_with_no_scraped(self):
      """Test score when scraper finds no dates but should have; should be 0.0."""
      golden_dates = [self.Date('2025-01-01', '2025-01-02')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([[]])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 0.0)

  def test_placeholder_today(self):
      """Test replacement of the $TODAY placeholder."""
      golden_dates = [self.Date('$TODAY', '$TODAY')]
      scraped_dates = [self.Date('2025-06-11', '2025-06-11')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 1.0)

  def test_placeholder_relative_add_year(self):
      """Test replacement of a relative date placeholder ($TODAY+1Y)."""
      golden_dates = [self.Date('$TODAY+1Y', '$TODAY-1Y')]
      scraped_dates = [self.Date('2026', '2024')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 1.0)

  def test_placeholder_relative_subtract_month(self):
      """Test replacement of a relative date placeholder ($TODAY-2M)."""
      golden_dates = [self.Date('$TODAY+2M', '$TODAY-2M')]
      scraped_dates = [self.Date('2025-08', '2025-04')]
      golden_series = pd.Series([golden_dates])
      scraped_series = pd.Series([scraped_dates])
      scrape_date_series = pd.Series([self.date_of_scrape])

      result = calculate_date_scores(golden_series, scraped_series, scrape_date_series)
      self.assertEqual(result[0], 1.0)

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

In [None]:
import unittest

class TestPlaceScoreCalculation(unittest.TestCase):

    def setUp(self):
        """Set up weights and the lambda function to create DetectedPlace objects."""
        self.parent_weight = 0.4
        self.child_weight = 0.2
        self.structure_weight = 0.4
        # This lambda function makes creating test objects clean and easy
        self.Place = lambda dcid='', sub_place_type=None: DetectedPlace(dcid=dcid, sub_place_type=sub_place_type)

    def test_perfect_match(self):
        """Score should be 1.0 for a perfect match."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], 1.0)

    def test_partial_credit_parent_only(self):
        """Score should be parent_weight if only the parent matches."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[self.Place(dcid='geo_ca')]]) # Missing sub_place_type
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], self.parent_weight)

    def test_partial_credit_child_only(self):
        """Score should be child_weight if only the child matches."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[self.Place(sub_place_type='STATE')]]) # Missing dcid
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], self.child_weight)

    def test_partial_credit_wrong_parent_correct_child(self):
        """Score should be child_weight if child is right but parent is wrong."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[self.Place(dcid='geo_us', sub_place_type='STATE')]])
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], self.child_weight)

    def test_total_miss(self):
        """Score should be 0.0 for a complete mismatch."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[self.Place(dcid='geo_us', sub_place_type='COUNTY')]])
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], 0.0)

    def test_not_applicable_case(self):
        """Score should be NaN if both golden and scraped are empty."""
        golden = pd.Series([[]])
        scraped = pd.Series([[]])
        result = calculate_place_scores(golden, scraped)
        self.assertTrue(np.isnan(result[0]))

    def test_golden_expected_scrape_empty(self):
        """Score should be 0.0 if scrape misses an expected place."""
        golden = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        scraped = pd.Series([[]])
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], 0.0)

    def test_scrape_hallucinated_golden_empty(self):
        """Score should be 0.0 if scrape finds a place that wasn't expected."""
        golden = pd.Series([[]])
        scraped = pd.Series([[self.Place(dcid='geo_ca', sub_place_type='STATE')]])
        result = calculate_place_scores(golden, scraped)
        self.assertAlmostEqual(result[0], 0.0)

    def test_multiple_labels_and_rows(self):
        """Test with a multi-row DataFrame and multiple labels per row."""
        golden = pd.Series([
            [self.Place('geo_ca', 'STATE'), self.Place('geo_us', 'COUNTRY')], # Row 0
            [self.Place('geo_de')]                                           # Row 1
        ])
        scraped = pd.Series([
            [self.Place('geo_ca', 'STATE'), self.Place('geo_us')], # Row 0: Partial match
            [self.Place('geo_de', 'STATE')]                      # Row 1: Partial match
        ])

        result = calculate_place_scores(golden, scraped)

        # Expected Score for Row 0
        parent_score_0 = 1.0; child_score_0 = 0.666666; structure_score_0 = 0.5
        expected_score_0 = (self.parent_weight * parent_score_0) + (self.child_weight * child_score_0) + (self.structure_weight * structure_score_0)

        # Expected Score for Row 1
        parent_score_1 = 1.0; child_score_1 = 0.0; structure_score_1 = 0.0
        expected_score_1 = (self.parent_weight * parent_score_1) + (self.child_weight * child_score_1) + (self.structure_weight * structure_score_1)

        self.assertEqual(len(result), 2)
        self.assertAlmostEqual(result[0], expected_score_0, places=3)
        self.assertAlmostEqual(result[1], expected_score_1, places=3)


# --- Run the tests in the notebook ---
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

In [None]:
import unittest

class TestTotalScoreCalculation(unittest.TestCase):

    def setUp(self):
        """Define weights in ONE place. All tests will use this."""
        self.weights = {
            'date': 0.2,
            'place': 0.4,
            'variable': 0.4
        }

    def test_all_scores_present(self):
        """Test calculation when all component scores are valid."""
        scores_dict = {'date': 1.0, 'place': 0.8, 'variable': 0.9}

        # The test now calculates the expected value automatically
        expected_score = 0
        for component, weight in self.weights.items():
            expected_score += scores_dict[component] * weight

        result = calculate_total_scores(
            pd.Series([scores_dict['date']]),
            pd.Series([scores_dict['place']]),
            pd.Series([scores_dict['variable']]),
        )
        self.assertAlmostEqual(result[0], expected_score)

    def test_one_score_is_nan(self):
        """Test re-normalization when one score is NaN."""
        scores_dict = {'date': np.nan, 'place': 0.8, 'variable': 0.9}

        expected_score, total_weight = 0, 0
        for component, score in scores_dict.items():
          if pd.notna(score):
              expected_score += score * self.weights[component]
              total_weight += self.weights[component]
        expected_score /= total_weight

        result = calculate_total_scores(
            pd.Series([scores_dict['date']]),
            pd.Series([scores_dict['place']]),
            pd.Series([scores_dict['variable']]),
        )
        self.assertAlmostEqual(result[0], expected_score)

    def test_two_scores_are_nan(self):
        """Test calculation when only one score is valid."""
        scores_dict = {'date': np.nan, 'place': 0.8, 'variable': np.nan}

        expected_score, total_weight = 0, 0
        for component, score in scores_dict.items():
          if pd.notna(score):
              expected_score += score * self.weights[component]
              total_weight += self.weights[component]
        expected_score /= total_weight

        result = calculate_total_scores(
            pd.Series([scores_dict['date']]),
            pd.Series([scores_dict['place']]),
            pd.Series([scores_dict['variable']]),
        )
        self.assertAlmostEqual(result[0], expected_score)

    def test_all_scores_are_nan(self):
        """Test that the result is NaN when all inputs are NaN."""
        scores_dict = {'date': np.nan, 'place': np.nan, 'variable': np.nan}

        result = calculate_total_scores(
            pd.Series([scores_dict['date']]),
            pd.Series([scores_dict['place']]),
            pd.Series([scores_dict['variable']]),
        )
        self.assertTrue(np.isnan(result[0]))

    def test_vectorized_multi_row(self):
        """Test a multi-row Series to ensure vectorization works correctly."""
        # Define scores for multiple rows
        scores_data = [
            {'date': 1.0, 'place': 0.8, 'variable': 0.9},    # Row 0
            {'date': np.nan, 'place': 0.8, 'variable': 0.9}, # Row 1
            {'date': 0.5, 'place': 0.7, 'variable': np.nan},  # Row 2
            {'date': np.nan, 'place': np.nan, 'variable': np.nan} # Row 3
        ]

        # Create the input Series
        date_s = pd.Series([d['date'] for d in scores_data])
        place_s = pd.Series([d['place'] for d in scores_data])
        variable_s = pd.Series([d['variable'] for d in scores_data])

        # Calculate all expected scores in a loop using the simple oracle
        def calculate_weighted_average_for_test(scores, weights):
          total_score, total_weight = 0, 0
          for component, score in scores.items():
            if pd.notna(score):
                total_score += score * weights[component]
                total_weight += weights[component]
          if not total_weight:
            return np.nan
          return total_score / total_weight

        expected_scores = [calculate_weighted_average_for_test(d, self.weights) for d in scores_data]

        # Calculate all actual scores at once using the vectorized function
        result = calculate_total_scores(date_s, place_s, variable_s)

        # Assert each result matches its expectation
        self.assertEqual(len(result), 4)
        self.assertAlmostEqual(result[0], expected_scores[0])
        self.assertAlmostEqual(result[1], expected_scores[1])
        self.assertAlmostEqual(result[2], expected_scores[2])
        self.assertTrue(np.isnan(result[3]))

# --- Run the tests in the notebook ---
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

In [None]:
df = gcs_csv_to_df(goldens_path(golden_epoch), NlGolden)
df['variables'].str.len()

### sample set for scraping

In [None]:
watch_list = [
    "Which cities have the most affordable housing",
    "incarceration rate by race in US",
    "How much have gas prices increased in the US since 2000",
              "List out counties with the worst AQI",
              "I want to see stats about farming in Sonoma County",
              "Most common allergies in the US",
              "Which countries have universal health care",
              "What is the impact of pollution on ocean life in Tulum",
              "Areas with the highest crime rate",
    "How much have gas prices increased in the US since 2000",
    "Population of the us"]
scrapes = []
for query in watch_list:
  s = scrape_query(0, query)
  scrapes.append(s)
scrapes