## IMPORTANT
1. Run Milvus Docker first

Imports

In [16]:
from pymilvus import connections, DataType, CollectionSchema, FieldSchema, Collection, Partition, utility
import openai
import pandas as pd
import numpy as np
import re
import json
from openai.embeddings_utils import get_embedding
import time
from tqdm import tqdm
import fasttext


Constants

In [17]:
OPENAI_API_KEY = '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'
max_tokens = 8000
dimensions = {'openai' : 1536,
            'fasttext' : 300}
openai.api_key = OPENAI_API_KEY

Mutable variables

In [18]:
# Change partition_name based on kind of data
partition_name = 'scs_about'
# Change embedder to either fasttext openai 

embedder = 'openai'
dimension = dimensions[embedder]
bundled_schema = {'rmrj_articles': ['author', 'title', 'published_date', 'text'],
                  'facebook_posts': ['text', 'time', 'link'],
                  'usjr_about': ['text', 'content_id'],
                  'contacts': ['text', 'contact', 'department'],
                  'scs_about': ['text', 'link', 'title'],
                  'religious_admin': ['text', 'name', 'position', 'media'],
                  'all': ['media','contact', 'department', 'author', 'title', 'published_date', 'text', 'time', 'post', 'link', 'content_id']}
collection_names = bundled_schema[partition_name]
json_path = f'raw_jsons/{partition_name}.json'
description = 'description'
if embedder == 'fasttext':
    fasttext_model = fasttext.load_model('/Users/garfieldgreglim/Library/Mobile Documents/com~apple~CloudDocs/Josenian-Query/Final Outputs/Jupyter Notebooks/Embedder/crawl-300d-2M-subword.bin')

Function definitions:

In [19]:
def get_embedding(text, embedding_type):
    text = text.replace("\n", " ")
    model = "text-embedding-ada-002"
    if embedding_type == 'openai':
        return openai.Embedding.create(input = [text], model=model)['data'][0]['embedding']
    elif embedding_type == 'fasttext':
        return fasttext_model.get_sentence_vector(text)
    else:
        raise ValueError("Invalid embedding_type. Expected 'openai' or 'fasttext'.")


Connection

In [20]:
# Check if the connection already exists
if connections.has_connection('default'):
    connections.remove_connection('default')  # Disconnect if it exists

# Now, reconnect with your new configuration
connections.connect(alias='default', host='localhost', port='19530')

Drop collection

In [21]:
# for name in collection_names:
#     utility.drop_collection(f"{name}_collection")
# utility.list_collections()

Collection schema definition

In [22]:
collections = {}  # To store the created collections

for name in collection_names:
    if name not in utility.list_collections():
        fields = [
            FieldSchema(name="uuid", dtype=DataType.VARCHAR, is_primary=True, max_length=36),
            FieldSchema(name=name, dtype=DataType.VARCHAR, max_length=5000),
            FieldSchema(name="embeds", dtype=DataType.FLOAT_VECTOR, dim=dimension)
        ]

        schema = CollectionSchema(fields=fields, description=f"Collection for {name}")

        # Create the collection and store it in the dictionary
        collections[name] = Collection(name=f"{name}_collection", schema=schema)

List collections

In [23]:
utility.list_collections()

['content_id_collection',
 'link_collection',
 'title_collection',
 'published_date_collection',
 'department_collection',
 'media_collection',
 'LangChainCollection',
 'text_collection',
 'contact_collection',
 'name_collection',
 'position_collection',
 'time_collection',
 'author_collection']

Partition creation

In [24]:
for collection in collections.values():
    partition = Partition(collection, partition_name)

List partitions

In [25]:
for collection in collections.values():
    display(collection.partitions)

[{"name": "_default", "collection_name": "text_collection", "description": ""},
 {"name": "usjr_about", "collection_name": "text_collection", "description": ""},
 {"name": "facebook_posts", "collection_name": "text_collection", "description": ""},
 {"name": "rmrj_articles", "collection_name": "text_collection", "description": ""},
 {"name": "contacts", "collection_name": "text_collection", "description": ""},
 {"name": "religious_admin", "collection_name": "text_collection", "description": ""},
 {"name": "scs_about", "collection_name": "text_collection", "description": ""}]

[{"name": "_default", "collection_name": "link_collection", "description": ""},
 {"name": "facebook_posts", "collection_name": "link_collection", "description": ""},
 {"name": "scs_about", "collection_name": "link_collection", "description": ""}]

[{"name": "_default", "collection_name": "title_collection", "description": ""},
 {"name": "rmrj_articles", "collection_name": "title_collection", "description": ""},
 {"name": "scs_about", "collection_name": "title_collection", "description": ""}]

Index definition

In [26]:
index_params = {
  "metric_type": "L2", # Euclidean distance
  "index_type": "FLAT", # FLAT index type
  "params": {} # No additional parameters needed for FLAT
}

Index creation

In [27]:
for collection in collections.values():
    collection.create_index("embeds", index_params)

## Data Processing

Loading

In [28]:
json_path = "json_per_collection/" 
def open_json(filename):
    with open(filename + ".json") as file:
        return json.load(file)

obj_list = {}
for name in collection_names:
    obj_list[name] = open_json(json_path + f"{partition_name}_{name}")

for name in collection_names:
    for obj in obj_list[name]:
        if len(obj[name]) > 5000:
            obj[name] = obj[name][:2480]


Upserting

In [29]:
for name in collection_names:
    collection = Collection(f"{name}_collection")
    print(collection.insert(obj_list[name], partition_name=partition_name))


(insert count: 11, delete count: 0, upsert count: 0, timestamp: 442939816356085765, success count: 11, err count: 0)
(insert count: 11, delete count: 0, upsert count: 0, timestamp: 442939816369192965, success count: 11, err count: 0)
(insert count: 11, delete count: 0, upsert count: 0, timestamp: 442939816369192975, success count: 11, err count: 0)


In [30]:
print(collection.flush())

None
