# Preprocessing

Do initial preprocessing of corpora to enable analysis later

In [None]:
import json
from urllib.parse import urlparse
from dotenv import load_dotenv
import logging
from pathlib import Path
import re
from collections import defaultdict
import pandas as pd
import glob
import requests
import time
from tqdm import tqdm
import os


# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()


## Which Grokipedia pages include licensing info?

In [None]:
def split_by_license_phrase_from_paragraph(
    scraped_dir="../scraped_data",
    phrase="The content is adapted fromWikipedia, licensed under Creative Commons Attribution-ShareAlike 4.0 License",
    test_fp=None
):
    """
    Separates Grokipedia pages into two sets depending on whether the license phrase (possibly missing a space)
    occurs in the last 10 paragraphs of the 'paragraph' field of any page in each JSONL file.

    Returns:
        (set of file paths containing the phrase, set of file paths not containing the phrase)
    """

    articles_w_phrase = set()
    articles_wo_phrase = set()
    dir_path = Path(scraped_dir)
    # i = 0

    if test_fp:
        with open(test_fp, "r", encoding="utf-8") as f:
            for line in f:
                found = False
                page = json.loads(line)
                data = page.get("data", None)
                article_title = data.get("main_title", None)
                paragraphs = data.get("paragraphs", None)
                if isinstance(paragraphs, list) and paragraphs:
                    for para in paragraphs:
                        if isinstance(para, str) and re.search(phrase, para):
                            found = True
                if found:
                    articles_w_phrase.add(article_title)
                else:
                    articles_wo_phrase.add(article_title)
                    
    else:
        for file_path in dir_path.glob("*.jsonl"):
            found = False
            with open(file_path, "r", encoding="utf-8") as f:
                for line in f:
                    # if i % 40000 == 0:
                    #     print(f"Processing article {i} of ~885k")
                    # i += 1
                    try:
                        data = json.loads(line)
                        found = False
                        page = json.loads(line)
                        data = page.get("data", None)
                        article_title = data.get("main_title", None)
                        paragraphs = data.get("paragraphs", None)
                        if isinstance(paragraphs, list) and paragraphs:
                            for para in paragraphs:
                                if isinstance(para, str) and re.search(phrase, para):
                                    found = True
                        if found:
                            articles_w_phrase.add(article_title)
                        else:
                            articles_wo_phrase.add(article_title)
                    except Exception:
                        continue
    return articles_w_phrase, articles_wo_phrase

# Example usage to print summary:
w_phrase, wo_phrase = split_by_license_phrase_from_paragraph()
print(f"Files containing phrase: {len(w_phrase)}")
print(f"Files NOT containing phrase: {len(wo_phrase)}")

In [None]:
with open('../results/grokipedia_w_license.txt', 'w') as f:
    for title in w_phrase:
        if title:
            f.write(title + "\n")

with open('../results/grokipedia_wo_license.txt', 'w') as f:
    for title in wo_phrase:
        if title:
            f.write(title + "\n")

# Which domains were added / removed from Wikipedia --> Grokipedia?

In [None]:
def stem_domain(domain):
    # Remove common www/amp/mobile etc, keep only base domain
    if domain.startswith("www."):
        domain = domain[4:]
    # Remove mobile/amp subdomains if desired
    domain = re.sub(r"^(m|amp|en|web|mobile)\.", "", domain)
    return domain.lower()

def extract_grokipedia_stemmed_reference_domains(
    scraped_dir="../scraped_data",
    test_fp=None,
    titles_file=None
):
    """
    Returns list of dicts: [{title: {domain: count, ...}}, ...]
    Only includes references with a ['link']['href'] URL.
    If titles_file is provided, results are filtered to those titles (exact match).
    """
    # Load filter set (normalized with underscores)
    title_allowlist = None
    if titles_file:
        with open(titles_file, "r", encoding="utf-8") as f:
            title_allowlist = {line.strip().replace(" ", "_") for line in f if line.strip()}

    result = []
    dir_path = Path(scraped_dir)

    def handle_page(page):
        data = page.get('data', {})
        title = data.get('title', None)
        if title is None:
            return
        title_norm = title.replace(" ", "_")
        if title_allowlist and title_norm not in title_allowlist:
            return

        refs = data.get('references', [])
        domains_dict = defaultdict(lambda: {"count": 0})
        for ref in refs:
            link = ref.get('link') if isinstance(ref, dict) else None
            href = link.get('href') if (link and isinstance(link, dict)) else None
            if href:
                domain = urlparse(href).netloc
                if domain:
                    stemmed = stem_domain(domain)
                    domains_dict[stemmed]["count"] += 1

        result.append({title_norm: {d: v["count"] for d, v in domains_dict.items()}})

    if test_fp:
        with open(test_fp, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    page = json.loads(line)
                except Exception:
                    continue
                handle_page(page)
    else:
        # i = 0
        for file_path in dir_path.glob("*.jsonl"):
            with open(file_path, "r", encoding="utf-8") as f:
                for line in f:
                    # if i % 40000 == 0:
                    #     print(f"Processing article {i} of ~885k")
                    # i += 1
                    try:
                        page = json.loads(line)
                    except Exception:
                        continue
                    handle_page(page)

    return result

def extract_wikipedia_stemmed_reference_domains(
    article_fp="../grokipedia_wikipedia_articles.ndjson",
    test=False,
    titles_file=None
):
    """
    Returns a list of dicts for each article of the form:
    [{title: {domain: url, "count": count, ...}}, ...]
    Only includes references with a ['metadata']['url'] field containing a URL.
    The article title is the top-level 'name' key in the JSON object.
    """

    title_allowlist = None
    if titles_file:
        with open(titles_file, "r", encoding="utf-8") as f:
            title_allowlist = {line.strip().replace(" ", "_") for line in f if line.strip()}
    
    result = []
    with open(article_fp, "r", encoding="utf-8") as f:
        for i, line in enumerate(f):
            if test and i > 100:
                break
            try:
                article = json.loads(line)
            except Exception:
                continue
            title = article.get("name", None).replace(" ", "_")
            if title_allowlist and title not in title_allowlist:
                continue
            refs = article.get('references', [])
            domains_dict = defaultdict(lambda: {"url": None, "count": 0})
            for ref in refs:
                metadata = ref.get("metadata") if isinstance(ref, dict) else None
                url = metadata.get("url") if (metadata and isinstance(metadata, dict)) else None
                if url:
                    domain = None
                    try:
                        domain = urlparse(url).netloc
                    except Exception:
                        print(f"error parsing URL: {url}")
                        continue
                    if domain:
                        stemmed = stem_domain(domain)
                        domains_dict[stemmed]["count"] += 1
            if title is not None:
                result.append({title: {d: v["count"] for d, v in domains_dict.items()}})
    for article_dict in result:
        for title, domains in article_dict.items():
            for domain_k, val in domains.items():
                if isinstance(val, dict) and "url" in val and "count" in val and val["count"] == 1:
                    domains[domain_k] = val["url"]
    return result

In [None]:
grok_domains = extract_grokipedia_stemmed_reference_domains()
wp_domains = extract_wikipedia_stemmed_reference_domains()

with open('../results/wp_domains.json', 'w') as f:
    json.dump(wp_domains, f)

with open('../results/grok_domains.json', 'w') as f:
    json.dump(grok_domains, f)

In [None]:
titles_file = '../results/grokipedia_w_license.txt'
grok_domains_w_license = extract_grokipedia_stemmed_reference_domains(titles_file=titles_file)
wp_domains_w_license = extract_wikipedia_stemmed_reference_domains(titles_file=titles_file)

with open('../results/wp_domains_w_license.json', 'w') as f:
    json.dump(wp_domains_w_license, f)

with open('../results/grok_domains_w_license.json', 'w') as f:
    json.dump(grok_domains_w_license, f)

In [None]:
titles_file = '../results/grokipedia_wo_license.txt'
grok_domains_wo_license = extract_grokipedia_stemmed_reference_domains(titles_file=titles_file)
wp_domains_wo_license = extract_wikipedia_stemmed_reference_domains(titles_file=titles_file)

with open('../results/wp_domains_wo_license.json', 'w') as f:
    json.dump(wp_domains_wo_license, f)

with open('../results/grok_domains_wo_license.json', 'w') as f:
    json.dump(grok_domains_wo_license, f)

## On a per-page basis, which domains were added or removed?

In [None]:

def analyze_domain_deltas_df(wiki, groki) -> pd.DataFrame:
    # Normalize source to dict[title] -> dict[domain] -> count
    def to_map(source):
        if isinstance(source, dict):
            return source
        m = {}
        for d in source:              # list of {title: {...}}
            for k, v in d.items():
                m[k] = v
        return m

    wiki_map = to_map(wiki)
    grok_map = to_map(groki)

    # Common titles
    titles = set(wiki_map.keys()) & set(grok_map.keys())
    total_titles = len(titles)
    if total_titles == 0:
        return pd.DataFrame(columns=[
            'domain','delta_sum','added_pages','removed_pages',
            'added_prop','removed_prop','total_titles_compared'
        ])

    # Expand rows
    def to_df(source_map, label):
        rows = []
        for title in titles:
            for domain, count in (source_map.get(title, {}) or {}).items():
                rows.append({'title': title, 'domain': domain, label: int(count)})
        return pd.DataFrame(rows)

    wiki_df = to_df(wiki_map, 'wiki_count')
    groki_df = to_df(grok_map, 'groki_count')

    merged_df = pd.merge(wiki_df, groki_df, on=['title','domain'], how='outer').fillna(0)
    merged_df['wiki_count'] = merged_df['wiki_count'].astype(int)
    merged_df['groki_count'] = merged_df['groki_count'].astype(int)
    merged_df['delta'] = merged_df['groki_count'] - merged_df['wiki_count']

    def domain_stats(g):
        delta_sum = g['delta'].sum()
        added_pages = (g['delta'] > 0).sum()
        removed_pages = (g['delta'] < 0).sum()
        return pd.Series({
            'delta_sum': int(delta_sum),
            'added_pages': int(added_pages),
            'removed_pages': int(removed_pages),
            'added_prop': added_pages / total_titles,
            'removed_prop': removed_pages / total_titles,
            'total_titles_compared': total_titles,
        })

    return merged_df.groupby('domain').apply(domain_stats).reset_index()

In [None]:

with open('../results/wp_domains.json', 'r') as f:
    wp_domains = json.load(f)

with open('../results/grok_domains.json', 'r') as f:
    grok_domains = json.load(f)

df = analyze_domain_deltas_df(wp_domains, grok_domains)
df.to_csv('../results/domain_deltas.csv', index=False)

In [None]:
with open('../results/wp_domains_wo_license.json', 'r') as f:
    wp_domains_wo_license = json.load(f)

with open('../results/grok_domains_wo_license.json', 'r') as f:
    grok_domains_wo_license = json.load(f)

df_wo_license = analyze_domain_deltas_df(wp_domains_wo_license, grok_domains_wo_license)
df_wo_license.to_csv('../results/domain_deltas_wo_license.csv', index=False)

In [None]:
with open('../results/wp_domains_w_license.json', 'r') as f:
    wp_domains_w_license = json.load(f)

with open('../results/grok_domains_w_license.json', 'r') as f:
    grok_domains_w_license = json.load(f)

df_w_license = analyze_domain_deltas_df(wp_domains_w_license, grok_domains_w_license)
df_w_license.to_csv('../results/domain_deltas_w_license.csv', index=False)

## Gather academic domains

In [None]:
BASE_URL = "https://api.openalex.org/sources"
PER_PAGE = 200  # max allowed by OpenAlex

def extract_domain(url):
    if not url:
        return None
    parsed = urlparse(url)
    return parsed.netloc.lower() if parsed.netloc else None

journals = []
cursor = "*"  # OpenAlex uses a cursor-based pagination

while True:
    params = {
        "filter": "is_oa:true,type:journal",  # open access journals only
        "per-page": PER_PAGE,
        "cursor": cursor
    }
    response = requests.get(BASE_URL, params=params)
    if response.status_code != 200:
        print(f"❌ Error: {response.status_code}")
        break

    data = response.json()
    results = data.get("results", [])
    if not results:
        print("✅ No more results — finished fetching.")
        break

    for j in results:
        title = j.get("display_name")
        homepage = j.get("homepage_url")
        domain = extract_domain(homepage)
        if title and domain:
            journals.append({"Title": title, "Domain": domain})

    print(f"Fetched {len(journals)} journals so far...")

    cursor = data.get("meta", {}).get("next_cursor")
    if not cursor:
        print("✅ Finished fetching all journals.")
        break

    time.sleep(1)  # polite rate limit

# Save to CSV
journal_df = pd.DataFrame(journals)
journal_df.to_csv("../supplemental_data/domain_lists/openalex_journal_domains.csv", index=False, encoding="utf-8")

domain_set = set(df.Domain.tolist())

## Extract perennial source ratings

(Using the perennial sources table as ground truth: https://en.wikipedia.org/wiki/Wikipedia:Reliable_sources/Perennial_sources)

In [None]:
def parse_perennial_sources_tables():
    """
    Parse all perennial sources tables and extract domains with reliability status.
    Returns a DataFrame with columns: domain, source_name, reliability_status, last_updated
    """
    wikitable_files = glob.glob('../supplemental_data/perennial_sources_enwiki/*.txt')
    
    all_records = []
    
    # Status code mapping
    status_map = {
        'gr': 'generally_reliable',
        'd': 'deprecated',
        'gu': 'generally_unreliable',
        'nc': 'no_consensus',
        'b': 'blacklisted'
    }
    
    for filepath in wikitable_files:
        with open(filepath, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        
        # Parse the file line by line to extract rows
        i = 0
        while i < len(lines):
            line = lines[i].strip()
            
            # Look for row start: |- class="s-{status}" id="{Source Name}"
            row_match = re.search(r'^\|\-.*class="s-([^"]+)".*id="([^"]+)"', line)
            if not row_match:
                i += 1
                continue
            
            status_code = row_match.group(1)
            source_id = row_match.group(2)
            status = status_map.get(status_code, status_code)
            
            # Collect cells for this row (cells start with |)
            cells = []
            i += 1
            while i < len(lines):
                cell_line = lines[i].strip()
                if cell_line.startswith('|-'):
                    # Next row started, break
                    break
                if cell_line.startswith('|') and not cell_line.startswith('|+') and not cell_line.startswith('!'):
                    # This is a cell (remove leading |)
                    cell_content = cell_line[1:].strip()
                    if cells and (cell_line.startswith('| ') or cell_line.startswith('|{')):
                        # New cell
                        cells.append(cell_content)
                    elif cells and cell_content:
                        # Continuation of previous cell (multiline cell)
                        cells[-1] += ' ' + cell_content
                    elif not cells and cell_content:
                        # First cell
                        cells.append(cell_content)
                i += 1
            
            # We need at least 6 cells (source, status, discussions, last, summary, uses)
            if len(cells) < 6:
                continue
            
            # Parse cells
            # Cell 0: Source name
            source_cell = cells[0] if len(cells) > 0 else ''
            source_name = None
            link_match = re.search(r'\[\[([^\|\[\]]+)(?:\|[^\]]+)?\]\]', source_cell)
            if link_match:
                source_name = link_match.group(1)
            else:
                # Extract from data-sort-value or just clean up the text
                sort_match = re.search(r'data-sort-value="([^"]+)"', source_cell)
                if sort_match:
                    source_name = sort_match.group(1)
                else:
                    # Clean up wikitext
                    source_name = re.sub(r'\{\{[^}]+\}\}', '', source_cell)
                    source_name = re.sub(r'\[\[[^\]]+\]\]', '', source_name)
                    source_name = re.sub(r"''([^']+)''", r'\1', source_name)
                    source_name = source_name.strip()
            
            # Cell 1: RSPSTATUS template (confirm status)
            if len(cells) > 1:
                status_cell = cells[1]
                status_match = re.search(r'\{\{WP:RSPSTATUS\|([^}|]+)', status_cell)
                if status_match:
                    status_code = status_match.group(1)
                    status = status_map.get(status_code, status_code)
            
            # Cell 3: RSPLAST template (last updated year)
            last_updated = None
            if len(cells) > 3:
                last_cell = cells[3]
                last_match = re.search(r'\{\{WP:RSPLAST\|(\d{4})', last_cell)
                if last_match:
                    last_updated = int(last_match.group(1))
            
            # Cell 5: RSPUSES template (domains)
            domains = []
            if len(cells) > 5:
                uses_cell = cells[5]
                # Extract domains from {{WP:RSPUSES|domain1|domain2|...}}
                uses_match = re.search(r'\{\{WP:RSPUSES\|([^}]+)\}\}', uses_cell)
                if uses_match:
                    domains_str = uses_match.group(1)
                    # Split by | and clean up
                    domains = [d.strip() for d in domains_str.split('|') if d.strip()]
                    # Remove any extra content after }} or before [
                    domains = [d.split('}}')[0].split(' ')[0] for d in domains if not d.startswith('[')]
            
            # If no domains or source name found, skip this row
            if not domains or not source_name:
                continue
            
            # Create one record per domain
            for domain in domains:
                # Normalize domain (remove http://, www., trailing slashes)
                domain = re.sub(r'^https?://', '', domain)
                domain = re.sub(r'^www\.', '', domain)
                domain = domain.split('/')[0].lower().strip()
                
                all_records.append({
                    'domain': domain,
                    'source_name': source_name,
                    'source_id': source_id,
                    'reliability_status': status,
                    'last_updated': last_updated
                })
    
    df = pd.DataFrame(all_records)
    
    # Remove duplicates (same domain from same source)
    df = df.drop_duplicates(subset=['domain', 'source_name'])
    
    # Sort by domain
    df = df.sort_values('domain').reset_index(drop=True)
    
    return df

# Parse the tables
reliability_df = parse_perennial_sources_tables()
print(f"Parsed {len(reliability_df)} domain entries")
print(f"Unique domains: {reliability_df['domain'].nunique()}")

reliability_df.to_csv('../supplemental_data/perennial_sources_enwiki/perennial_sources.csv', index=False)

## Parse book cites

(this still needs work)

In [None]:
# ISBN patterns
ISBN_PATTERN = re.compile(
    r'(?:ISBN[-:]?\s*)?'
    r'(?:'
    r'(?:978|979)[- ]?\d{1,5}[- ]?\d{1,7}[- ]?\d{1,6}[- ]?\d'  # ISBN-13
    r'|'
    r'\d{1,5}[- ]?\d{1,7}[- ]?\d{1,6}[- ]?[\dX]'  # ISBN-10
    r')',
    re.IGNORECASE
)

YEAR_PATTERN = re.compile(
    r'(?:\(|\()?(\d{4})(?:\)|\))?',
    re.IGNORECASE
)

PAGE_PATTERN = re.compile(
    r'\b(?:p\.?|pp\.?|page|pages)\s+\d+(?:[-–]\d+)?',
    re.IGNORECASE
)

def contains_isbn(text):
    if not isinstance(text, str):
        return False
    return bool(ISBN_PATTERN.search(text))

def contains_books_google_link(text_or_url):
    if not isinstance(text_or_url, str):
        return False
    return 'books.google.com' in text_or_url.lower()

def contains_year(text):
    if not isinstance(text, str):
        return False
    matches = YEAR_PATTERN.findall(text)
    for match in matches:
        year = int(match)
        if 1000 <= year <= 2999:
            return True
    return False

def contains_page_ref(text):
    if not isinstance(text, str):
        return False
    return bool(PAGE_PATTERN.search(text))

def is_book_citation(ref):
    indicators = {
        'has_isbn': False,
        'has_books_google': False,
        'has_year': False,
        'has_page_ref': False,
        'is_book': False
    }
    texts_to_check = []
    if isinstance(ref, dict):
        metadata = ref.get('metadata', {})
        if isinstance(metadata, dict):
            for field in ['text', 'title', 'isbn', 'isbn10', 'isbn13', 'citation', 'note', 'url']:
                value = metadata.get(field)
                if value:
                    texts_to_check.append(str(value))
        ref_text = ref.get('text') or ref.get('citation')
        if ref_text:
            texts_to_check.append(str(ref_text))
        link = ref.get('link')
        if isinstance(link, dict):
            link_href = link.get('href', '')
            link_text = link.get('text', '')
            if link_href:
                texts_to_check.append(link_href)
            if link_text:
                texts_to_check.append(link_text)
        url = ref.get('url')
        if url:
            texts_to_check.append(str(url))
    elif isinstance(ref, str):
        texts_to_check.append(ref)
    for text in texts_to_check:
        if contains_isbn(text):
            indicators['has_isbn'] = True
        if contains_books_google_link(text):
            indicators['has_books_google'] = True
        if contains_year(text):
            indicators['has_year'] = True
        if contains_page_ref(text):
            indicators['has_page_ref'] = True
    indicators['is_book'] = (
        indicators['has_isbn'] or
        indicators['has_books_google'] or
        (indicators['has_year'] and indicators['has_page_ref'])
    )
    return indicators

def book_cites_list_with_titles_wikipedia(article_fp="../grokipedia_wikipedia_articles.ndjson", test=False):
    """
    Returns a list of dicts: [{'title': ..., 'book_cites': ...}, ...]
    """
    article_book_cites = []
    with open(article_fp, 'r', encoding='utf-8') as f:
        for i, line in enumerate(tqdm(f, desc="Processing Wikipedia articles")):
            if test and i > 10000:
                break
            try:
                article = json.loads(line)
            except Exception:
                continue
            refs = article.get('references', [])
            book_cites_this_article = 0
            for ref in refs:
                indicators = is_book_citation(ref)
                if indicators['is_book']:
                    book_cites_this_article += 1
            title = article.get('name', f"(no_title_{i})")
            article_book_cites.append({'title': title, 'book_cites': book_cites_this_article})
    return article_book_cites

def book_cites_list_with_titles_grokipedia(article_fp="../grokipedia_scrape.ndjson", test=False):
    """
    Returns a list of dicts: [{'title': ..., 'book_cites': ...}, ...]
    """
    article_book_cites = []
    with open(article_fp, 'r', encoding='utf-8') as f:
        for i, line in enumerate(tqdm(f, desc="Processing Grokipedia articles")):
            if test and i > 10000:
                break
            line = line.strip()
            if not line:
                continue
            try:
                article = json.loads(line)
            except Exception:
                continue
            # Grokipedia structure: may have 'data' wrapper or direct 'references'
            data = article.get('data', article)
            refs = data.get('references', [])
            book_cites_this_article = 0
            for ref in refs:
                indicators = is_book_citation(ref)
                if indicators['is_book']:
                    book_cites_this_article += 1
            # Use 'title' field of root or 'data'
            title = data.get('title') or article.get('title') or f"(no_title_{i})"
            article_book_cites.append({'title': title, 'book_cites': book_cites_this_article})
    return article_book_cites

# Output directories
os.makedirs("../results", exist_ok=True)

# Wikipedia
print("Generating Wikipedia article:book_cite list...")
wp_book_cites_list = book_cites_list_with_titles_wikipedia(test=False)
with open("../results/wikipedia_article_book_cites.json", "w", encoding="utf-8") as f:
    json.dump(wp_book_cites_list, f, ensure_ascii=False, indent=2)
print(f"Wikipedia article book citation list written to ../results/wikipedia_article_book_cites.json ({len(wp_book_cites_list)} articles)")

# Grokipedia
print("Generating Grokipedia article:book_cite list...")
grok_book_cites_list = book_cites_list_with_titles_grokipedia(test=False)
with open("../results/grokipedia_article_book_cites.json", "w", encoding="utf-8") as f:
    json.dump(grok_book_cites_list, f, ensure_ascii=False, indent=2)
print(f"Grokipedia article book citation list written to ../results/grokipedia_article_book_cites.json ({len(grok_book_cites_list)} articles)")