In [2]:
import sqlitedict as sqld
import numpy as np

In [16]:
with np.load('TestProject001/sample_news_2019-03-04/sample_news_2019-03-04_compressed.npz') as npz:
    ids = npz['ids']
    sents = npz['sents']
    embs = npz['embs']

In [4]:
id_map = sqld.SqliteDict('test.db')

In [12]:
for i in range(len(ids)):
    id_map[int(ids[i])] = f'{sents[i]}'

In [9]:
ids[0]


1

In [11]:
id_map[ids[1]]

'Online tools such as Smart Investor have valueResearch furthers your money educationBeing afraid of money\n'

In [16]:
id_map[3]

"doesn't help\n"

In [1]:
import gc
import os
import os.path as p
import faiss
import numpy as np
from pathlib import Path
import sqlitedict as sqld
from typing import List, Tuple, Union
from copy import deepcopy

from SimSent.vectorizer.sentence_vectorizer import SentenceVectorizer

W0424 12:26:33.504294 4591781312 __init__.py:56] Some hub symbols are not available because TensorFlow version is less than 1.14


In [2]:
sv = SentenceVectorizer()

Loading model: /Users/lukasferrer/Documents/SimSent/SimSent/vectorizer/model/1fb57c3ffe1a38479233ee9853ddd7a8ac8a8c47/
Initializing TF Session...


In [6]:
class IndexBuilder:
    # Return Types
    MMAP_ARRAYS = Tuple[np.array, np.array, np.array]
    BASE_INDEX = faiss.Index
    
    def __init__(self, project_dir: Path, 
                sentence_vectorizer: object = None, large_encoder: bool = False):
        self.project_dir = project_dir
        self.sub_dir = None
        self.seed_name = None

        if sentence_vectorizer:
            self.sv = sentence_vectorizer
        else:
            self.sv = SentenceVectorizer(large=large_encoder)

    def tsv_to_index(self, dump_tsv: Union[str, Path]):
        f_name = Path(dump_tsv).stem
        self.sub_dir = self.project_dir/f_name
        self.seed_name = f_name
        os.makedirs(p.abspath(self.sub_dir), exist_ok=True)

        # Vectorize to npz
        npz_name = self.sub_dir/f'{f_name}_compressed.npz'
        if not p.exists(npz_name):
            self.sv.prep_npz(input_tsv=dump_tsv, output_npz=npz_name)

        # Load as mmap arrays
        ids, embs, sents = self.load_npz(npz_name)
            
        # Prepare base index & write on-disk index
        base_index = self.train_base_index(embeddings=embs)  
        self.make_mmap_index(base_index,
                             embs=embs, ids=ids)

        # Get id-to-sent mapping
        self.populate_db(ids=ids, sents=sents)

    @staticmethod
    def load_npz(npz_name: Union[str, Path]) -> MMAP_ARRAYS:
        with np.load(npz_name, mmap_mode='r') as npz:
            ids = npz['ids']
            embs = npz['embs']
            sents = npz['sents']
        return ids, embs, sents
    
    def train_base_index(self, embeddings: np.array,
        n_centroids: int = 1024, compression: str = 'Flat'     # Or Flat
        ) -> BASE_INDEX:

        idx_type = f'IVF{n_centroids},{compression}'
        base_idx_pth = p.abspath(self.sub_dir/f'{idx_type}_base.index')

        if p.exists(base_idx_pth):
            index = faiss.read_index(base_idx_pth)
        else:
            index = faiss.index_factory(embeddings.shape[1], idx_type)
            index.train(embeddings)
            faiss.write_index(index, base_idx_pth)
        return index

    def make_mmap_index(self, base_index: BASE_INDEX,
                        ids: np.array, embs: np.array):

        # Get invlists
        index = faiss.clone_index(base_index)
        index.add_with_ids(embs, ids)
        ivf_vector = faiss.InvertedListsPtrVector()
        ivf_vector.push_back(index.invlists)
        index.own_invlists = False
        del index
        gc.collect()

        # Make MMAP ivfdata
        index_name = p.abspath(self.sub_dir/f'{self.seed_name}_mmap')
        invlists = faiss.OnDiskInvertedLists(base_index.nlist, 
                                             base_index.code_size, 
                                             f'{index_name}.ivfdata')
        ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size())

        # Link index to ivfdata and save
        index = faiss.clone_index(base_index)
        index.ntotal = ntotal
        index.replace_invlists(invlists)
        faiss.write_index(index, f'{index_name}.index')
        
    # TODO: REWORK THIS
    def populate_db(self, ids: np.array, sents: np.array):
        db_file = p.abspath(self.sub_dir/f'{self.seed_name}_id-sent-map.sqlite')
        id_to_sent = sqld.SqliteDict(db_file, autocommit=True)

        for i in range(ids.shape[0]):
            id_to_sent[str(ids[i])] = str(sents[i])

        id_to_sent.close()

#     def prep_index(self, dump_tsv: Path):
#         f_name = dump_tsv.stem
#         sub_dir = self.project_dir/f_name
#         os.makedirs(p.abspath(sub_dir), exist_ok=True)

#         # Vectorize
#         npz_name = self.project_dir/f_name/f'{f_name}_compressed.npz'
#         if not p.exists(npz_name):
#             self.sv.prep_npz(input_tsv=dump_tsv, output_npz=npz_name)
        
#         print('Finished vectorization')
        
#         # Load
#         with np.load(npz_name) as npz:
#             ids = npz['ids']
#             embs = npz['embs']
#             sents = npz['sents']

#         n_centroids = 1024      # TODO: choose wrt len(embs) 
#         if not p.exists(sub_dir/f'{f_name}_{n_centroids}SQ8_base.index'):
#             # Train                 # TODO: Put in func
#             idx_type = f'IVF{n_centroids},SQ8'
#             index = faiss.index_factory(embs.shape[1], idx_type)
#             index.train(embs)
#             faiss.write_index(index, p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_base.index'))
#         else:
#             index = faiss.read_index(p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_base.index'))
        
#         print('Finished training base idx')

#         # Populate idx          # TODO: Put in func that saves several chunks if embs is too large
#         index.add_with_ids(embs, ids)
#         faiss.write_index(index, p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_in-memory.index'))
#         sub_idxs = [p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_in-memory.index')]
#         del index
#         gc.collect()

#         # Make on-disk index    # TODO: Put in func
#         ivfs = list()
#         for idx in sub_idxs:
#             index = faiss.read_index(idx, faiss.IO_FLAG_MMAP)
#             ivfs.append(index.invlists)
#             index.own_invlists = False  # Prevents de-allocation
#             del index
#             gc.collect()

#         index = faiss.read_index(p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_base.index'))
        
#         # build inv list
#         invlists = faiss.OnDiskInvertedLists(index.nlist, index.code_size,
#                                              p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_on-disk.ivfdata'))
#         ivf_vector = faiss.InvertedListsPtrVector()
#         for ivf in ivfs:
#             ivf_vector.push_back(ivf)

#         ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size())
#         index.ntotal = ntotal
#         index.replace_invlists(invlists)

#         faiss.write_index(index, p.abspath(sub_dir/f'{f_name}_{n_centroids}SQ8_on-disk.index'))
            
#         print('Finished writing on-disk idx')
            
#         # Populate db
#         id_to_sent = sqld.SqliteDict(p.abspath(sub_dir/f'{f_name}_id-sent-map.db'))
#         for i in range(len(ids)):
#             id_to_sent[int(ids[i])] = f'{sents[i]}'

#         print('Finished writing db')

        
        

In [7]:
project_dir = p.abspath('TestProject001/')
idx_bdr = IndexBuilder(Path(project_dir), sentence_vectorizer=sv)

In [8]:
idx_bdr.tsv_to_index(Path('TestProject001/sample_news_2019-03-04.tsv'))

In [9]:
idx_bdr.tsv_to_index(Path('TestProject001/sample_news_2019-03-05.tsv'))

In [10]:
idx_bdr.tsv_to_index(Path('TestProject001/sample_news_2019-03-06.tsv'))

In [20]:
idx = faiss.read_index('Test2/test2/test_1024SQ8_on-disk.index', faiss.IO_FLAG_ONDISK_SAME_DIR)

RuntimeError: Error in void faiss::OnDiskInvertedLists::do_mmap() at OnDiskInvertedLists.cpp:243: Error: 'f' failed: could not open /Users/lukasferrer/Documents/SimSent/Test2/test/test_1024SQ8_on-disk.ivfdata in mode r+: No such file or directory

In [14]:
idx.search(embs[0:1], 5)

(array([[0.03949273, 0.71097904, 0.71097904, 0.71097904, 0.872072  ]],
       dtype=float32), array([[    1,  6307, 16216, 15801, 13477]]))

In [23]:
faiss.write_index(idx, 'just,_checking.index')

In [22]:
idx2 = faiss.read_index('just_checking.index')

In [26]:
type(Path('lol/no.npz').stem)

str

In [11]:
import os
import glob
from typing import List, Tuple

import numpy as np

from SimSent.faiss_cache import faiss_cache

__all__ = ['BaseIndexer', 'faiss_cache',
           'DiffScores', 'VectorIDs', 'FaissSearch']


DiffScores = List[List[np.float32]]
VectorIDs = List[List[np.int64]]
FaissSearch = Tuple[DiffScores, VectorIDs]


class BaseIndexHandler(object):
    def __init__(self):
        self.index = None
        self.dynamic = False
        self.io_flag = faiss.IO_FLAG_ONDISK_SAME_DIR

    @faiss_cache(128)
    def search(self, query_vector: np.array, k: int) -> FaissSearch:
        return self.index.search(query_vector, k)

    def get_index_paths(self, idx_dir_pth: Path, 
                        nested: bool = False) -> List[str]:
        if nested:
            get = '*/*.index'
        else:
            get = '*.index'

        index_paths = glob.glob(p.abspath(idx_dir_pth/get))
        index_paths = [Path(pth) for pth in index_paths if 
                       faiss.read_index(pth, self.io_flag).ntotal > 0]

        return sorted(index_paths)

    @staticmethod
    def joint_sort(scores: DiffScores, ids: VectorIDs) -> FaissSearch:
        """
        Sorts scores in ascending order while maintaining score::id mapping.
        Checks if input is already sorted.
        :param scores: Faiss query/hit vector L2 distances
        :param ids: Corresponding faiss vector ids
        :return: Scores sorted in ascending order with corresponding ids
        """
        # Possible nested lists
#         if isinstance(scores[0], (list, array)):
#             scores, ids = scores[0], ids[0]

#         print(scores)
#         print(ids)

        # Check if sorted
        if all(scores[i] <= scores[i + 1] for i in range(len(scores) - 1)):
            return scores, ids

        # Joint sort
        sorted_difs, sorted_ids = (list(sorted_dif_ids) for sorted_dif_ids
                                     in zip(*sorted(zip(scores, ids))))

        return [sorted_difs], [sorted_ids]


In [12]:
from time import sleep
from multiprocessing import Pipe, Process, Queue

import faiss
import numpy as np

#### Parallelized Nearest Neighbor Search ####
class Shard(Process):
    def __init__(self, shard_name: str, shard_path: Path, 
                 input_pipe: Pipe, output_queue: Queue,
                 nprobe: int = 4, daemon: bool = False):
        """ RangeShards search worker """
        super().__init__(name=shard_name, daemon=daemon)
        self.input = input_pipe
        self.index = faiss.read_index(p.abspath(shard_path), 
                                      faiss.IO_FLAG_ONDISK_SAME_DIR)
        self.index.nprobe = nprobe
        self.output = output_queue

    def run(self):
        @faiss_cache(64)
        def neighborhood(index, query, radius):
            _, difs, ids = index.range_search(query, radius)
            return difs, ids

        if self.input.poll():
            (query_vector, radius_limit) = self.input.recv()
            difs, ids = neighborhood(self.index, query_vector, radius_limit)
            self.output.put((self.name, difs, ids), block=False)


class RangeShards(BaseIndexHandler):
    def __init__(self, shard_dir: Union[str, Path], 
                 nprobe: int = 4, get_nested: bool = False):
        """
        For deploying multiple, pre-made IVF indexes as shards.
            (intended for on-disk indexes that do not fit in memory)

        Note: The index shards must be true partitions with no overlapping ids

        :param shard_dir: Dir containing faiss index shards
        :param nprobe: Number of clusters to visit during search
                       (speed accuracy trade-off)
        :param get_nested: Load indexes in sub directories of shard_dir
        """
        super().__init__()
        self.paths_to_shards = self.get_index_paths(Path(shard_dir), 
                                                    nested=get_nested)
        self.nprobe = nprobe
        self.dynamic = True
        self.lock = False

        self.results = Queue()
        self.shards = dict()
        self.n_shards = 0
        for shard_path in self.paths_to_shards:
            self.load_shard(shard_path)
        for shard_name, (handler_pipe, shard) in self.shards.items():
            shard.start()
            self.n_shards += 1

    def load_shard(self, shard_path: Path):
        shard_name = Path(shard_path).stem
        shard_pipe, handler_pipe = Pipe(duplex=False)
        shard = Shard(shard_name, shard_path,
                      input_pipe=shard_pipe, 
                      output_queue=self.results,
                      nprobe=self.nprobe, daemon=False)
        self.shards[shard_name] = (handler_pipe, shard)

    @faiss_cache(128)
    def search(self, query_vector: np.array, keys: list, 
               radius: float = 1.0) -> FaissSearch:

        if query_vector.shape[0] > 1:
            query_vector = np.reshape(query_vector, (1, 512))

        # Lock search while loading index or actively searching
        while self.lock:
            sleep(1)

        # Lock out other searches
        self.lock = True

        # Start parallel range search
        n_results = 0
        for shard_name, (hpipe, shard) in self.shards.items():
            if shard_name in keys:
                hpipe.send((query_vector, radius))
                shard.run()
                n_results += 1

        # Aggregate results
        results = dict()
        while n_results > 0 or not self.results.empty():
            name, difs, ids = self.results.get()
            results[name] = self.joint_sort(difs, ids)
            n_results -= 1 

        self.lock = False
        return results
    

In [13]:
rs = RangeShards('TestProject001/', get_nested=True)

In [14]:
keys = list(rs.shards.keys())
keys

['sample_news_2019-03-04_mmap',
 'sample_news_2019-03-05_mmap',
 'sample_news_2019-03-06_mmap']

In [50]:
a = rs.search(embs[1:2], keys)

In [63]:
for k, results in a.items():
    scores, ids = results
#     print(results)
    print(sorted(zip(scores[0], ids[0]))[:5])

[(0.0, 1), (0.6829833, 6829), (0.6829833, 16651), (0.6829833, 17086), (0.68332684, 704)]
[(0.70729554, 12220), (0.79234034, 1425), (0.81015825, 12211), (0.81180775, 21319), (0.81778175, 1987)]
[(0.7811464, 14096), (0.78185153, 14678), (0.78664434, 19446), (0.8098983, 19456), (0.8474065, 11525)]


In [204]:
id_to_sent = sqld.SqliteDict('Test7/test/test_id-sent-map.sqlite')
len(id_to_sent)

27764

In [179]:
id_to_sent[2]

'Online tools such as Smart Investor have valueResearch furthers your money educationBeing afraid of money'

In [176]:
id_to_sent[16]

'Imagine a doughnut representing the fees charged by your KiwiSaver fund.'

In [181]:
sents[1:2]

array(['Event URL:\n'], dtype='<U3875')

27764

In [184]:
rs.shards['test_mmap'][1].index.ntotal

70252

In [210]:
27764*3

83292

In [209]:
np.argwhere(ids==3)

array([[    2],
       [20095],
       [42490]])

In [21]:
import math

In [41]:
math.floor(math.log(28764, 2))

14

In [26]:
math.pow(2, 8)

256.0

256.0