# Daily arXiv Embeddings Pipeline

This notebook fetches daily arXiv preprints via RSS, generates embeddings using sentence-transformers, and stores them in Hopsworks Feature Store.

## 1. Setup and Imports

In [1]:
!pip install -q feedparser sentence-transformers "hopsworks[python]==4.2.*"

# Restart runtime after install (required for numpy compatibility)
#import os
#os.kill(os.getpid(), 9)

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m665.0/665.0 kB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.2/44.2 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m75.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.7/11.7 MB[0m [31m111.7 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0mm
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-colab 1.0.0 requires pandas==2.2.2, but

In [2]:
import os
import re
import time
import json
import urllib.request
import feedparser
import pandas as pd
import numpy as np
import pyarrow as pa
from datetime import datetime
from sentence_transformers import SentenceTransformer
import hopsworks



## 2. Configuration

In [3]:
# =============================================================================
# arXiv Category Configuration
# =============================================================================
# Fetch categories dynamically from the official arXiv taxonomy
# Each category is fetched individually with rate limiting (5s delay)

ARXIV_TAXONOMY_URL = "https://raw.githubusercontent.com/arXiv/arxiv-canonical/407cb0b2cef83c7f653dabdf998e797b18475b13/schema-extended/Taxonomy.json"

def fetch_arxiv_categories(taxonomy_url: str) -> list[str]:
    """Fetch all active arXiv categories from the official taxonomy."""
    print(f"Fetching taxonomy from: {taxonomy_url}")
    with urllib.request.urlopen(taxonomy_url) as response:
        taxonomy = json.loads(response.read().decode())
    
    categories = taxonomy["definitions"]["active_categories"]["enum"]
    print(f"Found {len(categories)} categories")
    return categories

# Fetch all categories from arXiv taxonomy
ARXIV_CATEGORIES = fetch_arxiv_categories(ARXIV_TAXONOMY_URL)

# Optional: Filter to specific subject areas (uncomment to use)
# CATEGORY_PREFIXES = ['cs.', 'stat.', 'econ.', 'q-fin.']  # Example: CS, Stats, Econ, Finance
# ARXIV_CATEGORIES = [c for c in ARXIV_CATEGORIES if any(c.startswith(p) for p in CATEGORY_PREFIXES)]

# Rate limiting configuration
FETCH_DELAY_SECONDS = 5

# Embedding model configuration
EMBEDDING_MODEL = 'all-MiniLM-L6-v2'
EMBEDDING_DIM = 384

# Hopsworks configuration
HOPSWORKS_HOST = os.environ.get('HOPSWORKS_HOST', 'c.app.hopsworks.ai')
HOPSWORKS_FEATURE_STORE = os.environ.get('HOPSWORKS_FEATURE_STORE', 'kingaedwin_featurestore')
FEATURE_GROUP_NAME = 'arxiv_embeddings_with_cats'
FEATURE_GROUP_VERSION = 1

print(f"\nCategories to fetch: {len(ARXIV_CATEGORIES)}")
print(f"Delay between requests: {FETCH_DELAY_SECONDS}s")
print(f"Estimated fetch time: ~{(len(ARXIV_CATEGORIES) * FETCH_DELAY_SECONDS) // 60} min {(len(ARXIV_CATEGORIES) * FETCH_DELAY_SECONDS) % 60}s")
print(f"First 10 categories: {ARXIV_CATEGORIES[:10]}")

Fetching taxonomy from: https://raw.githubusercontent.com/arXiv/arxiv-canonical/407cb0b2cef83c7f653dabdf998e797b18475b13/schema-extended/Taxonomy.json
Found 155 categories

Categories to fetch: 155
Delay between requests: 5s
Estimated fetch time: ~12 min 55s
First 10 categories: ['astro-ph.CO', 'astro-ph.EP', 'astro-ph.GA', 'astro-ph.HE', 'astro-ph.IM', 'astro-ph.SR', 'cond-mat.dis-nn', 'cond-mat.mes-hall', 'cond-mat.mtrl-sci', 'cond-mat.other']


## 3. arXiv RSS Parsing

In [4]:
def clean_text(text: str) -> str:
    """Clean and normalize text from RSS feed."""
    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)
    # Remove extra whitespace
    text = ' '.join(text.split())
    return text.strip()


def extract_arxiv_id(link: str) -> float:
    """Extract arXiv ID from URL and convert to float."""
    match = re.search(r'abs/([\d.]+)', link)
    if match:
        return float(match.group(1))
    return 0.0


def extract_categories(entry: dict) -> tuple[list[str], list[str]]:
    """Extract primary categories and sub-categories from entry."""
    tags = entry.get('tags', [])
    
    if not tags:
        return [], []
    
    # Extract all category terms
    all_cats = [tag.get('term', '') for tag in tags if tag.get('term')]
    
    if all_cats:
        # First category as primary (in a list), rest as sub-categories
        primary = [all_cats[0]]
        sub_cats = all_cats[1:] if len(all_cats) > 1 else []
        return primary, sub_cats
    
    return [], []


def parse_arxiv_feed(feed_url: str) -> list[dict]:
    """Parse arXiv RSS feed and return list of papers."""
    feed = feedparser.parse(feed_url)
    papers = []
    
    for entry in feed.entries:
        primary_cats, sub_cats = extract_categories(entry)
        
        paper = {
            'id': extract_arxiv_id(entry.get('link', '')),
            'title': clean_text(entry.get('title', '')),
            'abstract': clean_text(entry.get('summary', '')),
            'categories': primary_cats,      # array<string>
            'sub_categories': sub_cats,      # array<string>
            'link': entry.get('link', ''),
            'published': entry.get('published', ''),
            'authors': ', '.join([a.get('name', '') for a in entry.get('authors', [])]),
        }
        papers.append(paper)
    
    return papers


def get_fallback_paper() -> dict:
    """Return a fallback test paper for days with no publications (e.g., weekends)."""
    return {
        'id': 2511.17836,
        'title': 'Validating API Design Requirements for Interoperability: A Static Analysis Approach Using OpenAPI',
        'abstract': 'This paper presents S.E.O.R.A, a configurable tool that uses static analysis to validate RESTful API designs against 75 identified rules. Through Design Science Research methodology, the authors developed a rule engine to detect structural violations in OpenAPI specifications. The work emphasizes how API quality validation contributes to aligning technical designs with requirements and enterprise architecture by strengthening interoperability and governance between enterprise systems.',
        'categories': ['cs.SE'],
        'sub_categories': ['cs.SE'],  # Non-empty to ensure Hopsworks can infer array<string> type
        'link': 'https://arxiv.org/abs/2511.17836',
        'published': '2025-11-21',
        'authors': 'Edwin Sundberg, Thea Ekmark, Workneh Yilma Ayele',
    }


def is_weekend() -> bool:
    """Check if today is Saturday (5) or Sunday (6)."""
    return datetime.utcnow().weekday() >= 5


def fetch_all_categories(categories: list[str], delay_seconds: int = 5) -> list[dict]:
    """Fetch papers from each category individually with rate limiting.
    
    Args:
        categories: List of arXiv category codes (e.g., ['cs.AI', 'cs.LG'])
        delay_seconds: Seconds to wait between requests
        
    Returns:
        Combined list of all papers from all categories
    """
    # Skip fetching on weekends - arXiv doesn't publish
    if is_weekend():
        print("Weekend detected - arXiv doesn't publish on Sat/Sun.")
        print("Using fallback test paper.")
        return [get_fallback_paper()]
    
    all_papers = []
    
    for i, category in enumerate(categories):
        feed_url = f"https://rss.arxiv.org/rss/{category}"
        print(f"[{i+1}/{len(categories)}] Fetching {category}...", end=" ")
        
        papers = parse_arxiv_feed(feed_url)
        all_papers.extend(papers)
        print(f"found {len(papers)} papers")
        
        # Rate limit: wait before next request (skip delay after last category)
        if i < len(categories) - 1:
            print(f"    Waiting {delay_seconds}s...")
            time.sleep(delay_seconds)
    
    return all_papers

In [5]:
# Fetch papers from each category with rate limiting
papers = fetch_all_categories(ARXIV_CATEGORIES, FETCH_DELAY_SECONDS)
papers_df = pd.DataFrame(papers)

# Remove duplicates by ID (papers can appear in multiple categories)
papers_df = papers_df.drop_duplicates(subset=['id'], keep='first')
print(f"\nTotal papers fetched: {len(papers)}")
print(f"Unique papers after deduplication: {len(papers_df)}")
papers_df.head()

Weekend detected - arXiv doesn't publish on Sat/Sun.
Using fallback test paper.

Total papers fetched: 1
Unique papers after deduplication: 1




Unnamed: 0,id,title,abstract,categories,sub_categories,link,published,authors
0,2511.17836,Validating API Design Requirements for Interop...,"This paper presents S.E.O.R.A, a configurable ...",[cs.SE],[cs.SE],https://arxiv.org/abs/2511.17836,2025-11-21,"Edwin Sundberg, Thea Ekmark, Workneh Yilma Ayele"


## 4. Generate Embeddings

In [6]:
# Load the embedding model
print(f"Loading embedding model: {EMBEDDING_MODEL}")
model = SentenceTransformer(EMBEDDING_MODEL)
print(f"Model loaded. Embedding dimension: {model.get_sentence_embedding_dimension()}")

Loading embedding model: all-MiniLM-L6-v2


Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


Model loaded. Embedding dimension: 384


In [7]:
def generate_embeddings(df: pd.DataFrame, model: SentenceTransformer) -> pd.DataFrame:
    """Generate embeddings for paper titles and abstracts."""
    # Combine title and abstract for richer embedding
    texts = (df['title'] + ' ' + df['abstract']).tolist()
    
    print(f"Generating embeddings for {len(texts)} papers...")
    embeddings = model.encode(texts, show_progress_bar=True, batch_size=32)
    
    # Add embeddings as list of float32 (array<float> for Hopsworks)
    df = df.copy()
    df['embedding'] = [emb.astype(np.float32).tolist() for emb in embeddings]
    
    print(f"Embeddings generated. Shape: {embeddings.shape}")
    return df

In [8]:
# Generate embeddings
papers_with_embeddings = generate_embeddings(papers_df, model)
papers_with_embeddings.head()

Generating embeddings for 1 papers...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated. Shape: (1, 384)


Unnamed: 0,id,title,abstract,categories,sub_categories,link,published,authors,embedding
0,2511.17836,Validating API Design Requirements for Interop...,"This paper presents S.E.O.R.A, a configurable ...",[cs.SE],[cs.SE],https://arxiv.org/abs/2511.17836,2025-11-21,"Edwin Sundberg, Thea Ekmark, Workneh Yilma Ayele","[-0.0575193352997303, 0.07753833383321762, -0...."


## 5. Connect to Hopsworks and Store Data

In [9]:
# Connect to Hopsworks
# API key should be set via HOPSWORKS_API_KEY environment variable
project = hopsworks.login(host=HOPSWORKS_HOST)
fs = project.get_feature_store(name=HOPSWORKS_FEATURE_STORE)


Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286343


In [10]:
def prepare_for_hopsworks(df: pd.DataFrame) -> pd.DataFrame:
    """Prepare dataframe for Hopsworks ingestion.
    
    Schema:
    - id: double (primary key)
    - embedding: array<float>
    - categories: array<string>
    - sub_categories: array<string>
    """
    df = df.copy()
    
    # Select only the columns needed for the feature group
    result = df[['id', 'embedding', 'categories', 'sub_categories']].copy()
    
    # Ensure proper types
    result['id'] = result['id'].astype(float)
    
    # Convert embeddings to array<float32> using pyarrow
    embedding_array = pa.array(result['embedding'].tolist(), type=pa.list_(pa.float32()))
    result['embedding'] = embedding_array.to_pandas()
    
    return result

In [11]:
# Prepare data for Hopsworks
hopsworks_df = prepare_for_hopsworks(papers_with_embeddings)
print(f"Prepared DataFrame shape: {hopsworks_df.shape}")
print(f"Columns: {hopsworks_df.columns.tolist()}")
print(f"\nSample:")
hopsworks_df.head()

Prepared DataFrame shape: (1, 4)
Columns: ['id', 'embedding', 'categories', 'sub_categories']

Sample:


Unnamed: 0,id,embedding,categories,sub_categories
0,2511.17836,"[-0.057519335, 0.077538334, -0.0293648, -0.033...",[cs.SE],[cs.SE]


In [12]:
# Get the existing feature group
arxiv_fg = fs.get_feature_group(
    name=FEATURE_GROUP_NAME,
    version=FEATURE_GROUP_VERSION,
)

# Insert the data
print(f"Inserting {len(hopsworks_df)} papers into feature group...")
arxiv_fg.insert(hopsworks_df)
print("Data inserted successfully!")

Inserting 1 papers into feature group...


Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: arxiv_embeddings_with_cats_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286343/jobs/named/arxiv_embeddings_with_cats_1_offline_fg_materialization/executions
Data inserted successfully!


## 6. Verify Data

In [13]:
# Read back some data to verify
print("Verifying inserted data...")
sample = arxiv_fg.read().head(5)
print(f"Sample of stored data:")
sample[['id', 'categories', 'sub_categories']]

Verifying inserted data...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.47s) 
Sample of stored data:


Unnamed: 0,id,categories,sub_categories
0,704.0132,[Physics],[cond-mat.mes-hall]
1,704.0123,[Physics],"[nlin.CD, cond-mat.other, physics.optics]"
2,704.0114,[Physics],[cond-mat.str-el]
3,704.0088,[Physics],[physics.optics]
4,704.0033,[Physics],"[physics.optics, physics.comp-ph]"


## 7. Pipeline Summary

In [14]:
print("=" * 50)
print("Daily Pipeline Summary")
print("=" * 50)
print(f"Papers fetched: {len(papers_df)}")
print(f"Embeddings generated: {len(papers_with_embeddings)}")
print(f"Records inserted to Hopsworks: {len(hopsworks_df)}")
print(f"Categories requested: {', '.join(ARXIV_CATEGORIES)}")
print(f"Timestamp: {datetime.utcnow().isoformat()}")
print("=" * 50)

Daily Pipeline Summary
Papers fetched: 1
Embeddings generated: 1
Records inserted to Hopsworks: 1
Categories requested: astro-ph.CO, astro-ph.EP, astro-ph.GA, astro-ph.HE, astro-ph.IM, astro-ph.SR, cond-mat.dis-nn, cond-mat.mes-hall, cond-mat.mtrl-sci, cond-mat.other, cond-mat.quant-gas, cond-mat.soft, cond-mat.stat-mech, cond-mat.str-el, cond-mat.supr-con, cs.AI, cs.AR, cs.CC, cs.CE, cs.CG, cs.CL, cs.CR, cs.CV, cs.CY, cs.DB, cs.DC, cs.DL, cs.DM, cs.DS, cs.ET, cs.FL, cs.GL, cs.GR, cs.GT, cs.HC, cs.IR, cs.IT, cs.LG, cs.LO, cs.MA, cs.MM, cs.MS, cs.NA, cs.NE, cs.NI, cs.OH, cs.OS, cs.PF, cs.PL, cs.RO, cs.SC, cs.SD, cs.SE, cs.SI, cs.SY, econ.EM, econ.GN, econ.TH, eess.AS, eess.IV, eess.SP, eess.SY, gr-qc, hep-ex, hep-lat, hep-ph, hep-th, math-ph, math.AC, math.AG, math.AP, math.AT, math.CA, math.CO, math.CT, math.CV, math.DG, math.DS, math.FA, math.GM, math.GN, math.GR, math.GT, math.HO, math.IT, math.KT, math.LO, math.MG, math.MP, math.NA, math.NT, math.OA, math.OC, math.PR, math.QA, math.

