# Retrieve ToS;DR Data

*This notebook retrieves service information from the ToS;DR API, including service names and associated privacy policy documents.*

## 1. Load Libraries

*Import required packages for HTTP requests, parallel processing, and progress visualization.*

In [None]:
import requests
import json
import time
import random
import concurrent.futures
import threading
from pathlib import Path

from rich import print as rprint
from rich.console import Console
from rich.progress import (
    Progress, SpinnerColumn, BarColumn, TextColumn, 
    TimeRemainingColumn, MofNCompleteColumn
)

## 2. Configuration

*Define API endpoints, user agents for requests, file paths, and thread-safe locks for concurrent file writes.*

In [None]:
console = Console()
API_BASE = "https://api.tosdr.org"

USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
]

ROOT = Path('../..')
DATA_DIR = ROOT / "data-generated" / "TOSDR"
DATA_DIR.mkdir(parents=True, exist_ok=True)

DATA_FILE = DATA_DIR / "tosdr_data.jsonl"
ID_FILE = DATA_DIR / "tosdr_ids.txt"

# Thread-safe lock for file writing
file_lock = threading.Lock()

## 3. Helper Functions

*Utility functions for making safe HTTP requests with retry logic and rate limit handling.*

### 3.1 HTTP Request Handler

*Performs GET requests with automatic retry on failures and exponential backoff for rate limits.*

In [None]:
def safe_get(url, retries=3, backoff=5):
    """Performs a simple and fast GET request with retry logic."""
    headers = {'User-Agent': random.choice(USER_AGENTS)}
    for i in range(retries):
        try:
            time.sleep(random.uniform(0.1, 0.3)) 
            resp = requests.get(url, headers=headers, timeout=10)
            
            if resp.status_code == 429:
                wait_time = backoff * (i + 1)
                if wait_time > 10:
                    console.print(f"[red]⚡ Rate limit hit, pausing ({wait_time}s)...[/red]")
                time.sleep(wait_time)
                continue
                
            if resp.status_code == 200:
                return resp
        except:
            time.sleep(1)
    return None

### 3.2 Service Index Fetcher

*Retrieves the complete list of service IDs by paginating through the ToS;DR API until no more results are found.*

In [None]:
def fetch_service_index():
    """Quickly retrieves all service IDs using V3 pagination."""
    valid_ids = []
    current_page = 1
    consecutive_empty_pages = 0
    
    console.print("[cyan]Fetching service list...[/cyan]")
    
    with console.status("Scanning index...") as status:
        while True:
            resp = safe_get(f'{API_BASE}/service/v3/?page={current_page}')
            if not resp: break
            
            try:
                data = resp.json()
                if isinstance(data, list):
                    services = data
                else:
                    services = data.get('parameters', {}).get('services', [])
                    if not services: services = data.get('services', [])
                
                if not services:
                    consecutive_empty_pages += 1
                    if consecutive_empty_pages >= 3: break
                else:
                    consecutive_empty_pages = 0
                    for service in services:
                        if isinstance(service, dict) and service.get('id'):
                            valid_ids.append(service['id'])
                
                status.update(f"Page {current_page} - {len(valid_ids)} services found")
                current_page += 1
            except:
                break
                
    unique_ids = sorted(list(set(valid_ids)))
    console.print(f"[green]✔ Index complete: {len(unique_ids)} unique services.[/green]")
    return unique_ids

## 4. Data Extraction

*Functions to fetch detailed service information and handle the API's inconsistent JSON response structures.*

### 4.1 Service Detail Worker

*Worker function that fetches individual service details and normalizes the inconsistent API response formats.*

In [None]:
def fetch_service_worker(session, service_id):
    """
    Worker capable of handling inconsistent JSON structures from the ToS;DR API.
    """
    url = f"{API_BASE}/service/v3/?id={service_id}"
    
    for attempt in range(3):
        try:
            time.sleep(random.uniform(0.5, 1.5))
            
            resp = session.get(url, timeout=15)
            
            if resp.status_code == 429:
                if attempt == 2: return {"error": "Rate Limited (429)", "id": service_id}
                time.sleep(5 * (attempt + 1))
                continue
            
            if resp.status_code != 200:
                return {"error": f"HTTP {resp.status_code}", "id": service_id, "skippable": True}

            try:
                raw_data = resp.json()
            except json.JSONDecodeError:
                return {"error": "Invalid JSON", "id": service_id}

            service_info = None

            if isinstance(raw_data, dict) and raw_data.get("name") and raw_data.get("documents") is not None:
                service_info = raw_data
            
            elif "parameters" in raw_data:
                params = raw_data["parameters"]
                if isinstance(params, dict):
                    if "name" in params:
                        service_info = params
                    elif "services" in params and isinstance(params["services"], list) and len(params["services"]) > 0:
                        service_info = params["services"][0]

            elif "services" in raw_data and isinstance(raw_data["services"], list) and len(raw_data["services"]) > 0:
                service_info = raw_data["services"][0]

            if not service_info:
                return {"error": "Empty JSON or unknown structure", "id": service_id, "skippable": True}

            if not service_info.get("name"):
                 return {"error": "Service name missing", "id": service_id, "skippable": True}

            documents = []
            raw_documents = service_info.get("documents", [])
            if raw_documents: 
                for doc in raw_documents:
                    if doc.get("url"):
                        documents.append({
                            "name": doc.get("name"), 
                            "url": doc.get("url")
                        })

            return {
                "success": True,
                "data": {
                    "service_id": service_id,
                    "name": service_info.get("name"),
                    "documents": documents
                }
            }

        except Exception as e:
            if attempt == 2: return {"error": f"Exception: {str(e)}", "id": service_id}
            time.sleep(1)

    return {"error": "Max retries", "id": service_id}

### 4.2 Main Execution Pipeline

*Coordinates the full extraction process: loading IDs, checking for already processed items, and running parallel workers with progress tracking.*

In [None]:
def main():
    console.print(f"[bold cyan]Data directory:[/bold cyan] {DATA_DIR}")
    
    if not ID_FILE.exists():
        console.print(f"[red]IDs file missing: {ID_FILE}[/red]")
        return

    with open(ID_FILE, "r") as f:
        all_service_ids = [line.strip() for line in f.readlines() if line.strip()]
    
    console.print(f"IDs loaded: {len(all_service_ids)}")

    processed_ids = set()
    if DATA_FILE.exists():
        console.print("[yellow]Reading checkpoint...[/yellow]")
        with open(DATA_FILE, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line: continue
                try:
                    item = json.loads(line)
                    if "service_id" in item:
                        processed_ids.add(str(item["service_id"]))
                except:
                    continue

    remaining_ids = [service_id for service_id in all_service_ids if str(service_id) not in processed_ids]
    
    console.print(f"[green]Already processed: {len(processed_ids)}[/green]")
    console.print(f"[bold blue]Remaining to process: {len(remaining_ids)}[/bold blue]")

    if not remaining_ids:
        console.print("[green]Everything is already up to date![/green]")
        return

    f_out = open(DATA_FILE, "a", encoding="utf-8", buffering=1)

    try:
        with Progress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            MofNCompleteColumn(),
            TimeRemainingColumn(),
            console=console
        ) as progress:
            
            extraction_task = progress.add_task("Extracting...", total=len(remaining_ids))
            
            with requests.Session() as session:
                session.headers.update({'User-Agent': random.choice(USER_AGENTS)})
                
                with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                    future_to_service_id = {
                        executor.submit(fetch_service_worker, session, service_id): service_id 
                        for service_id in remaining_ids
                    }
                    
                    for future in concurrent.futures.as_completed(future_to_service_id):
                        current_service_id = future_to_service_id[future]
                        try:
                            result = future.result()
                            
                            if result and result.get("success"):
                                data_str = json.dumps(result["data"], ensure_ascii=False)
                                with file_lock:
                                    f_out.write(data_str + "\n")
                                    f_out.flush()
                            else:
                                if result and not result.get("skippable"):
                                    error_msg = result.get("error", "Unknown")
                                    progress.console.print(f"[red]✘ ID {current_service_id}: {error_msg}[/red]")
                                
                        except Exception as e:
                            progress.console.print(f"[red]Critical error {current_service_id}: {e}[/red]")
                        
                        progress.update(extraction_task, advance=1)
    
    finally:
        f_out.close()
        console.print(f"[bold green]✔ Complete! File: {DATA_FILE}[/bold green]")

## 5. Run Extraction

*Execute the main pipeline to fetch and save all service data.*

In [None]:
main()