# Sanity Checks — Science Data Lake

10 diagnostic checks validating data integrity at join boundaries.
Each check prints **PASS** or **FAIL**.

In [1]:
import duckdb
import numpy as np

DB_PATH = '/mnt/nvme03/science_datalake/datalake.duckdb'
con = duckdb.connect(DB_PATH, read_only=True)
con.execute('SET threads=16')

results = {}

def report(num, title, passed, detail=""):
    status = "PASS" if passed else "FAIL"
    results[num] = (title, status)
    icon = "\u2705" if passed else "\u274c"
    print(f"{icon} Check {num} \u2014 {title}: {status}")
    if detail:
        print(f"   {detail}")

print("Connected to Science Data Lake")

Connected to Science Data Lake


In [2]:
# Check 1: DOI format — no http prefix, all lowercase
row = con.sql("""
    SELECT
        COUNT(*) FILTER (WHERE doi LIKE 'http%') AS http_prefix,
        COUNT(*) FILTER (WHERE doi != LOWER(doi)) AS not_lowercase,
        COUNT(*) AS total
    FROM xref.unified_papers
""").fetchone()
violations = row[0] + row[1]
report(1, "DOI format (no http prefix, all lowercase)", violations == 0,
       f"http prefix: {row[0]:,} | uppercase: {row[1]:,} | total: {row[2]:,}")

✅ Check 1 — DOI format (no http prefix, all lowercase): PASS
   http prefix: 0 | uppercase: 0 | total: 293,123,121


In [3]:
# Check 2: Coverage flags match data presence
row = con.sql("""
    SELECT
        COUNT(*) FILTER (WHERE has_openalex != (openalex_id IS NOT NULL)) AS oa,
        COUNT(*) FILTER (WHERE has_s2ag != (s2ag_corpusid IS NOT NULL)) AS s2ag,
        COUNT(*) FILTER (WHERE has_sciscinet != (sciscinet_paperid IS NOT NULL)) AS ssn
    FROM xref.unified_papers
""").fetchone()
total = row[0] + row[1] + row[2]
report(2, "Coverage flags match data", total == 0,
       f"OA: {row[0]:,} | S2AG: {row[1]:,} | SciSciNet: {row[2]:,}")

✅ Check 2 — Coverage flags match data: PASS
   OA: 0 | S2AG: 0 | SciSciNet: 0


In [4]:
# Check 3: Primary key uniqueness (no duplicate DOIs)
row = con.sql("""
    SELECT COUNT(*) AS total, COUNT(DISTINCT doi) AS unique_dois
    FROM xref.unified_papers
""").fetchone()
dups = row[0] - row[1]
report(3, "Primary key uniqueness (no duplicate DOIs)", dups == 0,
       f"total: {row[0]:,} | unique: {row[1]:,} | duplicates: {dups:,}")

✅ Check 3 — Primary key uniqueness (no duplicate DOIs): PASS
   total: 293,123,121 | unique: 293,123,121 | duplicates: 0


In [5]:
# Check 4: OpenAlex ID format (https://openalex.org/W...) + joinability to works_topics
fmt_bad = con.sql("""
    SELECT COUNT(*) FROM xref.unified_papers
    WHERE openalex_id IS NOT NULL
      AND openalex_id NOT LIKE 'https://openalex.org/W%'
""").fetchone()[0]

# Sample 1000 openalex_ids and check join to works_topics
join = con.sql("""
    SELECT
        COUNT(DISTINCT u.openalex_id) AS total,
        COUNT(DISTINCT CASE WHEN wt.work_id IS NOT NULL THEN u.openalex_id END) AS matched
    FROM (
        SELECT openalex_id FROM xref.unified_papers
        WHERE openalex_id IS NOT NULL
        USING SAMPLE 1000
    ) u
    LEFT JOIN openalex.works_topics wt ON wt.work_id = u.openalex_id
""").fetchone()
rate = join[1] / join[0] if join[0] > 0 else 0
passed = fmt_bad == 0 and rate > 0.5
report(4, "OpenAlex ID format + joinability", passed,
       f"format violations: {fmt_bad:,} | join match: {join[1]}/{join[0]} ({rate:.0%})")

✅ Check 4 — OpenAlex ID format + joinability: PASS
   format violations: 0 | join match: 672/999 (67%)


In [6]:
# Check 5: topic_ontology_map -> openalex.topics (no orphan topic_ids)
orphans = con.sql("""
    SELECT COUNT(DISTINCT tom.topic_id)
    FROM xref.topic_ontology_map tom
    LEFT JOIN openalex.topics t ON tom.topic_id = t.id
    WHERE t.id IS NULL
""").fetchone()[0]
report(5, "topic_ontology_map -> openalex.topics (no orphans)", orphans == 0,
       f"orphan topic_ids: {orphans:,}")

✅ Check 5 — topic_ontology_map -> openalex.topics (no orphans): PASS
   orphan topic_ids: 0


In [7]:
# Check 6: RoS oaid -> OpenAlex join (sample 10K)
# ros.pcs_oa.oaid is BIGINT; unified_papers.openalex_id is 'https://openalex.org/W<num>'
join = con.sql("""
    SELECT
        COUNT(*) AS total,
        COUNT(u.openalex_id) AS matched
    FROM (
        SELECT DISTINCT
            'https://openalex.org/W' || CAST(oaid AS VARCHAR) AS expected_id
        FROM ros.pcs_oa
        WHERE oaid IS NOT NULL
        USING SAMPLE 10000
    ) r
    LEFT JOIN xref.unified_papers u ON u.openalex_id = r.expected_id
""").fetchone()
rate = join[1] / join[0] if join[0] > 0 else 0
report(6, "RoS oaid -> OpenAlex join (10K sample)", rate > 0.85,
       f"match rate: {rate:.1%} ({join[1]:,}/{join[0]:,})")

✅ Check 6 — RoS oaid -> OpenAlex join (10K sample): PASS
   match rate: 86.7% (8,324/9,606)


In [8]:
# Check 7: Citation count cross-source correlation (S2AG vs OA vs SciSciNet)
row = con.sql("""
    SELECT
        CORR(CAST(s2ag_citationcount AS DOUBLE), CAST(oa_cited_by_count AS DOUBLE)) AS r_s2ag_oa,
        CORR(CAST(s2ag_citationcount AS DOUBLE), CAST(sciscinet_citation_count AS DOUBLE)) AS r_s2ag_ssn,
        CORR(CAST(oa_cited_by_count AS DOUBLE), CAST(sciscinet_citation_count AS DOUBLE)) AS r_oa_ssn
    FROM xref.unified_papers
    WHERE s2ag_citationcount IS NOT NULL
      AND oa_cited_by_count IS NOT NULL
      AND sciscinet_citation_count IS NOT NULL
""").fetchone()
r_vals = [row[0], row[1], row[2]]
n_above = sum(1 for r in r_vals if r is not None and r > 0.8)
# Pass if at least 2 of 3 pairwise correlations exceed 0.8
passed = n_above >= 2
report(7, "Citation cross-source correlation (>=2 pairs r>0.8)", passed,
       f"S2AG\u2194OA: {row[0]:.4f} | S2AG\u2194SSN: {row[1]:.4f} | OA\u2194SSN: {row[2]:.4f}")

✅ Check 7 — Citation cross-source correlation (>=2 pairs r>0.8): PASS
   S2AG↔OA: 0.7599 | S2AG↔SSN: 0.8686 | OA↔SSN: 0.8643


In [9]:
# Check 8: Year distribution (NULL and invalid counts < 1% each)
row = con.sql("""
    SELECT
        COUNT(*) AS total,
        COUNT(*) FILTER (WHERE year IS NULL) AS null_yr,
        COUNT(*) FILTER (WHERE year < 1500 OR year > 2026) AS bad_yr
    FROM xref.unified_papers
""").fetchone()
null_pct = 100 * row[1] / row[0]
bad_pct = 100 * row[2] / row[0]
report(8, "Year distribution (NULL/invalid < 1% each)", null_pct < 1 and bad_pct < 1,
       f"NULL: {row[1]:,} ({null_pct:.4f}%) | invalid: {row[2]:,} ({bad_pct:.4f}%)")

✅ Check 8 — Year distribution (NULL/invalid < 1% each): PASS
   NULL: 1,548,939 (0.5284%) | invalid: 4,958 (0.0017%)


In [10]:
# Check 9: Spot-check 3 known papers
all_ok = True

# 9a: Wakefield 1998 (retracted MMR-autism paper)
w = con.sql("""
    SELECT doi, has_retraction, oa_is_retracted,
           s2ag_citationcount, oa_cited_by_count, year
    FROM xref.unified_papers
    WHERE doi = '10.1016/s0140-6736(97)11096-0'
""").fetchdf()
if len(w) == 0:
    print("   Wakefield paper not found!")
    all_ok = False
else:
    r = w.iloc[0]
    ok = bool(r['has_retraction'])
    sym = "\u2713" if ok else "\u2717"
    print(f"   {sym} Wakefield 1998: retraction_flag={r['has_retraction']}, "
          f"oa_retracted={r['oa_is_retracted']}, year={r['year']}, "
          f"cites(S2AG/OA)={r['s2ag_citationcount']}/{r['oa_cited_by_count']}")
    if not ok:
        all_ok = False

# 9b: Retraction rate sanity (should be < 1% of all papers)
row = con.sql("""
    SELECT
        COUNT(*) FILTER (WHERE has_retraction) AS retracted,
        COUNT(*) AS total
    FROM xref.unified_papers
""").fetchone()
ret_pct = 100 * row[0] / row[1]
ok2 = ret_pct < 1
sym2 = "\u2713" if ok2 else "\u2717"
print(f"   {sym2} Retraction rate: {row[0]:,}/{row[1]:,} ({ret_pct:.4f}%)")
if not ok2:
    all_ok = False

# 9c: PWC paper exists with correct flag
pwc = con.sql("""
    SELECT doi, has_pwc, title
    FROM xref.unified_papers WHERE has_pwc LIMIT 1
""").fetchdf()
if len(pwc) == 0:
    print("   \u2717 No PWC papers found!")
    all_ok = False
else:
    print(f"   \u2713 PWC sample: doi={pwc.iloc[0]['doi']}, has_pwc={pwc.iloc[0]['has_pwc']}")

report(9, "Spot-check known papers", all_ok)

   ✓ Wakefield 1998: retraction_flag=True, oa_retracted=True, year=1998, cites(S2AG/OA)=153/2903
   ✓ Retraction rate: 60,074/293,123,121 (0.0205%)
   ✓ PWC sample: doi=10.48550/arxiv.2203.01482, has_pwc=True
✅ Check 9 — Spot-check known papers: PASS


In [11]:
# Check 10: Vignette count reproducibility (V1-V4 key numbers)
expected = {
    'V1 total unified papers': (
        "SELECT COUNT(*) FROM xref.unified_papers",
        293_123_121
    ),
    'V2 retracted w/ SciSciNet': (
        "SELECT COUNT(*) FROM xref.unified_papers WHERE has_retraction AND has_sciscinet",
        58_775
    ),
    'V3 patent-cited papers': (
        "SELECT COUNT(*) FROM xref.unified_papers WHERE has_patent",
        312_929
    ),
    'V4 papers in all 3 sources': (
        "SELECT COUNT(*) FROM xref.unified_papers WHERE has_s2ag AND has_openalex AND has_sciscinet",
        120_990_697
    ),
}
all_match = True
for label, (sql, exp) in expected.items():
    actual = con.sql(sql).fetchone()[0]
    ok = actual == exp
    sym = "\u2713" if ok else "\u2717"
    print(f"   {sym} {label}: {actual:,} (expected {exp:,})")
    if not ok:
        all_match = False
report(10, "Vignette count reproducibility", all_match)

   ✓ V1 total unified papers: 293,123,121 (expected 293,123,121)
   ✓ V2 retracted w/ SciSciNet: 58,775 (expected 58,775)
   ✓ V3 patent-cited papers: 312,929 (expected 312,929)
   ✓ V4 papers in all 3 sources: 120,990,697 (expected 120,990,697)
✅ Check 10 — Vignette count reproducibility: PASS


In [12]:
# Summary
print()
print("=" * 60)
print("SANITY CHECK SUMMARY")
print("=" * 60)
n_pass = sum(1 for _, s in results.values() if s == "PASS")
n_fail = sum(1 for _, s in results.values() if s == "FAIL")
for num in sorted(results):
    title, status = results[num]
    icon = "\u2705" if status == "PASS" else "\u274c"
    print(f"  {icon} Check {num:2d}: {status}  \u2014 {title}")
print(f"\n{n_pass} passed, {n_fail} failed out of {len(results)} checks")
if n_fail == 0:
    print("\n\u2705 ALL CHECKS PASSED")
else:
    print(f"\n\u274c {n_fail} CHECK(S) FAILED \u2014 review above for details")

con.close()


SANITY CHECK SUMMARY
  ✅ Check  1: PASS  — DOI format (no http prefix, all lowercase)
  ✅ Check  2: PASS  — Coverage flags match data
  ✅ Check  3: PASS  — Primary key uniqueness (no duplicate DOIs)
  ✅ Check  4: PASS  — OpenAlex ID format + joinability
  ✅ Check  5: PASS  — topic_ontology_map -> openalex.topics (no orphans)
  ✅ Check  6: PASS  — RoS oaid -> OpenAlex join (10K sample)
  ✅ Check  7: PASS  — Citation cross-source correlation (>=2 pairs r>0.8)
  ✅ Check  8: PASS  — Year distribution (NULL/invalid < 1% each)
  ✅ Check  9: PASS  — Spot-check known papers
  ✅ Check 10: PASS  — Vignette count reproducibility

10 passed, 0 failed out of 10 checks

✅ ALL CHECKS PASSED
