# Complete Pipeline: Data Ingestion ‚Üí Preprocessing

This notebook combines all steps from notebooks 01-05 into a single pipeline:

1. **ArXiv Ingestion** - Fetch papers from ArXiv API
2. **ACL Anthology** - Download and parse ACL papers
3. **S2ORC Ingestion** - Fetch from Semantic Scholar
4. **Unified Metadata** - Combine and deduplicate
5. **Preprocessing** - Clean and tokenize text

**‚è±Ô∏è Total Runtime:** ~3-4 hours

**üíæ Storage Required:** ~500MB-1GB

---

## Table of Contents
- [Setup & Installation](#setup)
- [Part 1: ArXiv Ingestion](#part1)
- [Part 2: ACL Anthology](#part2)
- [Part 3: S2ORC Ingestion](#part3)
- [Part 4: Unified Metadata](#part4)
- [Part 5: Preprocessing](#part5)
- [Final Summary](#summary)

<a id='setup'></a>
## Setup & Installation

Install all required packages upfront.

In [None]:
# install all required packages
!pip install pandas arxiv requests beautifulsoup4 lxml pyarrow spacy nltk tqdm -q
!python -m spacy download en_core_web_sm

In [None]:
# import all libraries
import os
import re
import json
import time
import pandas as pd
import numpy as np
from datetime import datetime
from collections import Counter
from tqdm import tqdm

# data ingestion
import arxiv
import requests
from bs4 import BeautifulSoup

# NLP
import spacy
import nltk
from nltk.corpus import stopwords

# colab
from google.colab import drive

print("‚úì All imports successful!")

In [None]:
# mount Google Drive
drive.mount('/content/drive')

In [None]:
# create directory structure
os.makedirs('data/raw', exist_ok=True)
os.makedirs('data/processed', exist_ok=True)
os.makedirs('src/preprocessing', exist_ok=True)

print("‚úì Directory structure created")

In [None]:
# download NLTK data
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)

# load spacy model
nlp = spacy.load('en_core_web_sm')

# get stopwords
stop_words = set(stopwords.words('english'))

print("‚úì NLP models loaded")

---
<a id='part1'></a>
# Part 1: ArXiv Ingestion

Fetch papers from ArXiv API for NLP-related categories.

In [None]:
def fetch_arxiv_papers(categories, max_results_per_category=300):
    """
    Fetch papers from arxiv for given categories.
    """
    client = arxiv.Client()
    papers = []
    
    for cat in categories:
        print(f"Fetching papers from {cat}...")
        
        search = arxiv.Search(
            query = f'cat:{cat}',
            max_results = max_results_per_category,
            sort_by = arxiv.SortCriterion.SubmittedDate
        )
        
        count = 0
        for result in client.results(search):
            paper = {
                'paper_id': result.entry_id.split('/')[-1],
                'title': result.title,
                'authors': [author.name for author in result.authors],
                'abstract': result.summary.replace('\n', ' '),
                'categories': result.categories,
                'primary_category': result.primary_category,
                'published': result.published.isoformat(),
                'updated': result.updated.isoformat(),
                'pdf_url': result.pdf_url,
                'venue': 'arXiv',
                'year': result.published.year
            }
            papers.append(paper)
            count += 1
            
            if count % 50 == 0:
                print(f"  Fetched {count} papers from {cat}...")
        
        print(f"Completed {cat}: {count} papers fetched\n")
    
    return papers

In [None]:
# fetch ArXiv papers
TARGET_CATEGORIES = ['cs.CL', 'cs.LG', 'stat.ML']

print("Starting ArXiv ingestion...\n")
arxiv_papers = fetch_arxiv_papers(TARGET_CATEGORIES, max_results_per_category=300)

print(f"\n‚úì Total ArXiv papers fetched: {len(arxiv_papers)}")

In [None]:
# save ArXiv data
arxiv_df = pd.DataFrame(arxiv_papers)
arxiv_df.to_parquet('data/raw/arxiv_papers.parquet', index=False)

print(f"‚úì Saved {len(arxiv_df)} ArXiv papers to data/raw/arxiv_papers.parquet")
print(f"  File size: {os.path.getsize('data/raw/arxiv_papers.parquet') / (1024*1024):.2f} MB")

---
<a id='part2'></a>
# Part 2: ACL Anthology Ingestion

Download and parse ACL Anthology BibTeX dump.

In [None]:
# download ACL anthology BibTeX
print("Downloading ACL Anthology BibTeX file...")
!wget https://aclanthology.org/anthology.bib.gz -O data/raw/acl_anthology.bib.gz -q
!gunzip -f data/raw/acl_anthology.bib.gz

print("‚úì Downloaded and extracted ACL Anthology")

In [None]:
def parse_bibtex_file(filepath):
    """
    Parse BibTeX file and extract paper metadata.
    """
    papers = []
    
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        current_entry = {}
        in_entry = False
        
        for line in f:
            line = line.strip()
            
            if line.startswith('@'):
                if current_entry and 'title' in current_entry:
                    papers.append(current_entry)
                
                parts = line[1:].split('{')
                if len(parts) == 2:
                    entry_type = parts[0].lower()
                    paper_id = parts[1].rstrip(',')
                    current_entry = {'paper_id': paper_id, 'entry_type': entry_type}
                    in_entry = True
            
            elif line.startswith('}'):
                if current_entry and 'title' in current_entry:
                    papers.append(current_entry)
                current_entry = {}
                in_entry = False
            
            elif in_entry and '=' in line:
                field_parts = line.split('=', 1)
                if len(field_parts) == 2:
                    field_name = field_parts[0].strip()
                    field_value = field_parts[1].strip().strip(',').strip('{}').strip('"')
                    current_entry[field_name] = field_value
            
            if len(papers) % 5000 == 0 and len(papers) > 0:
                print(f"  Parsed {len(papers)} papers...")
    
    if current_entry and 'title' in current_entry:
        papers.append(current_entry)
    
    return papers

In [None]:
# parse BibTeX file
print("Parsing ACL BibTeX file (this takes a few minutes)...\n")
acl_papers = parse_bibtex_file('data/raw/acl_anthology.bib')

print(f"\n‚úì Total ACL papers parsed: {len(acl_papers)}")

In [None]:
def normalize_acl_paper(paper):
    """Convert BibTeX entry to standard schema."""
    authors = []
    if 'author' in paper:
        authors = [a.strip() for a in paper['author'].split(' and ')]
    
    venue = paper.get('booktitle', paper.get('journal', 'ACL Anthology'))
    
    year = None
    if 'year' in paper:
        try:
            year = int(paper['year'])
        except:
            pass
    
    return {
        'paper_id': paper.get('paper_id', ''),
        'title': paper.get('title', ''),
        'authors': authors,
        'abstract': paper.get('abstract', ''),
        'venue': venue,
        'year': year,
        'url': paper.get('url', ''),
        'doi': paper.get('doi', ''),
        'pages': paper.get('pages', ''),
    }

# normalize and filter
print("Normalizing ACL papers...")
acl_normalized = [normalize_acl_paper(p) for p in acl_papers]

acl_df = pd.DataFrame(acl_normalized)
acl_df = acl_df[acl_df['title'].str.len() > 0]
acl_df = acl_df[acl_df['year'] >= 2015]  # filter to recent papers

print(f"‚úì Filtered to {len(acl_df)} ACL papers (2015+)")

In [None]:
# save ACL data
acl_df.to_parquet('data/raw/acl_anthology_papers.parquet', index=False)

print(f"‚úì Saved {len(acl_df)} ACL papers to data/raw/acl_anthology_papers.parquet")
print(f"  File size: {os.path.getsize('data/raw/acl_anthology_papers.parquet') / (1024*1024):.2f} MB")

---
<a id='part3'></a>
# Part 3: S2ORC Ingestion

Fetch papers from Semantic Scholar API.

In [None]:
S2_API_BASE = "https://api.semanticscholar.org/graph/v1"

def search_semantic_scholar(query, limit=100):
    """
    Search Semantic Scholar for papers.
    """
    fields = ['paperId', 'title', 'abstract', 'authors', 'year', 
              'venue', 'citationCount', 'referenceCount', 'fieldsOfStudy',
              'publicationDate', 'journal', 'externalIds']
    
    url = f"{S2_API_BASE}/paper/search"
    params = {
        'query': query,
        'limit': min(limit, 100),
        'fields': ','.join(fields)
    }
    
    try:
        response = requests.get(url, params=params, timeout=10)
        if response.status_code == 200:
            return response.json().get('data', [])
        else:
            print(f"  Error {response.status_code}")
            return []
    except Exception as e:
        print(f"  Request failed: {e}")
        return []

In [None]:
# search queries
SEARCH_QUERIES = [
    'natural language processing',
    'transformers bert gpt',
    'machine translation',
    'sentiment analysis',
    'named entity recognition',
    'question answering',
    'text summarization',
    'language models',
]

print("Fetching papers from Semantic Scholar...\n")

all_s2_papers = []
for i, query in enumerate(SEARCH_QUERIES):
    print(f"{i+1}/{len(SEARCH_QUERIES)}: '{query}'...", end=' ')
    papers = search_semantic_scholar(query, limit=100)
    all_s2_papers.extend(papers)
    print(f"got {len(papers)} papers")
    time.sleep(1)  # respect API limits

print(f"\n‚úì Total S2 papers fetched: {len(all_s2_papers)}")

In [None]:
# deduplicate
unique_ids = set()
unique_s2_papers = []

for paper in all_s2_papers:
    paper_id = paper.get('paperId')
    if paper_id and paper_id not in unique_ids:
        unique_ids.add(paper_id)
        unique_s2_papers.append(paper)

print(f"‚úì Unique S2 papers: {len(unique_s2_papers)}")

In [None]:
def normalize_s2orc_paper(paper):
    """Convert S2 API response to standard schema."""
    authors = []
    if 'authors' in paper and paper['authors']:
        authors = [a.get('name', '') for a in paper['authors'] if a.get('name')]
    
    venue = paper.get('venue', '')
    if not venue and 'journal' in paper and paper['journal']:
        venue = paper['journal'].get('name', '')
    
    categories = paper.get('fieldsOfStudy', [])
    if not categories:
        categories = []
    
    return {
        'paper_id': paper.get('paperId', ''),
        'title': paper.get('title', ''),
        'authors': authors,
        'abstract': paper.get('abstract', ''),
        'venue': venue,
        'year': paper.get('year'),
        'publication_date': paper.get('publicationDate', ''),
        'citation_count': paper.get('citationCount', 0),
        'reference_count': paper.get('referenceCount', 0),
        'fields_of_study': categories,
        'external_ids': paper.get('externalIds', {}),
    }

# normalize and filter
s2_normalized = [normalize_s2orc_paper(p) for p in unique_s2_papers]
s2_filtered = [p for p in s2_normalized if p['title'] and p['abstract']]

s2_df = pd.DataFrame(s2_filtered)

print(f"‚úì Filtered to {len(s2_df)} S2 papers with title and abstract")

In [None]:
# save S2ORC data
s2_df.to_parquet('data/raw/s2orc_papers.parquet', index=False)

print(f"‚úì Saved {len(s2_df)} S2ORC papers to data/raw/s2orc_papers.parquet")
print(f"  File size: {os.path.getsize('data/raw/s2orc_papers.parquet') / (1024*1024):.2f} MB")

---
<a id='part4'></a>
# Part 4: Unified Metadata

Combine all three sources into a unified dataset.

In [None]:
# load all datasets
print("Loading datasets...")
arxiv_df = pd.read_parquet('data/raw/arxiv_papers.parquet')
acl_df = pd.read_parquet('data/raw/acl_anthology_papers.parquet')
s2_df = pd.read_parquet('data/raw/s2orc_papers.parquet')

print(f"  ArXiv: {len(arxiv_df)} papers")
print(f"  ACL: {len(acl_df)} papers")
print(f"  S2ORC: {len(s2_df)} papers")

In [None]:
def normalize_arxiv(df):
    """Normalize ArXiv to unified schema."""
    if df.empty:
        return pd.DataFrame()
    
    return pd.DataFrame({
        'paper_id': 'arxiv_' + df['paper_id'].astype(str),
        'title': df['title'],
        'authors': df['authors'],
        'abstract': df['abstract'],
        'venue': df['venue'],
        'year': df['year'],
        'categories': df['categories'],
        'source': 'arxiv',
        'metadata': df.apply(lambda row: {
            'primary_category': row.get('primary_category', ''),
            'published': row.get('published', ''),
            'pdf_url': row.get('pdf_url', '')
        }, axis=1)
    })

def normalize_acl(df):
    """Normalize ACL to unified schema."""
    if df.empty:
        return pd.DataFrame()
    
    df['categories'] = df['venue'].apply(lambda x: ['NLP', 'Computational Linguistics'])
    
    return pd.DataFrame({
        'paper_id': 'acl_' + df['paper_id'].astype(str).str.replace('/', '_'),
        'title': df['title'],
        'authors': df['authors'],
        'abstract': df['abstract'].fillna(''),
        'venue': df['venue'],
        'year': df['year'],
        'categories': df['categories'],
        'source': 'acl',
        'metadata': df.apply(lambda row: {
            'url': row.get('url', ''),
            'doi': row.get('doi', ''),
            'pages': row.get('pages', '')
        }, axis=1)
    })

def normalize_s2orc(df):
    """Normalize S2ORC to unified schema."""
    if df.empty:
        return pd.DataFrame()
    
    return pd.DataFrame({
        'paper_id': 's2_' + df['paper_id'].astype(str),
        'title': df['title'],
        'authors': df['authors'],
        'abstract': df['abstract'],
        'venue': df['venue'],
        'year': df['year'],
        'categories': df['fields_of_study'],
        'source': 's2orc',
        'metadata': df.apply(lambda row: {
            'citation_count': row.get('citation_count', 0),
            'reference_count': row.get('reference_count', 0),
            'publication_date': row.get('publication_date', ''),
            'external_ids': row.get('external_ids', {})
        }, axis=1)
    })

# normalize all
print("\nNormalizing datasets...")
arxiv_norm = normalize_arxiv(arxiv_df)
acl_norm = normalize_acl(acl_df)
s2_norm = normalize_s2orc(s2_df)

print(f"  ArXiv normalized: {len(arxiv_norm)}")
print(f"  ACL normalized: {len(acl_norm)}")
print(f"  S2ORC normalized: {len(s2_norm)}")

In [None]:
# combine all
all_dfs = [arxiv_norm, acl_norm, s2_norm]
all_dfs = [df for df in all_dfs if not df.empty]

unified_df = pd.concat(all_dfs, ignore_index=True)

print(f"\n‚úì Combined dataset: {len(unified_df)} papers")

In [None]:
# clean and deduplicate
print("\nCleaning and deduplicating...")

# filter
cleaned_df = unified_df[
    (unified_df['title'].str.len() > 10) &
    (unified_df['abstract'].str.len() > 50) &
    (unified_df['authors'].apply(len) > 0)
].copy()

print(f"  After filtering: {len(cleaned_df)} papers")

# deduplicate by title
source_priority = {'acl': 1, 's2orc': 2, 'arxiv': 3}
cleaned_df['source_rank'] = cleaned_df['source'].map(source_priority)

deduped_df = cleaned_df.sort_values('source_rank').drop_duplicates(
    subset=['title'], 
    keep='first'
).drop('source_rank', axis=1)

print(f"  After deduplication: {len(deduped_df)} papers")
print(f"  Duplicates removed: {len(cleaned_df) - len(deduped_df)}")

In [None]:
# add computed fields
deduped_df['title_length'] = deduped_df['title'].str.len()
deduped_df['abstract_length'] = deduped_df['abstract'].str.len()
deduped_df['num_authors'] = deduped_df['authors'].apply(len)
deduped_df['num_categories'] = deduped_df['categories'].apply(
    lambda x: len(x) if isinstance(x, list) else 0
)

print("\n‚úì Added computed fields")

In [None]:
# save unified dataset
deduped_df.to_parquet('data/processed/unified_papers.parquet', index=False)

print(f"\n‚úì Saved {len(deduped_df)} papers to data/processed/unified_papers.parquet")
print(f"  File size: {os.path.getsize('data/processed/unified_papers.parquet') / (1024*1024):.2f} MB")

print("\nSource distribution:")
print(deduped_df['source'].value_counts())

---
<a id='part5'></a>
# Part 5: Preprocessing

Clean and preprocess text data.

In [None]:
# define preprocessing functions

def clean_text(text):
    """Remove URLs, emails, and extra whitespace."""
    if not isinstance(text, str):
        return ""
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'\S+@\S+', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

def remove_special_chars(text):
    """Remove special characters."""
    text = re.sub(r'[^a-zA-Z0-9\s.,!?-]', '', text)
    text = re.sub(r'([.,!?-])\1+', r'\1', text)
    return text

def tokenize_text(text):
    """Tokenize using spaCy."""
    doc = nlp(text)
    return [token.text for token in doc]

def remove_stopwords(tokens):
    """Remove stopwords."""
    return [token for token in tokens if token.lower() not in stop_words]

def lemmatize_tokens(tokens):
    """Lemmatize tokens."""
    text = ' '.join(tokens)
    doc = nlp(text)
    return [token.lemma_ for token in doc]

def preprocess_text(text, lowercase=True, remove_stops=True, lemmatize=True):
    """Complete preprocessing pipeline."""
    if not isinstance(text, str) or len(text) == 0:
        return {
            'cleaned_text': '',
            'tokens': [],
            'processed_text': ''
        }
    
    # clean
    text = clean_text(text)
    text = remove_special_chars(text)
    
    if lowercase:
        text = text.lower()
    
    cleaned_text = text
    
    # tokenize
    tokens = tokenize_text(text)
    
    # remove stopwords
    if remove_stops:
        tokens = remove_stopwords(tokens)
    
    # lemmatize
    if lemmatize:
        tokens = lemmatize_tokens(tokens)
    
    # filter
    tokens = [t for t in tokens if len(t) > 2 and t.isalnum()]
    
    processed_text = ' '.join(tokens)
    
    return {
        'cleaned_text': cleaned_text,
        'tokens': tokens,
        'processed_text': processed_text
    }

print("‚úì Preprocessing functions defined")

In [None]:
# load unified dataset and sample
df = pd.read_parquet('data/processed/unified_papers.parquet')

sample_size = 500
if len(df) > sample_size:
    sample_df = df.sample(n=sample_size, random_state=42).copy()
else:
    sample_df = df.copy()

print(f"‚úì Sampled {len(sample_df)} papers for preprocessing")

In [None]:
# preprocess all abstracts
print(f"\nProcessing {len(sample_df)} abstracts...\n")

processed_data = []

for idx, row in tqdm(sample_df.iterrows(), total=len(sample_df)):
    result = preprocess_text(row['abstract'])
    
    processed_data.append({
        'paper_id': row['paper_id'],
        'title': row['title'],
        'original_abstract': row['abstract'],
        'cleaned_text': result['cleaned_text'],
        'tokens': result['tokens'],
        'processed_text': result['processed_text'],
        'num_tokens': len(result['tokens']),
        'source': row['source'],
        'year': row['year']
    })

processed_df = pd.DataFrame(processed_data)

print(f"\n‚úì Preprocessing complete!")

In [None]:
# statistics
print("\nPreprocessing statistics:")
print(f"  Average tokens per paper: {processed_df['num_tokens'].mean():.1f}")
print(f"  Min tokens: {processed_df['num_tokens'].min()}")
print(f"  Max tokens: {processed_df['num_tokens'].max()}")

# vocabulary
all_tokens = []
for tokens in processed_df['tokens']:
    all_tokens.extend(tokens)

token_counts = Counter(all_tokens)
print(f"\n  Vocabulary size: {len(token_counts)}")
print(f"  Total tokens: {len(all_tokens)}")

print("\n  Top 20 tokens:")
for token, count in token_counts.most_common(20):
    print(f"    {token}: {count}")

In [None]:
# save preprocessed data
processed_df.to_parquet('data/processed/preprocessed_sample_500.parquet', index=False)

# save vocabulary
vocab_data = {
    'vocab_size': len(token_counts),
    'total_tokens': len(all_tokens),
    'vocabulary': sorted(list(token_counts.keys())),
    'token_frequencies': dict(token_counts.most_common(1000))
}

with open('data/processed/vocabulary.json', 'w') as f:
    json.dump(vocab_data, f, indent=2)

print("\n‚úì Saved preprocessed data to data/processed/preprocessed_sample_500.parquet")
print("‚úì Saved vocabulary to data/processed/vocabulary.json")

---
<a id='summary'></a>
# Final Summary

Pipeline complete! Here's what we accomplished:

In [None]:
print("="*80)
print("PIPELINE SUMMARY")
print("="*80)

print("\nüì• DATA INGESTION")
print(f"  ArXiv papers:     {len(arxiv_df):,}")
print(f"  ACL papers:       {len(acl_df):,}")
print(f"  S2ORC papers:     {len(s2_df):,}")
print(f"  Total ingested:   {len(arxiv_df) + len(acl_df) + len(s2_df):,}")

print("\nüîó UNIFICATION")
print(f"  Unified papers:   {len(deduped_df):,}")
print(f"  Duplicates removed: {len(unified_df) - len(deduped_df):,}")

print("\nüßπ PREPROCESSING")
print(f"  Preprocessed:     {len(processed_df):,} papers")
print(f"  Vocabulary size:  {len(token_counts):,} tokens")
print(f"  Avg tokens/paper: {processed_df['num_tokens'].mean():.1f}")

print("\nüìÅ OUTPUT FILES")
output_files = [
    'data/raw/arxiv_papers.parquet',
    'data/raw/acl_anthology_papers.parquet',
    'data/raw/s2orc_papers.parquet',
    'data/processed/unified_papers.parquet',
    'data/processed/preprocessed_sample_500.parquet',
    'data/processed/vocabulary.json'
]

total_size = 0
for filepath in output_files:
    if os.path.exists(filepath):
        size_mb = os.path.getsize(filepath) / (1024 * 1024)
        total_size += size_mb
        print(f"  ‚úì {filepath} ({size_mb:.2f} MB)")

print(f"\n  Total storage: {total_size:.2f} MB")

print("\n" + "="*80)
print("‚úÖ PIPELINE COMPLETE!")
print("="*80)

print("\nüìä Next Steps:")
print("  1. Generate embeddings (Word2Vec, BERT)")
print("  2. Train classification models")
print("  3. Build topic models")
print("  4. Create retrieval system")
print("  5. Develop research digest interface")