<a href="https://colab.research.google.com/github/BakhturinaPolina/goodreads-romance-research/blob/main/Goodreads_Datasets_Romance_Data_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Single Colab cell: deduplicate by work_id, keep full metadata (except exclusions), use max() for counts, and produce reports.
# Run in Google Colab.

# ---------- Setup & imports ----------
from google.colab import drive
drive.mount('/content/drive')

import gzip, json, os, logging, datetime, re
from collections import defaultdict, Counter
from tqdm import tqdm
import pandas as pd
import numpy as np
import math

# ---------- Config ----------
BOOKS_PATH = "/content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/goodreads_books_romance.json.gz"
OUTPUT_DIR = "/content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/processing_outputs_full"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Subset options for fast testing:
RUN_SUBSET = True      # set False to run on full corpus
SUBSET_WORK_COUNT = 10 # if RUN_SUBSET True, process only first N distinct work_ids encountered

# Columns to exclude from final CSV (explicitly requested)
EXCLUDE_COLUMNS = {
    "isbn", "isbn13", "asin", "kindle_asin",
    "url", "link", "image_url",
    "edition_information", "format", "country_code", "publisher"
}

# Output filenames (timestamped)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
LOGFILE = os.path.join(OUTPUT_DIR, f"dedup_run_{ts}.log")
OUT_CSV = os.path.join(OUTPUT_DIR, f"goodreads_romance_dedup_by_work_{ts}.csv")
OUT_SUMMARY_JSON = os.path.join(OUTPUT_DIR, f"dedup_summary_{ts}.json")
OUT_AMBIG_TITLES = os.path.join(OUTPUT_DIR, f"ambiguous_same_title_diff_authors_{ts}.csv")
OUT_QUALITY = os.path.join(OUTPUT_DIR, f"quality_report_{ts}.txt")

# ---------- Logging ----------
logger = logging.getLogger("goodreads_dedup_full")
logger.setLevel(logging.DEBUG)
# Stream handler (console)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter('[%(levelname)s] %(message)s'))
logger.handlers = [ch]
# File handler
fh = logging.FileHandler(LOGFILE, encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(fh)

logger.info("Starting deduplication pipeline")
logger.info(f"BOOKS_PATH = {BOOKS_PATH}")
logger.info(f"OUTPUT_DIR = {OUTPUT_DIR}")
logger.info(f"RUN_SUBSET = {RUN_SUBSET}, SUBSET_WORK_COUNT = {SUBSET_WORK_COUNT}")

# ---------- Helpers ----------
def iter_json_gz_lines(path):
    """Yield JSON objects from newline-delimited JSON .gz file (defensive)."""
    with gzip.open(path, "rt", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                # best-effort: skip malformed line but log occasionally
                logger.debug("Skipping a malformed JSON line.")
                continue

def to_int_safe(x):
    if x is None or x == "":
        return None
    try:
        return int(str(x).replace(",", ""))
    except:
        try:
            return int(float(x))
        except:
            return None

def normalize_title(t):
    if not t:
        return ""
    s = str(t).strip().lower()
    s = re.sub(r"[^\w\s]", "", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def normalize_author_id_list(authors_field):
    """Return list of author identifiers (author_id if present else name str)."""
    out = []
    if not authors_field:
        return out
    if isinstance(authors_field, list):
        for a in authors_field:
            if isinstance(a, dict):
                aid = a.get("author_id") or a.get("id")
                if aid:
                    out.append(str(aid))
                else:
                    name = a.get("name") or a.get("author_name")
                    if name:
                        out.append(str(name).strip().lower())
            else:
                out.append(str(a).strip().lower())
    elif isinstance(authors_field, dict):
        aid = authors_field.get("author_id") or authors_field.get("id")
        if aid:
            out.append(str(aid))
        else:
            name = authors_field.get("name") or authors_field.get("author_name")
            if name:
                out.append(str(name).strip().lower())
    else:
        out.append(str(authors_field).strip().lower())
    return out

# ---------- Phase 1: read file and build list of rows (raw) ----------
logger.info("Phase 1: streaming JSON and collecting raw records (preserve all keys). This may take some time for the full dump.")

raw_rows = []
work_id_order = []   # track first-seen work_id ordering
work_seen = set()
lines_read = 0

for obj in tqdm(iter_json_gz_lines(BOOKS_PATH), desc="reading books"):
    lines_read += 1
    # keep raw object; convert certain str-number fields to numeric for easier aggregation
    # but we keep raw values too
    row = dict(obj)  # shallow copy of all keys
    # normalize some fields
    row['_ratings_count_int'] = to_int_safe(row.get('ratings_count'))
    row['_text_reviews_count_int'] = to_int_safe(row.get('text_reviews_count'))
    # publication year might be string -> int or empty
    row['_publication_year_int'] = to_int_safe(row.get('publication_year'))
    # ensure book_id and work_id exist as strings
    row['book_id'] = str(row.get('book_id')) if row.get('book_id') is not None else ""
    row['work_id'] = str(row.get('work_id')) if row.get('work_id') is not None else ""
    raw_rows.append(row)
    if row['work_id'] not in work_seen:
        work_seen.add(row['work_id'])
        work_id_order.append(row['work_id'])
    # Fast test cutoff if RUN_SUBSET True: we still need all editions for chosen works,
    # so we won't break early here. We'll filter later by the first N work_ids for subset.
# end stream
logger.info(f"Finished streaming. Lines read: {lines_read}. Raw records collected: {len(raw_rows)}")

# Convert to DataFrame (keeps all keys as columns; some rows may have different keys)
logger.info("Converting to pandas DataFrame (this may use memory).")
df_raw = pd.DataFrame(raw_rows)
logger.info(f"DataFrame shape: {df_raw.shape}, columns: {len(df_raw.columns)}")

# ---------- Phase 1b: optionally restrict to subset of works to test quickly ----------
if RUN_SUBSET:
    selected_work_ids = work_id_order[:SUBSET_WORK_COUNT]
    logger.info(f"RUN_SUBSET is True: restricting to first {len(selected_work_ids)} work_ids for quick test.")
    df = df_raw[df_raw['work_id'].isin(selected_work_ids)].copy()
else:
    df = df_raw

logger.info(f"Working DataFrame shape after subset filter: {df.shape}")

# ---------- Phase 2: group by work_id, compute aggregates and pick canonical edition ----------
logger.info("Phase 2: grouping by work_id, computing aggregates, and selecting canonical edition (edition with max ratings_count).")

grouped = df.groupby('work_id', sort=False)

dedup_rows = []
work_stats = {}  # for summary

for wid, group in tqdm(grouped, desc="processing work groups"):
    if wid == "" or wid is None:
        # optionally skip invalid work_id
        continue
    # compute number of editions
    num_editions = len(group)
    # inferred publication year = earliest non-null publication year across editions
    pub_years = group['_publication_year_int'].dropna().astype(int).tolist()
    publication_year_inferred = int(min(pub_years)) if len(pub_years) > 0 else None
    # ratings/text_reviews chosen = MAX across editions (our fix)
    ratings_count_chosen = None
    text_reviews_count_chosen = None
    if not group['_ratings_count_int'].dropna().empty:
        ratings_count_chosen = int(group['_ratings_count_int'].dropna().max())
    if not group['_text_reviews_count_int'].dropna().empty:
        text_reviews_count_chosen = int(group['_text_reviews_count_int'].dropna().max())
    # choose canonical edition row: edition with highest ratings_count; tie-break: highest text_reviews_count
    # if no ratings_count available, pick the edition with the earliest publication_year; else fallback to first row
    canonical_row = None
    try:
        if not group['_ratings_count_int'].dropna().empty:
            idx = group['_ratings_count_int'].idxmax()
            # if multiple have same max, idxmax returns first; good enough
            canonical_row = group.loc[idx].to_dict()
        elif publication_year_inferred is not None:
            # pick earliest year edition (closest to inferred)
            cand = group[group['_publication_year_int'] == publication_year_inferred]
            if len(cand) > 0:
                canonical_row = cand.iloc[0].to_dict()
            else:
                canonical_row = group.iloc[0].to_dict()
        else:
            canonical_row = group.iloc[0].to_dict()
    except Exception as e:
        logger.debug(f"Error selecting canonical edition for work {wid}: {e}")
        canonical_row = group.iloc[0].to_dict()
    # Ensure canonical_row contains keys (if some editions had missing keys)
    if canonical_row is None:
        continue

    # Build final dedup row:
    # - start from canonical_row but remove excluded columns
    # - add aggregated columns: num_editions_for_work, publication_year_inferred, publication_year_chosen,
    #   ratings_count_chosen, text_reviews_count_chosen
    final_row = {}
    # Copy canonical metadata keys, except excluded ones
    for k, v in canonical_row.items():
        if k in EXCLUDE_COLUMNS:
            continue
        # we do not want intermediate _fields in final metadata
        if k.startswith('_'):
            continue
        final_row[k] = v
    # Add aggregated columns
    final_row['work_id'] = wid
    final_row['canonical_book_id'] = canonical_row.get('book_id')
    final_row['num_editions_for_work'] = num_editions
    final_row['publication_year_inferred'] = publication_year_inferred
    # publication_year_chosen: prefer canonical edition's publication_year if available else inferred
    pub_year_chosen = to_int_safe(canonical_row.get('publication_year')) if canonical_row.get('publication_year') not in (None, "") else publication_year_inferred
    final_row['publication_year_chosen'] = pub_year_chosen
    final_row['ratings_count_chosen'] = ratings_count_chosen
    final_row['text_reviews_count_chosen'] = text_reviews_count_chosen
    # authors flatten: prefer canonical authors list but also include combined authors across editions
    canonical_authors = canonical_row.get('authors') if canonical_row.get('authors') is not None else []
    # compute union of author ids/names across editions
    author_union = set()
    for a in group['authors'].dropna().tolist():
        if isinstance(a, list):
            for ai in a:
                if isinstance(ai, dict):
                    aid = ai.get('author_id') or ai.get('id') or ai.get('name')
                    if aid: author_union.add(str(aid))
                else:
                    author_union.add(str(ai))
        elif isinstance(a, dict):
            aid = a.get('author_id') or a.get('id') or a.get('name')
            if aid: author_union.add(str(aid))
        else:
            author_union.add(str(a))
    # canonical authors normalized list
    canonical_authors_ids = normalize_author_id_list(canonical_authors)
    final_row['authors_ids_or_names'] = "|".join(sorted(author_union)) if author_union else "|".join(canonical_authors_ids)
    # store normalized title for collision detection
    final_row['_normalized_title'] = normalize_title(final_row.get('title') or final_row.get('title_without_series') or "")
    dedup_rows.append(final_row)
    # save stats
    work_stats[wid] = {
        "num_editions": num_editions,
        "pub_year_inferred": publication_year_inferred,
        "ratings_count_chosen": ratings_count_chosen,
        "text_reviews_count_chosen": text_reviews_count_chosen
    }

logger.info(f"Completed grouping & canonical selection. Deduplicated works: {len(dedup_rows)}")

# ---------- Phase 3: create final DataFrame and flag ambiguous same-title different-author cases ----------
logger.info("Phase 3: building final DataFrame and identifying title collisions.")

dedup_df = pd.DataFrame(dedup_rows)

# Detect ambiguous titles: same normalized title maps to >1 distinct author identifiers
title_groups = dedup_df.groupby('_normalized_title')['authors_ids_or_names'].apply(lambda s: set(s.dropna().tolist()))
ambiguous_titles = {}
for title_norm, authors_set in title_groups.items():
    # authors_set is a set of strings like "123|456" -> need to split and count unique tokens
    unique_authors = set()
    for s in authors_set:
        if not s:
            continue
        parts = s.split("|")
        for p in parts:
            if p: unique_authors.add(p)
    if len(unique_authors) > 1:
        ambiguous_titles[title_norm] = {
            "n_distinct_authors": len(unique_authors),
            "authors_sample": list(sorted(unique_authors))[:10],
            "work_examples": dedup_df[dedup_df['_normalized_title'] == title_norm][['work_id','canonical_book_id','authors_ids_or_names','title']].to_dict(orient='records')
        }

# add flag column possible_title_collision
dedup_df['possible_title_collision'] = dedup_df['_normalized_title'].apply(lambda t: True if t in ambiguous_titles else False)

# drop helper normalized column before saving, but keep if you want (we keep it)
# dedup_df.drop(columns=['_normalized_title'], inplace=True)

# ---------- Phase 4: Save outputs ----------
logger.info("Phase 4: saving CSV, ambiguous titles CSV, and summary files.")

# Save main dedup CSV (preserve column order: work_id, canonical_book_id, title..., then aggregated columns)
# Ensure aggregated columns exist
agg_cols = ['work_id', 'canonical_book_id', 'title', 'title_without_series', 'authors_ids_or_names',
            'publication_year_inferred', 'publication_year_chosen', 'ratings_count_chosen',
            'text_reviews_count_chosen', 'num_editions_for_work', 'possible_title_collision']
# include all other columns in dedup_df except excluded ones and helper fields
other_cols = [c for c in dedup_df.columns if c not in agg_cols and c != '_normalized_title']
# prioritize archives: put agg_cols first, then other_cols
final_columns = [c for c in agg_cols if c in dedup_df.columns] + other_cols

dedup_df.to_csv(OUT_CSV, index=False, columns=final_columns, encoding='utf-8')
logger.info(f"Saved deduplicated CSV to: {OUT_CSV}")

# Save ambiguous titles details
if ambiguous_titles:
    ambig_rows = []
    for t, info in ambiguous_titles.items():
        ambig_rows.append({
            "normalized_title": t,
            "n_distinct_authors": info["n_distinct_authors"],
            "authors_sample": "|".join(info["authors_sample"]),
            "work_examples_json": json.dumps(info["work_examples"], ensure_ascii=False)
        })
    pd.DataFrame(ambig_rows).to_csv(OUT_AMBIG_TITLES, index=False, encoding='utf-8')
    logger.info(f"Saved ambiguous titles CSV to: {OUT_AMBIG_TITLES}")
else:
    logger.info("No ambiguous titles found.")

# Save summary JSON
summary = {
    "timestamp": ts,
    "input": BOOKS_PATH,
    "output_csv": OUT_CSV,
    "n_raw_records": len(raw_rows),
    "n_works_deduplicated": len(dedup_df),
    "run_subset": RUN_SUBSET,
    "subset_work_count": SUBSET_WORK_COUNT if RUN_SUBSET else None,
    "excluded_columns": sorted(list(EXCLUDE_COLUMNS))
}
with open(OUT_SUMMARY_JSON, "w", encoding="utf-8") as fh:
    json.dump(summary, fh, indent=2)
logger.info(f"Saved summary JSON to: {OUT_SUMMARY_JSON}")

# ---------- Phase 5: Quality report ----------
logger.info("Phase 5: computing quality report metrics.")

def safe_list_get(series, key=None):
    return series.dropna()

quality_lines = []
quality_lines.append(f"Quality report generated at {datetime.datetime.now().isoformat()}\n")
quality_lines.append(f"Input file: {BOOKS_PATH}")
quality_lines.append(f"Output file: {OUT_CSV}")
quality_lines.append(f"Number of raw records read: {len(raw_rows)}")
quality_lines.append(f"Number of deduplicated works: {len(dedup_df)}")
# ratings distribution
ratings_vals = dedup_df['ratings_count_chosen'].dropna().astype(int) if 'ratings_count_chosen' in dedup_df.columns else pd.Series(dtype=float)
if not ratings_vals.empty:
    quality_lines.append(f"Ratings_count_chosen: min={int(ratings_vals.min())}, median={int(ratings_vals.median())}, mean={float(ratings_vals.mean()):.2f}, max={int(ratings_vals.max())}")
else:
    quality_lines.append("Ratings_count_chosen: No data")
# text_reviews distribution
reviews_vals = dedup_df['text_reviews_count_chosen'].dropna().astype(int) if 'text_reviews_count_chosen' in dedup_df.columns else pd.Series(dtype=float)
if not reviews_vals.empty:
    quality_lines.append(f"Text_reviews_count_chosen: min={int(reviews_vals.min())}, median={int(reviews_vals.median())}, mean={float(reviews_vals.mean()):.2f}, max={int(reviews_vals.max())}")
else:
    quality_lines.append("Text_reviews_count_chosen: No data")
# inferred year coverage
inferred_years = dedup_df['publication_year_inferred'].dropna().astype(int) if 'publication_year_inferred' in dedup_df.columns else pd.Series(dtype=float)
if not inferred_years.empty:
    quality_lines.append(f"Publication year inferred: min={int(inferred_years.min())}, median={int(inferred_years.median())}, max={int(inferred_years.max())}")
    missing_years_pct = 100.0 * (len(dedup_df) - len(inferred_years)) / len(dedup_df)
    quality_lines.append(f"Percentage of works missing inferred publication year: {missing_years_pct:.2f}%")
else:
    quality_lines.append("No inferred publication years available")
# edition counts
if 'num_editions_for_work' in dedup_df.columns:
    nd = dedup_df['num_editions_for_work'].dropna().astype(int)
    quality_lines.append(f"Editions per work: min={int(nd.min())}, median={int(nd.median())}, mean={float(nd.mean()):.2f}, max={int(nd.max())}")
# ambiguous titles count
quality_lines.append(f"Number of ambiguous normalized titles (same title, multiple authors): {len(ambiguous_titles)}")

# Save quality report
with open(OUT_QUALITY, "w", encoding="utf-8") as fh:
    fh.write("\n".join(quality_lines))
logger.info(f"Saved quality report to: {OUT_QUALITY}")

# Print a short console summary
logger.info("=== RUN SUMMARY ===")
for line in quality_lines[:10]:
    logger.info(line)
logger.info("=== END SUMMARY ===")

logger.info("Pipeline finished successfully. Please inspect the CSV and reports in the output directory. Remember to commit the script and outputs (small samples) to your git repo before heavy reruns.")

[INFO] Starting deduplication pipeline
INFO:goodreads_dedup_full:Starting deduplication pipeline
[INFO] BOOKS_PATH = /content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/goodreads_books_romance.json.gz
INFO:goodreads_dedup_full:BOOKS_PATH = /content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/goodreads_books_romance.json.gz
[INFO] OUTPUT_DIR = /content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/processing_outputs_full
INFO:goodreads_dedup_full:OUTPUT_DIR = /content/drive/MyDrive/Goodreads_Metadata_Reviews_2017/processing_outputs_full
[INFO] RUN_SUBSET = True, SUBSET_WORK_COUNT = 10
INFO:goodreads_dedup_full:RUN_SUBSET = True, SUBSET_WORK_COUNT = 10
[INFO] Phase 1: streaming JSON and collecting raw records (preserve all keys). This may take some time for the full dump.
INFO:goodreads_dedup_full:Phase 1: streaming JSON and collecting raw records (preserve all keys). This may take some time for the full dump.


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


reading books: 335449it [00:47, 7132.97it/s]
[INFO] Finished streaming. Lines read: 335449. Raw records collected: 335449
INFO:goodreads_dedup_full:Finished streaming. Lines read: 335449. Raw records collected: 335449
[INFO] Converting to pandas DataFrame (this may use memory).
INFO:goodreads_dedup_full:Converting to pandas DataFrame (this may use memory).
[INFO] DataFrame shape: (335449, 32), columns: 32
INFO:goodreads_dedup_full:DataFrame shape: (335449, 32), columns: 32
[INFO] RUN_SUBSET is True: restricting to first 10 work_ids for quick test.
INFO:goodreads_dedup_full:RUN_SUBSET is True: restricting to first 10 work_ids for quick test.
[INFO] Working DataFrame shape after subset filter: (265, 32)
INFO:goodreads_dedup_full:Working DataFrame shape after subset filter: (265, 32)
[INFO] Phase 2: grouping by work_id, computing aggregates, and selecting canonical edition (edition with max ratings_count).
INFO:goodreads_dedup_full:Phase 2: grouping by work_id, computing aggregates, and s