Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ WORKDIR /app
# Copy project metadata
COPY pyproject.toml uv.lock* ./

# Install package into system python
RUN uv pip install --system .

# Copy CLI application
COPY embeddings ./embeddings

# Install package into system python
RUN uv pip install --system .
# Copy fixtures
COPY tests/fixtures /fixtures

# Download the model and include in the Docker image
# NOTE: The env vars "TE_MODEL_URI" and "TE_MODEL_PATH" are set here to support
Expand Down
6 changes: 0 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ black-apply: # Apply changes with 'black'
ruff-apply: # Resolve 'fixable errors' with 'ruff'
uv run ruff check --fix .

##############################
# CLI convenience commands
##############################
my-app: # CLI without any arguments, utilizing uv script entrypoint
uv run my-app


####################################
# Docker
Expand Down
5 changes: 2 additions & 3 deletions embeddings/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ def create_embedding(self, embedding_input: EmbeddingInput) -> Embedding:
embedding_input: EmbeddingInput instance
"""

@abstractmethod
def create_embeddings(
self, embedding_inputs: Iterator[EmbeddingInput]
) -> Iterator[Embedding]:
"""Yield Embeddings for a batch of EmbeddingInputs.
"""Yield Embeddings for multiple EmbeddingInputs.

Args:
embedding_inputs: iterator of EmbeddingInputs
"""
for embedding_input in embedding_inputs:
yield self.create_embedding(embedding_input)
289 changes: 100 additions & 189 deletions embeddings/models/os_neural_sparse_doc_v3_gte.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@

import json
import logging
import os
import shutil
import tempfile
import time
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING
from typing import cast

import torch
from huggingface_hub import snapshot_download
from transformers import AutoModelForMaskedLM, AutoTokenizer
from sentence_transformers.sparse_encoder import SparseEncoder
from torch import Tensor

from embeddings.embedding import Embedding, EmbeddingInput
from embeddings.models.base import BaseEmbeddingModel

if TYPE_CHECKING:
from transformers import PreTrainedModel
from transformers.models.distilbert.tokenization_distilbert_fast import (
DistilBertTokenizerFast,
)

logger = logging.getLogger(__name__)


Expand All @@ -42,10 +38,8 @@ def __init__(self, model_path: str | Path) -> None:
model_path: Path where the model will be downloaded to and loaded from.
"""
super().__init__(model_path)
self._model: PreTrainedModel | None = None
self._tokenizer: DistilBertTokenizerFast | None = None
self._special_token_ids: list[int] | None = None
self._device: torch.device = torch.device("cpu")
self.device = os.getenv("TE_TORCH_DEVICE", "cpu")
self._model: SparseEncoder = None # type: ignore[assignment]

def download(self) -> Path:
"""Download and prepare model, saving to self.model_path.
Expand Down Expand Up @@ -139,209 +133,126 @@ def load(self) -> None:
start_time = time.perf_counter()
logger.info(f"Loading model from: {self.model_path}")

# ensure model exists locally
if not self.model_path.exists():
raise FileNotFoundError(f"Model not found at path: {self.model_path}")

# setup device (use CUDA if available, otherwise CPU)
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# load tokenizer
self._tokenizer = AutoTokenizer.from_pretrained( # type: ignore[no-untyped-call]
self.model_path,
local_files_only=True,
)

# load model as AutoModelForMaskedLM (required for sparse embeddings)
self._model = AutoModelForMaskedLM.from_pretrained(
self.model_path,
# load model as SparseEncoder
self._model = SparseEncoder(
str(self.model_path),
trust_remote_code=True,
local_files_only=True,
model_kwargs={},
device="cpu",
)
self._model.to(self._device) # type: ignore[arg-type]
self._model.eval()

# set special token IDs (following model card pattern)
# these will be zeroed out in the sparse vectors
self._special_token_ids = [
self._tokenizer.vocab[token] # type: ignore[index]
for token in self._tokenizer.special_tokens_map.values()
]

logger.info(
f"Model loaded successfully on {self._device}, "
f"{time.perf_counter() - start_time:.2f}s"
)
logger.info(f"Model loaded successfully, {time.perf_counter() - start_time:.2f}s")

def create_embedding(self, embedding_input: EmbeddingInput) -> Embedding:
"""Create sparse vector and decoded token weight embeddings for an input text.
"""Create an Embedding for an EmbeddingInput.

This model is configured to return a sparse vector of vocabulary token indices
and weights, and a dictionary of decoded tokens and weights that had a weight
> 0 in the sparse vector.

Args:
embedding_input: EmbeddingInput object with a .text attribute
embedding_input: EmbeddingInput instance
"""
# generate the sparse embeddings
sparse_vector, decoded_tokens = self._encode_documents([embedding_input.text])[0]

# coerce sparse vector tensor into list[float]
sparse_vector_list = sparse_vector.cpu().numpy().tolist()

return Embedding(
timdex_record_id=embedding_input.timdex_record_id,
run_id=embedding_input.run_id,
run_record_offset=embedding_input.run_record_offset,
model_uri=self.model_uri,
embedding_strategy=embedding_input.embedding_strategy,
embedding_vector=sparse_vector_list,
embedding_token_weights=decoded_tokens,
)
sparse_vector = self._model.encode_document(embedding_input.text)
sparse_vector = cast("Tensor", sparse_vector)
return self._get_embedding_from_sparse_vector(embedding_input, sparse_vector)

def _encode_documents(
def create_embeddings(
self,
texts: list[str],
) -> list[tuple[torch.Tensor, dict[str, float]]]:
"""Encode documents into sparse vectors and decoded token weights.

This follows the pattern outlined on the HuggingFace model card for document
encoding.

This method will accommodate MULTIPLE text inputs, and return a list of
embeddings, but the calling context of create_embedding() is a SINGULAR input +
output. This method keeps the ability to handle multiple inputs + outputs, in the
event we want something like a create_multiple_embeddings() method in the future,
but only returns a single result.

At a very high level, the following is performed:

1. We tokenize the input text into "features" using the model's tokenizer.
embedding_inputs: Iterator[EmbeddingInput],
) -> Iterator[Embedding]:
"""Yield Embeddings for multiple EmbeddingInputs.

2. The features are fed to the model returning model output logits. These logits
are "dense" in the sense there are few zeros, but they are not "dense vectors"
(embeddings) in the sense that they meaningfully represent the input document in
geometric space; two logit tensors cannot be compared with something like cosine
similarity.
If env var TE_NUM_WORKERS is set and >1, the encoding lib sentence-transformers
will automatically create a pool of worker processes to work in parallel.

3. The logits are then converted into a sparse vector, which is a numeric
array of floats with the same number of values as the model's vocabulary. Each
value's position in the sparse array corresponds to the token id in the
vocabulary, and the value itself is the "weight" of this token in the input text.
Note: currently 2+ workers in amd64 and arm64 Docker contexts immediately exits
due to a "Bus Error". It is recommended to omit the env var TE_NUM_WORKERS, or
set to "1", in Docker contexts.

4. Lastly, we convert this sparse vector into a {token:weight} dictionary of the
actual token strings and their numerical weight. This dictionary may contain
tokens not present in the original text, but will be considerably shorter than
the model vocabulary length given all zero and low scoring tokens are dropped.
This is the final form that we will ultimately index into OpenSearch.
Currently, we also fully consume the input EmbeddingInputs before we start
embedding work. This may change in future iterations if we move to batching
embedding creation, so until then it's assumed that inputs to this method are
memory safe for the full run.

Args:
texts: list of strings to create embeddings for
embedding_inputs: iterator of EmbeddingInputs
"""
if self._model is None or self._tokenizer is None:
raise RuntimeError("Model not loaded. Call load() before create_embedding.")
# consume input EmbeddingInputs
embedding_inputs_list = list(embedding_inputs)
if not embedding_inputs_list:
return

# extract texts from all inputs
texts = [embedding_input.text for embedding_input in embedding_inputs_list]

# read env vars for configurations
num_workers = int(os.getenv("TE_NUM_WORKERS", "1"))
batch_size = int(
os.getenv("TE_BATCH_SIZE", "32")
) # sentence-transformers default

# configure device and worker pool based on number of workers requested
if num_workers > 1 or self.device == "mps":
device = None
pool = self._model.start_multi_process_pool(
[self.device for _ in range(num_workers)]
)
else:
device = self.device
pool = None
logger.info(
f"Num workers: {num_workers}, batch size: {batch_size}, "
f"device: {device}, pool: {pool}"
)

# tokenize the input texts
features = self._tokenizer(
# get sparse vector embedding for input text(s)
sparse_vectors = self._model.encode_document(
texts,
padding=True,
truncation=True,
return_tensors="pt", # returns PyTorch tensors instead of Python lists
return_token_type_ids=False,
batch_size=batch_size,
device=device,
pool=pool,
save_to_cpu=True,
)
sparse_vectors = cast("list[Tensor]", sparse_vectors)

# move to CPU or GPU device, depending on what's available
features = {k: v.to(self._device) for k, v in features.items()}

# pass features to the model and receive model output logits as a tensor
with torch.no_grad():
output = self._model(**features)[0]

# generate sparse vectors from model logits tensor
sparse_vectors = self._get_sparse_vectors(features, output)

# decode sparse vectors to token-weight dictionaries
decoded = self._decode_sparse_vectors(sparse_vectors)

# return list of tuple(vector, decoded token weights) embedding results
return [(sparse_vectors[i], decoded[i]) for i in range(len(texts))]
for i, embedding_input in enumerate(embedding_inputs_list):
sparse_vector = sparse_vectors[i]
sparse_vector = cast("Tensor", sparse_vector)
yield self._get_embedding_from_sparse_vector(embedding_input, sparse_vector)

def _get_sparse_vectors(
self, features: dict[str, torch.Tensor], output: torch.Tensor
) -> torch.Tensor:
"""Convert model logits output to sparse vectors.

This follows the HuggingFace model card exactly: https://huggingface.co/
opensearch-project/opensearch-neural-sparse-encoding-doc-v3-gte#usage-huggingface

This implements the get_sparse_vector function from the model card:
1. Max pooling with attention mask
2. log(1 + log(1 + relu())) transformation
3. Zero out special tokens

The end result is a sparse vector with a length of the model vocabulary, with each
position representing a token in the model vocabulary and each value representing
that token's weight relative to the input text.

Args:
features: Tokenizer output with attention_mask
output: Model logits of shape (batch_size, seq_len, vocab_size)

Returns:
Sparse vectors of shape (batch_size, vocab_size)
"""
# collapse sequence positions: take max logit for each vocab token across all
# positions (also masks out padding tokens)
values, _ = torch.max(output * features["attention_mask"].unsqueeze(-1), dim=1)

# compress values to create sparsity: ReLU removes negatives,
# double-log shrinks large values
values = torch.log(1 + torch.log(1 + torch.relu(values)))

# remove special tokens like [CLS], [SEP], [PAD]
values[:, self._special_token_ids] = 0

return values

def _decode_sparse_vectors(
self, sparse_vectors: torch.Tensor
) -> list[dict[str, float]]:
"""Convert sparse vectors to token-weight dictionaries.
def _get_embedding_from_sparse_vector(
self,
embedding_input: EmbeddingInput,
sparse_vector: Tensor,
) -> Embedding:
"""Prepare Embedding from EmbeddingInput and calculated sparse vector.

Handles both single vectors and batches, returning a list of dictionaries mapping
token strings to their weights.
This shared method is used by create_embedding() and create_embeddings() to
prepare and return an Embedding. A sparse vector is provided, which is decoded
into a dictionary of tokens:weights, and a final Embedding instance is returned.

Args:
sparse_vectors: Tensor of shape (batch_size, vocab_size) or (vocab_size,)

Returns:
List of dictionaries with token-weight pairs
embedding_input: EmbeddingInput
sparse_vector: sparse vector returned by model
"""
if sparse_vectors.dim() == 1:
sparse_vectors = sparse_vectors.unsqueeze(0)

# move to CPU for processing
sparse_vectors_cpu = sparse_vectors.cpu()

results: list[dict] = []
for vector in sparse_vectors_cpu:

# find non-zero indices and values
nonzero_indices = torch.nonzero(vector, as_tuple=False).squeeze(-1)

if nonzero_indices.numel() == 0:
results.append({})
continue

# get weights
weights = vector[nonzero_indices].tolist()

# convert indices to token strings
token_ids = nonzero_indices.tolist()
tokens = self._tokenizer.convert_ids_to_tokens(token_ids) # type: ignore[union-attr]
# get decoded dictionary of tokens:weights
decoded_token_weights = self._model.decode(sparse_vector)
decoded_token_weights = cast("list[tuple[str, float]]", decoded_token_weights)
embedding_token_weights = dict(decoded_token_weights)

# create token:weight dictionary
token_dict = {
token: weight
for token, weight in zip(tokens, weights, strict=True)
if token is not None
}
results.append(token_dict)
# prepare sparse vector for JSON serialization
embedding_vector = sparse_vector.to_dense().tolist()

return results
return Embedding(
timdex_record_id=embedding_input.timdex_record_id,
run_id=embedding_input.run_id,
run_record_offset=embedding_input.run_record_offset,
model_uri=self.model_uri,
embedding_strategy=embedding_input.embedding_strategy,
embedding_vector=embedding_vector,
embedding_token_weights=embedding_token_weights,
)
Loading