In [39]:
from typing import Dict, List, Annotated
import numpy as np
import os
import time
from dataclasses import dataclass
from memory_profiler import memory_usage

DB_SEED_NUMBER = 42
ELEMENT_SIZE = np.dtype(np.float32).itemsize
DIMENSION = 70
from sklearn.cluster import MiniBatchKMeans, KMeans
from numpy.linalg import norm
import pickle

class IVF:
    def __init__(self, nlist: int):
        """
        :param nlist: Number of Voronoi cells (partitions)
        """
        self.nlist = nlist  # Number of Voronoi cells
        self.centroids = None  # IVF centroids (cluster centers)
        self.assignments = []  # Vector assignments to clusters
        self.vectors = None  # Store original vectors
        self.posting_lists = None  # Inverted file lists for each cluster

    def train(self, vectors: np.ndarray) -> None:
        """
        Fit the IVF model to the input vectors by clustering them into 'nlist' clusters.
        
        :param vectors: A 2D numpy array with shape (n, d), where n is the number of vectors and d is their dimensionality.
        """
        n, d = vectors.shape
        self.vectors = vectors  # Store original vectors

        # Perform KMeans clustering to find the centroids
        kmeans = KMeans(n_clusters=self.nlist, random_state=42)
        self.assignments = kmeans.fit_predict(vectors)  # Assign each vector to a cluster
        self.centroids = kmeans.cluster_centers_  # Get the cluster centroids

        # Build the posting lists: a dictionary where key is the cluster index, and value is a list of vector indices
        self.posting_lists = {i: [] for i in range(self.nlist)}  
        for i, label in enumerate(self.assignments):
            self.posting_lists[label].append(i)

    def search(self, query: np.ndarray, top_k: int) -> list:
        """
        Perform a search for the 'top_k' closest vectors to the query vector.
        
        :param query: A 1D numpy array representing the query vector.
        :param top_k: The number of top nearest vectors to return.
        
        :return: A list of the top_k closest vectors from the dataset.
        """
        # Find the closest centroid to the query vector
        distances_to_centroids = np.linalg.norm(self.centroids - query, axis=1)
        nearest_centroid_idx = np.argmin(distances_to_centroids)

        # Get the candidate vectors assigned to the closest centroid
        candidate_indices = self.posting_lists[nearest_centroid_idx]

        return candidate_indices

    def save_model(self, filepath='ivfpq_model.dat'):
        with open(filepath, 'wb') as f:
            pickle.dump({
                'centroids': self.centroids,
                'inverted_lists': self.posting_lists
            }, f)

    def load_model(self, filepath='ivfpq_model.dat'):
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            self.centroids = data['centroids']
            self.posting_lists = data['inverted_lists']




class VecDB:
    def __init__(self, database_file_path = "saved_db.dat", index_file_path = "index.dat", new_db = True, db_size = None) -> None:
        self.db_path = database_file_path
        self.index_path = index_file_path
        self.index = IVF(500)
        self.index.load_model()
        if new_db:
            if db_size is None:
                raise ValueError("You need to provide the size of the database")
            # delete the old DB file if exists
            if os.path.exists(self.db_path):
                os.remove(self.db_path)
            self.generate_database(db_size)
    
    def generate_database(self, size: int) -> None:
        rng = np.random.default_rng(DB_SEED_NUMBER)
        vectors = rng.random((size, DIMENSION), dtype=np.float32)
        self._write_vectors_to_file(vectors)
        self._build_index()

    def _write_vectors_to_file(self, vectors: np.ndarray) -> None:
        mmap_vectors = np.memmap(self.db_path, dtype=np.float32, mode='w+', shape=vectors.shape)
        mmap_vectors[:] = vectors[:]
        mmap_vectors.flush()

    def _get_num_records(self) -> int:
        return os.path.getsize(self.db_path) // (DIMENSION * ELEMENT_SIZE)

    def insert_records(self, rows: Annotated[np.ndarray, (int, 70)]):
        num_old_records = self._get_num_records()
        num_new_records = len(rows)
        full_shape = (num_old_records + num_new_records, DIMENSION)
        mmap_vectors = np.memmap(self.db_path, dtype=np.float32, mode='r+', shape=full_shape)
        mmap_vectors[num_old_records:] = rows
        mmap_vectors.flush()
        #TODO: might change to call insert in the index, if you need
        self._build_index()

    def get_one_row(self, row_num: int) -> np.ndarray:
        # This function is only load one row in memory
        try:
            offset = row_num * DIMENSION * ELEMENT_SIZE
            mmap_vector = np.memmap(self.db_path, dtype=np.float32, mode='r', shape=(1, DIMENSION), offset=offset)
            return np.array(mmap_vector[0])
        except Exception as e:
            return f"An error occurred: {e}"

    def get_all_rows(self) -> np.ndarray:
        # Take care this load all the data in memory
        num_records = self._get_num_records()
        vectors = np.memmap(self.db_path, dtype=np.float32, mode='r', shape=(num_records, DIMENSION))
        return np.array(vectors)
    
    def retrieve(self, query: Annotated[np.ndarray, (1, DIMENSION)], top_k = 5):
        # scores = []
        # num_records = self.index.search(query, top_k)
        # # here we assume that the row number is the ID of each vector
        # for i, row_num in enumerate(num_records):
        #     vector = self.get_one_row(row_num)
        #     score = self._cal_score(query, vector)
        #     scores.append((score, row_num))
        # # here we assume that if two rows have the same score, return the lowest ID
        # scores = sorted(scores, reverse=True)[:top_k]
        # indx=[idx for _, idx in scores ]
        # return indx
        indices = self.index.search(query, top_k)  # Retrieve more than top_k for better selection.
    
        maxvalues = [float('-inf')] * top_k
        top_indices = [-1] * top_k
        for i, index in enumerate(indices):
            vector = self.get_one_row(index)
            score = self._cal_score(query, vector)
                    # Find the position of the smallest score in maxvalues
            min_score_index = maxvalues.index(min(maxvalues))
            if score > maxvalues[min_score_index]:
                maxvalues[min_score_index] = score
                top_indices[min_score_index] = index
                # Combine the scores and indices, sort by score (descending), and extract indices
        combined = sorted(zip(maxvalues, top_indices), reverse=True)
        
        return [idx for _, idx in combined if idx != -1]
    
    def _cal_score(self, vec1, vec2):
        dot_product = np.dot(vec1, vec2)
        norm_vec1 = np.linalg.norm(vec1)
        norm_vec2 = np.linalg.norm(vec2)
        cosine_similarity = dot_product / (norm_vec1 * norm_vec2)
        return cosine_similarity

    def _build_index(self):
        # Placeholder for index building logic
        vectors = self.get_all_rows()
        self.index.train(vectors)
        self.index.save_model()


# This snippet of code is to show you a simple evaluate for VecDB class, but the full evaluation for project on the Notebook shared with you.


@dataclass
class Result:
    run_time: float
    top_k: int
    db_ids: List[int]
    actual_ids: List[int]
def monitor_retrieve_memory(db, query, top_k=5):
    mem_usage = memory_usage(
        (db.retrieve, (query,), {'top_k': top_k}),
        interval=0.1,
        retval=True
    )
    peak_memory_used = max(mem_usage[0]) - min(mem_usage[0])
    result = mem_usage[1]  # The actual result from retrieve
    print(f"Peak memory used during retrieve: {peak_memory_used:.2f} MiB")
    return result

def run_queries(db, np_rows, top_k, num_runs):
    results = []
    peak_mem_usage =0
    for _ in range(num_runs):
        query = np.random.random((1,70))
        
        tic = time.time()
        db_ids = db.retrieve(query, top_k)
        toc = time.time()
        run_time = toc - tic
        peak_mem_usage += max(memory_usage((db.retrieve, (query,), {'top_k': 5}), interval=0.1)) - min(memory_usage())
        tic = time.time()
        actual_ids = np.argsort(np_rows.dot(query.T).T / (np.linalg.norm(np_rows, axis=1) * np.linalg.norm(query)), axis= 1).squeeze().tolist()[::-1]
        toc = time.time()
        np_run_time = toc - tic
        
        results.append(Result(run_time, top_k, db_ids, actual_ids))
    print(f"Memory Used: {peak_mem_usage:.2f} MiB")
    return results

def eval(results: List[Result]):
    # scores are negative. So getting 0 is the best score.
    scores = []
    run_time = []
    for res in results:
        run_time.append(res.run_time)
        # case for retrieving number not equal to top_k, score will be the lowest
        if len(set(res.db_ids)) != res.top_k or len(res.db_ids) != res.top_k:
            scores.append( -1 * len(res.actual_ids) * res.top_k)
            continue
        score = 0
        for id in res.db_ids:
            try:
                ind = res.actual_ids.index(id)
                if ind > res.top_k * 3:
                    score -= ind
            except:
                score -= len(res.actual_ids)
        scores.append(score)

    return sum(scores) / len(scores), sum(run_time) / len(run_time)


if __name__ == "__main__":
    # db = VecDB(new_db=False)
    db = VecDB(db_size=10**6)

    all_db = db.get_all_rows()

    res = run_queries(db, all_db, 5, 10)
    print(eval(res))
    




KeyboardInterrupt: 