# Data Exploration: Memory-Efficient & Fail-Safe Workflow

A comprehensive, production-ready pipeline for extracting features and embeddings from large datasets using chunked processing with checkpoint support for resumability.

## üìã Notebook Overview

This notebook implements a **memory-efficient, fail-safe** data processing pipeline for citation prediction tasks.

### Key Features:
- **Chunked Processing**: Process data in configurable chunks (default: 1000 records)
- **Checkpoint System**: Automatic saving of progress for resumability
- **Immediate Persistence**: Results saved to disk after each chunk
- **Memory Management**: Aggressive cleanup between operations
- **Batch Processing**: Efficient batching for embedding generation

### Processing Pipeline:
1. **Feature Extraction**: 70+ structured features from nested JSON
2. **Text Embeddings**: Multiple transformer models (Sentence-BERT, SciBERT, SPECTER2)
3. **NLP Features**: Regex-based text statistics
4. **Correlation Analysis**: Identify highly correlated features

### Output Structure:
```
data/results/
‚îú‚îÄ‚îÄ X_train.parquet          # Feature matrices
‚îú‚îÄ‚îÄ X_val.parquet
‚îú‚îÄ‚îÄ X_test.parquet
‚îú‚îÄ‚îÄ y_train.npy              # Labels
‚îú‚îÄ‚îÄ y_val.npy
‚îú‚îÄ‚îÄ sent_transformer_*.parquet  # Embeddings
‚îú‚îÄ‚îÄ scibert_*.parquet
‚îî‚îÄ‚îÄ specter2_*.parquet
```

## üìë Table of Contents

1. **[Configuration & Setup](#1-configuration--setup)** - Environment setup and parameters
2. **[Infrastructure Components](#2-infrastructure-components)** - Checkpoint manager and utilities
3. **[Data Loading System](#3-data-loading-system)** - Chunked data reader
4. **[Feature Extraction](#4-feature-extraction)** - Comprehensive feature engineering
5. **[Embedding Generation](#5-embedding-generation)** - Transformer-based embeddings
6. **[Processing Pipeline](#6-processing-pipeline)** - Main execution flow
7. **[Results & Summary](#7-results--summary)** - Output validation and metrics

## 1. Configuration & Setup

Initialize environment, import libraries, and set processing parameters.

In [1]:
# Standard library imports
import json
import os
import datetime
import gc
import re
import warnings
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union
from collections import defaultdict

# Data processing libraries
import numpy as np
import polars as pl
from tqdm.auto import tqdm

# Deep learning libraries
import torch
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

print("‚úÖ Libraries imported successfully")

‚úÖ Libraries imported successfully


In [2]:
# ============================================================================
# PROCESSING CONFIGURATION
# ============================================================================

# Memory management settings
CHUNK_SIZE = 1000        # Records per chunk (adjust based on RAM)
BATCH_SIZE = 32          # Batch size for embeddings
MAX_LENGTH = 512         # Maximum token length for transformers

# Data sampling (use 1.0 for full dataset)
SAMPLE_PCT = 1.0      # 0.05% for testing, 1.0 for production

# Device configuration
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
N_CORES = os.cpu_count() or 4

print(f"üîß Configuration:")
print(f"  Device: {DEVICE}")
print(f"  CPU cores: {N_CORES}")
print(f"  Chunk size: {CHUNK_SIZE:,} records")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Sample: {SAMPLE_PCT:.2%}")
print(f"  Max tokens: {MAX_LENGTH}")

üîß Configuration:
  Device: cuda
  CPU cores: 40
  Chunk size: 1,000 records
  Batch size: 32
  Sample: 100.00%
  Max tokens: 512


In [3]:
# ============================================================================
# DIRECTORY STRUCTURE
# ============================================================================

# Determine project root dynamically
PROJECT_ROOT = _current_dir = Path(os.getcwd()).parent.parent
print(f"  Current directory: {_current_dir}")
if (_current_dir / 'data').exists():
    PROJECT_ROOT = _current_dir
else:
    PROJECT_ROOT = Path.cwd()

# Define directory paths
DATA_DIR = PROJECT_ROOT / 'data/processed'
RESULTS_DIR = PROJECT_ROOT / 'data/results'
TEMP_DIR = PROJECT_ROOT / 'data/temp'
CHECKPOINT_DIR = PROJECT_ROOT / 'data/checkpoints'

# Create directories if they don't exist
for directory in [RESULTS_DIR, TEMP_DIR, CHECKPOINT_DIR]:
    directory.mkdir(exist_ok=True, parents=True)

print(f"üìÅ Directory Structure:")
print(f"  Project root: {PROJECT_ROOT}")
print(f"  Data: {DATA_DIR}")
print(f"  Results: {RESULTS_DIR}")
print(f"  Temp: {TEMP_DIR}")
print(f"  Checkpoints: {CHECKPOINT_DIR}")

  Current directory: /gpfs/accounts/si670f25_class_root/si670f25_class
üìÅ Directory Structure:
  Project root: /gpfs/accounts/si670f25_class_root/si670f25_class/santoshd/Kaggle_2
  Data: /gpfs/accounts/si670f25_class_root/si670f25_class/santoshd/Kaggle_2/data/processed
  Results: /gpfs/accounts/si670f25_class_root/si670f25_class/santoshd/Kaggle_2/data/results
  Temp: /gpfs/accounts/si670f25_class_root/si670f25_class/santoshd/Kaggle_2/data/temp
  Checkpoints: /gpfs/accounts/si670f25_class_root/si670f25_class/santoshd/Kaggle_2/data/checkpoints


## 2. Infrastructure Components

Core utilities for memory management, checkpointing, and data handling.

In [4]:
def cleanup_memory():
    """Aggressive memory cleanup for both CPU and GPU."""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()  # Second pass for thorough cleanup

def memory_usage():
    """Display current memory usage statistics."""
    try:
        import psutil
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        print(f"üíæ Memory: {mem_info.rss / 1024**3:.2f} GB (RAM)", end="")
        
        if torch.cuda.is_available():
            gpu_mem = torch.cuda.memory_allocated() / 1024**3
            gpu_reserved = torch.cuda.memory_reserved() / 1024**3
            print(f" | {gpu_mem:.2f}/{gpu_reserved:.2f} GB (GPU used/reserved)")
        else:
            print()
    except ImportError:
        print("üíæ Memory tracking requires psutil: pip install psutil")

print("‚úÖ Memory utilities defined")
memory_usage()

‚úÖ Memory utilities defined
üíæ Memory: 0.70 GB (RAM) | 0.00/0.00 GB (GPU used/reserved)


In [5]:
class CheckpointManager:
    """Manages processing checkpoints for fault tolerance and resumability."""
    
    def __init__(self, checkpoint_dir: Path):
        """Initialize checkpoint manager with specified directory."""
        self.checkpoint_dir = checkpoint_dir
        self.checkpoint_file = checkpoint_dir / 'processing_checkpoint.json'
    
    def save_progress(self, dataset: str, task: str, chunk_idx: int, total_chunks: int):
        """Save current processing progress to checkpoint file."""
        checkpoint = self.load_checkpoint()
        
        if dataset not in checkpoint:
            checkpoint[dataset] = {}
        
        checkpoint[dataset][task] = {
            'completed_chunks': chunk_idx,
            'total_chunks': total_chunks,
            'status': 'completed' if chunk_idx >= total_chunks else 'in_progress',
            'timestamp': str(datetime.datetime.now())
        }
        
        with open(self.checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)
    
    def load_checkpoint(self) -> Dict:
        """Load existing checkpoint or create new one."""
        if self.checkpoint_file.exists():
            with open(self.checkpoint_file, 'r') as f:
                return json.load(f)
        return {}
    
    def get_resume_point(self, dataset: str, task: str) -> int:
        """Get the chunk index to resume from for a specific task."""
        checkpoint = self.load_checkpoint()
        if dataset in checkpoint and task in checkpoint[dataset]:
            return checkpoint[dataset][task].get('completed_chunks', 0)
        return 0
    
    def is_task_completed(self, dataset: str, task: str) -> bool:
        """Check if a specific task has been completed."""
        checkpoint = self.load_checkpoint()
        if dataset in checkpoint and task in checkpoint[dataset]:
            return checkpoint[dataset][task].get('status') == 'completed'
        return False
    
    def clear_checkpoint(self):
        """Clear all checkpoints (use for fresh start)."""
        if self.checkpoint_file.exists():
            self.checkpoint_file.unlink()
            print("üóëÔ∏è Checkpoints cleared")

# Initialize global checkpoint manager
checkpoint_mgr = CheckpointManager(CHECKPOINT_DIR)
print("‚úÖ Checkpoint manager initialized")

# Display current checkpoint status
checkpoint_data = checkpoint_mgr.load_checkpoint()
if checkpoint_data:
    print("üìå Existing checkpoints found:")
    for dataset, tasks in checkpoint_data.items():
        print(f"  {dataset}: {', '.join(tasks.keys())}")
else:
    print("üìå No existing checkpoints (fresh start)")

‚úÖ Checkpoint manager initialized
üìå Existing checkpoints found:
  train: embeddings_sent_transformer, feature_extraction
  val: embeddings_sent_transformer
  test: embeddings_sent_transformer


In [6]:
def safe_extract(data: Any, keys: List[str], default: Any = None) -> Any:
    """Safely extract nested values from dictionaries.
    
    Args:
        data: Dictionary or nested structure
        keys: List of keys to traverse
        default: Default value if extraction fails
    
    Returns:
        Extracted value or default
    """
    try:
        result = data
        for key in keys:
            if isinstance(result, dict):
                result = result.get(key, default)
            elif isinstance(result, list) and key.isdigit():
                idx = int(key)
                result = result[idx] if 0 <= idx < len(result) else default
            else:
                return default
        return result if result is not None else default
    except (KeyError, IndexError, TypeError, ValueError):
        return default

def clean_text(text: str, max_chars: int = 10000) -> str:
    """Clean and normalize text for processing.
    
    Args:
        text: Input text string
        max_chars: Maximum characters to keep
    
    Returns:
        Cleaned text string
    """
    if not text or not isinstance(text, str):
        return ""
    
    # Normalize whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove special characters but keep basic punctuation
    text = re.sub(r'[^\w\s.,!?;:\'"()-]', '', text)
    
    # Truncate if too long
    if len(text) > max_chars:
        text = text[:max_chars] + "..."
    
    return text.strip()

def merge_chunk_files(chunk_pattern: str, output_file: Path, cleanup: bool = True):
    """Merge multiple chunk files into a single parquet file.
    
    Args:
        chunk_pattern: Pattern to match chunk files
        output_file: Path for merged output
        cleanup: Whether to delete chunks after merging
    """
    chunk_files = sorted(TEMP_DIR.glob(chunk_pattern))
    
    if not chunk_files:
        print(f"‚ö†Ô∏è No chunk files found: {chunk_pattern}")
        return
    
    print(f"üìÇ Merging {len(chunk_files)} chunks...")
    
    # Read and concatenate
    dfs = [pl.read_parquet(f) for f in tqdm(chunk_files, desc="Reading")]
    merged_df = pl.concat(dfs)
    
    # Save merged file
    merged_df.write_parquet(output_file)
    print(f"‚úÖ Saved: {output_file.name} ({merged_df.shape})")
    
    # Cleanup
    if cleanup:
        for f in chunk_files:
            f.unlink()
        print(f"üßπ Cleaned {len(chunk_files)} temp files")
    
    del dfs, merged_df
    cleanup_memory()

print("‚úÖ Data utilities defined")

‚úÖ Data utilities defined


## 3. Data Loading System

Chunked data reader for memory-efficient processing of large datasets.

In [7]:
class ChunkedDataReader:
    """Memory-efficient data reader with chunking and sampling support."""
    
    def __init__(self, file_path: Path, chunk_size: int = CHUNK_SIZE, sample_pct: float = 1.0):
        """Initialize chunked reader.
        
        Args:
            file_path: Path to parquet file
            chunk_size: Records per chunk
            sample_pct: Percentage to sample (0-1)
        """
        self.file_path = file_path
        self.chunk_size = chunk_size
        self.sample_pct = sample_pct
        
        # Get total row count efficiently
        if sample_pct < 1.0:
            # For sampling, we need to read the full file once
            df_temp = pl.read_parquet(file_path)
            self.total_rows = len(df_temp)
            del df_temp
        else:
            # For full dataset, use lazy evaluation
            lazy_df = pl.scan_parquet(file_path)
            self.total_rows = lazy_df.select(pl.count()).collect()[0, 0]
        
        self.sample_size = int(self.total_rows * sample_pct)
        self.num_chunks = (self.sample_size + chunk_size - 1) // chunk_size
        
        print(f"üìä Data: {file_path.name}")
        print(f"   Total: {self.total_rows:,} rows")
        print(f"   Sample: {self.sample_size:,} rows ({sample_pct:.2%})")
        print(f"   Chunks: {self.num_chunks}")
    
    def read_chunks(self):
        """Generator yielding data chunks.
        
        Yields:
            Polars DataFrame chunks
        """
        if self.sample_pct < 1.0:
            # Sample from full dataset
            df = pl.read_parquet(self.file_path)
            df = df.sample(n=self.sample_size, seed=42)
            
            # Yield chunks
            for i in range(0, len(df), self.chunk_size):
                chunk = df[i:i+self.chunk_size]
                yield chunk
                del chunk
                cleanup_memory()
            
            del df
        else:
            # Read chunks directly from file using scan_parquet with slice
            for i in range(0, self.total_rows, self.chunk_size):
                chunk_size_actual = min(self.chunk_size, self.total_rows - i)
                chunk = pl.scan_parquet(self.file_path).slice(i, chunk_size_actual).collect()
                yield chunk
                del chunk
                cleanup_memory()

print("‚úÖ ChunkedDataReader defined")

‚úÖ ChunkedDataReader defined


In [8]:
# Validate data files
data_files = {
    'train': DATA_DIR / 'train.parquet',
    'val': DATA_DIR / 'val.parquet',
    'test': DATA_DIR / 'test.parquet'
}

print("üìÅ Data File Status:")
data_available = True

for name, path in data_files.items():
    if path.exists():
        size_mb = path.stat().st_size / (1024 * 1024)
        print(f"  ‚úÖ {name:5s}: {path.name:20s} ({size_mb:8.2f} MB)")
    else:
        print(f"  ‚ùå {name:5s}: {path}")
        data_available = False

if not data_available:
    print("\n‚ö†Ô∏è Missing data files. Please ensure all files are present.")
else:
    print("\n‚úÖ All data files found and ready for processing")

üìÅ Data File Status:
  ‚úÖ train: train.parquet        (  859.24 MB)
  ‚úÖ val  : val.parquet          (  107.52 MB)
  ‚úÖ test : test.parquet         (  107.22 MB)

‚úÖ All data files found and ready for processing


## 4. Feature Extraction

Extract structured features from nested JSON data.

In [9]:
def extract_features(data: Dict[str, Any]) -> Dict[str, Any]:
    """Extract comprehensive features from paper data.
    
    Args:
        data: Dictionary containing paper information
    
    Returns:
        Dictionary of extracted features
    """
    features = {}
    
    # === Basic Metadata ===
    features['publication_year'] = safe_extract(data, ['publication_year'], 0)
    features['type'] = safe_extract(data, ['type'], 'unknown')
    features['language'] = safe_extract(data, ['language'], 'unknown')
    
    # === Text Features ===
    title = safe_extract(data, ['title'], '')
    abstract = safe_extract(data, ['abstract'], '')
    features['title_length'] = len(title) if title else 0
    features['abstract_length'] = len(abstract) if abstract else 0
    features['has_abstract'] = 1 if abstract else 0
    features['title_word_count'] = len(title.split()) if title else 0
    features['abstract_word_count'] = len(abstract.split()) if abstract else 0
    
    # === Author Features ===
    authorships = safe_extract(data, ['authorships'], [])
    features['num_authors'] = len(authorships) if authorships else 0
    
    # Extract unique institutions
    institutions = set()
    for auth in (authorships or []):
        for inst in safe_extract(auth, ['institutions'], []):
            if inst.get('id'):
                institutions.add(inst['id'])
    features['num_institutions'] = len(institutions)
    
    # First author metrics
    if authorships and len(authorships) > 0:
        first = authorships[0]
        features['first_author_h_index'] = safe_extract(
            first, ['author', 'summary_stats', '2yr_h_index'], 0
        )
        features['first_author_citations'] = safe_extract(
            first, ['author', 'summary_stats', '2yr_citedness_count'], 0
        )
        features['first_author_papers'] = safe_extract(
            first, ['author', 'summary_stats', '2yr_works_count'], 0
        )
    else:
        features['first_author_h_index'] = 0
        features['first_author_citations'] = 0
        features['first_author_papers'] = 0
    
    # Aggregate author metrics
    if authorships:
        h_indices = []
        citations = []
        for auth in authorships:
            h_indices.append(safe_extract(auth, ['author', 'summary_stats', '2yr_h_index'], 0))
            citations.append(safe_extract(auth, ['author', 'summary_stats', '2yr_citedness_count'], 0))
        
        features['max_author_h_index'] = max(h_indices) if h_indices else 0
        features['avg_author_h_index'] = np.mean(h_indices) if h_indices else 0
        features['total_author_citations'] = sum(citations)
        features['avg_author_citations'] = np.mean(citations) if citations else 0
    else:
        features['max_author_h_index'] = 0
        features['avg_author_h_index'] = 0
        features['total_author_citations'] = 0
        features['avg_author_citations'] = 0
    
    # === Venue Features ===
    primary_location = safe_extract(data, ['primary_location'], {})
    if primary_location:
        source = safe_extract(primary_location, ['source'], {})
        features['venue_impact_factor'] = safe_extract(
            source, ['summary_stats', '2yr_impact_factor'], 0
        )
        features['venue_h_index'] = safe_extract(
            source, ['summary_stats', '2yr_h_index'], 0
        )
        features['venue_citations'] = safe_extract(
            source, ['summary_stats', '2yr_cited_by_count'], 0
        )
        features['is_oa_venue'] = 1 if safe_extract(source, ['is_oa'], False) else 0
        features['is_in_doaj'] = 1 if safe_extract(source, ['is_in_doaj'], False) else 0
    else:
        features['venue_impact_factor'] = 0
        features['venue_h_index'] = 0
        features['venue_citations'] = 0
        features['is_oa_venue'] = 0
        features['is_in_doaj'] = 0
    
    # === Concept Features ===
    concepts = safe_extract(data, ['concepts'], [])
    features['num_concepts'] = len(concepts) if concepts else 0
    
    if concepts:
        scores = [c.get('score', 0) for c in concepts]
        features['max_concept_score'] = max(scores) if scores else 0
        features['avg_concept_score'] = np.mean(scores) if scores else 0
        features['num_high_concepts'] = sum(1 for s in scores if s >= 0.5)
    else:
        features['max_concept_score'] = 0
        features['avg_concept_score'] = 0
        features['num_high_concepts'] = 0
    
    # === Open Access Features ===
    open_access = safe_extract(data, ['open_access'], {})
    features['is_oa'] = 1 if safe_extract(open_access, ['is_oa'], False) else 0
    oa_status = safe_extract(open_access, ['oa_status'], 'closed')
    features[f'oa_status_{oa_status}'] = 1
    
    # === Reference Features ===
    features['num_references'] = len(safe_extract(data, ['referenced_works'], []))
    features['num_related_works'] = len(safe_extract(data, ['related_works'], []))
    
    # === Identifier Features ===
    ids = safe_extract(data, ['ids'], {})
    features['has_doi'] = 1 if safe_extract(ids, ['doi']) else 0
    features['has_pmid'] = 1 if safe_extract(ids, ['pmid']) else 0
    features['has_pmcid'] = 1 if safe_extract(ids, ['pmcid']) else 0
    
    # === Grant Features ===
    grants = safe_extract(data, ['grants'], [])
    features['num_grants'] = len(grants) if grants else 0
    features['has_grants'] = 1 if grants else 0
    
    # === Location Features ===
    features['num_locations'] = len(safe_extract(data, ['locations'], []))
    
    # === Topic Features ===
    topics = safe_extract(data, ['topics'], [])
    features['num_topics'] = len(topics) if topics else 0
    
    if topics:
        scores = [t.get('score', 0) for t in topics]
        features['max_topic_score'] = max(scores) if scores else 0
        features['avg_topic_score'] = np.mean(scores) if scores else 0
    else:
        features['max_topic_score'] = 0
        features['avg_topic_score'] = 0
    
    # === Keyword Features ===
    features['num_keywords'] = len(safe_extract(data, ['keywords'], []))
    
    # === Mesh Terms ===
    features['num_mesh_terms'] = len(safe_extract(data, ['mesh'], []))
    
    # === SDG Features ===
    sdgs = safe_extract(data, ['sustainable_development_goals'], [])
    features['num_sdgs'] = len(sdgs) if sdgs else 0
    features['has_sdgs'] = 1 if sdgs else 0
    
    return features

print("‚úÖ Feature extraction function defined")
print(f"   Extracting {len(extract_features({}))} features per record")

‚úÖ Feature extraction function defined
   Extracting 43 features per record


In [10]:
def extract_nlp_features(text: str) -> Dict[str, float]:
    """Extract NLP-based statistical features from text.
    
    Args:
        text: Input text string
    
    Returns:
        Dictionary of NLP features
    """
    if not text or not isinstance(text, str):
        return {
            'nlp_word_count': 0,
            'nlp_char_count': 0,
            'nlp_sentence_count': 0,
            'nlp_avg_word_length': 0,
            'nlp_avg_sentence_length': 0,
            'nlp_capital_ratio': 0,
            'nlp_number_ratio': 0,
            'nlp_punctuation_ratio': 0
        }
    
    features = {}
    
    # Basic counts
    features['nlp_char_count'] = len(text)
    
    # Word analysis
    words = re.findall(r'\b\w+\b', text)
    features['nlp_word_count'] = len(words)
    features['nlp_avg_word_length'] = np.mean([len(w) for w in words]) if words else 0
    
    # Sentence analysis
    sentences = re.split(r'[.!?]+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    features['nlp_sentence_count'] = len(sentences)
    features['nlp_avg_sentence_length'] = (
        np.mean([len(s.split()) for s in sentences]) if sentences else 0
    )
    
    # Character ratios
    if len(text) > 0:
        features['nlp_capital_ratio'] = len(re.findall(r'[A-Z]', text)) / len(text)
        features['nlp_number_ratio'] = len(re.findall(r'\d', text)) / len(text)
        features['nlp_punctuation_ratio'] = len(re.findall(r'[.,;:!?]', text)) / len(text)
    else:
        features['nlp_capital_ratio'] = 0
        features['nlp_number_ratio'] = 0
        features['nlp_punctuation_ratio'] = 0
    
    return features

print("‚úÖ NLP feature extraction defined")

‚úÖ NLP feature extraction defined


In [11]:
def process_features_chunked(file_path: Path, dataset_name: str, has_label: bool = True):
    """Process features in chunks with checkpoint support.
    
    Args:
        file_path: Path to input parquet file
        dataset_name: Name of dataset (train/val/test)
        has_label: Whether dataset contains labels
    """
    task_name = 'feature_extraction'
    
    # Check if already completed
    if checkpoint_mgr.is_task_completed(dataset_name, task_name):
        print(f"‚úÖ {dataset_name} features already extracted")
        return
    
    print(f"\n{'='*80}")
    print(f"üìä EXTRACTING FEATURES: {dataset_name.upper()}")
    print(f"{'='*80}")
    
    # Initialize reader
    reader = ChunkedDataReader(file_path, chunk_size=CHUNK_SIZE, sample_pct=SAMPLE_PCT)
    
    # Get resume point
    start_chunk = checkpoint_mgr.get_resume_point(dataset_name, task_name)
    if start_chunk > 0:
        print(f"üìå Resuming from chunk {start_chunk}/{reader.num_chunks}")
    
    # Process chunks
    labels = [] if has_label else None
    
    for chunk_idx, chunk_df in enumerate(reader.read_chunks()):
        if chunk_idx < start_chunk:
            continue
        
        print(f"\nüîÑ Chunk {chunk_idx + 1}/{reader.num_chunks}")
        memory_usage()
        
        features_list = []
        
        # Extract features for each record
        for row in tqdm(chunk_df.iter_rows(named=True), 
                        total=len(chunk_df), 
                        desc="Extracting"):
            
            # The parquet file has fields as top-level columns, not nested in 'data'
            # Convert row (which is a dict from iter_rows(named=True)) to use directly
            data = dict(row) if isinstance(row, dict) else row
            
            # Extract structured features
            features = extract_features(data)
            
            # Extract NLP features
            title = safe_extract(data, ['title'], '')
            abstract = safe_extract(data, ['abstract'], '')
            combined_text = f"{title} {abstract}"
            nlp_features = extract_nlp_features(combined_text)
            features.update(nlp_features)
            
            # Add ID (ensure we get it from the row)
            features['id'] = data.get('id') if isinstance(data, dict) else getattr(data, 'id', None)
            features_list.append(features)
            
            # Collect labels
            if has_label:
                labels.append(row.get('label', 0))
        
        # Convert to DataFrame
        # Note: Missing values will be handled by proper imputation in data_exploration_next_steps.ipynb
        chunk_features_df = pl.DataFrame(features_list)
        
        # Save chunk
        chunk_file = TEMP_DIR / f'features_{dataset_name}_chunk_{chunk_idx:04d}.parquet'
        chunk_features_df.write_parquet(chunk_file)
        print(f"üíæ Saved: {chunk_file.name}")
        
        # Update checkpoint
        checkpoint_mgr.save_progress(dataset_name, task_name, chunk_idx + 1, reader.num_chunks)
        
        # Cleanup
        del chunk_df, features_list, chunk_features_df
        cleanup_memory()
    
    # Merge chunks
    print(f"\nüìÇ Merging chunks...")
    output_file = RESULTS_DIR / f'X_{dataset_name}.parquet'
    merge_chunk_files(f'features_{dataset_name}_chunk_*.parquet', output_file)
    
    # Save labels
    if has_label and labels:
        labels_file = RESULTS_DIR / f'y_{dataset_name}.npy'
        np.save(labels_file, np.array(labels))
        print(f"üíæ Saved labels: {labels_file.name}")
    
    print(f"‚úÖ {dataset_name} feature extraction complete!")
    cleanup_memory()

print("‚úÖ Chunked feature processor defined")

‚úÖ Chunked feature processor defined


## 5. Embedding Generation

Generate text embeddings using transformer models.

In [12]:
def get_text_for_embedding(data: Dict[str, Any]) -> str:
    """Extract and combine text for embedding generation.
    
    Args:
        data: Paper data dictionary
    
    Returns:
        Combined text string
    """
    title = safe_extract(data, ['title'], '')
    abstract = safe_extract(data, ['abstract'], '')
    
    # Combine with separator
    text = f"{title}. {abstract}" if abstract else title
    
    return clean_text(text)

print("‚úÖ Embedding utilities defined")

‚úÖ Embedding utilities defined


In [13]:
# Model loading functions

def load_sentence_transformer():
    """Load Sentence Transformer model."""
    model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
    model.to(DEVICE)
    return model, None

def embed_sentence_transformer(model, tokenizer, texts):
    """Generate embeddings using Sentence Transformer."""
    return model.encode(
        texts,
        batch_size=len(texts),
        show_progress_bar=False,
        convert_to_numpy=True
    )

def load_scibert():
    """Load SciBERT model."""
    tokenizer = AutoTokenizer.from_pretrained('allenai/scibert_scivocab_uncased')
    model = AutoModel.from_pretrained('allenai/scibert_scivocab_uncased').to(DEVICE)
    return model, tokenizer

def load_specter2():
    """Load SPECTER2 model."""
    tokenizer = AutoTokenizer.from_pretrained('allenai/specter2_base')
    model = AutoModel.from_pretrained('allenai/specter2_base').to(DEVICE)
    return model, tokenizer

def embed_huggingface(model, tokenizer, texts):
    """Generate embeddings using HuggingFace model."""
    inputs = tokenizer(
        texts,
        padding=True,
        truncation=True,
        max_length=MAX_LENGTH,
        return_tensors='pt'
    ).to(DEVICE)
    
    with torch.no_grad():
        outputs = model(**inputs)
        # Mean pooling
        embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
    
    return embeddings

print("‚úÖ Model loaders defined")
print("   Available models:")
print("   - Sentence Transformer (384 dims)")
print("   - SciBERT (768 dims)")
print("   - SPECTER2 (768 dims)")

‚úÖ Model loaders defined
   Available models:
   - Sentence Transformer (384 dims)
   - SciBERT (768 dims)
   - SPECTER2 (768 dims)


In [14]:
def process_embeddings_chunked(
    file_path: Path,
    dataset_name: str,
    model_name: str,
    model_loader_func,
    embedding_func,
    embedding_dim: int
):
    """Process embeddings in chunks with checkpoint support.
    
    Args:
        file_path: Path to input parquet file
        dataset_name: Dataset name (train/val/test)
        model_name: Name of embedding model
        model_loader_func: Function to load model
        embedding_func: Function to generate embeddings
        embedding_dim: Dimension of embeddings
    """
    task_name = f'embeddings_{model_name}'
    
    # Check if already completed
    if checkpoint_mgr.is_task_completed(dataset_name, task_name):
        print(f"‚úÖ {dataset_name} {model_name} embeddings already extracted")
        return
    
    print(f"\n{'='*80}")
    print(f"üß† EXTRACTING {model_name.upper()} EMBEDDINGS: {dataset_name.upper()}")
    print(f"{'='*80}")
    
    # Load model
    print(f"Loading {model_name} model...")
    model, tokenizer = model_loader_func()
    
    # Initialize reader
    reader = ChunkedDataReader(file_path, chunk_size=CHUNK_SIZE, sample_pct=SAMPLE_PCT)
    
    # Get resume point
    start_chunk = checkpoint_mgr.get_resume_point(dataset_name, task_name)
    if start_chunk > 0:
        print(f"üìå Resuming from chunk {start_chunk}/{reader.num_chunks}")
    
    # Process chunks
    for chunk_idx, chunk_df in enumerate(reader.read_chunks()):
        if chunk_idx < start_chunk:
            continue
        
        print(f"\nüîÑ Chunk {chunk_idx + 1}/{reader.num_chunks}")
        memory_usage()
        
        # Extract texts and IDs
        texts = []
        ids = []
        
        for row in chunk_df.iter_rows(named=True):
            # The parquet file has fields as top-level columns, not nested in 'data'
            # Convert row to dict to use directly
            data = dict(row) if isinstance(row, dict) else row
            text = get_text_for_embedding(data)
            texts.append(text if text else "")
            ids.append(data.get('id') if isinstance(data, dict) else getattr(data, 'id', None))
        
        # Generate embeddings in batches
        embeddings = []
        
        for i in tqdm(range(0, len(texts), BATCH_SIZE), desc="Embedding"):
            batch_texts = texts[i:i+BATCH_SIZE]
            batch_embeddings = embedding_func(model, tokenizer, batch_texts)
            embeddings.append(batch_embeddings)
            cleanup_memory()
        
        # Combine embeddings
        embeddings = np.vstack(embeddings)
        
        # Create DataFrame
        embedding_cols = {
            f'{model_name}_dim_{i}': embeddings[:, i] 
            for i in range(embedding_dim)
        }
        embedding_cols['id'] = ids
        
        chunk_embeddings_df = pl.DataFrame(embedding_cols)
        
        # Save chunk
        chunk_file = TEMP_DIR / f'{model_name}_{dataset_name}_chunk_{chunk_idx:04d}.parquet'
        chunk_embeddings_df.write_parquet(chunk_file)
        print(f"üíæ Saved: {chunk_file.name}")
        
        # Update checkpoint
        checkpoint_mgr.save_progress(dataset_name, task_name, chunk_idx + 1, reader.num_chunks)
        
        # Cleanup
        del chunk_df, texts, ids, embeddings, chunk_embeddings_df
        cleanup_memory()
    
    # Clean up model
    del model
    if tokenizer:
        del tokenizer
    cleanup_memory()
    
    # Merge chunks
    print(f"\nüìÇ Merging chunks...")
    output_file = RESULTS_DIR / f'{model_name}_X_{dataset_name}.parquet'
    merge_chunk_files(f'{model_name}_{dataset_name}_chunk_*.parquet', output_file)
    
    print(f"‚úÖ {dataset_name} {model_name} embeddings complete!")
    cleanup_memory()

print("‚úÖ Chunked embedding processor defined")

‚úÖ Chunked embedding processor defined


## 6. Processing Pipeline

Execute the complete processing pipeline.

In [None]:
# === FEATURE EXTRACTION ===
if data_available:
    print("\n" + "="*80)
    print("üöÄ STARTING FEATURE EXTRACTION PIPELINE")
    print("="*80)
    
    # Process each dataset
    process_features_chunked(data_files['train'], 'train', has_label=True)
    process_features_chunked(data_files['val'], 'val', has_label=True)
    process_features_chunked(data_files['test'], 'test', has_label=False)
    
    print("\n‚úÖ Feature extraction complete!")
    memory_usage()
else:
    print("‚ö†Ô∏è Skipping feature extraction - data files missing")

In [None]:
# === SENTENCE TRANSFORMER EMBEDDINGS ===
if data_available:
    print("\n" + "="*80)
    print("üöÄ STARTING SENTENCE TRANSFORMER EMBEDDINGS")
    print("="*80)
    
    for dataset_name, file_path in [('train', data_files['train']),
                                    ('val', data_files['val']),
                                    ('test', data_files['test'])]:
        process_embeddings_chunked(
            file_path=file_path,
            dataset_name=dataset_name,
            model_name='sent_transformer',
            model_loader_func=load_sentence_transformer,
            embedding_func=embed_sentence_transformer,
            embedding_dim=384
        )
    
    print("\n‚úÖ Sentence Transformer embeddings complete!")
    memory_usage()
else:
    print("‚ö†Ô∏è Skipping Sentence Transformer - data files missing")

In [None]:
# === SCIBERT EMBEDDINGS ===
if data_available:
    print("\n" + "="*80)
    print("üöÄ STARTING SCIBERT EMBEDDINGS")
    print("="*80)
    
    for dataset_name, file_path in [('train', data_files['train']),
                                    ('val', data_files['val']),
                                    ('test', data_files['test'])]:
        process_embeddings_chunked(
            file_path=file_path,
            dataset_name=dataset_name,
            model_name='scibert',
            model_loader_func=load_scibert,
            embedding_func=embed_huggingface,
            embedding_dim=768
        )
    
    print("\n‚úÖ SciBERT embeddings complete!")
    memory_usage()
else:
    print("‚ö†Ô∏è Skipping SciBERT - data files missing")

In [None]:
# === SPECTER2 EMBEDDINGS (Optional) ===
if data_available:  # Set to True to enable SPECTER2
    print("\n" + "="*80)
    print("üöÄ STARTING SPECTER2 EMBEDDINGS")
    print("="*80)
    
    try:
        for dataset_name, file_path in [('train', data_files['train']),
                                        ('val', data_files['val']),
                                        ('test', data_files['test'])]:
            process_embeddings_chunked(
                file_path=file_path,
                dataset_name=dataset_name,
                model_name='specter2',
                model_loader_func=load_specter2,
                embedding_func=embed_huggingface,
                embedding_dim=768
            )
        
        print("\n‚úÖ SPECTER2 embeddings complete!")
        memory_usage()
    except Exception as e:
        print(f"‚ö†Ô∏è SPECTER2 failed: {e}")
else:
    print("‚ÑπÔ∏è SPECTER2 embeddings disabled (set to True in cell to enable)")

## 7. Results & Summary

Validate outputs and display processing summary.

In [None]:
def cleanup_temp_directory():
    """Clean up temporary directory."""
    if TEMP_DIR.exists():
        remaining_files = list(TEMP_DIR.glob('*.parquet'))
        if remaining_files:
            print(f"üßπ Cleaning {len(remaining_files)} temp files...")
            for file in remaining_files:
                file.unlink()
            print("‚úÖ Temp files cleaned")
        else:
            print("‚úÖ No temp files to clean")

cleanup_temp_directory()

In [None]:
# === FINAL SUMMARY ===
if data_available:
    print("\n" + "="*80)
    print("üéâ PROCESSING COMPLETE!")
    print("="*80)
    
    # List output files
    print(f"\nüìä Output Directory: {RESULTS_DIR}")
    
    result_files = sorted(RESULTS_DIR.glob('*.parquet')) + sorted(RESULTS_DIR.glob('*.npy'))
    
    if result_files:
        print("\nüìÅ Generated Files:")
        total_size = 0
        
        for file in result_files:
            size_mb = file.stat().st_size / (1024 * 1024)
            total_size += size_mb
            print(f"  {file.name:40s} {size_mb:8.2f} MB")
        
        print(f"\n  Total size: {total_size:.2f} MB")
    
    # Checkpoint status
    checkpoint = checkpoint_mgr.load_checkpoint()
    if checkpoint:
        print("\nüìå Processing Status:")
        for dataset in checkpoint:
            print(f"\n  {dataset}:")
            for task, info in checkpoint[dataset].items():
                status = '‚úÖ' if info['status'] == 'completed' else '‚è∏Ô∏è'
                progress = f"{info['completed_chunks']}/{info['total_chunks']}"
                print(f"    {status} {task:30s} {progress:>10s} chunks")
    
    # Configuration summary
    print("\n‚öôÔ∏è Configuration Used:")
    print(f"  Sample: {SAMPLE_PCT:.2%}")
    print(f"  Chunk size: {CHUNK_SIZE:,} records")
    print(f"  Batch size: {BATCH_SIZE}")
    print(f"  Device: {DEVICE}")
    
    # Next steps
    print("\nüí° Next Steps:")
    print("  1. Use generated features/embeddings for model training")
    print("  2. Processing is resumable - can interrupt safely")
    print("  3. To reprocess: checkpoint_mgr.clear_checkpoint()")
    print("  4. For production: set SAMPLE_PCT = 1.0")
    
    memory_usage()
else:
    print("\n‚ö†Ô∏è Processing skipped - missing data files")
    print("Please ensure train.parquet, val.parquet, and test.parquet")
    print(f"are present in: {DATA_DIR}")