## Load data from author json file

In [None]:
import logging
from tqdm.auto import tqdm
from pymilvus import connections, utility, Collection
from embedding_search.vector_store import (
    AUTHORS_DIR,
    create_author_collection,
    create_article_collection,
    make_articles_data_packages,
    push_data,
)

logging.basicConfig(level=logging.DEBUG)

### Connect to Milvus

In [None]:
# `standalone` is the service name from docker-compose
connections.connect("default", host="milvus-standalone", port="19530")
print(utility.get_server_version())

## Make Milvus collections

Init collection and load data from json file

In [None]:
# create_author_collection()
# create_article_collection()

In [None]:
utility.list_collections()

List collections

In [None]:
connections.disconnect(alias="default")

In [None]:
author_collection = Collection("authors")
print(f"There are {author_collection.num_entities} authors in the DB.")

article_collection = Collection("articles")
print(f"There are {article_collection.num_entities} articles in the DB.")

In [None]:
author_collection

In [None]:
from api.core import Engine
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
engine = Engine(author_collection, article_collection, embeddings)

In [None]:
y = engine.embed("I will be late")

In [None]:
articles = engine.search_articles("Dark Higg's Boson")

In [None]:
articles

In [None]:
max([x['distance'] for x in y])

In [None]:
y = engine.search_authors("Higgs boson, who discover it", top_k=3)

In [None]:
y

In [None]:
engine.get_author(106927)

In [None]:
def query_author(author_id: str):
    """Get author details from Milvus."""

    author = author_collection.query(
        expr=f"id == {author_id}",
        output_fields=["first_name", "last_name", "community_name"],
        limit=1,
    )

    if not author:
        retur
    
    return author[0]

In [None]:
query_author(106927)

## Ingest data

Authors

In [None]:
AUTHORS_DIR

In [None]:
author_ids = [path.stem for path in AUTHORS_DIR.glob("*.json")]

# DEBUG
# author_ids = author_ids[:100]

# with Pool(8) as p:
#     data_packages = p.map(make_author_data_package, author_ids)

In [None]:
push_data(author_ids[101])

In [None]:
tmp = make_articles_data_packages(author_ids[2026])

In [None]:
len(tmp)

In [None]:
author_collection.insert(data_packages)
author_collection.flush()

Articles

In [None]:
author_ids = [path.stem for path in AUTHORS_DIR.glob("*.json")]
author_ids = author_ids[:100]

article_collection = Collection("articles")
for author_id in tqdm(author_ids):
    data_packages = make_articles_data_packages(author_id)
    article_collection.insert(data_packages)

article_collection.flush()

## Create Milvus index

In [None]:
index_params = {
    "metric_type": "IP",  # inner product
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024},
}

In [None]:
article_collection.create_index("embedding", index_params)

In [None]:
author_collection.create_index("embedding", index_params)

In [None]:
utility.index_building_progress("articles")

In [None]:
utility.index_building_progress("authors")

## Load collection and test search

In [None]:
author_collection.load()

In [None]:
article_collection.load()

In [None]:
from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
search_vector = embeddings.embed_query("Dark Higgs's boson")

In [None]:
articles = article_collection.search(
    data=[search_vector],
    anns_field="embedding",
    param={"metric_type": "IP", "params": {"nprobe": 16}},
    limit=10,
    output_fields=["title", "author_id", "doi"],
)

In [None]:
authors = author_collection.query(
    expr="id == 106927",
    offset=0,
    limit=1,
    output_fields=["id", "first_name", "last_name", "community_name"],
)

In [None]:
from typing import Any
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.base import Embeddings

ARTICLE_COLLECTION = Collection(name="articles")
ARTICLE_COLLECTION.load()
EMBEDDINGS = OpenAIEmbeddings()


def search(
    query: str,
    output_fields: list,
    top_k: int = 3,
    distance_threshold: float = 0.2,
    pow: float = 3.0,
) -> list:
    """Search for articles by query."""

    # Embed query
    search_vector = EMBEDDINGS.embed_query(query)
    print(search_vector)

    # Search in Milvus
    search_params = {"metric_type": "IP", "params": {"nprobe": 16}}
    articles = article_collection.search(
        data=[search_vector],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=output_fields,
    )

    return articles.__dict__

In [None]:
r = search("galaxy far away")

In [None]:
r[0][2].entity.get("author_id")

In [None]:
from pymilvus.orm.search import SearchResults


def _flatten_results(results: SearchResults) -> list[dict]:
    """Flatten Milvus search results."""

    return [
        {
            "ids": result.ids,
            "distances": result.distances,
            "scores": result.scores,
        }
        for result in results[0]
    ]

In [None]:
r.on_result()

In [None]:
type(r)

In [None]:
r[0][0].__dict__

In [None]:
for x in r[0]:
    print(type(x))

In [None]:
def search_with_emb

In [None]:
articles[0][0]

Get only the title

In [None]:
articles[0][0].entity.get("title")

## Drop collection

In [None]:
utility.list_collections()

In [None]:
# utility.drop_collection("articles")

# Dev

In [None]:
def search(
    query: str,
    top_k: int,
    n: int = 1000,
    distance_threshold: float = 0.2,
    pow: float = 3.0,
) -> list:
    """Search for author by a query.
    
    Each author is given by a score, defined as:
    $$ S_j = \sum_{i=1}^n (1 - d_{i,j})^p $$

    where $d_{i,j}$ is the distance between the query and the $i$-th article of the $j$-th author.
    The value $d$ is also clipped by the `distance_threshold` $t$, i.e.,

    $$
    d = 
    \begin{cases} 
    d & \text{if } d \leq t \\
    1 & \text{if } d > t 
    \end{cases}
    $$

    Args:
        query (str): Query string.
        top_k (int, optional): Number of authors to return. 
        n (int, optional): Number of articles $n$ in the weighting pool. Defaults to 1000.
        distance_threshold (float, optional): Distance threshold. Defaults to 0.2.
        pow (float, optional): Power in weighting function $p$. Defaults to 3.0.
    
    """

    search_vector = cached_resources["embeddings"].embed_query(query)
    logging.debug(search_vector)

    search_params = {"metric_type": "IP", "params": {"nprobe": 16}}
    results = cached_resources["article_collection"].search(
        data=[search_vector],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=["title", "author_id"],
    )

    def _flatten(result) -> dict:
        """Flatten a result."""

        flat_result = {}
        entity = result.entity
        flat_result["distance"] = result.distance
        for key, value in entity.items():
            flat_result[key] = value
        return flat_result

    # Flatten and filter by distance threshold
    results = [
        _flatten(result)
        for result in results[0]
        if result.distance < distance_threshold
    ]

    # Calculate author scores = Sum((1-distance) ** pow)

    author_scores: dict[str, float] = {}
    for result in results:
        author_id = result[author_id]
        weight = (1 - result["distance"]) ** pow
        result["weight"] = weight
        author_scores[author_id] = author_scores.get(author_id, 0) + weight

    logging.debug(author_scores)
    top_ids = sort_key_by_value(author_scores, reversed=True)[:top_k]

    return top_ids

In [None]:
search("black hole", 3)