# Semantic search with fasttext embeddings and postgresql

* Create DB Postgresql + pgvector
```
docker run --rm --name pubtrends-postgres -p 5432:5432 \
        -m 32G \
        -e POSTGRES_USER=biolabs -e POSTGRES_PASSWORD=mysecretpassword \
        -e POSTGRES_DB=pubtrends \
        -v ~/postgres/:/var/lib/postgresql/data \
        -e PGDATA=/var/lib/postgresql/data/pgdata \
        -d pgvector/pgvector:pg17
```

* Launch fasttext endpoint API
  `python pysrc/fasttext/fasttext_apy.py`


In [None]:
import logging
import pandas as pd
from tqdm.auto import tqdm
import os
import psycopg2

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
logger = logging.getLogger('notebook')

%matplotlib inline
%config InlineBackend.figure_format='retina'

# Connections with main PubTrends database

In [None]:
from pysrc.config import PubtrendsConfig
config = PubtrendsConfig(test=False)

connection_string_full_db = f"""
                    host={config.postgres_host} \
                    port={config.postgres_port} \
                    dbname={config.postgres_database} \
                    user={config.postgres_username} \
                    password={config.postgres_password}
                """.strip()

In [None]:
from pysrc.papers.db.postgres_utils import ints_to_vals


def load_publications(pids):
    with psycopg2.connect(connection_string_full_db) as connection:
        connection.set_session(readonly=True)
    vals = ints_to_vals(pids)
    query = f'''
                SELECT P.pmid as id, title, abstract, year
                FROM PMPublications P
                WHERE P.pmid IN (VALUES {vals});
                '''
    with connection.cursor() as cursor:
        cursor.execute(query)
        df = pd.DataFrame(cursor.fetchall(),
                          columns=['id', 'title', 'abstract', 'year'],
                          dtype=object)
        return df


In [None]:
def load_publications_year(year):
    with psycopg2.connect(connection_string_full_db) as connection:
        connection.set_session(readonly=True)
        query = f'''
                SELECT P.pmid as id, title, abstract
                FROM PMPublications P
                WHERE year = {year};
                '''
        with connection.cursor() as cursor:
            cursor.execute(query)
            df = pd.DataFrame(cursor.fetchall(),
                              columns=['id', 'title', 'abstract'],
                              dtype=object)
            return df

In [None]:
# load_publications_year(2025).head(10)

# Chunking

In [None]:
from pysrc.papers.analysis.text import universal_chunk

text = "Staphylococcus aureus is a rare cause of postinfectious glomerulonephritis, and Staphylococcus-related glo-merulonephritis primarily occurs in middle-aged or elderly patients. Patients with Staphylococcus-related glomerulonephritis also present with hematuria, proteinuria of varying degrees, rising serum creatinine levels, and/or edema. The severity of renal insufficiency is proportional to the degree of proliferation and crescent formation. Here, we present a diabetic patient admitted with a history of 1 week of left elbow pain. Laboratory results revealed that erythrocyte sedimentation rate was 110 mm/hour, serum creatinine level was 1 mg/dL, C-reactive protein level was 150 mg/L, and magnetic resonance imaging showed signal changes in favor of osteomyelitis at the olecranon level, with diffuse edematous appearance in the elbow skin tissue and increased intra-articular effusion. After diagnosis of osteomyelitis, ampicillin/sulbactam and teicoplanin were administered. After day 7 of admission, the patient developed acute kidney injury requiring hemodialysis under antibiotic treatment. Kidney biopsy was performed to determine the underlying cause, which showed Staphylococcus-related glomerulonephritis. Recovery of renal func-tions was observed after antibiotic and supportive treatment."

chunks = universal_chunk(text)
print(f"Number of chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
    print(f"\nChunk {i+1}:")
    print(chunk)

In [None]:
from pysrc.papers.analysis.text import process_paper_chunks
import concurrent.futures
import multiprocessing

def parallel_collect_chunks(pids, texts):
    # Default to number of CPUs for max workers
    max_workers = multiprocessing.cpu_count()

    chunks = []
    chunk_idx = []

    # Process texts in parallel
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Create a list of (pid, text) tuples for processing
        # Using index as pid for demonstration
        text_args = [(pid, text) for pid, text in zip(pids, texts)]

        # Submit all tasks and wait for results
        results = list(executor.map(process_paper_chunks, text_args))

    # Combine results
    for text_chunks, text_chunk_idx in results:
        chunks.extend(text_chunks)
        chunk_idx.extend(text_chunk_idx)

    return chunks, chunk_idx

# Prepare Postgresql + pgvector for embeddings search

In [None]:
semantics_search_host = 'localhost'
semantics_search_port = 5432
semantics_search_database = 'pubtrends'
semantics_search_username = 'biolabs'
semantics_search_password = 'mysecretpassword'

semantics_search_connection_string = f"""
                    host={semantics_search_host} \
                    port={semantics_search_port} \
                    dbname={semantics_search_database} \
                    user={semantics_search_username} \
                    password={semantics_search_password}
                """.strip()

In [None]:
with psycopg2.connect(semantics_search_connection_string) as connection:
    connection.set_session(readonly=False)
    query = '''
drop table if exists PMPublicationsSmall;
create table PMPublicationsSmall(
pmid    integer,
title   varchar(1023),
abstract text
);
            '''
    with connection.cursor() as cursor:
        cursor.execute(query)
    connection.commit()

In [None]:
with psycopg2.connect(semantics_search_connection_string) as connection:
    connection.set_session(readonly=False)
    query = '''
            CREATE EXTENSION IF NOT EXISTS vector;
            drop table if exists PMPublicationsEmbeddings;
            create table PMPublicationsEmbeddings(
                                                pmid    integer,
                                                chunk   integer,
                                                embedding vector(200)
            );
            '''
    with connection.cursor() as cursor:
        cursor.execute(query)
    connection.commit()

In [None]:
# Create an index for fast vector similarity search
with psycopg2.connect(semantics_search_connection_string) as connection:
    connection.set_session(readonly=False)
    query = '''
            CREATE INDEX texts_embedding_idx
                ON PMPublicationsEmbeddings
                USING ivfflat (embedding vector_cosine_ops)
                WITH (lists = 100);
            '''
    with connection.cursor() as cursor:
        cursor.execute(query)
    connection.commit()

# Compute embeddings

In [None]:
from pysrc.fasttext.fasttext import PretrainedModelCache

# Use a local embeddings model, and dispose it after
model_cache = PretrainedModelCache()
model_cache.download_and_load_model

In [None]:
import numpy as np

def text_embedding_fasttext(text, model_instance):
    tokens = text.split()
    vectors = [
            model_instance.get_vector(t) if model_instance.has_index_for(t)
            else np.zeros(model_instance.vector_size)  # Support out-of-dictionary missing embeddings
            for t in tokens
        ]
    return np.mean(vectors, axis=0) if vectors else np.zeros(model_instance.vector_size)


In [None]:
len(text_embedding_fasttext(text, model_instance=model_cache.download_and_load_model))

In [None]:
import concurrent

def parallel_texts_embeddings(texts, model_instance):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(
            lambda t: text_embedding_fasttext(t, model_instance), t
        ) for t in texts]
        return [future.result() for future in concurrent.futures.as_completed(futures)]

# Insert embeddings into Postgresql

In [None]:
from psycopg2.extras import execute_values

from more_itertools import sliced
CHUNK_SIZE = 1000

for year in range(2025, 2024, -1):
    print(f'Processing year {year}')
    df = load_publications_year(year)
    print('Storing embeddings into DB')
    index_slices = sliced(range(len(df)), CHUNK_SIZE)
    for index_slice in tqdm(list(index_slices)):
        chunk = df.iloc[index_slice]
        print('\rCollecting chunks           ', end='')
        pids = list(chunk['id'])
        texts = [f'{title}. {abstract}' for title, abstract in zip(chunk['title'], chunk['abstract'])]
        chunks, chunk_idx = parallel_collect_chunks(pids, texts)
        print('\rCompute embeddings          ', end='')
        chunk_embeddings = parallel_texts_embeddings(chunks, model_cache.download_and_load_model)
        print(f'\rStoring {len(chunk_embeddings)} embeddings', end='')
        data = [(pmid, chunk, e) for (pmid, chunk), e in zip(chunk_idx, chunk_embeddings)]
        with psycopg2.connect(semantics_search_connection_string) as connection:
            with connection.cursor() as cursor:
                execute_values(
                    cursor,
                    "INSERT INTO PMPublicationsEmbeddings (pmid, chunk, embedding) VALUES %s",
                    data
                )
            connection.commit()

In [None]:
# Cleanup memory
del model_cache

# Semantic search with Postgresql

In [None]:
from pysrc.papers.analysis.text import is_fasttext_endpoint_ready
import time

i = 0
while not is_fasttext_endpoint_ready():
    print('\rWaiting for fasttext endpoint to be ready' + '.' * i, end='')
    i += 1
    time.sleep(10)

In [None]:
from pysrc.papers.analysis.text import fetch_fasttext_text_embedding

len(fetch_fasttext_text_embedding(text))

In [None]:
def semantic_search_postgresql(query, k=5):
    embedding = fetch_fasttext_text_embedding(query)
    with psycopg2.connect(semantics_search_connection_string) as connection:
        with connection.cursor() as cursor:
            cursor.execute("""
                   SELECT pmid, chunk, embedding <=> %s::vector AS distance
                   FROM PMPublicationsEmbeddings
                   ORDER BY distance ASC
                       LIMIT %s
                   """, (embedding, k))

            results = cursor.fetchall()
            return results

In [None]:
search = semantic_search_postgresql("epigenetic human aging", 1000)
pmids = [pid for pid, _, _ in search]
load_publications(pmids)

In [None]:
from pysrc.papers.db.pm_postgres_loader import PubmedPostgresLoader
from pysrc.papers.analyzer import PapersAnalyzer

loader = PubmedPostgresLoader(config)
analyzer = PapersAnalyzer(loader, config)

In [None]:
config.topic_min_size = 5
try:
    analyzer.analyze_papers(pmids, 10)
finally:
    loader.close_connection()
    analyzer.teardown()

In [None]:
from bokeh.plotting import show
from itertools import chain
from pysrc.papers.plot.plotter import Plotter

analyzer.search_ids = pmids
plotter = Plotter(config, analyzer)

In [None]:
show(plotter.plot_papers_graph())

In [None]:
show(plotter.topics_hierarchy_with_keywords())

# Insert embeddings into Faiss

In [None]:
# Use a local embeddings model, and dispose it after
model_cache = PretrainedModelCache()
model_cache.download_and_load_model

In [None]:
! mkdir -p ~/faiss
import faiss

FAISS_INDEX_FILE = os.path.expanduser('~/faiss/embeddings.index')
PIDS_INDEX_FILE = os.path.expanduser('~/faiss/pids.csv.gz')

def create_or_load_faiss():
    if os.path.exists(FAISS_INDEX_FILE):
        print('Creating Faiss index from existing file')
        index = faiss.read_index(FAISS_INDEX_FILE)
    else:
        print('Creating empty Faiss index')
        # Using HNSW index (good performance at scale)
        index = faiss.IndexHNSWFlat(200, 32)  # 32 = number of neighbors in HNSW graph
        index.hnsw.efConstruction = 100  # higher value improves accuracy (at expense of index build time)
    if os.path.exists(PIDS_INDEX_FILE):
        pids_idx = pd.read_csv(PIDS_INDEX_FILE, compression='gzip')
    else:
        pids_idx = pd.DataFrame(data=[], columns=['pmid', 'chunk'], dtype=int)
    return index, pids_idx

In [None]:
from more_itertools import sliced
CHUNK_SIZE = 1000

faiss_index, pids_idx = None, None

for year in range(2025, 2024, -1):
    print(f'Processing year {year}')
    df = load_publications_year(year)
    if faiss_index is None:
        faiss_index, pids_idx = create_or_load_faiss()

    print('Computing embeddings')
    chunk_idx_all = []
    index_slices = sliced(range(len(df)), CHUNK_SIZE)
    for i, index_slice in tqdm(list(enumerate(index_slices))):
        chunk = df.iloc[index_slice]
        print('\rCollecting chunks           ', end='')
        pids = list(chunk['id'])
        texts = [f'{title}. {abstract}' for title, abstract in zip(chunk['title'], chunk['abstract'])]
        chunks, chunk_idx = parallel_collect_chunks(pids, texts)
        print('\rCompute embeddings          ', end='')
        chunk_embeddings = parallel_texts_embeddings(chunks, model_cache.download_and_load_model)
        print(f'\rStoring {len(chunk_embeddings)} embeddings', end='')
        embeddings = np.array(chunk_embeddings).astype('float32')
        # Normalize embeddings if using cosine similarity
        faiss.normalize_L2(embeddings)
        # Add embeddings to the index
        faiss_index.add(embeddings)
        chunk_idx_all.extend(chunk_idx)

        if i > 0 and i % 10 == 0:
            print('Storing FAISS index')
            faiss.write_index(faiss_index, FAISS_INDEX_FILE)
            print('Storing Ids index')
            new_pids_idx = pd.DataFrame(chunk_idx_all, columns=['pmid', 'chunk'])
            t = pd.concat([pids_idx, new_pids_idx]).reset_index(drop=True)
            t.to_csv(PIDS_INDEX_FILE, index=False, compression='gzip')
    print('Storing FAISS index')
    faiss.write_index(faiss_index, FAISS_INDEX_FILE)
    print('Storing Ids index')
    new_pids_idx = pd.DataFrame(chunk_idx_all, columns=['pmid', 'chunk'])
    t = pd.concat([pids_idx, new_pids_idx]).reset_index(drop=True)
    t.to_csv(PIDS_INDEX_FILE, index=False, compression='gzip')
    pids_idx = t
    print('Done')

In [None]:
del model_cache


# Semantic search with Faiss

In [None]:
faiss_index, pids_idx = create_or_load_faiss()

In [None]:
def semantic_search_faiss(query_text, faiss_index, df, top_k=5):
    query_vector = text_embedding_fasttext(query_text, model_cache.download_and_load_model).astype('float32')
    faiss.normalize_L2(query_vector.reshape(1, -1))
    distances, indices = faiss_index.search(query_vector.reshape(1, -1), top_k)
    return df.iloc[indices[0]]

In [None]:
search = semantic_search_faiss("epigenetic human aging", faiss_index, pids_idx, 1000)
pmids = search['pmid']
load_publications(pmids)