# core

> core functions

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *
%load_ext autoreload
%autoreload 2

In [None]:
#| export
from emb_opt.imports import *

  from .autonotebook import tqdm as notebook_tqdm


## Query Result

The `QueryResult` serves as a data model to define the data required to integrate with the library. To integrate a new backend, you only need to figure out how to hook the returned information into the `QueryResult` class.

* `query_idx` - query vectors are passed as a numpy array of size `(n, vector_size)`. This value corresponds to the index of the query vector that generated the specific result
* `db_idx` - index value from the database
* `embedding` - the embedding of the item
* `distance` - the distance from the query vector to the embedding
* `data` - a dictionary of any other values you want to track

For more examples, see `HFDatabase`, `FaissDatabase`, `QdrantDatabase` or `ChromaDatabase`

In [None]:
#| export
class QueryResult():
    def __init__(self, 
                 query_idx: int, # index of query vector
                 db_idx: int, # index of item in database
                 embedding: np.ndarray, # item embedding
                 distance: float, # distance to query vector
                 data: dict # any associated data
                ):
        self.query_idx = query_idx
        self.db_idx = db_idx
        self.embedding = embedding
        self.distance = distance
        self.data = data
        
    def to_dict(self) -> dict:
        return {
            'query_idx' : self.query_idx,
            'db_idx' : self.db_idx,
            'embedding' : self.embedding,
            'distance' : self.distance,
            'data' : self.data
        }

In [None]:
#| export
def dataset_from_query_results(query_results: list[QueryResult]) -> Dataset:
    'generates a `Dataset` from a list of `QueryResult`'
    data_dicts = [i.to_dict() for i in query_results]
    return Dataset.from_list(data_dicts)

In [None]:
query_vecs = np.random.randn(2, 256)

vector_database = np.random.randn(64, 256)

topk = 24

dists = ((query_vecs[:,None] - vector_database[None])**2).sum(-1)**0.5
nearest = dists.argsort(-1)[:, -topk:]

query_results = []

for query_idx in range(query_vecs.shape[0]):
    for db_idx in nearest[query_idx]:
        result = QueryResult(query_idx, db_idx, vector_database[db_idx], 
                             dists[query_idx, db_idx], {'randint': np.random.randint(0,100)})
        query_results.append(result)
        
query_dataset = dataset_from_query_results(query_results)

## Filter

The `Filter` class allows query results to be filtered by some function. See the [Huggingface Filter documentation](https://huggingface.co/docs/datasets/process#select-and-filter) for examples of filter functions and optional arguments.

In [None]:
#| export
class Filter():
    def __init__(self, 
                 filter_func: Callable, # function to filter
                 filter_kwargs_dict: Optional[dict]=None # optional kwargs dict passed to `Dataset.filter`
                ):
        self.filter_func = filter_func
        self.filter_kwargs_dict = filter_kwargs_dict if filter_kwargs_dict else {}
        
    def __call__(self, query_dataset: Dataset) -> Dataset:
        return query_dataset.filter(lambda item: self.filter_func(item), **self.filter_kwargs_dict)
    
class PassThroughFilter(Filter):
    'Dummy filter'
    def __init__(self):
        pass
    def __call__(self, query_dataset: Dataset) -> Dataset:
        return query_dataset

In [None]:
def simple_filter(row):
    return row['data']['randint'] < 20

# basic filtering
f = Filter(simple_filter)
filtered_dataset = f(query_dataset)
assert len(filtered_dataset) < len(query_dataset)

# multiprocess
f = Filter(simple_filter, {'num_proc':2})
filtered_dataset = f(query_dataset)
assert len(filtered_dataset) < len(query_dataset)

# batched
def batched_filter(batch):
    randints = np.array([i['randint'] for i in batch['data']])
    return randints < 20

f = Filter(batched_filter, {'batched':True})
filtered_dataset = f(query_dataset)
assert len(filtered_dataset) < len(query_dataset)

# dummy
f = PassThroughFilter()
filtered_dataset = f(query_dataset)
assert len(filtered_dataset) == len(query_dataset)

                                                                                

## Score

The `Score` class holds the score function we want to maximize. The score function will be given all the information in `QueryResult`. See [Huggingface Map Documentation](https://huggingface.co/docs/datasets/process#map) for available kwargs.

In [None]:
#| export
class Score():
    def __init__(self, 
                 score_func: Callable, # score function to maximize
                 map_kwargs_dict: Optional[dict]=None # optional kwargs for `Dataset.map`
                ):
        self.score_func = score_func
        self.map_kwargs_dict = map_kwargs_dict if map_kwargs_dict else {}
        
    def __call__(self, query_dataset: Dataset) -> Dataset:
        
        return query_dataset.map(lambda item: {'score' : self.score_func(item)}, **self.map_kwargs_dict)

In [None]:
def simple_score(row):
    return np.random.randn()

score = Score(simple_score)
scored_dataset = score(query_dataset)

# multiprocess
score = Score(simple_score, {'num_proc' : 2})
scored_dataset = score(query_dataset)

# batched
def batch_score(batch):
    return np.arange(len(batch['query_idx']))

score = Score(batch_score, {'batched' : True})
scored_dataset = score(query_dataset)

                                                                                

In [None]:
#| export
class VectorDatabase():
    'Base class for vector database backends'
    def query(self, query_vectors: np.ndarray) -> Dataset:
        raise NotImplementedError

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()