<a href="https://colab.research.google.com/github/codecreator-ai/Research-tool/blob/main/Harvest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import os
drive.mount('/content/drive')
BASE_PATH = '/content/drive/MyDrive/paper-harvest-2025'
os.makedirs(f'{BASE_PATH}/data/raw', exist_ok=True)
os.makedirs(f'{BASE_PATH}/data/pdfs', exist_ok=True)
os.makedirs(f'{BASE_PATH}/data/outputs', exist_ok=True)
os.makedirs(f'{BASE_PATH}/logs', exist_ok=True)
print(f"Project directory: {BASE_PATH}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Project directory: /content/drive/MyDrive/paper-harvest-2025


In [2]:
from google.colab import userdata
import getpass
try:
    SEMANTIC_SCHOLAR_API_KEY = userdata.get('SEMANTIC_SCHOLAR_API_KEY')
    UNPAYWALL_EMAIL = userdata.get('EMAIL_ADDRESS')
    print("Loaded credentials from Colab Secrets")
except:
    print("Colab Secrets not found. Manual input required:")
    SEMANTIC_SCHOLAR_API_KEY = getpass.getpass("Semantic Scholar API Key: ")
    UNPAYWALL_EMAIL = input("Your email for Unpaywall: ")

CROSSREF_MAILTO = UNPAYWALL_EMAIL
EUROPE_PMC_API_KEY = ''

# Pipeline configuration
MAX_PAPERS = 2000
TARGET_YEAR = 2025

print(f"Target year: {TARGET_YEAR}, Max papers: {MAX_PAPERS}")


Loaded credentials from Colab Secrets
Target year: 2025, Max papers: 2000


In [3]:
import requests
import pandas as pd
import json
import re
import time
import logging
from datetime import datetime
from typing import Optional, Tuple, List, Dict
from tqdm.auto import tqdm
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import unicodedata  # Built-in Python module - replaces unidecode
from io import BytesIO, StringIO

# PDF extraction imports - use correct pdfminer.six paths
try:
    from pdfminer.high_level import extract_text
    PDFMINER_AVAILABLE = True
except ImportError:
    try:
        from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
        from pdfminer.converter import TextConverter
        from pdfminer.layout import LAParams
        from pdfminer.pdfpage import PDFPage
        PDFMINER_AVAILABLE = True
    except ImportError:
        PDFMINER_AVAILABLE = False
        print("⚠️ Warning: pdfminer.six not available, PDF extraction will be skipped")

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'{BASE_PATH}/logs/run.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# API Endpoints
SEMANTIC_SCHOLAR_SEARCH_URL = "https://api.semanticscholar.org/graph/v1/paper/search"
SEMANTIC_SCHOLAR_PAPER_FIELDS = "title,abstract,authors,year,venue,url,externalIds"
CROSSREF_API_URL = "https://api.crossref.org/works"
EUROPE_PMC_SEARCH_URL = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
UNPAYWALL_URL = "https://api.unpaywall.org/v2/"

# HTTP Headers
HEADERS = {
    "User-Agent": f"paper-harvester/1.0 (mailto:{UNPAYWALL_EMAIL})"
}

# Rate limiting (seconds)
SS_REQUEST_SLEEP = 0.6
CR_REQUEST_SLEEP = 0.5
EPMC_REQUEST_SLEEP = 0.5
UNPAYWALL_SLEEP = 0.2
HTML_REQUEST_SLEEP = 1.2
PDF_REQUEST_SLEEP = 1.5

# Retry configuration
MAX_RETRIES = 5
BACKOFF_MULT = 2

# Regex patterns for extraction
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
CORR_KEYWORDS_RE = re.compile(
    r"(corresponding author|correspondence to|correspondence:|to whom correspondence should be addressed|author for correspondence)",
    re.I
)
AFFILIATION_KEYWORDS = ["university", "college", "department", "institute", "school", "laboratory", "lab", "center"]

# Classification keywords
INDUSTRY_KEYWORDS = [
    "inc", "llc", "ltd", "gmbh", "co.", "corporation", "company", "biotech", "pharma",
    "amazon", "google", "microsoft", "deepmind", "novartis", "roche", "pfizer", "basf",
    "abbvie", "sanofi", "glaxosmithkline", "gsk", "johnson & johnson", "meta", "anthropic"
]

ACADEMIC_KEYWORDS = [
    "university", "college", "institute", "department", "school", "laboratory",
    "lab", "centr", "center", "faculty", "edu", "california"
]

# Search queries
QUERIES = [
    '("Deep Learning" OR "long short term memory") AND ("cellular differentiation") AND ("cell fate" OR "cell fate decision")',
    '("single-cell" OR "scRNA-seq" OR "single cell RNA") AND ("time series" OR "temporal") AND ("differentiation" OR "cell fate" OR "trajectory")',
    '("machine learning" OR "deep learning" OR "neural network") AND ("cell differentiation" OR "cell fate")',
    '("time series gene expression" OR "temporal gene expression") AND ("cell differentiation" OR "cell fate")',
    '("LSTM" OR "recurrent neural network") AND ("single-cell" OR "scRNA-seq")'
]

print("Constants and configuration loaded")

Constants and configuration loaded


In [4]:
%%bash
source /usr/local/etc/profile.d/conda.sh
conda activate myenv
conda install unidecode -y

Jupyter detected...
2 channel Terms of Service accepted
Channels:
 - defaults
Platform: linux-64
Collecting package metadata (repodata.json): - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
Solving environment: | / done

# All requested packages already installed.



In [5]:
def normalize_doi(doi: str) -> Optional[str]:
    """Normalize DOI to lowercase, strip whitespace, remove prefix"""
    if not doi or doi.strip() == '':
        return None

    doi = doi.lower().strip()

    # Remove common prefixes
    if doi.startswith('doi:'):
        doi = doi[4:]
    elif doi.startswith('https://doi.org/'):
        doi = doi[16:]
    elif doi.startswith('http://doi.org/'):
        doi = doi[15:]

    return doi.strip() if doi.strip() else None


def normalize_title(title: str) -> str:
    """Normalize title: ASCII, lowercase, remove punctuation"""
    if not title:
        return ""

    # Unicode to ASCII using built-in unicodedata
    # NFKD = Compatibility Decomposition, then filter out non-ASCII
    normalized = unicodedata.normalize('NFKD', title)
    ascii_title = normalized.encode('ASCII', 'ignore').decode('ASCII')

    # Lowercase
    ascii_title = ascii_title.lower()

    # Remove punctuation
    ascii_title = re.sub(r'[,.;:!?()\[\]{}]', '', ascii_title)

    # Collapse whitespace
    ascii_title = re.sub(r'\s+', ' ', ascii_title)

    return ascii_title.strip()

def dedupe_and_filter_year(df: pd.DataFrame, year: int = 2025) -> pd.DataFrame:
    """Deduplicate by DOI/title and filter to target year"""
    logger.info(f"Starting deduplication. Input rows: {len(df)}")

    # Create normalized columns
    df['doi_norm'] = df['doi'].apply(normalize_doi)
    df['title_norm'] = df['title'].apply(normalize_title)

    # Filter by year first
    df = df[df['year'] == year].copy()
    logger.info(f"After year filter ({year}): {len(df)} rows")

    # Deduplicate by DOI where available
    df_with_doi = df[df['doi_norm'].notna()].copy()
    df_without_doi = df[df['doi_norm'].isna()].copy()

    df_with_doi = df_with_doi.drop_duplicates(subset=['doi_norm'], keep='first')
    df_without_doi = df_without_doi.drop_duplicates(subset=['title_norm'], keep='first')

    df_deduped = pd.concat([df_with_doi, df_without_doi], ignore_index=True)

    logger.info(f"After deduplication: {len(df_deduped)} rows")

    # Save
    output_path = f'{BASE_PATH}/data/outputs/papers_deduped.csv'
    df_deduped.to_csv(output_path, index=False, encoding='utf-8')
    logger.info(f"Saved to {output_path}")

    return df_deduped

print("Utility functions defined")

Utility functions defined


In [6]:
#@retry(
#    stop=stop_after_attempt(MAX_RETRIES),
#    wait=wait_exponential(multiplier=1, max=60),
#    retry=retry_if_exception_type((requests.exceptions.RequestException, requests.exceptions.Timeout))
#)
def semantic_scholar_search(query: str, offset: int, limit: int = 100) -> dict:
    """Search Semantic Scholar API with retry logic"""
    params = {
        'query': query,
        'offset': offset,
        'limit': limit,
        'fields': SEMANTIC_SCHOLAR_PAPER_FIELDS
    }

    headers = {
        **HEADERS,
        'x-api-key': SEMANTIC_SCHOLAR_API_KEY
    }

    response = requests.get(
        SEMANTIC_SCHOLAR_SEARCH_URL,
        params=params,
        headers=headers,
        timeout=30
    )

    if response.status_code == 429:
        logger.warning("Rate limited by Semantic Scholar, retrying...")
        raise requests.exceptions.RequestException("Rate limited")

    response.raise_for_status()
    time.sleep(SS_REQUEST_SLEEP)

    return response.json()

import re

def sanitize_query_for_ss(query: str) -> str:
    # remove quotes and parentheses and boolean operators, collapse spaces
    q = re.sub(r'["\(\)]', ' ', query)                       # remove quotes & parens
    q = re.sub(r'\b(AND|OR|NOT|&&|\|\|)\b', ' ', q, flags=re.I)  # remove boolean tokens
    q = re.sub(r'[-]', ' ', q)                               # replace hyphens (docs mention issues)
    q = re.sub(r'\s+', ' ', q).strip()
    return q

    # Usage in harvest_all:
    safe_query = sanitize_query_for_ss(query)
    result = semantic_scholar_search(safe_query, offset, limit=100)

def crossref_search_primary(query: str, rows: int = 100, offset: int = 0) -> dict:
    """Search CrossRef API - NO API KEY REQUIRED"""
    params = {
        'query': query,
        'filter': 'from-pub-date:2025-01-01,until-pub-date:2025-12-31',
        'rows': rows,
        'offset': offset,
        'mailto': UNPAYWALL_EMAIL  # Polite usage
    }

    try:
        response = requests.get(CROSSREF_API_URL, params=params, headers=HEADERS, timeout=30)
        response.raise_for_status()
        time.sleep(CR_REQUEST_SLEEP)
        return response.json()
    except Exception as e:
        logger.error(f"CrossRef error: {e}")
        return {}



@retry(
    stop=stop_after_attempt(MAX_RETRIES),
    wait=wait_exponential(multiplier=1, max=60),
    retry=retry_if_exception_type((requests.exceptions.RequestException,))
)
def crossref_search(query: str, rows: int = 100, offset: int = 0) -> dict:
    """Search CrossRef API"""
    params = {
        'query': query,
        'year' : TARGET_YEAR,
        'filter': 'from-pub-date:2025-01-01,until-pub-date:2025-12-31',
        'rows': rows,
        'offset': offset,
        'mailto': CROSSREF_MAILTO
    }

    response = requests.get(CROSSREF_API_URL, params=params, headers=HEADERS, timeout=30)
    response.raise_for_status()
    time.sleep(CR_REQUEST_SLEEP)

    return response.json()

def harvest_all_crossref(queries: List[str]) -> pd.DataFrame:
    """Harvest papers from CrossRef (no API key needed)"""
    all_papers = []
    total_harvested = 0

    logger.info(f"Starting CrossRef harvest with {len(queries)} queries")

    for query_idx, query in enumerate(queries):
        logger.info(f"Query {query_idx + 1}/{len(queries)}: {query[:80]}...")

        offset = 0

        while total_harvested < MAX_PAPERS:
            try:
                result = crossref_search_primary(query, rows=100, offset=offset)

                items = result.get('message', {}).get('items', [])
                if not items:
                    logger.info(f"No more results for query {query_idx + 1}")
                    break

                timestamp = datetime.utcnow().isoformat()

                for item in items:
                    # Extract author info
                    authors_list = []
                    for author in item.get('author', []):
                        authors_list.append({
                            'name': f"{author.get('given', '')} {author.get('family', '')}".strip()
                        })

                    paper_row = {
                        'ss_paper_id': None,  # CrossRef doesn't have this
                        'title': item.get('title', [''])[0] if item.get('title') else '',
                        'abstract': item.get('abstract', ''),  # Often not available in CrossRef
                        'doi': item.get('DOI'),
                        'year': item.get('published', {}).get('date-parts', [[None]])[0][0],
                        'venue': item.get('container-title', [''])[0] if item.get('container-title') else '',
                        'authors': json.dumps(authors_list),
                        'url': item.get('URL', ''),
                        'externalIds': json.dumps({'DOI': item.get('DOI')}),
                        'harvest_source': 'crossref',
                        'harvest_query': query,
                        'harvest_offset': offset,
                        'harvest_timestamp': timestamp
                    }
                    all_papers.append(paper_row)
                    total_harvested += 1

                    if total_harvested >= MAX_PAPERS:
                        break

                logger.info(f"Offset {offset}: fetched {len(items)} papers (total: {total_harvested})")
                offset += 100

                if total_harvested >= MAX_PAPERS:
                    logger.info(f"Reached MAX_PAPERS limit ({MAX_PAPERS})")
                    break

            except Exception as e:
                logger.error(f"Error harvesting query {query_idx + 1} at offset {offset}: {e}")
                break

        if total_harvested >= MAX_PAPERS:
            break

    df = pd.DataFrame(all_papers)

    # Save raw harvest
    output_path = f'{BASE_PATH}/data/outputs/papers_raw.csv'
    df.to_csv(output_path, index=False, encoding='utf-8')
    logger.info(f"Saved {len(df)} raw papers to {output_path}")

    return df

#@retry(
#    stop=stop_after_attempt(MAX_RETRIES),
#    wait=wait_exponential(multiplier=1, max=60),
#    retry=retry_if_exception_type((requests.exceptions.RequestException,))
#)
def europe_pmc_search(query: str, pageSize: int = 100, page: int = 1) -> dict:
    """Search Europe PMC API"""
    full_query = f'{query} PUB_YEAR:{TARGET_YEAR}'
    params = {
        'query': full_query,
        'format': 'json',
        'pageSize': pageSize,
        'page': page
    }

    response = requests.get(EUROPE_PMC_SEARCH_URL, params=params, headers=HEADERS, timeout=30)
    response.raise_for_status()
    time.sleep(EPMC_REQUEST_SLEEP)

    return response.json()


def harvest_all(queries: List[str]) -> pd.DataFrame:
    """Harvest papers from all sources"""
    all_papers = []
    total_harvested = 0

    logger.info(f"Starting harvest with {len(queries)} queries")

    for query_idx, query in enumerate(queries):
        logger.info(f"Query {query_idx + 1}/{len(queries)}: {query[:80]}...")

        offset = 0
        query_count = 0

        while total_harvested < MAX_PAPERS:
            try:
                safe_query = sanitize_query_for_ss(query)
                result = semantic_scholar_search(safe_query, offset, limit=100)

                papers = result.get('data', [])
                if not papers:
                    logger.info(f"No more results for query {query_idx + 1}")
                    break

                timestamp = datetime.utcnow().isoformat()

                for paper in papers:
                    paper_row = {
                        'ss_paper_id': paper.get('paperId'),
                        'title': paper.get('title'),
                        'abstract': paper.get('abstract'),
                        'doi': paper.get('externalIds', {}).get('DOI'),
                        'year': paper.get('year'),
                        'venue': paper.get('venue'),
                        'authors': json.dumps(paper.get('authors', [])),
                        'url': paper.get('url'),
                        'externalIds': json.dumps(paper.get('externalIds', {})),
                        'harvest_source': 'semantic_scholar',
                        'harvest_query': query,
                        'harvest_offset': offset,
                        'harvest_timestamp': timestamp
                    }
                    all_papers.append(paper_row)
                    query_count += 1
                    total_harvested += 1

                    if total_harvested >= MAX_PAPERS:
                        break

                logger.info(f"Offset {offset}: fetched {len(papers)} papers (total: {total_harvested})")
                offset += 100

                if total_harvested >= MAX_PAPERS:
                    logger.info(f"Reached MAX_PAPERS limit ({MAX_PAPERS})")
                    break

            except Exception as e:
                logger.error(f"Error harvesting query {query_idx + 1} at offset {offset}: {e}")
                break

        if total_harvested >= MAX_PAPERS:
            break

    df = pd.DataFrame(all_papers)

    # Save raw harvest
    output_path = f'{BASE_PATH}/data/outputs/papers_raw.csv'
    df.to_csv(output_path, index=False, encoding='utf-8')
    logger.info(f"Saved {len(df)} raw papers to {output_path}")

    return df

print("Harvest functions defined")

Harvest functions defined


In [7]:
@retry(
    stop=stop_after_attempt(MAX_RETRIES),
    wait=wait_exponential(multiplier=1, max=60),
    retry=retry_if_exception_type((requests.exceptions.RequestException,))
)
def unpaywall_lookup(doi: str) -> dict:
    """Lookup paper in Unpaywall API"""
    if not doi:
        return {}

    url = f"{UNPAYWALL_URL}{doi}"
    params = {'email': UNPAYWALL_EMAIL}

    try:
        response = requests.get(url, params=params, headers=HEADERS, timeout=20)

        if response.status_code == 404:
            return {}

        response.raise_for_status()
        time.sleep(UNPAYWALL_SLEEP)

        return response.json()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            return {}
        raise


def attach_pdf_links(df: pd.DataFrame) -> pd.DataFrame:
    """Attach PDF URLs from Unpaywall to dataframe"""
    logger.info(f"Fetching PDF links for {len(df)} papers")

    pdf_urls = []
    oa_statuses = []
    unpaywall_jsons = []

    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Unpaywall lookups"):
        doi_norm = row.get('doi_norm')

        if pd.isna(doi_norm):
            pdf_urls.append(None)
            oa_statuses.append('none')
            unpaywall_jsons.append(None)
            continue

        try:
            result = unpaywall_lookup(doi_norm)

            if not result:
                pdf_urls.append(None)
                oa_statuses.append('none')
                unpaywall_jsons.append(None)
                continue

            # Extract PDF URL
            pdf_url = None
            oa_status = 'closed'

            if result.get('is_oa'):
                best_oa = result.get('best_oa_location')
                if best_oa:
                    pdf_url = best_oa.get('url_for_pdf') or best_oa.get('url')
                    oa_status = best_oa.get('version', 'green')

            pdf_urls.append(pdf_url)
            oa_statuses.append(oa_status)
            unpaywall_jsons.append(json.dumps(result))

        except Exception as e:
            logger.error(f"Error fetching Unpaywall for DOI {doi_norm}: {e}")
            pdf_urls.append(None)
            oa_statuses.append('error')
            unpaywall_jsons.append(None)

    df['pdf_url'] = pdf_urls
    df['oa_status'] = oa_statuses
    df['unpaywall_json'] = unpaywall_jsons

    # Save papers_master.csv
    output_path = f'{BASE_PATH}/data/outputs/papers_master.csv'
    df.to_csv(output_path, index=False, encoding='utf-8')
    logger.info(f"Saved papers_master.csv with {len(df)} rows")

    return df

print("Unpaywall functions defined")

Unpaywall functions defined


In [8]:
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, max=30),
    retry=retry_if_exception_type((requests.exceptions.RequestException,))
)
def extract_email_from_html(url: str) -> Tuple[Optional[str], Optional[str]]:
    """Extract corresponding author email from HTML page"""
    if not url:
        return None, None

    try:
        response = requests.get(url, headers=HEADERS, timeout=20)
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'lxml')
        text = soup.get_text(" ", strip=True)

        # Find all emails
        emails = EMAIL_RE.findall(text)

        if not emails:
            time.sleep(HTML_REQUEST_SLEEP)
            return None, None

        # Check for emails near correspondence keywords
        for email in emails:
            email_pos = text.find(email)
            snippet_start = max(0, email_pos - 200)
            snippet_end = min(len(text), email_pos + 200)
            snippet = text[snippet_start:snippet_end]

            if CORR_KEYWORDS_RE.search(snippet):
                time.sleep(HTML_REQUEST_SLEEP)
                return email, snippet

        # Return first email if no correspondence keyword found
        first_email = emails[0]
        email_pos = text.find(first_email)
        snippet_start = max(0, email_pos - 200)
        snippet_end = min(len(text), email_pos + 200)
        snippet = text[snippet_start:snippet_end]

        time.sleep(HTML_REQUEST_SLEEP)
        return first_email, snippet

    except Exception as e:
        logger.error(f"Error extracting email from HTML {url}: {e}")
        time.sleep(HTML_REQUEST_SLEEP)
        return None, None


def extract_email_from_pdf(pdf_url: str, max_bytes: int = 200_000) -> Tuple[Optional[str], Optional[str]]:
    """Extract corresponding author email from PDF"""
    if not pdf_url or not PDFMINER_AVAILABLE:
        return None, None

    try:
        # Download PDF with size limit
        response = requests.get(pdf_url, headers=HEADERS, timeout=30, stream=True)
        response.raise_for_status()

        pdf_content = BytesIO()
        downloaded = 0

        for chunk in response.iter_content(chunk_size=8192):
            if downloaded >= max_bytes:
                break
            pdf_content.write(chunk)
            downloaded += len(chunk)

        pdf_content.seek(0)

        # Extract text from PDF using simpler method
        try:
            # Try high-level API first (newer pdfminer.six)
            from pdfminer.high_level import extract_text
            text = extract_text(pdf_content)
        except ImportError:
            # Fallback to lower-level API (older pdfminer.six)
            from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
            from pdfminer.converter import TextConverter
            from pdfminer.layout import LAParams
            from pdfminer.pdfpage import PDFPage

            output = StringIO()
            manager = PDFResourceManager()
            converter = TextConverter(manager, output, laparams=LAParams())
            interpreter = PDFPageInterpreter(manager, converter)

            for page in PDFPage.get_pages(pdf_content, maxpages=4):  # First 4 pages only
                interpreter.process_page(page)

            converter.close()
            text = output.getvalue()
            output.close()

        # Find emails
        emails = EMAIL_RE.findall(text)

        if not emails:
            time.sleep(PDF_REQUEST_SLEEP)
            return None, None

        # Check for emails near correspondence keywords
        for email in emails:
            email_pos = text.find(email)
            snippet_start = max(0, email_pos - 200)
            snippet_end = min(len(text), email_pos + 200)
            snippet = text[snippet_start:snippet_end]

            if CORR_KEYWORDS_RE.search(snippet):
                time.sleep(PDF_REQUEST_SLEEP)
                return email, snippet

        # Return first email
        first_email = emails[0]
        email_pos = text.find(first_email)
        snippet_start = max(0, email_pos - 200)
        snippet_end = min(len(text), email_pos + 200)
        snippet = text[snippet_start:snippet_end]

        time.sleep(PDF_REQUEST_SLEEP)
        return first_email, snippet

    except Exception as e:
        logger.error(f"Error extracting email from PDF {pdf_url}: {e}")
        time.sleep(PDF_REQUEST_SLEEP)
        return None, None

def extract_affiliation_from_snippet(snippet: str) -> Optional[str]:
    """Extract affiliation string from text snippet"""
    if not snippet:
        return None

    snippet_lower = snippet.lower()

    # Look for affiliation keywords
    for keyword in AFFILIATION_KEYWORDS:
        if keyword in snippet_lower:
            # Extract surrounding context (up to 150 chars)
            pos = snippet_lower.find(keyword)
            start = max(0, pos - 75)
            end = min(len(snippet), pos + 75)
            return snippet[start:end].strip()

    return None


def find_corresponding_author(row: pd.Series) -> dict:
    """Find corresponding author for a paper"""
    result = {
        'paper_id': row.get('ss_paper_id'),
        'doi_norm': row.get('doi_norm'),
        'title': row.get('title'),
        'contact_email': None,
        'contact_source': 'none',
        'affiliation_snippet': None,
        'candidate_author_name': None,
        'candidate_author_affiliation_raw': None,
        'extraction_timestamp': datetime.utcnow().isoformat(),
        'extraction_notes': []
    }

    # Try HTML first
    if pd.notna(row.get('url')):
        try:
            email, snippet = extract_email_from_html(row['url'])
            if email:
                result['contact_email'] = email
                result['contact_source'] = 'publisher_html'
                result['affiliation_snippet'] = snippet
                result['candidate_author_affiliation_raw'] = extract_affiliation_from_snippet(snippet)
                result['extraction_notes'].append('Found in publisher HTML')
                result['extraction_notes'] = '; '.join(result['extraction_notes'])
                return result
            else:
                result['extraction_notes'].append('No email in HTML')
        except Exception as e:
            result['extraction_notes'].append(f'HTML extraction error: {str(e)[:100]}')

    # Try PDF
    if pd.notna(row.get('pdf_url')):
        try:
            email, snippet = extract_email_from_pdf(row['pdf_url'])
            if email:
                result['contact_email'] = email
                result['contact_source'] = 'pdf'
                result['affiliation_snippet'] = snippet
                result['candidate_author_affiliation_raw'] = extract_affiliation_from_snippet(snippet)
                result['extraction_notes'].append('Found in PDF')
                result['extraction_notes'] = '; '.join(result['extraction_notes'])
                return result
            else:
                result['extraction_notes'].append('No email in PDF')
        except Exception as e:
            result['extraction_notes'].append(f'PDF extraction error: {str(e)[:100]}')

    # Try to get last author as candidate
    try:
        authors = json.loads(row.get('authors', '[]'))
        if authors:
            last_author = authors[-1]
            result['candidate_author_name'] = last_author.get('name')
            result['extraction_notes'].append('Last author recorded as candidate')
    except:
        pass

    result['extraction_notes'] = '; '.join(result['extraction_notes'])
    return result


def bulk_find_corresponding_authors(df: pd.DataFrame) -> pd.DataFrame:
    """Bulk extract corresponding authors"""
    logger.info(f"Extracting corresponding authors for {len(df)} papers")

    results = []

    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Extracting authors"):
        result = find_corresponding_author(row)
        result['harvest_provenance'] = f"row_{idx}"
        results.append(result)

        # Checkpoint every 100 papers
        if (idx + 1) % 100 == 0:
            temp_df = pd.DataFrame(results)
            temp_df.to_csv(f'{BASE_PATH}/data/outputs/authors_candidates_temp.csv', index=False, encoding='utf-8')
            logger.info(f"Checkpoint: processed {idx + 1} papers")

    results_df = pd.DataFrame(results)
    return results_df

print("Extraction functions defined")

Extraction functions defined


In [9]:
def classify_affiliation_string(aff_str: Optional[str]) -> str:
    """Classify affiliation as Academic, Industry, or Unknown"""
    if not aff_str or aff_str.strip() == '':
        return "Unknown"

    aff_lower = aff_str.lower()

    # Check for industry keywords first
    for keyword in INDUSTRY_KEYWORDS:
        if keyword in aff_lower:
            return "Industry"

    # Check for academic keywords
    for keyword in ACADEMIC_KEYWORDS:
        if keyword in aff_lower:
            return "Academic"

    # Check for academic email domains
    if re.search(r'\.edu(?:\.|$)|\.ac\.|\.edu\.', aff_lower):
        return "Academic"

    return "Unknown"


def affiliation_decision_rules(row: dict) -> dict:
    """Apply filtering rules based on affiliation type"""
    aff_type = classify_affiliation_string(row.get('candidate_author_affiliation_raw'))

    row['affiliation_type'] = aff_type

    if aff_type == 'Industry':
        row['keep_candidate'] = False
        row['drop_reason'] = 'Industry affiliation'
    elif aff_type == 'Unknown':
        row['keep_candidate'] = False
        row['drop_reason'] = 'Unknown affiliation'
    elif aff_type == 'Academic':
        row['keep_candidate'] = True
        row['drop_reason'] = ''
    else:
        row['keep_candidate'] = False
        row['drop_reason'] = 'Unclassified'

    return row

print("Classification functions defined")

Classification functions defined


In [11]:
print("="*80)
print("STARTING PAPER HARVESTING PIPELINE")
print("="*80)

# Step 1: Harvest papers
print("\n[1/5] Harvesting papers from Semantic Scholar...")
df_raw = harvest_all_crossref(QUERIES)
if df_raw.empty:
    raise RuntimeError("Semantic Scholar harvest returned 0 papers — aborting pipeline")
print(f"✅ Harvested {len(df_raw)} raw papers")

# Step 2: Deduplicate and filter by year
print("\n[2/5] Deduplicating and filtering by year...")
df_deduped = dedupe_and_filter_year(df_raw, year=TARGET_YEAR)
print(f"✅ {len(df_deduped)} papers after deduplication")

# Step 3: Attach PDF links from Unpaywall
print("\n[3/5] Fetching PDF links from Unpaywall...")
df_master = attach_pdf_links(df_deduped)
pdf_count = df_master['pdf_url'].notna().sum()
print(f"✅ Found PDF links for {pdf_count}/{len(df_master)} papers")

# Step 4: Extract corresponding authors
print("\n[4/5] Extracting corresponding author emails...")
df_authors = bulk_find_corresponding_authors(df_master)
print(f"✅ Processed {len(df_authors)} papers for author extraction")

# Step 5: Classify and filter
print("\n[5/5] Classifying affiliations...")
authors_classified = df_authors.apply(lambda row: pd.Series(affiliation_decision_rules(row.to_dict())), axis=1)

# Save final authors_candidates.csv
output_path = f'{BASE_PATH}/data/outputs/authors_candidates.csv'
authors_classified.to_csv(output_path, index=False, encoding='utf-8')
print(f"✅ Saved {len(authors_classified)} candidates to authors_candidates.csv")

# Save checkpoint marker
with open(f'{BASE_PATH}/data/outputs/.last_harvest_index', 'w') as f:
    f.write(str(len(df_master)))

print("\n" + "="*80)
print("PIPELINE COMPLETE")
print("="*80)

STARTING PAPER HARVESTING PIPELINE

[1/5] Harvesting papers from Semantic Scholar...


  timestamp = datetime.utcnow().isoformat()


✅ Harvested 2000 raw papers

[2/5] Deduplicating and filtering by year...
✅ 1949 papers after deduplication

[3/5] Fetching PDF links from Unpaywall...


Unpaywall lookups:   0%|          | 0/1949 [00:00<?, ?it/s]

✅ Found PDF links for 1120/1949 papers

[4/5] Extracting corresponding author emails...


Extracting authors:   0%|          | 0/1949 [00:00<?, ?it/s]

  'extraction_timestamp': datetime.utcnow().isoformat(),
ERROR:__main__:Error extracting email from HTML https://doi.org/10.2139/ssrn.5417893: 403 Client Error: Forbidden for url: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=5417893
ERROR:__main__:Error extracting email from HTML https://doi.org/10.1039/d4tb02102a/v2/decision1: Failed to parse: 'www.webofknowledge.comundefinednull&referrer=target%3dhttps%253a%252f%252fwww.webofscience.com%252fwos%253fisproductcode%253dyes%2526init%253dyes%2526destparams%253d%25252fwos%25252fwoscc%25252ffull-record%25252f10.1039%2525252fd4tb02102a%25253ftype%25253ddoi%2526destapp%253dwosfv%2526func%253dframe%2526srcapp%253dpublons%2526type%253ddoi%2526sid%253dusw2ec0d07que50arnkczaaqfpabd%26sid%3dusw2ec0d07que50arnkczaaqfpabd%26detectsessioncomplete%3dtrue', label empty or too long
ERROR:__main__:Error extracting email from HTML https://doi.org/10.1002/1878-0261.70141/v1/decision1: Failed to parse: 'www.webofknowledge.comundefinednull&referrer=ta

✅ Processed 1949 papers for author extraction

[5/5] Classifying affiliations...
✅ Saved 1949 candidates to authors_candidates.csv

PIPELINE COMPLETE


In [None]:
print("\n📊 PIPELINE SUMMARY STATISTICS")
print("="*80)

# Check file existence
import os
files_to_check = [
    'papers_raw.csv',
    'papers_deduped.csv',
    'papers_master.csv',
    'authors_candidates.csv'
]

for filename in files_to_check:
    filepath = f'{BASE_PATH}/data/outputs/{filename}'
    exists = os.path.exists(filepath)
    print(f"{'✅' if exists else '❌'} {filename}: {'EXISTS' if exists else 'MISSING'}")

print("\n" + "-"*80)

# Load and verify papers_raw
df_raw_check = pd.read_csv(f'{BASE_PATH}/data/outputs/papers_raw.csv')
print(f"📄 papers_raw.csv: {len(df_raw_check)} rows")

# Verify papers_deduped
df_deduped_check = pd.read_csv(f'{BASE_PATH}/data/outputs/papers_deduped.csv')
print(f"📄 papers_deduped.csv: {len(df_deduped_check)} rows")
year_check = df_deduped_check['year'].unique()
print(f"   Years present: {year_check}")
assert all(y == TARGET_YEAR for y in year_check), f"ERROR: Found years other than {TARGET_YEAR}"

# Verify papers_master
df_master_check = pd.read_csv(f'{BASE_PATH}/data/outputs/papers_master.csv')
print(f"📄 papers_master.csv: {len(df_master_check)} rows")
assert 'pdf_url' in df_master_check.columns, "ERROR: Missing pdf_url column"
assert 'oa_status' in df_master_check.columns, "ERROR: Missing oa_status column"
papers_with_pdf = df_master_check['pdf_url'].notna().sum()
print(f"   Papers with PDF: {papers_with_pdf} ({papers_with_pdf/len(df_master_check)*100:.1f}%)")

# Verify authors_candidates
df_authors_check = pd.read_csv(f'{BASE_PATH}/data/outputs/authors_candidates.csv')
print(f"📄 authors_candidates.csv: {len(df_authors_check)} rows")
assert 'contact_email' in df_authors_check.columns, "ERROR: Missing contact_email column"
assert 'affiliation_type' in df_authors_check.columns, "ERROR: Missing affiliation_type column"
assert 'keep_candidate' in df_authors_check.columns, "ERROR: Missing keep_candidate column"

papers_with_email = df_authors_check['contact_email'].notna().sum()
print(f"   Papers with email: {papers_with_email} ({papers_with_email/len(df_authors_check)*100:.1f}%)")

# Classification breakdown
print("\n📊 Affiliation Classification:")
for aff_type in ['Academic', 'Industry', 'Unknown']:
    count = (df_authors_check['affiliation_type'] == aff_type).sum()
    print(f"   {aff_type}: {count}")

# Keep/drop breakdown
kept = df_authors_check['keep_candidate'].sum()
dropped = len(df_authors_check) - kept
print(f"\n✅ Candidates kept: {kept}")
print(f"❌ Candidates dropped: {dropped}")

# Drop reasons
if 'drop_reason' in df_authors_check.columns:
    print("\n📋 Drop Reasons:")
    drop_reasons = df_authors_check[df_authors_check['drop_reason'] != '']['drop_reason'].value_counts()
    for reason, count in drop_reasons.items():
        print(f"   {reason}: {count}")

print("\n" + "="*80)
print("✅ ALL VERIFICATION CHECKS PASSED")
print("="*80)