In [1]:
import os
import pandas as pd
from tqdm import tqdm

import hopsworks
import hsfs

from sentence_transformers import SentenceTransformer
from hsfs.embedding import EmbeddingIndex

from functions.zotero_parser import ZoteroCSVParser
from functions.PDF_extractor import PDFExtractor
import config

In [2]:
# Paths
ZOTERO_CSV_PATH = "PCG_latest.csv"

# Hopsworks
FEATURE_GROUP_PAPER = "paper_metadata_fg_2"
FEATURE_GROUP_CHUNK = "paper_chunk_fg_2"

# Embedding
from config import EMBEDDING_MODEL_NAME

In [3]:
import re
from typing import Optional, Dict, Any

def sanitize_paper_metadata(paper: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    Defensive metadata sanitation.
    Returns None if the paper is considered invalid.
    """

    # ---- 1. Mandatory Field Validation ----
    title = paper.get("title", "").strip()
    if not title:
        return None  # Equivalent to RDF version: discard if title is missing

    paper["title"] = title

    # ---- 2. Year Repair (Reusing regex logic from RDF) ----
    year = paper.get("year")
    if year is None:
        # Attempt to recover year from other metadata fields
        for field in ("url", "abstract"):
            text = paper.get(field, "")
            match = re.search(r"(19|20)\d{2}", text)
            if match:
                paper["year"] = int(match.group())
                break

    # ---- 3. Authors Fallback ----
    authors = paper.get("authors", "").strip()
    if not authors or authors.lower() == "nan":
        paper["authors"] = "Unknown"

    # ---- 4. Abstract Normalization ----
    abstract = paper.get("abstract", "").strip()
    if abstract.lower() in {"nan", "none"}:
        paper["abstract"] = ""

    # ---- 5. Attachments Handling ----
    # Preserve original state but ensure the value is a string
    attachments = paper.get("file_attachments")
    paper["file_attachments"] = str(attachments) if attachments is not None else ""

    return paper

In [4]:
parser = ZoteroCSVParser(ZOTERO_CSV_PATH)
raw_papers = parser.parse()

papers_df = []
for paper in raw_papers:
    fixed = sanitize_paper_metadata(paper)
    if fixed is not None:
        papers_df.append(fixed)

print(f"Parsed {len(papers_df)} papers.")
papers_df[:2]  

Parsed 17 papers.


[{'paper_id': 'CLGNKPIJ',
  'title': 'Synthesis of Normal Heart Sounds Using Generative Adversarial Networks and Empirical Wavelet Transform',
  'authors': 'Narváez, Pedro; Percybrooks, Winston S.',
  'year': 2020,
  'abstract': 'Currently, there are many works in the literature focused on the analysis of heart sounds, speciﬁcally on the development of intelligent systems for the classiﬁcation of normal and abnormal heart sounds. However, the available heart sound databases are not yet large enough to train generalized machine learning models. Therefore, there is interest in the development of algorithms capable of generating heart sounds that could augment current databases. In this article, we propose a model based on generative adversary networks (GANs) to generate normal synthetic heart sounds. Additionally, a denoising algorithm is implemented using the empirical wavelet transform (EWT), allowing a decrease in the number of epochs and the computational cost that the GAN model requ

In [5]:
# === Cell 5: Generate Embeddings for Metadata and Full Text ===

import pandas as pd
from sentence_transformers import SentenceTransformer
from hsfs import embedding

# -------------------------
# 1. Load embedding model
# -------------------------

model = SentenceTransformer(EMBEDDING_MODEL_NAME)
embedding_dim = model.get_sentence_embedding_dimension()

print(f"Loaded embedding model: {EMBEDDING_MODEL_NAME}")
print(f"Embedding dimension: {embedding_dim}")

2026-01-11 18:04:15,827 INFO: Use pytorch device_name: cpu
2026-01-11 18:04:15,828 INFO: Load pretrained SentenceTransformer: all-MiniLM-L6-v2
Loaded embedding model: all-MiniLM-L6-v2
Embedding dimension: 384


In [6]:
from config import HOPSWORKS_API_KEY
# project = hopsworks.login()

project = hopsworks.login(
        # project=HOPSWORKS_PROJECT,
        api_key_value=HOPSWORKS_API_KEY
    )
fs = project.get_feature_store()

2026-01-11 18:04:18,496 INFO: Initializing external client
2026-01-11 18:04:18,497 INFO: Base URL: https://c.app.hopsworks.ai:443
2026-01-11 18:04:20,077 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286333


In [7]:
paper_fg = fs.get_feature_group(
    name=FEATURE_GROUP_PAPER,
    version=2
)

chunk_fg = fs.get_feature_group(
    name=FEATURE_GROUP_CHUNK,
    version=2
)


In [8]:
existing_papers_df = paper_fg.read(online=False)[["paper_id"]]

existing_paper_ids = set(existing_papers_df["paper_id"].astype(str))

print(f"Papers already in Feature Store: {len(existing_paper_ids)}")


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.86s) 
Papers already in Feature Store: 15


In [9]:
new_papers = []

for paper in papers_df:   
    paper_id = str(paper["paper_id"])
    if paper_id not in existing_paper_ids:
        new_papers.append(paper)

print(f"New papers detected: {len(new_papers)}")


New papers detected: 2


In [10]:
import re
import urllib.parse
from typing import List, Optional

from config import MIN_FULLTEXT_LEN, CHUNK_SIZE, CHUNK_OVERLAP

def extract_abstract_from_text(text: str) -> Optional[str]:
    normalized = text.replace("\r\n", "\n").replace("\r", "\n")

    stop_markers = (
        r"keywords|index\s*terms|subject[s]?|introduction|background|materials\s+and\s+methods|"
        r"methods|results|conclusions|references|acknowledg(e)?ments|1\.|i\.|ii\.|iii\."
        r"|Keywords|Introduction|Background|Methods|Results|Conclusion|References"
    )
    start_markers = r"abstract|summary|Abstract|Summary"

    pattern = rf"(?is)\b(?:{start_markers})\b\s*[:\.\-]?\s*(.+?)(?=\n\s*(?:{stop_markers})\b|\n\n\s*[A-Z][A-Za-z ]+\b|\Z)"
    match = re.search(pattern, normalized)
    if match:
        abstract = re.sub(r"\s+", " ", match.group(1).strip())
        if 50 <= len(abstract) <= 5000 and re.search(r"[a-z]", abstract, re.I):
            return abstract

    lines = normalized.split("\n")
    abstract_started = False
    buffer: list[str] = []

    for line in lines:
        line_stripped = line.strip()

        if not abstract_started:
            if re.match(
                r"(?i)^(abstract|summary)\b\s*[:\-\.]?\s*$",
                line_stripped,
            ) or re.match(
                r"(?i)^(abstract|summary)\b\s*[:\-\.]?",
                line_stripped,
            ):
                after = re.sub(
                    r"(?i)^(abstract|summary)\b\s*[:\-\.]?\s*",
                    "",
                    line_stripped,
                )
                if after:
                    buffer.append(after)
                abstract_started = True
            continue
        else:
            if re.match(rf"(?i)^\s*(?:{stop_markers})\b", line_stripped):
                break
            buffer.append(line)

    candidate = re.sub(r"\s+", " ", " ".join(buffer)).strip()
    if 50 <= len(candidate) <= 5000 and re.search(r"[a-z]", candidate, re.I):
        return candidate

    paragraphs = re.split(r"\n\s*\n", normalized)
    for paragraph in paragraphs[:8]:
        p = re.sub(r"\s+", " ", paragraph.strip())
        if (
            120 <= len(p) <= 5000
            and not re.match(
                r"(?i)^(keywords|index\s*terms|introduction|references|acknowledg(e)?ments)",
                p,
            )
            and p.count(".") >= 2
        ):
            return p

    return None


# -------- Content cleaning --------

def clean_text(text: str) -> str:
    if not text:
        return ""

    text = re.sub(r"(\w+)-\s*\n\s*(\w+)", r"\1\2", text)
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    text = re.sub(r"(\.{5,}|\-{5,})", " ", text)
    text = re.sub(r"[\t\f\u00A0]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)

    return text.strip()


# -------- Chunking --------

def chunk_text(
    text: str,
    chunk_size: int = CHUNK_SIZE,
    overlap: int = CHUNK_OVERLAP,
) -> List[str]:
    if not text:
        return []

    paragraphs = text.split("\n\n")
    chunks = []
    current = ""

    for para in paragraphs:
        para = para.strip()
        if not para:
            continue

        if len(current) + len(para) < chunk_size:
            current = f"{current}\n\n{para}" if current else para
        else:
            if current:
                chunks.append(current)
                current = current[-overlap:] + "\n\n" + para

            if len(current) > chunk_size:
                for i in range(0, len(current), chunk_size - overlap):
                    chunks.append(current[i : i + chunk_size])
                current = ""

    if current:
        chunks.append(current)

    return chunks


# -------- Safe file reading --------

def safe_read_fulltext(file_path: str) -> str:
    if not file_path:
        return ""

    decoded = urllib.parse.unquote(file_path)
    return PDFExtractor.read_file(decoded) or ""

In [11]:
import re
from typing import List

def split_sentences(text: str) -> List[str]:
    return re.split(r'(?<=[.!?])\s+', text)


def extract_paragraph_chunks(
    text: str,
    min_len: int = 500,
    max_len: int = 1500,
    overlap: int = 200,
) -> List[str]:
    """
    Paragraph-first chunking with sentence-aware fallback splitting.
    """
    if not text:
        return []

    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
    chunks = []

    for para in paragraphs:
        # ---- 基本过滤 ----
        if len(para) < min_len:
            continue

        if re.search(r"(\.{5,}|\-{5,})", para):
            continue

        # ---- 正常段落：直接作为 chunk ----
        if len(para) <= max_len:
            chunks.append(para)
            continue

        # ---- 超长段落：sentence-aware fallback ----
        sentences = split_sentences(para)

        current = ""
        for sent in sentences:
            if len(current) + len(sent) <= max_len:
                current = f"{current} {sent}".strip()
            else:
                if current:
                    chunks.append(current)
                # overlap（句子级）
                current = sent[-overlap:] if overlap > 0 else sent

        if current:
            chunks.append(current)

    return chunks


In [12]:
metadata_rows = []
fulltext_rows = []

for paper in new_papers:
    raw_full_text = safe_read_fulltext(paper.get("file_attachments", ""))

    full_text = clean_text(raw_full_text)

    abstract = paper.get("abstract", "")
    if not abstract and len(full_text) >= MIN_FULLTEXT_LEN:
        abstract = extract_abstract_from_text(full_text) or ""

    metadata_rows.append(
        {
            "paper_id": paper["paper_id"],
            "title": paper["title"],
            "abstract": abstract,
            "authors": paper["authors"],
            "year": paper["year"],
            "item_type": paper["item_type"],
            "combined_text": (
                f"Title: {paper['title']}\n"
                f"Abstract: {abstract}"
            ),
        }
    )

    if len(full_text) >= MIN_FULLTEXT_LEN:
        for i, chunk in enumerate(extract_paragraph_chunks(full_text)):
            fulltext_rows.append(
                {
                    "paper_id": paper["paper_id"],
                    "chunk_index": i,
                    "content": chunk,
                    "year": paper["year"],
                }
            )

print(f"Metadata rows: {len(metadata_rows)}")
print(f"Fulltext chunks: {len(fulltext_rows)}")


Metadata rows: 2
Fulltext chunks: 65


In [13]:
df_metadata = pd.DataFrame(metadata_rows)
df_chunks = pd.DataFrame(fulltext_rows)

print(f"Metadata rows: {len(df_metadata)}")
print(f"Chunk rows: {len(df_chunks)}")


# -------------------------
# 3. Generate metadata embeddings
# -------------------------

if not df_metadata.empty:
    embeddings = model.encode(
        df_metadata["combined_text"].tolist(),
        show_progress_bar=True,
        convert_to_numpy=True,
    ).astype("float32")

    df_metadata["embedding"] = list(embeddings)
else:
    df_metadata["embedding"] = []

print("Metadata embeddings generated.")


# -------------------------
# 4. Generate full-text chunk embeddings
# -------------------------

if not df_chunks.empty:
    embeddings = model.encode(
        df_chunks["content"].fillna("").tolist(),
        show_progress_bar=True,
        convert_to_numpy=True,
    ).astype("float32")

    df_chunks["embedding"] = list(embeddings)

else:
    df_chunks["embedding"] = []

print("Chunk embeddings generated.")

# -------------------------
# 5. Create embedding indexes
# -------------------------

metadata_index = embedding.EmbeddingIndex()
metadata_index.add_embedding(
    "metadata_embedding",
    embedding_dim,
)

chunk_index = embedding.EmbeddingIndex()
chunk_index.add_embedding(
    "chunk_embedding",
    embedding_dim,
)

print("Embedding indexes created.")

# -------------------------
# 7. Final sanity check
# -------------------------

display(df_metadata.head())
display(df_chunks.head())

Metadata rows: 2
Chunk rows: 65


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Metadata embeddings generated.


Batches:   0%|          | 0/3 [00:00<?, ?it/s]

Chunk embeddings generated.
Embedding indexes created.


Unnamed: 0,paper_id,title,abstract,authors,year,item_type,combined_text,embedding
0,9Q8Z9AE3,H-LDM: Hierarchical Latent Diffusion Models fo...,Phonocardiogram (PCG) analysis is vital for ca...,"Xu, Chenyang; Li, Siming; Wang, Hao",2025,preprint,Title: H-LDM: Hierarchical Latent Diffusion Mo...,"[-0.0006771824, -0.10957697, -0.008515655, 0.0..."
1,J8DDHQUU,Prototyping an End-to-End Multi-Modal Tiny-CNN...,The vast majority of cardiovascular diseases m...,"Ibrahim, Mustafa Fuad Rifet; Alkanat, Tunc; Me...",2025,preprint,Title: Prototyping an End-to-End Multi-Modal T...,"[-0.06923509, 0.018779349, 0.04884383, 0.06390..."


Unnamed: 0,paper_id,chunk_index,content,year,embedding
0,9Q8Z9AE3,0,H-LDM: Hierarchical Latent Diffusion Models fo...,2025,"[-0.017158188, -0.06888803, 0.008421682, 0.005..."
1,9Q8Z9AE3,1,H-LDM establishes a new\ndirection for data au...,2025,"[-0.01754292, -0.096551366, -0.03906515, -0.00..."
2,9Q8Z9AE3,2,Conventional data augmentation\ntechniques fai...,2025,"[-0.025664022, -0.13114122, 0.044948883, 0.036..."
3,9Q8Z9AE3,3,hical conditioning mechanism that fuses rich s...,2025,"[-0.018627482, -0.081789196, 0.03577644, 0.037..."
4,9Q8Z9AE3,4,clinical attributes. The implications of this ...,2025,"[-0.061234742, -0.11351126, 0.018123308, 0.004..."


In [14]:
paper_fg.insert(
    df_metadata,
    write_options={
        "wait_for_job": True,
        "upsert": True,
    },
)


Uploading Dataframe: 100.00% |██████████| Rows 2/2 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: paper_metadata_fg_2_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286333/jobs/named/paper_metadata_fg_2_2_offline_fg_materialization/executions
2026-01-11 18:17:18,269 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-11 18:17:21,464 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 18:17:24,642 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:19:03,471 INFO: Waiting for execution to finish. Current state: FINISHED. Final status: SUCCEEDED
2026-01-11 18:19:03,939 INFO: Waiting for log aggregation to finish.
2026-01-11 18:19:03,939 INFO: Execution finished successfully.


(Job('paper_metadata_fg_2_2_offline_fg_materialization', 'SPARK'), None)

In [15]:
chunk_fg.insert(
    df_chunks,
    write_options={
        "wait_for_job": True,
        "upsert": True,
    },
)


Uploading Dataframe: 100.00% |██████████| Rows 65/65 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: paper_chunk_fg_2_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286333/jobs/named/paper_chunk_fg_2_2_offline_fg_materialization/executions
2026-01-11 18:21:37,544 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-11 18:21:40,740 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-11 18:21:43,927 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-11 18:23:32,432 INFO: Waiting for execution to finish. Current state: FINISHED. Final status: SUCCEEDED
2026-01-11 18:23:32,892 INFO: Waiting for log aggregation to finish.
2026-01-11 18:23:32,892 INFO: Execution finished successfully.


(Job('paper_chunk_fg_2_2_offline_fg_materialization', 'SPARK'), None)