# Story Deduplication

This notebook processes multiple JSON files containing story data, removes duplicates based on story IDs, and exports clean deduplicated stories to a single CSV file for downstream analysis.

## Configuration

In [None]:
FIELDS_TO_EXTRACT = [
    "story_id",
    "title",
    "story_text",
    "created_at_i",
]

INPUT_DIRECTORY = "data/stories"
OUTPUT_CSV_PATH = "data/stories_deduplicated.csv"

## Functions

Implement core utilities for JSON loading, story extraction with deduplication logic, HTML-to-text conversion, and CSV export functionality.

In [None]:
import csv
import glob
import json
import os
import re
import warnings
from pathlib import Path

from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning


def load_json_file(file_path: str | Path) -> dict | list | None:
    """Loads JSON data from a single file."""
    try:
        return json.loads(Path(file_path).read_text(encoding="utf-8"))
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Error decoding JSON from {file_path}: {e}")
        return None


def extract_stories(seen_story_ids: set[str], stories_to_process: list[dict], fields_to_extract: list[str]) -> tuple[
    set[str], list[dict]]:
    """Extract stories with deduplication"""
    batch_extracted_stories: list[dict] = []
    batch_seen_story_ids: set[str] = set()

    if not isinstance(stories_to_process, list):
        print(f"Warning: Expected a list of stories, but got {type(stories_to_process)}. Skipping.")
        return batch_seen_story_ids, batch_extracted_stories

    for story in stories_to_process:
        story_id = story.get("story_id")

        if story_id in seen_story_ids or story_id in batch_seen_story_ids:
            continue

        batch_seen_story_ids.add(story_id)
        extracted_story_data: dict = {}

        for field in fields_to_extract:
            value = story.get(field, "")
            if field == "story_text" and value:
                value = convert_html_to_plain_text(value)
            extracted_story_data[field] = value

        batch_extracted_stories.append(extracted_story_data)

    return batch_seen_story_ids, batch_extracted_stories


def convert_html_to_plain_text(s: str) -> str:
    """Convert HTML to plain text"""
    with warnings.catch_warnings():
        # Suppress false positives when story text contains URLs or file-like patterns
        warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)
        soup = BeautifulSoup(s, "html.parser")

    text = soup.get_text()
    return re.sub(r"\s+", " ", text).strip()


def save_to_csv(data: list[dict], csv_file_path: str | Path, fieldnames_list: list[str]) -> None:
    """Saves the extracted data to a CSV file."""
    if not data:
        print("No data to save.")
        return

    Path(csv_file_path).parent.mkdir(parents=True, exist_ok=True)

    with open(csv_file_path, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames_list)
        writer.writeheader()
        writer.writerows(data)  # Use writerows for efficiency

    print(f"Successfully saved {len(data)} stories to {csv_file_path}")

## Data Processing

Execute the main pipeline that discovers JSON files, processes each file while tracking global duplicates, and accumulates unique stories across all sources.

In [None]:
json_file_paths = glob.glob(os.path.join(INPUT_DIRECTORY, "*.json"))

if not json_file_paths:
    print(f"No JSON files found in the directory: {INPUT_DIRECTORY}")
else:
    print(f"Found JSON files to process in '{INPUT_DIRECTORY}': {json_file_paths}")

all_extracted_stories = []
seen_story_ids_global = set()  # To track uniqueness across all files globally
total_stories_processed = 0  # New: count all stories processed (before deduplication)

for file_path in json_file_paths:
    print(f"\nProcessing file: {file_path}...")
    stories_data_from_file = load_json_file(file_path)

    if stories_data_from_file:
        # Count all stories in this file (before deduplication)
        if isinstance(stories_data_from_file, list):
            total_stories_processed += len(stories_data_from_file)
        else:
            print(
                f"Warning: Expected a list of stories from {file_path}, got {type(stories_data_from_file)}"
            )

        newly_seen_ids, extracted_stories = extract_stories(
            seen_story_ids_global, stories_data_from_file, FIELDS_TO_EXTRACT
        )
        seen_story_ids_global.update(newly_seen_ids)
        all_extracted_stories.extend(extracted_stories)
        print(
            f"Finished processing {file_path}. Current total stories: {len(all_extracted_stories)}"
        )
    else:
        print(f"No data loaded from {file_path} or file was empty/corrupt.")

## Export & Report

In [None]:
if all_extracted_stories:
    save_to_csv(all_extracted_stories, OUTPUT_CSV_PATH, FIELDS_TO_EXTRACT)
    print(f"\n--- Summary ---")
    print(f"Total stories extracted (before deduplication): {total_stories_processed}")
    print(f"Total unique stories after deduplication: {len(all_extracted_stories)}")
    print(
        f"Number of duplicates removed: {total_stories_processed - len(all_extracted_stories)}"
    )
else:
    print("\nNo stories were extracted after processing all files.")