# Load embedding data into Milvus

First, import some common libraries and define the data reading functions.

In [1]:
# Import libraries.
import time
import pandas as pd
import numpy as np

# Output words instead of scores.
def sentiment_score_to_name(score: float):
    if score > 0:
        return "Positive"
    elif score <= 0:
        return "Negative"
    
def partition_dataset(df_input, smoke_test=False):
    """Splits data, assuming original, input dataframe contains 50K rows.

    Args:
        df_input (pandas.DataFrame): input data frame
        smoke_test (boolean): if True, use smaller number of rows for testing
    
    Returns:
        df_train, df_val, df_test (pandas.DataFrame): train, valid, test splits.
    """

    # Shuffle data and split into train/val/test.
    df_shuffled = df_input.sample(frac=1, random_state=1).reset_index()
    # Add a corpus index.
    columns = ['movie_index', 'text', 'label_int', 'label']
    df_shuffled.columns = columns

    df_train = df_shuffled.iloc[:35_000]
    df_val = df_shuffled.iloc[35_000:40_000]
    df_test = df_shuffled.iloc[40_000:]

    # Save train/val/test split data locally in separate files.
    df_train.to_csv("train.csv", index=False, encoding="utf-8")
    df_val.to_csv("val.csv", index=False, encoding="utf-8")
    df_test.to_csv("test.csv", index=False, encoding="utf-8")

    if smoke_test: 
        # Create small smoke_test datasets for easier testing purposes.
        df_train_small = df_train.copy()
        df_train_small = pd.concat([class1.iloc[0:100,:], class2.iloc[0:100,:]],
                        join="outer",
                        ignore_index=True)
        df_train_small.to_csv("train-small.csv", index=False, encoding="utf-8")

        df_val_small = df_val.copy()
        df_val_small = df_val.head(100)
        df_val_small.to_csv("val-small.csv", index=False, encoding="utf-8")

        df_test_small = df_test.copy()
        df_test_small = df_test.head(8)
        df_test_small.to_csv("test-small.csv", index=False, encoding="utf-8")

        # Concatenate together so df_small has consistent corpus_index.
        df_small = pd.concat([df_train_small, df_val_small, df_test_small],
                        join="outer",
                        ignore_index=True)
        return df_small, df_train_small, df_val_small, df_test_small

    return df_shuffled, df_train, df_val, df_test

## Start up a local Milvus server.

In [5]:
# https://milvus.io/docs/example_code.md
# https://pymilvus.readthedocs.io/en/latest/tutorial.html

# !wget https://raw.githubusercontent.com/milvus-io/pymilvus/master/examples/hello_milvus.py

Code in this notebook uses [Milvus lite](https://milvus.io/docs/milvus_lite.md), which runs a local server.  ⛔️ Milvus lite is only meant for demos and local testing.
- pip install milvus pymilvus

💡 **For production purposes**, use a local Milvus docker, Milvus clusters, or fully-managed Milvus on Zilliz Cloud.
- [Local Milvus docker](https://milvus.io/docs/install_standalone-docker.md) requires local docker installed and running.
- [Milvus clusters](https://milvus.io/docs/install_cluster-milvusoperator.md) requires a K8s cluster up and running.
- [Ziliz Cloud free trial](https://cloud.zilliz.com/login)


In [6]:
from milvus import default_server, debug_server
from pymilvus import (
    connections, utility, FieldSchema, 
    DataType, CollectionSchema, Collection)

# Cleanup previous data and stop server in case it is still running.
default_server.stop()
default_server.cleanup()

# Start a new milvus-lite local server.
start_time = time.time()
default_server.start()

end_time = time.time()
print(f"startup time: {end_time - start_time}")
# startup time: 5.6739208698272705

# Add wait to avoid error message from trying to connect.
time.sleep(10)

# Now you could connect with localhost and the given port.
# Port is defined by default_server.listen_port.
connections.connect(host='127.0.0.1', 
                  port=default_server.listen_port,
                  show_startup_banner=True)

# Check if the server is ready.
print(utility.get_server_version())


startup time: 9.774549961090088
v2.2-testing-20230824-68-ga34a9d606-lite


## Read CSV data into a pandas dataframe

In [10]:
# Read data after it has been stored locally
filepath = "movie_data.csv"

df = pd.read_csv(f"{filepath}")

# Drop duplicates
print(df.shape)
df.drop_duplicates(keep='first', inplace=True)
print(df.shape)

# Change column names.
df.columns = ['text', 'label_int']

# Map numbers to text labels
df["label"] = df["label_int"].apply(sentiment_score_to_name)

# Append label to text for better rating classifier training.
df['text'] = df['text'] + ' (Rating: ' + df['label'].astype(str) + ')'

print(df.shape)     # (50000, 3)
display(df.head())
print(df.text[0])

# Split data into train/valid/test.
# See https://huggingface.co/docs/datasets/v1.4.0/add_dataset.html
# This utility function splits and saves each file locally.
# df, df_train, df_val, df_test = partition_dataset(df, smoke_test=True)
df, df_train, df_val, df_test = partition_dataset(df, smoke_test=False)

# Train/valid/test split assume 50K rows in original dataset.
print(f"df: {df.shape}, train: {df_train.shape}, val: {df_val.shape}, test: {df_test.shape}")
df.head(2)


(50000, 2)
(49582, 2)
(49582, 3)


Unnamed: 0,text,label_int,label
0,"In 1974, the teenager Martha Moxley (Maggie Gr...",1,Positive
1,OK... so... I really like Kris Kristofferson a...,0,Negative
2,"***SPOILER*** Do not read this, if you think a...",0,Negative
3,hi for all the people who have seen this wonde...,1,Positive
4,"I recently bought the DVD, forgetting just how...",0,Negative


In 1974, the teenager Martha Moxley (Maggie Grace) moves to the high-class area of Belle Haven, Greenwich, Connecticut. On the Mischief Night, eve of Halloween, she was murdered in the backyard of her house and her murder remained unsolved. Twenty-two years later, the writer Mark Fuhrman (Christopher Meloni), who is a former LA detective that has fallen in disgrace for perjury in O.J. Simpson trial and moved to Idaho, decides to investigate the case with his partner Stephen Weeks (Andrew Mitchell) with the purpose of writing a book. The locals squirm and do not welcome them, but with the support of the retired detective Steve Carroll (Robert Forster) that was in charge of the investigation in the 70's, they discover the criminal and a net of power and money to cover the murder.<br /><br />"Murder in Greenwich" is a good TV movie, with the true story of a murder of a fifteen years old girl that was committed by a wealthy teenager whose mother was a Kennedy. The powerful and rich family 

Unnamed: 0,movie_index,text,label_int,label
0,26813,"Fot the most part, this movie feels like a ""ma...",0,Negative
1,26581,Are you kidding me? The music was SO LOUD in t...,0,Negative


In [11]:
# Check if approx. equal number training examples for each class.
class1 = df_train.loc[(df_train.label == "Positive"), :].copy()
class2 = df_train.loc[(df_train.label == "Negative"), :].copy()
print(class1.shape, class2.shape)

(17606, 4) (17394, 4)


### Initialize the Embedding Model and Vector DB
**Embedding model:**  I will use an embedding model from [sentence transformers](https://www.sbert.net/docs/pretrained_models.html) hosted on huggingface to encode the movie review text.  Then, we will save the embeddings to the milvus database.

**Chunking:** Before embedding, it is necessary to decide your chunk and chunk overlap sizes.  In this hello_world demo, I will use the Review text length itself as the chunk length.  Occasionally reviews are very long, but mostly they are short paragraphs.

Note:  To keep your tokens private, best practice is to use an env variable.   <br>
In Jupyter, need .env file (in same dir as notebooks) containing lines like this:
- VARIABLE_NAME=value

In [12]:
# Import torch.
import torch

# Initialize torch settings
torch.backends.cudnn.deterministic = True
RANDOM_SEED = 415
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')
print(f"device: {DEVICE}")

import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
from huggingface_hub import login

# Login to huggingface_hub
hub_token = os.getenv("HUGGINGFACEHUB_API_TOKEN")
login(token=hub_token)

# Set tokenizers parallelism to false, to avoid issues with multiprocessing.
os.environ["TOKENIZERS_PARALLELISM"] = "false"


device: cpu
Token will not been saved to git credential helper. Pass `add_to_git_credential=True` if you want to set the git credential as well.
Token is valid (permission: write).
Your token has been saved to /Users/christybergman/.cache/huggingface/token
Login successful


In [13]:
from sentence_transformers import SentenceTransformer

# load the retriever model from huggingface model hub
model_name = "BAAI/bge-base-en-v1.5"
retriever = SentenceTransformer(model_name, device=DEVICE)
print(type(retriever))
print(retriever)

# Save params for later.
max_seq_length = retriever.get_max_seq_length() 
EMBEDDING_LENGTH = retriever.get_sentence_embedding_dimension()
print(f"model_name: {model_name}")
print(f"embedding vector length: {EMBEDDING_LENGTH}, max_seq_length: {max_seq_length}")


<class 'sentence_transformers.SentenceTransformer.SentenceTransformer'>
SentenceTransformer(
  (0): Transformer({'max_seq_length': 512, 'do_lower_case': True}) with Transformer model: BertModel 
  (1): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': True, 'pooling_mode_mean_tokens': False, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
)
model_name: BAAI/bge-base-en-v1.5
embedding vector length: 768, max_seq_length: 512


In [14]:
# TODO - move all this to a utility function.
# Prepare df for insertion into Milvus index.

# Batch of data from pandas DataFrame.
batch = df.head(5).copy()  # TODO - change back to 100 and put ths in a function.

# 1. Change primary key type to string.
batch["movie_index"] = batch["movie_index"].apply(lambda x: str(x))

# 2. Truncate reviews to 512 characters.
batch["text"] = batch["text"].apply(lambda x: x[:max_seq_length-1])

# 3. Add embeddings as new column in df.
review_embeddings = torch.tensor(retriever.encode(batch['text']))
# Normalize embeddings to unit length.
review_embeddings = torch.nn.functional.normalize(review_embeddings, p=2, dim=1)
# Quick check if embeddings are normalized.
norms = np.linalg.norm(review_embeddings, axis=1)
assert np.allclose(norms, 1.0, atol=1e-5) == True

# 4. Convert the embeddings to np.float32
converted_values = list(map(np.float32, review_embeddings))
batch['embeddings'] = converted_values

# 5. Reorder columns so pk first, labels at end.
new_order = ["movie_index", "text", "embeddings", "label_int", "label"]
batch = batch[new_order]

display(batch.head(2))
assert len(batch.text[0]) == max_seq_length-1
assert len(batch.embeddings[0]) == EMBEDDING_LENGTH
print(batch.dtypes)
print(f"type embeddings: {type(batch.embeddings[0])}, {type(batch.embeddings[0][0])}")

# milvus field random, only supports list
# milvus field embeddings, supports numpy.ndarray and list

Unnamed: 0,movie_index,text,embeddings,label_int,label
0,26813,"Fot the most part, this movie feels like a ""ma...","[-0.022307869, -0.038372956, -0.005369567, -0....",0,Negative
1,26581,Are you kidding me? The music was SO LOUD in t...,"[-0.024203338, -0.028892132, -0.04505902, 0.01...",0,Negative


movie_index    object
text           object
embeddings     object
label_int       int64
label          object
dtype: object
type embeddings: <class 'numpy.ndarray'>, <class 'numpy.float32'>


## Create a Milvus collection ("table") for the embeddings.

In [15]:
# Supported data types for Milvus are:
# INT64
# VARCHAR
# VECTOR as FLOAT32 arrays

# Note: One column of type DataType.FLOAT_VECTOR is mandatory!?
fields = [
    FieldSchema(name="movie_index", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=8),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_LENGTH),
    FieldSchema(name="label_int", dtype=DataType.INT64),
    FieldSchema(name="label", dtype=DataType.VARCHAR, max_length=8),
]

collection_name = "movies"
schema = CollectionSchema(fields, "Search imdb movie reviews")
mc = Collection(collection_name, schema, consistency_level="Strong")
print(f"Created collection: {collection_name}")

Created collection: movies


In [16]:
# Insert data into the Milvus collection
print("Start inserting entities")
insert_result = mc.insert(batch)

# insert_result = hello_milvus.insert(entities)

# After final entity is inserted, it is best to call flush to have no growing segments left in memory
mc.flush() 
print(insert_result)
# print(mc.partitions)            # Return the list[Partition] object.

# 3.1 seconds for 100 rows.

Start inserting entities
(insert count: 5, delete count: 0, upsert count: 0, timestamp: 444601275148664840, success count: 5, err count: 0)


In [21]:
# Build an index on the Milvus collection.
# Choice of index: https://milvus.io/docs/index.md
# How to use HNSW: # https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md
# DISTANCE_METRIC: One of L2, IP, COSINE 

# # On Zilliz Cloud - Proprietary index is fastest!
# index_params = {
#     # Always set this to AUTOINDEX or just omit it.
#     "index_type": "AUTOINDEX", 
#     # Default to IP. This is the only parameter you should think about.
#     "metric_type": "COSINE",
#     # No need to set `params`
# }

# Drop the index, just in case it exists.
mc.drop_index()

# Create the index.
index_params = {
    "index_type": "HNSW", 
    "metric_type": "COSINE", 
    "params": {'M': 16,               # int. 4~64, num_layers
               "efConstruction": 32}  # int. 8~512, num_nearest_neighbors
               }
mc.create_index("embeddings", index_params)

Status(code=0, message=)

#### Run a Semantic Search

With the database populated, it's now possible to search all of the movies based on their reviews. In this example, we search for a movie where ... . We get the embeddings for these docs, and then search our vector database for the 3 docs with the closest embeddings.

Use a HuggingFace Retriever model to encode the question and search for top-k.

This could be a different, or same model we used to create text encodings.  <br>
Below, we'll keep the same model.

In [22]:
# Before conducting a search or a query, you need to load the data in `hello_milvus` into memory.
mc.load()
print("Loaded milvus collection into memory.")

Loaded milvus collection into memory.


In [23]:
# Define a typical question for length estimation purposes.
query = 'Which index does Langchain Milvus default to?'
QUERY_LENGTH = len(query)
print(f"query length: {QUERY_LENGTH}")

# Embed the query using same embedding model as used for the Milvus collection.
query_embeddings = torch.tensor(retriever.encode([query]))
# Normalize embeddings to unit length.
query_embeddings = torch.nn.functional.normalize(query_embeddings, p=2, dim=1)
# Quick check if embeddings are normalized.
norms = np.linalg.norm(query_embeddings, axis=1)
assert np.allclose(norms, 1.0, atol=1e-5) == True

# Convert the embeddings to list of list of np.float32.
query_embeddings = list(map(np.float32, query_embeddings))

print(type(query_embeddings), len(query_embeddings), type(query_embeddings[0]))
print(type(query_embeddings[0][0]) ) #, len(query_embeddings[0][0]))

# type embeddings: <class 'numpy.ndarray'>, <class 'numpy.float32'>

query length: 45
<class 'list'> 1 <class 'numpy.ndarray'>
<class 'numpy.float32'>


In [45]:
# Execute a search.
# https://milvus.io/docs/search.md

top_k = 1  # return top k results
search_params = {
    "M": 16,
    "ef": 32,
}

start_time = time.time()
retrieved_documents = mc.search(data=query_embeddings, 
                      anns_field="embeddings", 
                      param=search_params,
                      output_fields=["text"], 
                      limit=top_k)
elapsed_time = time.time() - start_time
print(elapsed_time)

print(type(retrieved_documents), len(retrieved_documents))


0.48792433738708496


In [55]:
# Print the results
for result in retrieved_documents:
    print(result)

retrieved_documents[0][0].entity.text

['id: 13007, distance: 0.42943188548088074, entity: {\'text\': "Saying this movie is extremely hard to follow and just as frustrating to sit through is putting it very mildly. Also saying that the current available print is dark, dreary, scratchy, abysmally edited, painfully dubbed, seemingly censored and in almost unwatchable shape is also correct. This film is in dire need of a good remastering from the full, uncut, original negative and seeing how it\'s reasonably atmospheric (and won the director an award at the Catalonia Film Festival), it might actually be worth t"}']


"Saying this movie is extremely hard to follow and just as frustrating to sit through is putting it very mildly. Also saying that the current available print is dark, dreary, scratchy, abysmally edited, painfully dubbed, seemingly censored and in almost unwatchable shape is also correct. This film is in dire need of a good remastering from the full, uncut, original negative and seeing how it's reasonably atmospheric (and won the director an award at the Catalonia Film Festival), it might actually be worth t"

In [57]:
id = retrieved_documents[0][0].id
print(id, type(id))
df.loc[df.movie_index == 13007, :]


13007 <class 'str'>


Unnamed: 0,movie_index,text,label_int,label
4,13007,Saying this movie is extremely hard to follow ...,0,Negative


In [None]:
# query = "What is a good movie for a medical doctor to watch?"

In [None]:
%load_ext watermark
%watermark -a 'Christy Bergman' -v -p torch,transformers,milvus,langchain --conda