## Creating episodes metadata (formerly numbers, names, urls; now raw title, slug, guest name, url)

### Original section:

print(
            "\n--- STAGE 2: Processing Episode Metadata & Generating Transcript URLs ---"
        )
        processed_metadata_filepath_for_saving = os.path.join(
            PROCESSED_DATA_DIR, "num_name_url_df.parquet"
        )

        # Step 2.1 Create numbers and names dictionary from html

        numbers_names_dict = create_numbers_names_dict_from_html(episodes_html_filepath)

        # Step 2.2 Create numbers and names dataframe from numbers and names dictionary

        numbers_names_df = create_numbers_names_df_from_dict(numbers_names_dict)

        # Step 2.3 Create URL's and add to the datframe, save dataframe

        num_name_url_df = create_urls_and_save_to_numbers_names_df(
            numbers_names_df, processed_metadata_filepath_for_saving
        )

        print("STAGE 2 Complete: numbers, names, urls dataframe saved.")

In [103]:
# --- 1. Project Root and Imports ---

import os
import sys
import os.path 
from pathlib import Path
import re
import unicodedata
from typing import List, Dict, Optional, Tuple, Any
from bs4 import BeautifulSoup
import pandas as pd
import random
import time
import numpy as np
from fuzzywuzzy import fuzz, process # <--- THE CRITICAL FIX

# Get the path of the directory containing this notebook (e.g., /project/notebooks)
# os.getcwd() typically works well in notebooks for this purpose.
notebook_dir = os.getcwd() 

# Go UP one directory level to find the Project Root (e.g., /project)
# NOTE: If your notebook is deeper, you might need another '../'
PROJECT_ROOT = os.path.abspath(os.path.join(notebook_dir, '..'))

# Add the Project Root to Python's search path (sys.path)
# This allows Python to find and import modules like 'utils' and 'off_menu'
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

# Now, imports should work
from off_menu.utils import try_read_html_string_from_filepath, try_read_parquet, extract_html, save_text_to_file
from off_menu.config import episodes_list_url, transcript_base_url, restaurants_url
from off_menu.data_extraction import extract_and_save_html
from off_menu.data_processing import create_mentions_by_res_name_dict, create_return_exploded_res_mentions_df, _clean_transcript_str_from_html

# --- 2. Define Data Paths ---
DATA_DIR = os.path.join(PROJECT_ROOT, "data")
RAW_DATA_DIR = os.path.join(DATA_DIR, "raw")
PROCESSED_DATA_DIR = os.path.join(DATA_DIR, "processed")
ANALYTICS_DATA_DIR = os.path.join(DATA_DIR, "analytics")

# --- 3. Define and Create Test Temp Directory (V2_tests) ---
Test_data_dir = os.path.join(DATA_DIR, "test_temp")
new_test_folder = "V2_tests"
V2_tests_dir = os.path.join(Test_data_dir, new_test_folder)

# Create the directory structure, avoiding errors if it already exists
os.makedirs(V2_tests_dir, exist_ok=True)

print(f"Project Root Set to: {PROJECT_ROOT}")
print(f"V2 Test Directory Set to: {V2_tests_dir}")


Project Root Set to: c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project
V2 Test Directory Set to: c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests


In [None]:
# --- Access and save test data ---
    
test_episodes_html_filepath = os.path.join(V2_tests_dir, "episodes.html")
test_restaurants_html_filepath = os.path.join(V2_tests_dir, "restaurants.html")

extract_and_save_html(episodes_list_url, test_episodes_html_filepath)
extract_and_save_html(restaurants_url, test_restaurants_html_filepath)

print("Test HTML data downloaded.")


In [None]:
# Turning "create_numbers_names_dict_from_html" into => create_ep_names_slugs_list_from_html
# The function calls _create_epnumber_epname_dict, which will need editing

# -------------------------
# 1 slugify helper
# -------------------------
def slugify(text: str) -> str:
    """
    Convert text to a simple dash-separated, lowercase slug.
    Example: "Richard Herring (Bonus Episode)" -> "richard-herring-bonus-episode"
    """
    s = unicodedata.normalize("NFKD", text or "")
    # remove parentheses but keep their content separated by space
    s = s.replace("(", " ").replace(")", " ")
    # remove all characters except word chars, whitespace and hyphen
    s = re.sub(r"[^\w\s-]", "", s)
    # collapse whitespace to single dash and strip leading/trailing dashes
    s = re.sub(r"\s+", "-", s).strip("-")
    return s.lower()

# -------------------------
# 2 extract guest name
# -------------------------
def extract_guest_name(raw_title: str) -> str:
    """
    Extract guest name using the simple rule:
      - split on first colon ':'
      - take the right hand side if a separator exists
      - remove any trailing parenthetical content e.g. ' (Bonus Episode)'
      - strip whitespace
    """
    if not raw_title:
        return ""

    s = raw_title.strip()

    # Split on the first recognized separator in the remaining string.
    # We prefer colon first as your original method did; then hyphens or em-dash.
    if ":" in s:
        parts = s.split(":", 1)
        candidate = parts[1].strip()
    else:
        # no separator found: either the whole string *is* the guest (as for new episodes)
        candidate = s

    # remove any parenthetical content at end or inside e.g "Name (Live) extra"
    candidate = re.sub(r"\(.*?\)", "", candidate).strip()

    # final clean: collapse multiple spaces
    candidate = re.sub(r"\s+", " ", candidate).strip()

    return candidate


def create_tuple_inc_ep_slugs_and_guests_list_from_html(html_string: str) -> Tuple[List[Dict[str, Any]], List[str]]:
    """
    Parse episodes HTML and return a tuple:
      (
        [list of valid episode records],
        [list of raw_titles for excluded 'Best of' episodes]
      )
    """
    
    soup = BeautifulSoup(html_string, "html.parser")
    episode_divs = soup.find_all("div", class_="image-slide-title")

    # 1. Initialize two separate lists
    records: List[Dict[str, Any]] = []
    exceptions: List[str] = [] 

    for div in episode_divs:
        raw_title = div.get_text(separator=" ", strip=True)
        
        # 2. Check the condition using the string method
        if raw_title.startswith("Best of"):
            # 3. If it is a "Best of" episode, append the title to the exceptions list
            exceptions.append(raw_title)
            # Skip the rest of the loop for this title and move to the next 'div'
            continue
        # menus to be buried with exception?
        # christmas dinner party exception?
            
        # If the 'if' condition was false (i.e., it's a regular episode), the code continues here:
        
        guest_name = extract_guest_name(raw_title)
        slug_full = slugify(raw_title)

        records.append({
            "raw_title": raw_title,
            "slug": slug_full,
            "guest_name": guest_name
        })

    # 4. Return both lists as a tuple
    return records, exceptions

# --- Running test

episodes_html_str = try_read_html_string_from_filepath(test_episodes_html_filepath)

test_episodes_list = create_tuple_inc_ep_slugs_and_guests_list_from_html(episodes_html_str)

print("Exceptions list")
print(test_episodes_list[1])

print("Names List")
print(test_episodes_list[0][:10])
print(test_episodes_list[0][100:110])

Exceptions list
['Best of 2024: Live', 'Best of 2024: Part 2', 'Best of 2024: Part 1', 'Best of 2023: Part 2', 'Best of 2023: Part 1', 'Best of 2022: Part 2', 'Best of 2022: Part 1', 'Best of 2021: Part 2', 'Best of 2021: Part 1', 'Best of 2020', 'Best of 2019']
Names List
[{'raw_title': 'Kunal Nayyar', 'slug': 'kunal-nayyar', 'guest_name': 'Kunal Nayyar'}, {'raw_title': 'Joy Crookes', 'slug': 'joy-crookes', 'guest_name': 'Joy Crookes'}, {'raw_title': 'Elle Fanning', 'slug': 'elle-fanning', 'guest_name': 'Elle Fanning'}, {'raw_title': 'Lucia Keskin', 'slug': 'lucia-keskin', 'guest_name': 'Lucia Keskin'}, {'raw_title': 'Ian Smith', 'slug': 'ian-smith', 'guest_name': 'Ian Smith'}, {'raw_title': 'Jen Brister (Tasting Menu)', 'slug': 'jen-brister-tasting-menu', 'guest_name': 'Jen Brister'}, {'raw_title': 'Gillian Anderson', 'slug': 'gillian-anderson', 'guest_name': 'Gillian Anderson'}, {'raw_title': 'Greg James', 'slug': 'greg-james', 'guest_name': 'Greg James'}, {'raw_title': 'Rhys James'

In [None]:
def create_slugs_guests_df_from_list_of_dict(titles_list: Dict) -> pd.DataFrame:
    """
    Takes the list of dicts of raw titles, slugs and guest names and returns a dataframe
    """
    df_episodes_metadata = pd.DataFrame(titles_list)
    return df_episodes_metadata

test_eps_metadata_df = create_slugs_guests_df_from_dict(test_episodes_list[0])
test_eps_metadata_df

Unnamed: 0,raw_title,slug,guest_name
0,Kunal Nayyar,kunal-nayyar,Kunal Nayyar
1,Joy Crookes,joy-crookes,Joy Crookes
2,Elle Fanning,elle-fanning,Elle Fanning
3,Lucia Keskin,lucia-keskin,Lucia Keskin
4,Ian Smith,ian-smith,Ian Smith
...,...,...,...
316,Ep 5: Aisling Bea,ep-5-aisling-bea,Aisling Bea
317,Ep 4: Nish Kumar,ep-4-nish-kumar,Nish Kumar
318,Ep 3: Richard Osman,ep-3-richard-osman,Richard Osman
319,Ep 2: Grace Dent,ep-2-grace-dent,Grace Dent


### Checking for duplicate names (e.g. Ed and James have multiple eps; 100, 200, 300)

In [34]:
filter_condition = test_eps_metadata_df['guest_name'] == "Ed Gamble and James Acaster"

# 2. Apply the Filter to the DataFrame
# When you pass the filter_condition to the DataFrame, 
# Pandas only returns the rows where the condition is True.
specific_guest_rows = test_eps_metadata_df[filter_condition]

# 3. View the results
print(specific_guest_rows)

                                             raw_title  ...                                                url
18   Ep 300: Ed Gamble and James Acaster (with spec...  ...  https://podscripts.co/podcasts/off-menu-with-e...
219  Ep 100: Ed Gamble and James Acaster (with Gues...  ...  https://podscripts.co/podcasts/off-menu-with-e...

[2 rows x 4 columns]


In [15]:
def _create_url_from_row(row: pd.Series) -> str:
    """Creates a podscripts transcript URL from an episode's metadata."""
    slug = row["slug"]
    url = f"{transcript_base_url}{slug}"
    return url

def create_urls_and_save_to_slugs_guests_df(
    input_dataframe: pd.DataFrame, output_filepath: str
) -> None:
    """
    Generates transcript URLs for a DataFrame of episode metadata and saves it.

    This function adds a new column 'url' to the input DataFrame by applying
    a helper function to each row. The modified DataFrame is then saved as a
    Parquet file to the specified path.

    Args:
        input_dataframe (pd.DataFrame): The DataFrame containing episode metadata
                                        with 'episode_number' and 'guest_name' columns.
        output_filepath (str): The full file path where the resulting DataFrame
                               will be saved in Parquet format.

    Returns:
        None: The function modifies the input DataFrame and saves a file to disk,
              but does not return a value.
    """
    df = input_dataframe
    df["url"] = df.apply(_create_url_from_row, axis=1)
    df.to_parquet(output_filepath)

In [16]:
test_processed_metadata_filepath_for_saving = os.path.join(
            V2_tests_dir, "test_metadata.parquet")

create_urls_and_save_to_slugs_guests_df(test_eps_metadata_df, test_processed_metadata_filepath_for_saving)

In [18]:
test_eps_metadata_urls = try_read_parquet(test_processed_metadata_filepath_for_saving)
test_eps_metadata_urls

Unnamed: 0,raw_title,slug,guest_name,url
0,Kunal Nayyar,kunal-nayyar,Kunal Nayyar,https://podscripts.co/podcasts/off-menu-with-e...
1,Joy Crookes,joy-crookes,Joy Crookes,https://podscripts.co/podcasts/off-menu-with-e...
2,Elle Fanning,elle-fanning,Elle Fanning,https://podscripts.co/podcasts/off-menu-with-e...
3,Lucia Keskin,lucia-keskin,Lucia Keskin,https://podscripts.co/podcasts/off-menu-with-e...
4,Ian Smith,ian-smith,Ian Smith,https://podscripts.co/podcasts/off-menu-with-e...
...,...,...,...,...
316,Ep 5: Aisling Bea,ep-5-aisling-bea,Aisling Bea,https://podscripts.co/podcasts/off-menu-with-e...
317,Ep 4: Nish Kumar,ep-4-nish-kumar,Nish Kumar,https://podscripts.co/podcasts/off-menu-with-e...
318,Ep 3: Richard Osman,ep-3-richard-osman,Richard Osman,https://podscripts.co/podcasts/off-menu-with-e...
319,Ep 2: Grace Dent,ep-2-grace-dent,Grace Dent,https://podscripts.co/podcasts/off-menu-with-e...


## Merging restaurant mentions with episodes metadata

In [22]:
# Generate res mentions dataframe ready for new merging function

# Step 4.1 Create dict of mentions with res name as keys and list of guests who mention as values
guests_who_mention_res_by_res_name_dict = create_mentions_by_res_name_dict(
            test_restaurants_html_filepath
)
# Step 4.2 Convert into exploded dataframe (one line per guest who mentions)
exploded_res_mentions_df = create_return_exploded_res_mentions_df(
    guests_who_mention_res_by_res_name_dict
)

exploded_res_mentions_df

Unnamed: 0,restaurant_name,guest_name
0,Red Chilli,Sophie Duker
1,Orana,Ian Smith
2,Barbacoa El Primo,Finn Wolfhard
3,La Taberna Del Gourmet,Rhod Gilbert
4,Ron Gastrobar,James Acaster
...,...,...
738,Estelle Manor,AJ Odudu
739,Partisan,CMAT
740,The Black Swan,Maisie Adam
740,The Black Swan,Ed Gamble


In [None]:
# New merging function

def combine_save_mentions_and_ep_metadata_dfs(
    exploded_restaurants_guest_df: pd.DataFrame,
    ep_metadata_filepath: str,
    output_df_filepath: str,
) -> None:
    """
    Takes in exploded (one line per guest/mention) mentions/guest df, and ep metadata (numbers, names, url) dataframe
    filepath, and output filepath, and combines the dataframes. The combined dataframe is then saved as a
    Parquet file to the specified path.

    Args:
        exploded_restaurants_guest_df (pd.DataFrame): A dataframe with 1 row for each mention of a restaurant (exploded)
        ep_metadata_filepath (str): String filepath for the episode metadata dataframe
        output_df_filepath (str): String filepath for where to save the combined dataframe

    Returns:
        None: The function combines the dataframes, and saves to a parquet.
    """
    # Fetch metadata filepath
    df_episodes_metadata = try_read_parquet(ep_metadata_filepath)
    # Left merge on guest, with numbers, names, url (df_episodes_metadata)
    merged_df = pd.merge(
        df_episodes_metadata, exploded_restaurants_guest_df, on="guest_name", how="left"
    )
    # Aggregating rows so we have one row per episode, with a list of restaurant mentions
    # Note groupby creates groups based on the args (three identical in this case). as_index False means also have an index col (don't use first col as index)
    # Note .agg aggregates the data, it creates a new col called restaurants mentioned, from the col 'restaurant_name', applying the method 'dropna' to each group (restuarants that were in the restaurant_name cell), dropna gets rid of the NaN's
    # Note NaN's are placeholders for missing data (means ilterally not a number, which is confusing as it could be text...)
    ep_meta_and_mentions_df = (
        merged_df.groupby(["guest_name", "url", "slug"], as_index=False, sort=False)
        .agg(restaurants_mentioned=("restaurant_name", lambda x: list(x.dropna())))
        .rename(columns={"restaurant_name": "restaurants_mentioned"})
    )
    # Save the dataframe
    ep_meta_and_mentions_df.to_parquet(output_df_filepath, index=False)


# Calling the function

test_full_episodes_metadata_path = os.path.join(V2_tests_dir, "test_episodes_metadata_full.parquet")

combine_save_mentions_and_ep_metadata_dfs(
        exploded_res_mentions_df,
        test_processed_metadata_filepath_for_saving,
        test_full_episodes_metadata_path,
    )

full_episodes_metadata_test_df = try_read_parquet(test_full_episodes_metadata_path)
full_episodes_metadata_test_df

Unnamed: 0,guest_name,url,slug,restaurants_mentioned
0,Kunal Nayyar,https://podscripts.co/podcasts/off-menu-with-e...,kunal-nayyar,"[Moti Mahal, The Tamil Prince, The Dover, Kutir]"
1,Joy Crookes,https://podscripts.co/podcasts/off-menu-with-e...,joy-crookes,[]
2,Elle Fanning,https://podscripts.co/podcasts/off-menu-with-e...,elle-fanning,"[Lady M, Red Lobster, Popeyes]"
3,Lucia Keskin,https://podscripts.co/podcasts/off-menu-with-e...,lucia-keskin,[]
4,Ian Smith,https://podscripts.co/podcasts/off-menu-with-e...,ian-smith,"[Orana, Skál, Mudbrick Vineyard]"
...,...,...,...,...
316,Aisling Bea,https://podscripts.co/podcasts/off-menu-with-e...,ep-5-aisling-bea,"[Cafe Gratitude, Burger and Lobster]"
317,Nish Kumar,https://podscripts.co/podcasts/off-menu-with-e...,ep-4-nish-kumar,"[Bademiya, The Owl and The Pussycat]"
318,Richard Osman,https://podscripts.co/podcasts/off-menu-with-e...,ep-3-richard-osman,"[Five Guys, Cora Pearl, Berners Tavern]"
319,Grace Dent,https://podscripts.co/podcasts/off-menu-with-e...,ep-2-grace-dent,"[Little Owl, Trullo]"


## New web scraper
### Needed to generate test data from the above html + to improve functionality

In [64]:
# Generate test data (full episodes metadata rows 0, 10, 20, 30, 40, 50...100)

ten_test_episodes_metadata_output_path = os.path.join(V2_tests_dir, "ten_test_episodes_full_metadata.parquet")

indices_to_slice = range(0, 101, 10)

# 2. Slice the DataFrame by position using .iloc
# .iloc stands for 'integer location' and is used for positional indexing.
sliced_df = full_episodes_metadata_test_df.iloc[indices_to_slice]

# 3. Save the sliced DataFrame to a Parquet file
# index=False ensures the default Pandas index (0, 1, 2, ...) is not saved as a column
sliced_df.to_parquet(ten_test_episodes_metadata_output_path, index=False)

print(f"Sliced DataFrame created with {len(sliced_df)} rows and saved to: {ten_test_episodes_metadata_output_path}")
sliced_df

# Second batch of test data 

second_ten_test_episodes_metadata_output_path = os.path.join(V2_tests_dir, "second_ten_test_episodes_full_metadata.parquet")

indices_to_slice_2 = range(100, 201, 10)

second_ten_test_data_df = full_episodes_metadata_test_df.iloc[indices_to_slice_2]

second_ten_test_data_df.to_parquet(second_ten_test_episodes_metadata_output_path, index=False)

print(f"Sliced DataFrame created with {len(second_ten_test_data_df)} rows and saved to: {second_ten_test_episodes_metadata_output_path}")

second_ten_test_data_df

Sliced DataFrame created with 11 rows and saved to: c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\ten_test_episodes_full_metadata.parquet
Sliced DataFrame created with 11 rows and saved to: c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\second_ten_test_episodes_full_metadata.parquet


Unnamed: 0,guest_name,url,slug,restaurants_mentioned
100,Ross Noble,https://podscripts.co/podcasts/off-menu-with-e...,ep-217-ross-noble-christmas-special,[]
110,Nick Frost,https://podscripts.co/podcasts/off-menu-with-e...,ep-207-nick-frost,"[Geranium, The Red House, The Yellow House]"
120,Jenny Eclair,https://podscripts.co/podcasts/off-menu-with-e...,ep-197-jenny-eclair,[]
130,Lily Allen,https://podscripts.co/podcasts/off-menu-with-e...,ep-187-lily-allen,"[Dorian, Afghan Kitchen]"
140,Fern Brady,https://podscripts.co/podcasts/off-menu-with-e...,ep-178-fern-brady,"[Lune, Chinese Tapas House, Bubala, Fortitude ..."
150,Ania Magliano,https://podscripts.co/podcasts/off-menu-with-e...,ep-169-ania-magliano,[]
160,Felicity Ward,https://podscripts.co/podcasts/off-menu-with-e...,ep-159-felicity-ward,"[The European, Supper Club, Meatball, Harrys S..."
170,Adam Buxton,https://podscripts.co/podcasts/off-menu-with-e...,ep-149-adam-buxton,[]
180,Nadiya Hussain,https://podscripts.co/podcasts/off-menu-with-e...,ep-139-nadiya-hussain,[YO Sushi]
190,Jason Reitman,https://podscripts.co/podcasts/off-menu-with-e...,ep-129-jason-reitman,"[Shokunin, White Bear, Mikawa]"


In [None]:
# Old with some edits (unsuccessful)

def _save_transcripts_html(eps_dataframe, directory):
    """
    Iterates through a DataFrame of episodes, downloads the HTML content from
    the episode URL, and saves it to a specified directory.

    Skips files that already exist and includes a random delay to be
    polite to the server.

    Args:
        eps_dataframe (pd.DataFrame): DataFrame containing episode metadata
                                      (including 'episode_number' and 'url').
        directory (str): The directory to save the HTML files to.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)

    for index, row in eps_dataframe.iterrows():
        guest_name = row["guest_name"]
        episode_url = row["url"]
        filename = f"{guest_name}.html"
        filepath = os.path.join(directory, filename)

        # Skip episodes that already exist
        if os.path.exists(filepath):
            print(
                f"  Skipping Episode {guest_name}, at index{index}: File already exists at {filepath}"
            )
            continue

        # Delay to be polite to the server and avoid 429 errors
        sleep_time = random.uniform(1, 3)  # Sleep for 1 to 3 seconds
        time.sleep(sleep_time)

        html_content_str = extract_html(episode_url)

        # Check for None before attempting to save
        # The extract_html function returns None on failure (like a 429 error)
        if html_content_str:
            save_text_to_file(html_content_str, filename, directory)
        else:
            print(
                f"  Skipping save for Episode {episode_num} due to failed extraction."
            )

In [None]:
# GPT scraper annotated

import os
import json
import random
import time
import math
import logging
from pathlib import Path
from typing import Dict, List, Optional
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---- Simple logger ----
# logger = logging.getLogger("scraper")
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
    logger.addHandler(ch)
    logger.setLevel(logging.INFO)


# ---- Helper: random-ish UA list (small) ----
_SIMPLE_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Mozilla/5.0 (X11; Linux x86_64)",
]


def _choose_headers():
    return {"User-Agent": random.choice(_SIMPLE_USER_AGENTS)}


# ---- Downloader with retries, backoff, persistence ----
def download_transcripts(
    url_map: Dict[str, str], # links the url to the guest name via a dict, because it uses name as filename (note needs to use slug as some names repeate.g. ed & james)
    out_dir: str, # Directory to save html to
    status_path: str, # Path to status JSON file
    max_attempts_per_url: int = 5,
    backoff_base: float = 1.0,
    max_workers: int = 3, # Number of "workers" (threads , things that try to run concurrently in a single overarching process)
    session: Optional[requests.Session] = None, # The session if we have one open for some reason (single session = more effieicnt)
    timeout: float = 12.0,
) -> Dict[str, Dict]:
    """
    Download a set of URLs and save the HTML files locally.

    Args:
        url_map: mapping slug_or_filename -> url. Eg {"paul-rudd": "https://.../ep-215-paul-rudd"}
                 Or you can map guest_name -> url.
        out_dir: directory to save files (will be created).
        status_path: path to JSON status file to persist attempts and outcomes.
        max_attempts_per_url: maximum attempts per url before giving up.
        backoff_base: base seconds for exponential backoff (1.0 is a reasonable default).
        max_workers: number of concurrent download workers (1..6 recommended).
        session: optional requests.Session() - if None a new one is created.
        timeout: request timeout in seconds.

    Returns:
        status dict mapping key -> { "url", "attempts", "status", "saved_path", "last_error" }
    """

    out_dir = Path(out_dir) 
    out_dir.mkdir(parents=True, exist_ok=True)
    status_path = Path(status_path) # Turns strings into paths for use saving/reading

    # Load existing status if present (allows resume)
    if status_path.exists():
        try:
            with open(status_path, "r", encoding="utf-8") as f:
                status = json.load(f)
        except Exception:
            status = {}
    else:
        status = {}

    # Initialize status entries for any missing keys
    # Makes statuses for all "keys" (guest names/episodes)
    for key, url in url_map.items():
        if key not in status:
            status[key] = {
                "url": url,
                "attempts": 0,
                "status": "pending",  # pending | success | failed
                "saved_path": None,
                "last_error": None,
            }

    # Use a single session for pooling - more efficient that starting multiple sessions apparently
    session = session or requests.Session()

    def _attempt_download(key: str, meta: Dict) -> Dict:
        url = meta["url"]
        attempts = meta["attempts"]
        result = dict(meta)
        # If already succeeded, skip
        if meta.get("status") == "success":
            return result

        # If we've already reached max attempts, mark failed and skip
        if attempts >= max_attempts_per_url:
            result["status"] = "failed"
            result["last_error"] = "max_attempts_reached"
            return result

        try:
            # Build headers and request - headers are in the request and say what browser I'm using, we're faking three diff ones to rotate between
            # To look less bot like
            # Resp is a response object which is what comes back from the request, and contains the html text among other things
            headers = _choose_headers()
            resp = session.get(url, headers=headers, timeout=timeout)
            # If success
            if resp.status_code == 200:
                # Save file (deterministic name using key)
                # note needs changing to slug
                filename = f"{key}.html"
                saved_path = str(out_dir / filename)
                with open(saved_path, "w", encoding="utf-8") as fh:
                    fh.write(resp.text)
                result.update({
                    "attempts": attempts + 1,
                    "status": "success",
                    "saved_path": saved_path,
                    "last_error": None,
                })
                logger.info("Saved %s -> %s", url, saved_path) # The logger lets us know whats going on, better than prints as level of detail can be
                # changed dynamically
                return result

            # Retryable status codes (429 Too Many Requests, 5xx)
            if resp.status_code in (429, 500, 502, 503, 504):
                result.update({
                    "attempts": attempts + 1,
                    "status": "pending",
                    "last_error": f"status_{resp.status_code}"
                })
                logger.warning("Retryable HTTP %s for %s (attempt %s)", resp.status_code, url, attempts + 1)
                return result

            # Non-retryable: mark failed with info
            result.update({
                "attempts": attempts + 1,
                "status": "failed",
                "last_error": f"status_{resp.status_code}"
            })
            logger.error("Non-retryable HTTP %s for %s", resp.status_code, url)
            return result

        except requests.RequestException as e: # network level errors, considered retryable
            # Network error: retryable
            result.update({
                "attempts": attempts + 1,
                "status": "pending",
                "last_error": repr(e)
            })
            logger.warning("RequestException for %s (attempt %s): %s", url, attempts + 1, e)
            return result

    # Worker function wraps attempts + backoff
    # Note meta is the status for this key, and it may be changed throuhg attemotin downloads to new meta
    def _worker_task(key):
        meta = status[key]
        # If already success or permanently failed, return
        if meta.get("status") == "success" or meta.get("attempts", 0) >= max_attempts_per_url:
            return key, meta

        # attempt download
        new_meta = _attempt_download(key, meta)

        # If still pending (retry-worthy), sleep exponential backoff before returning
        if new_meta["status"] == "pending":
            # compute sleep: base * 2^(attempts-1) + jitter
            sleep = backoff_base * (2 ** (new_meta["attempts"] - 1))
            jitter = random.uniform(0, 1.0)
            sleep_time = min(sleep + jitter, 60)  # cap at 60s
            logger.info("Backing off %0.2fs for %s (attempt %s)", sleep_time, new_meta["url"], new_meta["attempts"])
            time.sleep(sleep_time)

        return key, new_meta

    # Main loop: do rounds where each round runs up to max_workers concurrent attempts on pending items.
    pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]
    # List comprehension selects k's to include, where k is the status and v the attempts, if status is pending and attempts below threshhld
    round_idx = 0 # counter
    while pending_keys:
        round_idx += 1
        logger.info("Download round %d: %d pending", round_idx, len(pending_keys)) # How does %d work?

        # Limit concurrency to not overload server
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = {ex.submit(_worker_task, key): key for key in pending_keys} # Futures represent the future result of the task - recall 
            # recall that threads designed to run tasks concerrently
            # Note with... as ex is a context manager, opens/closes the thread pool
            # submit passes a task (a certain key/guest name to try and download) to the thread pool
            # futures = {} creates a dict where the future objects are keys and the values are the...keys, confusingly        
            for fut in as_completed(futures): # as completed yields futuer obkects 1 by 1 as they're completed
                key = futures[fut] # This accesses the future
                try:
                    k, new_meta = fut.result() # This makes k and new meta the results of the future (output of worker task, which is attempted download: a key and new meta which is the status entry for the key, inc. save path for html)
                    status[k].update(new_meta)
                except Exception as e:
                    logger.exception("Unhandled exception for key %s: %s", key, e)
                    status[key]["attempts"] = status[key].get("attempts", 0) + 1
                    status[key]["last_error"] = repr(e)

        # persist status to disk after every round
        try:
            with open(status_path, "w", encoding="utf-8") as f:
                json.dump(status, f, indent=2)
        except Exception as e:
            logger.exception("Failed to write status file: %s", e)

        # Prepare next round: only keys still pending and under attempts limit
        pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]

        # If there are pending keys, optionally small delay between rounds
        if pending_keys:
            logger.info("Sleeping 2s between rounds to be polite...")
            time.sleep(2)

    # final persist - saves the JSON again (unsure how dump method works)
    with open(status_path, "w", encoding="utf-8") as f:
        json.dump(status, f, indent=2)

    # return status mapping
    return status

In [None]:
# GPT scraper V2 slug use

import os
import json
import random
import time
import math
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# ---- Simple logger ----
#logger = logging.getLogger("scraper")
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
    logger.addHandler(ch)
    logger.setLevel(logging.INFO)


# ---- Helper: random-ish UA list (small) ----
_SIMPLE_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Mozilla/5.0 (X11; Linux x86_64)",
]


def _choose_headers():
    return {"User-Agent": random.choice(_SIMPLE_USER_AGENTS)}


def _sanitize_key(key: str) -> str:
    """
    Convert a key (expected to be a slug) into a safe filename slug:
      - lowercase
      - replace any sequence of characters NOT a-z0-9 or '-' or '_' with '-'
      - collapse multiple '-' into one
      - strip leading/trailing '-' or '_'
    This ensures keys like "Paul Rudd" become "paul-rudd" and already-correct slugs remain stable.
    """
    if not isinstance(key, str):
        key = str(key)
    s = key.strip().lower()
    s = re.sub(r"[^a-z0-9\-_]+", "-", s)
    s = re.sub(r"-{2,}", "-", s)
    return s.strip("-_")


# ---- Downloader with retries, backoff, persistence ----
def download_transcripts(
    url_map: Dict[str, str],  # mapping slug_or_filename -> url (keys should be your episode slugs)
    out_dir: str,  # Directory to save html to
    status_path: str,  # Path to status JSON file
    max_attempts_per_url: int = 5,
    backoff_base: float = 1.0,
    max_workers: int = 3,  # Number of concurrent download workers
    session: Optional[requests.Session] = None,  # Optional shared requests.Session
    timeout: float = 12.0,
) -> Dict[str, Dict]:
    """
    Download a set of URLs and save the HTML files locally.

    Args:
        url_map: mapping slug_or_filename -> url. Keys should be the episode slugs you want to use as identifiers.
        out_dir: directory to save files (created if missing).
        status_path: path to JSON status file to persist attempts and outcomes.
        max_attempts_per_url: maximum attempts per url before giving up.
        backoff_base: base seconds for exponential backoff.
        max_workers: number of concurrent download workers.
        session: optional requests.Session() - if None a new one is created.
        timeout: request timeout in seconds.

    Returns:
        status dict mapping key -> { "url", "attempts", "status", "saved_path", "last_error" }
    """

    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    status_path = Path(status_path)

    # Load existing status if present (allows resume)
    if status_path.exists():
        try:
            with open(status_path, "r", encoding="utf-8") as f:
                status = json.load(f)
        except Exception:
            status = {}
    else:
        status = {}

    # Initialize status entries for any missing keys
    for key, url in url_map.items():
        if key not in status:
            status[key] = {
                "url": url,
                "attempts": 0,
                "status": "pending",  # pending | success | failed
                "saved_path": None,
                "last_error": None,
            }

    # Use a single session for pooling
    session = session or requests.Session()

    def _attempt_download(key: str, meta: Dict) -> Dict:
        url = meta["url"]
        attempts = meta["attempts"]
        result = dict(meta)
        # If already succeeded, skip
        if meta.get("status") == "success":
            return result

        # If we've already reached max attempts, mark failed and skip
        if attempts >= max_attempts_per_url:
            result["status"] = "failed"
            result["last_error"] = "max_attempts_reached"
            return result

        try:
            # Build headers and request
            headers = _choose_headers()
            resp = session.get(url, headers=headers, timeout=timeout)
            # If success
            if resp.status_code == 200:
                # Save file using sanitized key (ensure filesystem-safe slug)
                safe_key = _sanitize_key(key)
                filename = f"{safe_key}.html"
                saved_path = str(out_dir / filename)
                with open(saved_path, "w", encoding="utf-8") as fh:
                    fh.write(resp.text)
                result.update({
                    "attempts": attempts + 1,
                    "status": "success",
                    "saved_path": saved_path,
                    "last_error": None,
                })
                logger.info("Saved %s -> %s", url, saved_path)
                return result

            # Retryable status codes (429 Too Many Requests, 5xx)
            if resp.status_code in (429, 500, 502, 503, 504):
                result.update({
                    "attempts": attempts + 1,
                    "status": "pending",
                    "last_error": f"status_{resp.status_code}"
                })
                logger.warning("Retryable HTTP %s for %s (attempt %s)", resp.status_code, url, attempts + 1)
                return result

            # Non-retryable: mark failed with info
            result.update({
                "attempts": attempts + 1,
                "status": "failed",
                "last_error": f"status_{resp.status_code}"
            })
            logger.error("Non-retryable HTTP %s for %s", resp.status_code, url)
            return result

        except requests.RequestException as e:
            # Network error: retryable
            result.update({
                "attempts": attempts + 1,
                "status": "pending",
                "last_error": repr(e)
            })
            logger.warning("RequestException for %s (attempt %s): %s", url, attempts + 1, e)
            return result

    # Worker function wraps attempts + backoff
    def _worker_task(key):
        meta = status[key]
        # If already success or permanently failed, return
        if meta.get("status") == "success" or meta.get("attempts", 0) >= max_attempts_per_url:
            return key, meta

        # attempt download
        new_meta = _attempt_download(key, meta)

        # If still pending (retry-worthy), sleep exponential backoff before returning
        if new_meta["status"] == "pending":
            # compute sleep: base * 2^(attempts-1) + jitter
            sleep = backoff_base * (2 ** (new_meta["attempts"] - 1))
            jitter = random.uniform(0, 1.0)
            sleep_time = min(sleep + jitter, 60)  # cap at 60s
            logger.info("Backing off %0.2fs for %s (attempt %s)", sleep_time, new_meta["url"], new_meta["attempts"])
            time.sleep(sleep_time)

        return key, new_meta

    # Main loop: do rounds where each round runs up to max_workers concurrent attempts on pending items.
    pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]
    round_idx = 0
    while pending_keys:
        round_idx += 1
        logger.info("Download round %d: %d pending", round_idx, len(pending_keys))

        # Limit concurrency to not overload server
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = {ex.submit(_worker_task, key): key for key in pending_keys}
            for fut in as_completed(futures):
                key = futures[fut]
                try:
                    k, new_meta = fut.result()
                    status[k].update(new_meta)
                except Exception as e:
                    logger.exception("Unhandled exception for key %s: %s", key, e)
                    status[key]["attempts"] = status[key].get("attempts", 0) + 1
                    status[key]["last_error"] = repr(e)

        # persist status to disk after every round
        try:
            with open(status_path, "w", encoding="utf-8") as f:
                json.dump(status, f, indent=2)
        except Exception as e:
            logger.exception("Failed to write status file: %s", e)

        # Prepare next round: only keys still pending and under attempts limit
        pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]

        # If there are pending keys, optionally small delay between rounds
        if pending_keys:
            logger.info("Sleeping 2s between rounds to be polite...")
            time.sleep(2)

    # final persist
    with open(status_path, "w", encoding="utf-8") as f:
        json.dump(status, f, indent=2)

    # return status mapping
    return status

In [None]:
# V3 scraper GPT (better log for debugging)

import logging
from pathlib import Path
import re
import random
import time
import json
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Optional
from email.utils import parsedate_to_datetime
from datetime import datetime, timezone

# -------------------
# Logger configuration helper
# -------------------
# def configure_logger(log_file: Optional[str] = None, level: int = logging.DEBUG):
    """
    Configure a compact logger for the scraper.
    - Console handler always enabled.
    - Optional file handler if log_file provided.
    - Default level: DEBUG for maximum visibility while testing.
    """
    logger = logging.getLogger("scraper")
    logger.setLevel(level)

    # Avoid adding handlers multiple times when running multiple times in a notebook
    if logger.hasHandlers():
        logger.handlers.clear()

    # Console handler (clear, one-line format)
    ch = logging.StreamHandler()
    ch.setLevel(level)
    ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
    logger.addHandler(ch)

    # Optional file handler (rotating not necessary here — keep simple)
    if log_file:
        fh = logging.FileHandler(log_file, encoding="utf-8")
        fh.setLevel(level)
        fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
        logger.addHandler(fh)

    return logger

# Initialize logger (call this in your notebook before running download_transcripts)
logger = configure_logger()  # or configure_logger("data/scraper.log")

# -------------------
# small sanitize helper (same as before)
# -------------------
def _sanitize_key(key: str) -> str:
    if not isinstance(key, str):
        key = str(key)
    s = key.strip().lower()
    s = re.sub(r"[^a-z0-9\-_]+", "-", s)
    s = re.sub(r"-{2,}", "-", s)
    return s.strip("-_")

# ---- Helper: random-ish UA list (small) ----
_SIMPLE_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Mozilla/5.0 (X11; Linux x86_64)",
]

def _choose_headers():
    return {"User-Agent": random.choice(_SIMPLE_USER_AGENTS)}

# ----- Helper to access retry limits from the server (for use in scraper)

def _parse_retry_after(header_value: Optional[str]) -> Optional[float]:
    """
    Parse Retry-After header. It can be:
      - an integer number of seconds, e.g. "120"
      - a HTTP-date string, e.g. "Wed, 21 Oct 2015 07:28:00 GMT"
    Return number of seconds to wait (float), or None if not parseable.
    """
    if not header_value:
        return None
    header_value = header_value.strip()
    # try integer seconds
    if header_value.isdigit():
        return float(header_value)
    # try HTTP-date
    try:
        dt = parsedate_to_datetime(header_value)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        now = datetime.now(timezone.utc)
        delta = (dt - now).total_seconds()
        return max(0.0, float(delta))
    except Exception:
        return None

# -------------------
# download_transcripts with extra logging (no other behavioural changes)
# -------------------
def download_transcripts(
    url_map: Dict[str, str],
    out_dir: str,
    status_path: str,
    max_attempts_per_url: int = 5,
    backoff_base: float = 1.0,
    max_workers: int = 3,
    session: Optional[requests.Session] = None,
    timeout: float = 12.0,
) -> Dict[str, Dict]:
    """
    Download URLs to out_dir using url_map (keys are slugs used as filenames).
    Added logging provides visibility into what the function does on each run.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    status_path = Path(status_path)

    logger.info("Starting download_transcripts: %d urls, out_dir=%s, status_path=%s",
                len(url_map), out_dir, status_path)

    # Load existing status if present (allows resume)
    if status_path.exists():
        try:
            with open(status_path, "r", encoding="utf-8") as f:
                status = json.load(f)
            logger.debug("Loaded existing status.json with %d entries", len(status))
        except Exception as e:
            logger.warning("Failed to load status.json (%s). Starting with empty status.", e)
            status = {}
    else:
        logger.debug("No status.json file found at %s. Starting fresh.", status_path)
        status = {}

    # Initialize status entries for any missing keys (log each new init)
    for key, url in url_map.items():
        if key not in status:
            status[key] = {
                "url": url,
                "attempts": 0,
                "status": "pending",  # pending | success | failed
                "saved_path": None,
                "last_error": None,
            }
            logger.debug("Initialized status for key='%s' -> %s", key, url)

    # Use a single session for pooling
    session = session or requests.Session()

    def _attempt_download(key: str, meta: Dict) -> Dict:
        url = meta["url"]
        attempts = meta["attempts"]
        result = dict(meta)

        # If already succeeded, skip and log reason
        if meta.get("status") == "success":
            logger.debug("Skipping key='%s' (already success, saved_path=%s)", key, meta.get("saved_path"))
            return result

        # If max attempts reached, log and skip
        if attempts >= max_attempts_per_url:
            result["status"] = "failed"
            result["last_error"] = "max_attempts_reached"
            logger.info("Key='%s' reached max attempts (%d). Marking failed.", key, attempts)
            return result

        # Log the attempt about to be made
        logger.debug("Attempting key='%s' (attempt %d) -> %s", key, attempts + 1, url)
        try:
            headers = _choose_headers()
            resp = session.get(url, headers=headers, timeout=timeout)

            # If success (200)
            if resp.status_code == 200:
                safe_key = _sanitize_key(key)
                filename = f"{safe_key}.html"
                saved_path = str(out_dir / filename)

                # If file already exists, log that we're overwriting (helps debug)
                if Path(saved_path).exists():
                    logger.debug("File %s already exists and will be overwritten by key='%s'", saved_path, key)

                with open(saved_path, "w", encoding="utf-8") as fh:
                    fh.write(resp.text)

                result.update({
                    "attempts": attempts + 1,
                    "status": "success",
                    "saved_path": saved_path,
                    "last_error": None,
                })
                logger.info("Saved %s -> %s (key=%s)", url, saved_path, key)
                return result

            # Retryable status codes
            if resp.status_code in (429, 500, 502, 503, 504):
                result.update({
                    "attempts": attempts + 1,
                    "status": "pending",
                    "last_error": f"status_{resp.status_code}"
                })
                logger.warning("Retryable HTTP %s for key='%s' url=%s (attempt %s)",
                               resp.status_code, key, url, attempts + 1)
                # Log headers optionally for 429 to see Retry-After
                if resp.status_code == 429:
                    ra = resp.headers.get("Retry-After")
                    logger.debug("429 response headers for key='%s': Retry-After=%s", key, ra)
                return result

            # Non-retryable
            result.update({
                "attempts": attempts + 1,
                "status": "failed",
                "last_error": f"status_{resp.status_code}"
            })
            logger.error("Non-retryable HTTP %s for key='%s' url=%s", resp.status_code, key, url)
            return result

        except requests.RequestException as e:
            # Network error: retryable
            result.update({
                "attempts": attempts + 1,
                "status": "pending",
                "last_error": repr(e)
            })
            logger.warning("RequestException for key='%s' url=%s (attempt %s): %s", key, url, attempts + 1, e)
            return result

    # Worker wrapper with backoff
    def _worker_task(key):
        meta = status[key]
        if meta.get("status") == "success" or meta.get("attempts", 0) >= max_attempts_per_url:
            return key, meta

        new_meta = _attempt_download(key, meta)

        if new_meta["status"] == "pending":
            sleep = backoff_base * (2 ** (new_meta["attempts"] - 1))
            jitter = random.uniform(0, 1.0)
            sleep_time = min(sleep + jitter, 60)
            logger.debug("Backing off %0.2fs for key='%s' (attempt %s)", sleep_time, key, new_meta["attempts"])
            time.sleep(sleep_time)

        return key, new_meta

    # Main loop
    pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]
    round_idx = 0
    while pending_keys:
        round_idx += 1
        logger.info("Download round %d: %d pending", round_idx, len(pending_keys))

        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = {ex.submit(_worker_task, key): key for key in pending_keys}
            for fut in as_completed(futures):
                key = futures[fut]
                try:
                    k, new_meta = fut.result()
                    status[k].update(new_meta)
                except Exception as e:
                    logger.exception("Unhandled exception for key %s: %s", key, e)
                    status[key]["attempts"] = status[key].get("attempts", 0) + 1
                    status[key]["last_error"] = repr(e)

        # persist status to disk after every round
        try:
            with open(status_path, "w", encoding="utf-8") as f:
                json.dump(status, f, indent=2)
            logger.debug("Persisted status.json (round %d).", round_idx)
        except Exception as e:
            logger.exception("Failed to write status file: %s", e)

        # Prepare next round
        pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]

        if pending_keys:
            logger.info("Sleeping 2s between rounds to be polite...")
            time.sleep(2)

    # final persist and summary
    with open(status_path, "w", encoding="utf-8") as f:
        json.dump(status, f, indent=2)

    # Final summary counts
    succ = sum(1 for v in status.values() if v.get("status") == "success")
    failed = sum(1 for v in status.values() if v.get("status") == "failed")
    pending = sum(1 for v in status.values() if v.get("status") == "pending")
    logger.info("Download finished. success=%d failed=%d pending=%d", succ, failed, pending)

    return status

In [85]:
# V4 scraper GPT (collects server wait times from the server and uses these to prevent overload errors)

import logging
from pathlib import Path
import re
import random
import time
import json
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Optional
from email.utils import parsedate_to_datetime
from datetime import datetime, timezone

# -------------------
# Logger configuration helper
# -------------------
def configure_logger(log_file: Optional[str] = None, level: int = logging.DEBUG):
    """
    Configure a compact logger for the scraper.
    - Console handler always enabled.
    - Optional file handler if log_file provided.
    - Default level: DEBUG for maximum visibility while testing.
    """
    logger = logging.getLogger("scraper")
    logger.setLevel(level)

    # Avoid adding handlers multiple times when running multiple times in a notebook
    if logger.hasHandlers():
        logger.handlers.clear()

    # Console handler (clear, one-line format)
    ch = logging.StreamHandler()
    ch.setLevel(level)
    ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
    logger.addHandler(ch)

    # Optional file handler (rotating not necessary here — keep simple)
    if log_file:
        fh = logging.FileHandler(log_file, encoding="utf-8")
        fh.setLevel(level)
        fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
        logger.addHandler(fh)

    return logger

# Initialize logger (call this in your notebook before running download_transcripts)
logger = configure_logger()  # or configure_logger("data/scraper.log")

# -------------------
# small sanitize helper (same as before)
# -------------------
def _sanitize_key(key: str) -> str:
    if not isinstance(key, str):
        key = str(key)
    s = key.strip().lower()
    s = re.sub(r"[^a-z0-9\-_]+", "-", s)
    s = re.sub(r"-{2,}", "-", s)
    return s.strip("-_")

# ---- Helper: random-ish UA list (small) ----
_SIMPLE_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "Mozilla/5.0 (X11; Linux x86_64)",
]

def _choose_headers():
    return {"User-Agent": random.choice(_SIMPLE_USER_AGENTS)}

# ----- Helper to access retry limits from the server (for use in scraper)
def _parse_retry_after(header_value: Optional[str]) -> Optional[float]:
    """
    Parse Retry-After header. It can be:
      - an integer number of seconds, e.g. "120"
      - a HTTP-date string, e.g. "Wed, 21 Oct 2015 07:28:00 GMT"
    Return number of seconds to wait (float), or None if not parseable.
    """
    if not header_value:
        return None
    header_value = header_value.strip()
    # try integer seconds
    if header_value.isdigit():
        try:
            return float(header_value)
        except Exception:
            return None
    # try HTTP-date
    try:
        dt = parsedate_to_datetime(header_value)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        now = datetime.now(timezone.utc)
        delta = (dt - now).total_seconds()
        return max(0.0, float(delta))
    except Exception:
        return None

# -------------------
# download_transcripts with extra logging (no other behavioural changes)
# -------------------
def download_transcripts(
    url_map: Dict[str, str],
    out_dir: str,
    status_path: str,
    max_attempts_per_url: int = 5,
    backoff_base: float = 1.0,
    max_workers: int = 3,
    session: Optional[requests.Session] = None,
    timeout: float = 12.0,
) -> Dict[str, Dict]:
    """
    Download URLs to out_dir using url_map (keys are slugs used as filenames).
    Added logging provides visibility into what the function does on each run.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    status_path = Path(status_path)

    logger.info("Starting download_transcripts: %d urls, out_dir=%s, status_path=%s",
                len(url_map), out_dir, status_path)

    # Load existing status if present (allows resume)
    if status_path.exists():
        try:
            with open(status_path, "r", encoding="utf-8") as f:
                status = json.load(f)
            logger.debug("Loaded existing status.json with %d entries", len(status))
        except Exception as e:
            logger.warning("Failed to load status.json (%s). Starting with empty status.", e)
            status = {}
    else:
        logger.debug("No status.json file found at %s. Starting fresh.", status_path)
        status = {}

    # Initialize status entries for any missing keys (log each new init)
    for key, url in url_map.items():
        if key not in status:
            status[key] = {
                "url": url,
                "attempts": 0,
                "status": "pending",  # pending | success | failed
                "saved_path": None,
                "last_error": None,
            }
            logger.debug("Initialized status for key='%s' -> %s", key, url)

    # Use a single session for pooling
    session = session or requests.Session()

    def _attempt_download(key: str, meta: Dict) -> Dict:
        url = meta["url"]
        attempts = meta["attempts"]
        result = dict(meta)

        # If already succeeded, skip and log reason
        if meta.get("status") == "success":
            logger.debug("Skipping key='%s' (already success, saved_path=%s)", key, meta.get("saved_path"))
            return result

        # If max attempts reached, log and skip
        if attempts >= max_attempts_per_url:
            result["status"] = "failed"
            result["last_error"] = "max_attempts_reached"
            logger.info("Key='%s' reached max attempts (%d). Marking failed.", key, attempts)
            return result

        # Log the attempt about to be made
        logger.debug("Attempting key='%s' (attempt %d) -> %s", key, attempts + 1, url)
        try:
            headers = _choose_headers()
            resp = session.get(url, headers=headers, timeout=timeout)

            # If success (200)
            if resp.status_code == 200:
                safe_key = _sanitize_key(key)
                filename = f"{safe_key}.html"
                saved_path = str(out_dir / filename)

                # If file already exists, log that we're overwriting (helps debug)
                if Path(saved_path).exists():
                    logger.debug("File %s already exists and will be overwritten by key='%s'", saved_path, key)

                with open(saved_path, "w", encoding="utf-8") as fh:
                    fh.write(resp.text)

                result.update({
                    "attempts": attempts + 1,
                    "status": "success",
                    "saved_path": saved_path,
                    "last_error": None,
                })
                logger.info("Saved %s -> %s (key=%s)", url, saved_path, key)
                return result

            # Retryable status codes
            if resp.status_code in (429, 500, 502, 503, 504):
                # Parse Retry-After header if present and include in result
                retry_after_raw = resp.headers.get("Retry-After")
                retry_after_seconds = _parse_retry_after(retry_after_raw)
                result.update({
                    "attempts": attempts + 1,
                    "status": "pending",
                    "last_error": f"status_{resp.status_code}",
                    "retry_after_seconds": retry_after_seconds,
                })
                logger.warning("Retryable HTTP %s for key='%s' url=%s (attempt %s)",
                               resp.status_code, key, url, attempts + 1)
                # Log headers optionally for 429 to see Retry-After
                if resp.status_code == 429:
                    logger.debug("429 response headers for key='%s': Retry-After=%s", key, retry_after_raw)
                    logger.debug("Parsed Retry-After seconds for key='%s': %s", key, retry_after_seconds)
                return result

            # Non-retryable
            result.update({
                "attempts": attempts + 1,
                "status": "failed",
                "last_error": f"status_{resp.status_code}"
            })
            logger.error("Non-retryable HTTP %s for key='%s' url=%s", resp.status_code, key, url)
            return result

        except requests.RequestException as e:
            # Network error: retryable
            result.update({
                "attempts": attempts + 1,
                "status": "pending",
                "last_error": repr(e)
            })
            logger.warning("RequestException for key='%s' url=%s (attempt %s): %s", key, url, attempts + 1, e)
            return result

    # Worker wrapper with backoff
    def _worker_task(key):
        meta = status[key]
        if meta.get("status") == "success" or meta.get("attempts", 0) >= max_attempts_per_url:
            return key, meta

        new_meta = _attempt_download(key, meta)

        if new_meta["status"] == "pending":
            # computed exponential backoff (what we would do)
            comp_sleep = backoff_base * (2 ** (new_meta["attempts"] - 1))
            jitter = random.uniform(0, 1.0)
            computed_sleep = comp_sleep + jitter

            # server-provided advice (if any)
            retry_after = new_meta.get("retry_after_seconds")
            if retry_after is not None:
                # use the server's suggestion if it's longer than our computed wait
                sleep_time = max(computed_sleep, float(retry_after))
            else:
                sleep_time = computed_sleep

            # cap to avoid runaway sleeps (adjust cap as desired)
            sleep_time = min(sleep_time, 600.0)

            logger.info("Backing off %0.2fs for key='%s' (attempt %s) [computed=%0.2fs, server=%s]",
                        sleep_time, key, new_meta["attempts"], computed_sleep, retry_after)
            time.sleep(sleep_time)

        return key, new_meta

    # Main loop
    pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]
    round_idx = 0
    while pending_keys:
        round_idx += 1
        logger.info("Download round %d: %d pending", round_idx, len(pending_keys))

        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = {ex.submit(_worker_task, key): key for key in pending_keys}
            for fut in as_completed(futures):
                key = futures[fut]
                try:
                    k, new_meta = fut.result()
                    status[k].update(new_meta)
                except Exception as e:
                    logger.exception("Unhandled exception for key %s: %s", key, e)
                    status[key]["attempts"] = status[key].get("attempts", 0) + 1
                    status[key]["last_error"] = repr(e)

        # persist status to disk after every round
        try:
            with open(status_path, "w", encoding="utf-8") as f:
                json.dump(status, f, indent=2)
            logger.debug("Persisted status.json (round %d).", round_idx)
        except Exception as e:
            logger.exception("Failed to write status file: %s", e)

        # Prepare next round
        pending_keys = [k for k, v in status.items() if v["status"] != "success" and v["attempts"] < max_attempts_per_url]

        if pending_keys:
            logger.info("Sleeping 2s between rounds to be polite...")
            time.sleep(2)

    # final persist and summary
    with open(status_path, "w", encoding="utf-8") as f:
        json.dump(status, f, indent=2)

    # Final summary counts
    succ = sum(1 for v in status.values() if v.get("status") == "success")
    failed = sum(1 for v in status.values() if v.get("status") == "failed")
    pending = sum(1 for v in status.values() if v.get("status") == "pending")
    logger.info("Download finished. success=%d failed=%d pending=%d", succ, failed, pending)

    return status

In [86]:
# First Test Batch new scraper


# Cell A — Build url_map
# Replace base_url below if you need to construct URLs from slugs:
base_url = "https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster"

# If sliced_df has a 'url' column already:
if "url" in sliced_df.columns:
    url_map = {row["slug"]: row["url"] for _, row in sliced_df.iterrows()}
else:
    # build urls by joining base_url and slug (only do this if that matches the website)
    url_map = {row["slug"]: base_url.rstrip("/") + "/" + row["slug"].lstrip("/") for _, row in sliced_df.iterrows()}

len(url_map), list(url_map.items())[:3]  # quick check


(11,
 [('kunal-nayyar',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/kunal-nayyar'),
  ('mawaan-rizwan',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/mawaan-rizwan'),
  ('ep-298-james-norton-in-partnership-with-dexcom',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-298-james-norton-in-partnership-with-dexcom')])

In [87]:
# First Test Batch new scraper - Cell B — run download_transcripts
out_dir = os.path.join(V2_tests_dir, "test_transcripts")      # where HTMLs will be saved 
status_path = os.path.join(out_dir, "status.json")

# tune these for a polite test run
max_attempts_per_url = 8
backoff_base = 2.0
max_workers = 2   # start low while testing

# call the function (assumes download_transcripts is in scope)
status = download_transcripts(
    url_map=url_map,
    out_dir=out_dir,
    status_path=status_path,
    max_attempts_per_url=max_attempts_per_url,
    backoff_base=backoff_base,
    max_workers=max_workers,
    timeout=12.0
)


2025-11-26 11:57:28,920 INFO: Starting download_transcripts: 11 urls, out_dir=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts, status_path=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts\status.json
2025-11-26 11:57:28,925 DEBUG: Loaded existing status.json with 11 entries
2025-11-26 11:57:28,935 INFO: Download finished. success=11 failed=0 pending=0


In [88]:
# Second test batch new scraper

# Cell A — Build url_map
# Replace base_url below if you need to construct URLs from slugs:
base_url = "https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster"

# If second_ten_test_data_df has a 'url' column already:
if "url" in second_ten_test_data_df.columns:
    url_map_2 = {row["slug"]: row["url"] for _, row in second_ten_test_data_df.iterrows()}
else:
    # build urls by joining base_url and slug (only do this if that matches the website)
    url_map_2 = {row["slug"]: base_url.rstrip("/") + "/" + row["slug"].lstrip("/") for _, row in second_ten_test_data_df.iterrows()}

len(url_map_2), list(url_map_2.items())[:3]  # quick check

(11,
 [('ep-217-ross-noble-christmas-special',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-217-ross-noble-christmas-special'),
  ('ep-207-nick-frost',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-207-nick-frost'),
  ('ep-197-jenny-eclair',
   'https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-197-jenny-eclair')])

In [89]:
# Second Test Batch new scraper - Cell B — run download_transcripts
out_dir = os.path.join(V2_tests_dir, "test_transcripts")      # where HTMLs will be saved 
status_path = os.path.join(out_dir, "status.json")

# tune these for a polite test run
max_attempts_per_url = 8
backoff_base = 2.0
max_workers = 2   # start low while testing

# call the function (assumes download_transcripts is in scope)
status = download_transcripts(
    url_map=url_map_2,
    out_dir=out_dir,
    status_path=status_path,
    max_attempts_per_url=max_attempts_per_url,
    backoff_base=backoff_base,
    max_workers=max_workers,
    timeout=12.0
)

2025-11-26 11:57:42,535 INFO: Starting download_transcripts: 11 urls, out_dir=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts, status_path=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts\status.json
2025-11-26 11:57:42,538 DEBUG: Loaded existing status.json with 11 entries
2025-11-26 11:57:42,539 DEBUG: Initialized status for key='ep-207-nick-frost' -> https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-207-nick-frost
2025-11-26 11:57:42,541 DEBUG: Initialized status for key='ep-197-jenny-eclair' -> https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-197-jenny-eclair
2025-11-26 11:57:42,541 DEBUG: Initialized status for key='ep-187-lily-allen' -> https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster/ep-187-lily-allen
2025-11-26 11:57:42,542 DEBUG: Initial

In [None]:
# Orchestration function for the scraper - notes (everything we need to do first)

logger = configure_logger()

# If sliced_df has a 'url' column already:
if "url" in sliced_df.columns:
    url_map = {row["slug"]: row["url"] for _, row in sliced_df.iterrows()}
else:
    # build urls by joining base_url and slug (only do this if that matches the website)
    url_map = {row["slug"]: base_url.rstrip("/") + "/" + row["slug"].lstrip("/") for _, row in sliced_df.iterrows()}

out_dir = os.path.join(V2_tests_dir, "test_transcripts")      # where HTMLs will be saved 
status_path = os.path.join(out_dir, "status.json")
max_attempts_per_url = 8
backoff_base = 2.0
max_workers = 2   # start low while testing

# call the function (assumes download_transcripts is in scope)
status = download_transcripts(
    url_map=url_map_2,
    out_dir=out_dir,
    status_path=status_path,
    max_attempts_per_url=max_attempts_per_url,
    backoff_base=backoff_base,
    max_workers=max_workers,
    timeout=12.0
)

In [93]:
# GPT orchestration function

def orchestrate_scraper(
    df,                     # DataFrame with 'slug' and optionally 'url'
    base_url,               # base URL for constructing URLs if df has no 'url' column
    out_dir,                # folder to save HTML transcripts
    max_attempts_per_url=5,
    backoff_base=1.0,
    max_workers=3,
    timeout=12.0
):
    """
    Orchestrates the scraping process:
      1. Prepares a slug → URL map
      2. Ensures output folder exists
      3. Calls download_transcripts() with sensible defaults
      4. Returns the status dict for all downloads
    """
    # ---------------------
    # Setup logger for this run
    # ---------------------
    logger = configure_logger()
    logger.info("Starting scraper orchestration for %d episodes", len(df))

    # ---------------------
    # Prepare URL map
    # ---------------------
    if "url" in df.columns:
        url_map = {row["slug"]: row["url"] for _, row in df.iterrows()}
        logger.info("Using existing URLs from DataFrame")
    else:
        url_map = {row["slug"]: base_url.rstrip("/") + "/" + row["slug"].lstrip("/") for _, row in df.iterrows()}
        logger.info("Constructed URLs from base_url and slugs")

    # ---------------------
    # Ensure output folder exists
    # ---------------------
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    status_path = out_dir / "status.json"

    # ---------------------
    # Call the scraper
    # ---------------------
    logger.info("Running download_transcripts with %d URLs", len(url_map))
    status = download_transcripts(
        url_map=url_map,
        out_dir=out_dir,
        status_path=status_path,
        max_attempts_per_url=max_attempts_per_url,
        backoff_base=backoff_base,
        max_workers=max_workers,
        timeout=timeout
    )

    logger.info("Scraper orchestration finished")
    return status

In [95]:
# Test orchstrator

status = orchestrate_scraper(
    df=second_ten_test_data_df,
    base_url="https://podscripts.co/podcasts/off-menu-with-ed-gamble-and-james-acaster",
    out_dir=os.path.join(V2_tests_dir, "test_transcripts"),
    max_attempts_per_url=8,
    backoff_base=2.0,
    max_workers=2
)

2025-11-26 12:24:23,863 INFO: Starting scraper orchestration for 11 episodes
2025-11-26 12:24:23,866 INFO: Using existing URLs from DataFrame
2025-11-26 12:24:23,869 INFO: Running download_transcripts with 11 URLs
2025-11-26 12:24:23,870 INFO: Starting download_transcripts: 11 urls, out_dir=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts, status_path=c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\test_transcripts\status.json
2025-11-26 12:24:23,893 DEBUG: Loaded existing status.json with 21 entries
2025-11-26 12:24:23,893 INFO: Download finished. success=21 failed=0 pending=0
2025-11-26 12:24:23,893 INFO: Scraper orchestration finished


## New extracting clean text and timestamps ; combining into dataframe

### Do we need to change any prior functions for the slug change? No

In [51]:
def _extract_timestamps_as_list_of_dicts(
    transcript_str: str, slug: str
) -> List[Dict[str, Any]]:
    """
    Finds all 'starting point is HH:MM:SS' timestamps in a transcript string.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, where each dict contains the episode
                               number, timestamp string, and its starting index.
    """
    timestamp_pattern = re.compile(r"starting point is (\d{2}:\d{2}:\d{2})")
    all_timestamps_in_transcript = []
    for match in timestamp_pattern.finditer(transcript_str):
        # Get the captured timestamp string (e.g., "00:00:05")
        actual_time_string = match.group(1)
        # We use group(1) because that's our (HH:MM:SS) part, group(0) refers to the whole string by default

        # Get the starting index of the entire match
        start_position_in_text = match.start()
        # Store this as a dict with episode_slug as key
        stamp_dict = {
            "slug": slug,
            "timestamp": actual_time_string,
            "start_index": start_position_in_text,
        }
        # Store this extracted data (the timestamp string and its position)
        all_timestamps_in_transcript.append(stamp_dict)
    return all_timestamps_in_transcript

In [None]:
def extract_save_clean_text_and_periodic_timestamps(
    full_episodes_metadata_path: str, transcripts_dir: str, output_filepath: str
) -> None:
    """
    Takes the full episodes metadata filepath, the transcripts html directory, and an output filepath, and iterates
    through the episodes, processing the html into clean transcript text and collating the periodic timestamps.

    These transcripts and periodic timestamps are saved in a dataframe, which is saves as a parquet file to the
    output filepath.

    Args:
        full_episodes_metadata_path (str): The full episodes metadata dataframe filepath
        transcripts_dir (str): The directory containing the html of each episode.
        output_filepath (str): The filepath the output df is saved to.
    Returns:
        None: A dataframe containing the clean text and the timestamps (a list of Dicts) is saved to the
        output filepath as a parquet.
    """
    # 1. Load episodes meta_data
    episodes_df = try_read_parquet(full_episodes_metadata_path)
    if episodes_df is None or episodes_df.empty:
        print(
            "  ERROR: Input episode metadata is missing or empty. Cannot process transcripts."
        )
        raise ValueError("No episodes to process.")

    processed_records = []  # To store data for the final DataFrame

    # 2. Iterate through each episode's metadata
    for index, row in episodes_df.iterrows():
        episode_slug = row.get("slug")
        guest_name = row.get("guest_name")
        transcript_filename = f"{episode_slug}.html"
        transcript_filepath = os.path.join(transcripts_dir, transcript_filename)

        # Confirm file exists and skip if not
        if not os.path.exists(transcript_filepath):
            print(
                f"  WARNING: Transcript file not found for Episode {guest_name}, slug: {episode_slug} at {transcript_filepath}. Skipping."
            )
            continue  # Skip to the next episode
        try:
            clean_transcript_str = _clean_transcript_str_from_html(transcript_filepath)
            timestamps = _extract_timestamps_as_list_of_dicts(
                clean_transcript_str, episode_slug
            )

            processed_records.append(
                {
                    "slug": episode_slug,
                    "guest_name": guest_name,
                    "clean_transcript_text": clean_transcript_str,
                    "periodic_timestamps": timestamps,  # This will be a list of dictionaries
                }
            )
            print(
                f"  Processed Episode {episode_slug} ({guest_name}): Extracted text and {len(timestamps)} timestamps."
            )

        except Exception as e:
            print(
                f"  ERROR: Failed to process transcript for Episode {episode_slug} ({guest_name}) from {transcript_filepath}: {e}"
            )
            continue  # For MVP, just skip and warn

        if processed_records:
            result_df = pd.DataFrame(processed_records)
            result_df.to_parquet(output_filepath, index=False)
            print(
                f"Successfully saved clean transcripts and timestamps for {len(result_df)} episodes to {output_filepath}"
            )
        else:
            print(
                "No transcripts were successfully processed. Output DataFrame will be empty."
            )
            pd.DataFrame().to_parquet(output_filepath, index=False)  # Save an empty DF


# -------------------------------------------------------------------------
# Combining episode metadata with transcripts and timestamps
# -------------------------------------------------------------------------


def combine_timestamps_and_metadata(
    transcripts_timestamps_filepath: str, metadata_filepath: str
) -> pd.DataFrame:
    """
    Reads and combines the transcripts and timestamps dataframe with the metadata dataframe.

    Args:
        transcripts_timestamps_filepath(str)
        metadata_filepath (str)
    Returns:
        pd.DataFrame: A dataframe containing episode slug, restaurants mentioned, clean transcript,
        and timestamps.
    """
    metadata_df = try_read_parquet(metadata_filepath)
    transcripts_timestamps_df = try_read_parquet(transcripts_timestamps_filepath)
    combined_df = transcripts_timestamps_df.merge(
        metadata_df[["slug", "restaurants_mentioned"]],
        on="slug",
        how="left",
    )
    return combined_df

In [None]:
test_timestamps_out_path = os.path.join(V2_tests_dir, "ten_test_timestamps.parquet")

extract_save_clean_text_and_periodic_timestamps(ten_test_episodes_metadata_output_path, out_dir, test_timestamps_out_path)

  Processed Episode kunal-nayyar (Kunal Nayyar): Extracted text and 257 timestamps.
Successfully saved clean transcripts and timestamps for 1 episodes to c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\ten_test_timestamps.parquet
  Processed Episode mawaan-rizwan (Mawaan Rizwan): Extracted text and 294 timestamps.
Successfully saved clean transcripts and timestamps for 2 episodes to c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\ten_test_timestamps.parquet
  Processed Episode ep-298-james-norton-in-partnership-with-dexcom (James Norton): Extracted text and 241 timestamps.
Successfully saved clean transcripts and timestamps for 3 episodes to c:\Users\jbara\Data science projects (store here not desktop on onedrive)\Off Menu project\data\test_temp\V2_tests\ten_test_timestamps.parquet
  Processed Episode ep-287-santiago-lastra (Santiago Lastra): Extracted t

In [None]:
# Inspectinf timestamps df

timestamps_transcripts_test_df = try_read_parquet(test_timestamps_out_path)
timestamps_transcripts_test_df

Unnamed: 0,episode_slug,guest_name,clean_transcript_text,periodic_timestamps
0,kunal-nayyar,Kunal Nayyar,"starting point is 00:00:00 oh no, it's james a...","[{'slug': 'kunal-nayyar', 'start_index': 0, 't..."
1,mawaan-rizwan,Mawaan Rizwan,"starting point is 00:00:00 james, huge news fr...","[{'slug': 'mawaan-rizwan', 'start_index': 0, '..."
2,ep-298-james-norton-in-partnership-with-dexcom,James Norton,starting point is 00:00:00 welcome to the off-...,[{'slug': 'ep-298-james-norton-in-partnership-...
3,ep-287-santiago-lastra,Santiago Lastra,starting point is 00:00:00 huge news from off-...,"[{'slug': 'ep-287-santiago-lastra', 'start_ind..."
4,ep-277-mo-gilligan,Mo Gilligan,starting point is 00:00:00 today's episode of ...,"[{'slug': 'ep-277-mo-gilligan', 'start_index':..."
5,ep-267-danny-dyer,Danny Dyer,"starting point is 00:00:00 hello, i'm amy gled...","[{'slug': 'ep-267-danny-dyer', 'start_index': ..."
6,ep-257-amy-annette,Amy Annette,starting point is 00:00:00 welcome to the off ...,"[{'slug': 'ep-257-amy-annette', 'start_index':..."
7,ep-247-ardal-ohanlon-live-in-dublin,Ardal O'Hanlon,starting point is 00:00:00 acas powers the wor...,[{'slug': 'ep-247-ardal-ohanlon-live-in-dublin...
8,ep-237-lucy-beaumont-live-in-manchester,Lucy Beaumont,"starting point is 00:00:00 hello, it's ed gamb...",[{'slug': 'ep-237-lucy-beaumont-live-in-manche...
9,ep-227-john-robins-live-in-bristol,John Robins,"starting point is 00:00:00 hello, it's ed gamb...",[{'slug': 'ep-227-john-robins-live-in-bristol'...


In [None]:
# Combining with metadata and inspecting

combined_timestamps_metadata_df = combine_timestamps_and_metadata(test_timestamps_out_path, ten_test_episodes_metadata_output_path)
combined_timestamps_metadata_df 

Unnamed: 0,slug,guest_name,clean_transcript_text,periodic_timestamps,restaurants_mentioned
0,kunal-nayyar,Kunal Nayyar,"starting point is 00:00:00 oh no, it's james a...","[{'slug': 'kunal-nayyar', 'start_index': 0, 't...","[Moti Mahal, The Tamil Prince, The Dover, Kutir]"
1,mawaan-rizwan,Mawaan Rizwan,"starting point is 00:00:00 james, huge news fr...","[{'slug': 'mawaan-rizwan', 'start_index': 0, '...",[Ambala]
2,ep-298-james-norton-in-partnership-with-dexcom,James Norton,starting point is 00:00:00 welcome to the off-...,[{'slug': 'ep-298-james-norton-in-partnership-...,"[Goldeneye, Belindas, The Ham Yard Hotel]"
3,ep-287-santiago-lastra,Santiago Lastra,starting point is 00:00:00 huge news from off-...,"[{'slug': 'ep-287-santiago-lastra', 'start_ind...",[]
4,ep-277-mo-gilligan,Mo Gilligan,starting point is 00:00:00 today's episode of ...,"[{'slug': 'ep-277-mo-gilligan', 'start_index':...","[Roka, Bagel King]"
5,ep-267-danny-dyer,Danny Dyer,"starting point is 00:00:00 hello, i'm amy gled...","[{'slug': 'ep-267-danny-dyer', 'start_index': ...","[Wimpy, Eastenders Kebab, Wilsons Fish and Chips]"
6,ep-257-amy-annette,Amy Annette,starting point is 00:00:00 welcome to the off ...,"[{'slug': 'ep-257-amy-annette', 'start_index':...",[]
7,ep-247-ardal-ohanlon-live-in-dublin,Ardal O'Hanlon,starting point is 00:00:00 acas powers the wor...,[{'slug': 'ep-247-ardal-ohanlon-live-in-dublin...,[]
8,ep-237-lucy-beaumont-live-in-manchester,Lucy Beaumont,"starting point is 00:00:00 hello, it's ed gamb...",[{'slug': 'ep-237-lucy-beaumont-live-in-manche...,[Restaurant Story]
9,ep-227-john-robins-live-in-bristol,John Robins,"starting point is 00:00:00 hello, it's ed gamb...",[{'slug': 'ep-227-john-robins-live-in-bristol'...,"[Schwartzs, Heritage]"


In [101]:
# fuzzy functions

def _create_list_tuple_clean_sen_og_sen_og_index(
    text: str,
) -> List[Tuple[str, str, int]]:
    """
    Takes in a clean transcript string, and creates a list of tuples containing cleaned sentences
    for fuzzymatching, original sentences and starting index for locating quotes.

    Splits text using delimiter ". ". Assumes no sentences start with puntuation (leading spaces are the only shift from the start of the original to the start
    of the cleaned sentence).

    Returns:
        List[Tuple[str, str, int]]: a list containing a tuple, with cleaned sentence, original
                                    stripped sentence, and true start index (the start index of the original sentence,
                                    in the original text).

    """
    results = []
    current_idx_in_original = 0  # This tracks our position in the original 'text'

    # Split into 'segments' (what will become sentences) by full stop/space.
    segments = text.split(". ")

    for i, segment in enumerate(
        segments
    ):  # Note enumerate is a way to loop and get index (rather than a manual counter)
        original_full_sentence_segment = segment
        # Calculate the actual start index of the content within the segment itself (after stripping leading/trailing spaces)
        # It asssumes the start index (in processes sentence) will only move due to leading spaces
        # So, it calculates the original (assuming none start with punctuation), and retains it
        # Later, we will use this original index to compare against timestamps
        leading_spaces_count = len(original_full_sentence_segment) - len(
            original_full_sentence_segment.lstrip()
        )
        true_start_index = current_idx_in_original + leading_spaces_count

        original_sentence_stripped = original_full_sentence_segment.strip()

        # Only process if the sentence is not empty after stripping
        if original_sentence_stripped:
            # Apply original cleaning, explicitly converting to lowercase for fuzzy matching
            cleaned_sentence = re.sub(
                r"[^\w\s]", "", original_sentence_stripped
            ).lower()

            # Store cleaned, original, and start index
            results.append(
                (cleaned_sentence, original_sentence_stripped, true_start_index)
            )

        # Update current_idx_in_original for the next segment.
        # Add the length of the current segment and the delimiter length (2 for ". ").
        # This assumes all segments (except possibly the last) were followed by ". ".
        current_idx_in_original += len(original_full_sentence_segment)
        if (
            i < len(segments) - 1
        ):  # Only add delimiter length if it's not the last segment
            current_idx_in_original += len(". ")

    return results


def _find_timestamp(
    original_sentence_start_index: int, transcript_timestamps: List[dict]
):
    """
    Finds the nearest timestamp occurring before or at a given sentence index.

    This function searches through a list of timestamp dictionaries (which should
    be pre-sorted by `start_index`) to find the timestamp that immediately
    precedes or is at the start of a matched sentence.

    Args:
        original_sentence_start_index (int): The starting index of the sentence
            in the full transcript string.
        transcript_timestamps (List[dict]): A list of dictionaries, where each dict
            contains 'start_index' and 'timestamp' for a periodic timestamp.

    Returns:
        Optional[str]: The timestamp string (e.g., '00:01:23') if a match is found,
                       otherwise returns None.
    """
    if original_sentence_start_index is None:
        return None
    # Could sort timestamps here for good practice, but should be sorted already
    # Reverse-iterate over timestamps to find the "nearest before or at"
    for timestamp_dict in reversed(transcript_timestamps):
        if timestamp_dict["start_index"] <= original_sentence_start_index:
            return timestamp_dict["timestamp"]

    return None  # If no timestamp found before the quote's starting position (all eps start "Starting point is 00:00:00")


def _matches_by_res_name_from_list_of_res_names(
    restaurant_names: List[str], searchable_sentences: List[str], min_score: int
) -> Dict[str, List[Tuple[str, int, int]]]:
    """
    Finds fuzzy matches for a list of restaurant names within a list of cleaned sentences.

    This function iterates through each restaurant name and uses fuzzy matching to find
    sentences that are a close match. Matches are filtered based on a minimum score.

    Args:
        restaurant_names (List[str]): A list of restaurant names to search for.
        searchable_sentences (List[str]): A list of pre-cleaned sentences to search within.
        min_score (int): The minimum fuzzy match score (from 0-100) to consider
                         a match valid.

    Returns:
        Dict[str, List[Tuple[str, int, int]]]: A dictionary where:
            - Keys are the restaurant names from `restaurant_names`.
            - Values are a list of filtered matches for that restaurant.
            - Each match is a tuple containing:
                - str: The matched sentence text.
                - int: The fuzzy matching score.
                - int: The index of the matched sentence in the `searchable_sentences` list.
    """
    filtered_matches_by_string = {}
    for res_name in restaurant_names:
        matches = process.extract(
            res_name, searchable_sentences, scorer=fuzz.partial_ratio, limit=20
        )

        filtered_matches = []
        # --- FIX: Unpack the tuple of 2 items correctly ---
        for match_text, score in matches:
            if score >= min_score:
                # Find the index of the matched sentence in the original list
                # We use a try-except block for robustness in case of unexpected data.
                try:
                    original_sentence_index = searchable_sentences.index(match_text)
                    # Append all three pieces of information
                    filtered_matches.append(
                        (match_text, score, original_sentence_index)
                    )
                except ValueError:
                    # This will happen if the match text isn't found in the list,
                    # e.g., due to slight string differences not captured by .index()
                    continue

        filtered_matches_by_string[res_name] = filtered_matches

    return filtered_matches_by_string

def find_top_match_and_timestamps(
    combined_df: pd.DataFrame, min_match_score: int = 90
) -> pd.DataFrame:
    """
    Finds fuzzy matches for restaurant mentions in episode transcripts and associates them with timestamps.

    This function iterates through each episode's metadata and transcript data. For each mentioned
    restaurant, it performs a fuzzy search within the transcript. It then returns a DataFrame
    of the top matches and their corresponding timestamps, or notes if no match was found.

    Args:
        combined_df (pd.DataFrame): A DataFrame containing episode metadata, cleaned transcripts,
                                    and periodic timestamps.
        min_match_score (int): The minimum fuzzy match score (0-100) required to consider
                               a match valid.

    Returns:
        pd.DataFrame: A DataFrame where each row represents a restaurant mention. It contains
                      the following columns:
                          - 'slug': The episode slug e.g. ep-217-ross-noble or elle-fanning
                          - 'Restaurant': The name of the restaurant mentioned.
                          - 'Mention text': The original sentence where the mention was found.
                          - 'Match Score': The fuzzy match score.
                          - 'Match Type': The type of match (e.g., 'full, over 90' or 'No match found').
                          - 'Timestamp': The nearest preceding timestamp for the mention.
                          - 'Transcript sample': A short sample of the transcript text.
    """
    all_mentions_collected = []

    for index, combined_row in combined_df.iterrows():
        slug = combined_row.get("slug")
        guest_name = combined_row.get("guest_name")
        clean_transcript_text = combined_row.get("clean_transcript_text")
        periodic_timestamps = combined_row.get("periodic_timestamps")

        restaurants_data = combined_row.get("restaurants_mentioned", [])
        transcript_sample = (
            clean_transcript_text[:200]
            if isinstance(clean_transcript_text, str)
            else "No Transcript Found"
        )

        # Unsure what data type the res mentions are, hence need for this
        restaurants_list = []
        if isinstance(restaurants_data, list):
            restaurants_list = restaurants_data
        elif isinstance(restaurants_data, np.ndarray) and restaurants_data.size > 0:
            # Flatten the array and convert it to a standard Python list of strings
            restaurants_raw_list = restaurants_data.flatten().tolist()
            restaurants_list = [
                name.strip().lower() for name in restaurants_raw_list if name.strip()
            ]
        elif isinstance(restaurants_data, str):
            restaurants_list = [
                name.strip() for name in restaurants_data.split(",") if name.strip()
            ]

        if restaurants_list:
            episode_sentences_data = _create_list_tuple_clean_sen_og_sen_og_index(
                clean_transcript_text
            )
            searchable_sentences = [
                item[0] for item in episode_sentences_data
            ]  # This is to select the cleaned sentence from the list of tuple
            # of cleaned sentence, original, and true start index that create_sentence_list creates

            all_matches_for_episode = _matches_by_res_name_from_list_of_res_names(
                restaurants_list, searchable_sentences, 90
            )
            # --- all_matches_for_episode is a dict with key res_name and value lists of matches (matches r tuples of quote, score)
            for (
                restaurant_name_query,
                match_list_for_query,
            ) in all_matches_for_episode.items():
                if match_list_for_query:
                    top_match = match_list_for_query[0]
                    # Unpack the top match's data
                    matched_cleaned_text, score, matched_sentence_index = top_match
                    original_sentence_data = episode_sentences_data[
                        matched_sentence_index
                    ]  # This takes you back to episode sentences data for the sentence index
                    # Which is a tuple of clean sentence, original, and index of sentence within sen list
                    original_sentence_text = original_sentence_data[
                        1
                    ]  # The og sentence is at index 1 in this tuple
                    original_start_index = original_sentence_data[
                        2
                    ]  # The og start index is at index 2 in this tuple

                    timestamp = _find_timestamp(
                        original_start_index, periodic_timestamps
                    )

                    mention = {
                        "Episode ID": slug,
                        "Restaurant": restaurant_name_query,
                        "Mention text": original_sentence_text,
                        "Match Score": score,
                        "Match Type": f"full, over {min_match_score}",
                        "Timestamp": timestamp,
                        "transcript_sample": transcript_sample,
                    }
                    all_mentions_collected.append(mention)
                else:
                    null_mention = {
                        "Episode ID": slug,
                        "Restaurant": restaurant_name_query,
                        "Mention text": None,
                        "Match Score": 0,
                        "Match Type": "No match found",
                        "Timestamp": None,
                        "transcript_sample": transcript_sample,
                    }
                    all_mentions_collected.append(null_mention)
        else:
            print(
                f"  No raw mentions found in 'restaurants_mentioned' list for Episode {slug}. Skipping"
            )
    combined_df = pd.DataFrame(all_mentions_collected)
    return combined_df



In [104]:
# Test fuzzymatching

# --- Run top matches on the test data ---
top_mentions_df = find_top_match_and_timestamps(combined_timestamps_metadata_df , 90)

# --- Convert list into dataframe, print output ---

print(f"\n--- TOP COLLECTED ---")
print(f"Top Mentions DataFrame created with {len(top_mentions_df)} rows.")

  No raw mentions found in 'restaurants_mentioned' list for Episode ep-287-santiago-lastra. Skipping
  No raw mentions found in 'restaurants_mentioned' list for Episode ep-257-amy-annette. Skipping
  No raw mentions found in 'restaurants_mentioned' list for Episode ep-247-ardal-ohanlon-live-in-dublin. Skipping
  No raw mentions found in 'restaurants_mentioned' list for Episode ep-217-ross-noble-christmas-special. Skipping

--- TOP COLLECTED ---
Top Mentions DataFrame created with 16 rows.


In [105]:
top_mentions_df

Unnamed: 0,Episode ID,Restaurant,Mention text,Match Score,Match Type,Timestamp,transcript_sample
0,kunal-nayyar,moti mahal,ah,100,"full, over 90",01:00:27,"starting point is 00:00:00 oh no, it's james a..."
1,kunal-nayyar,the tamil prince,"there's a pub, an indian pub called the tamil ...",100,"full, over 90",00:32:33,"starting point is 00:00:00 oh no, it's james a..."
2,kunal-nayyar,the dover,,0,No match found,,"starting point is 00:00:00 oh no, it's james a..."
3,kunal-nayyar,kutir,,0,No match found,,"starting point is 00:00:00 oh no, it's james a..."
4,mawaan-rizwan,ambala,is there a place where you've had the best fal...,100,"full, over 90",00:56:05,"starting point is 00:00:00 james, huge news fr..."
5,ep-298-james-norton-in-partnership-with-dexcom,goldeneye,"go goldeneye, honestly, it's the best place in...",100,"full, over 90",00:55:49,starting point is 00:00:00 welcome to the off-...
6,ep-298-james-norton-in-partnership-with-dexcom,belindas,,0,No match found,,starting point is 00:00:00 welcome to the off-...
7,ep-298-james-norton-in-partnership-with-dexcom,the ham yard hotel,,0,No match found,,starting point is 00:00:00 welcome to the off-...
8,ep-277-mo-gilligan,roka,,0,No match found,,starting point is 00:00:00 today's episode of ...
9,ep-277-mo-gilligan,bagel king,"one of my favorites though, is from a place ca...",100,"full, over 90",01:19:18,starting point is 00:00:00 today's episode of ...
