### Arabic Poem Embedding Pipeline

**Disclaimer:** This is *not* a production-ready pipeline — it’s the one used for our initial verse-embedding experiments in the Qafiyah project.

This notebook connects to your PostgreSQL `poems` database, denormalizes records,
splits Arabic verses (with optional diacritic removal), embeds each verse using
a Hugging Face model, and writes out Parquet batches—then merges them into one file.

**Before you run anything:** edit the **Configuration** cell with your
database credentials and desired settings.

In [None]:
!pip install sqlalchemy pandas torch transformers pyarrow psycopg2-binary tqdm


In [None]:
import logging
import re
from pathlib import Path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.parquet import ParquetWriter
import torch
from torch import Tensor
import torch.nn.functional as F
from sqlalchemy import create_engine, text
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel


In [None]:
# Database credentials
DB_USER = "your_db_user"
DB_PASSWORD = "your_db_password"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "qafiyah_dev"

# Embedding model & batching
# With Transformers versions earlier than 4.51.0, you may encounter the following error: KeyError: 'qwen3'
MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B"
MAX_LENGTH = 8192  # from Qwen docs; kindly do not change it
BATCH_SIZE = 256
SAVE_DIR = Path("verse_parquet_batches")

# Device selection: prefer CUDA, then MPS, then CPU
if torch.cuda.is_available():
    DEVICE = torch.device("cuda")
    print("Using CUDA")
elif torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
    print("Using Apple MPS")
else:
    DEVICE = torch.device("cpu")
    print("Using CPU")

torch.set_float32_matmul_precision("high")

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger()


In [None]:
# Create the SQLAlchemy engine, count total poems, then build a `denormalized_poems` table.

engine = create_engine(
    f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}",
    future=True
)

# Count poems & verses
with engine.begin() as conn:
    result = conn.execute(text("""
                               SELECT COUNT(*) AS total_poems, SUM(num_verses) AS total_verses
                               FROM poems
                               """))
    total_poems, total_verses = result.fetchone()

print(f"Total poems: {total_poems:,}")
print(f"Total verses (sum of num_verses column): {total_verses:,}")


In [None]:
# Denormalize into a single table
with engine.begin() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS denormalized_poems AS
        SELECT
            poems.id AS poem_id,
            poems.title,
            poems.content,
            poets.id AS poet_id,
            poets.name AS poet_name,
            eras.name AS era_name,
            meters.name AS meter_name,
            themes.name AS theme_name,
            rhymes.pattern AS rhyme_pattern,
            types.name AS type_name
        FROM poems
        LEFT JOIN poets     ON poems.poet_id = poets.id
        LEFT JOIN eras      ON poets.era_id = eras.id
        LEFT JOIN meters    ON poems.meter_id = meters.id
        LEFT JOIN themes    ON poems.theme_id = themes.id
        LEFT JOIN rhymes    ON poems.rhyme_id = rhymes.id
        LEFT JOIN types     ON poems.type_id = types.id;
    """))

logger.info("Denormalized table ready.")


In [None]:
# Read the denormalized table into a DataFrame. Optionally, create a clean column without Arabic tashkeel.

df = pd.read_sql("denormalized_poems", engine)[["poem_id", "content"]]

arabic_diacritics = re.compile(r'[\u0617-\u061A\u064B-\u0652]')


def remove_tashkeel(text):
    return arabic_diacritics.sub("", text) if isinstance(text, str) else text


# Add an optional clean column (without diacritics)
df["content_no_diacritics"] = df["content"].apply(remove_tashkeel)

logger.info("Loaded %d poems into DataFrame.", len(df))


In [None]:
# Split on `*` markers into hemistich pairs.
def split_poem_into_verses(poem_id, content):
    if not isinstance(content, str) or not content.strip():
        return []
    parts = [p.strip() for p in content.split("*") if p.strip()]
    verses = []
    for i in range(0, len(parts) - 1, 2):
        verses.append({
            "poem_id": poem_id,
            "verse": f"{parts[i]} {parts[i + 1]}"
        })
    return verses


In [None]:
def last_token_pool(
    last_hidden_states: Tensor,
    attention_mask: Tensor
) -> Tensor:
    left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0])
    if left_padding:
        return last_hidden_states[:, -1]
    else:
        sequence_lengths = attention_mask.sum(dim=1) - 1
        batch_size = last_hidden_states.shape[0]
        return last_hidden_states[
            torch.arange(batch_size, device=last_hidden_states.device),
            sequence_lengths
        ]


def prepare_embedding_batches(verses_df: pd.DataFrame, save_dir: Path):
    logger.info("Embedding %d verses…", len(verses_df))
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, padding_side="left")
    model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE).eval()
    eod_token = tokenizer.eos_token or "<|endoftext|>"

    save_dir.mkdir(exist_ok=True, parents=True)
    batch_idx = 0

    with tqdm(total=len(verses_df), desc="Verses", unit="v") as pbar:
        for start in range(0, len(verses_df), BATCH_SIZE):
            batch = verses_df.iloc[start: start + BATCH_SIZE]
            texts = [v + eod_token for v in batch["verse"]]

            enc = tokenizer(
                texts,
                padding=True,
                truncation=True,
                max_length=MAX_LENGTH,
                return_tensors="pt"
            )
            enc = {k: v.to(DEVICE) for k, v in enc.items()}

            with torch.no_grad():
                out = model(**enc)
            embeds = last_token_pool(out.last_hidden_state, enc["attention_mask"])
            embeds = F.normalize(embeds, p=2, dim=1).cpu()

            records = []
            for i, (_, row) in enumerate(batch.iterrows()):
                records.append({
                    "poem_id": row["poem_id"],
                    "verse": row["verse"],
                    "embedding": embeds[i].tolist()
                })

            df_batch = pd.DataFrame(records)
            pq.write_table(
                pa.Table.from_pandas(df_batch),
                save_dir / f"batch_{batch_idx:05d}.parquet",
                compression="snappy"
            )
            batch_idx += 1
            pbar.update(len(batch))

    logger.info("Wrote %d batches to `%s`", batch_idx, str(save_dir))


In [None]:
# Pull verses from the cleaned column and execute batching. Change the column name if you want to use the original content column with tashkeel.

all_verses = []
for _, row in df.iterrows():
    all_verses.extend(
        split_poem_into_verses(row["poem_id"], row["content_no_diacritics"])
    )
verses_df = pd.DataFrame(all_verses)
prepare_embedding_batches(verses_df, SAVE_DIR)


In [None]:
def merge_parquet_batches(input_dir: Path, output_file: Path):
    files = sorted(input_dir.glob("*.parquet"))
    writer = None

    for path in files:
        table = pq.read_table(path)  # load one batch at a time
        if writer is None:
            # initialize writer with schema of first batch
            writer = ParquetWriter(str(output_file), table.schema, compression="zstd")
        writer.write_table(table)

    if writer:
        writer.close()
    logger.info("Merged %d files → %s", len(files), output_file)

merge_parquet_batches(SAVE_DIR, Path("verse_embeddings.parquet"))


In [None]:
# Inspect Results

df_emb = pd.read_parquet("verse_embeddings.parquet")
print("Total verses:", len(df_emb))
print("Columns:", df_emb.columns.tolist())
df_emb.head()
