## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
import numpy as np
import os
import struct
import pandas as pd
import time
from time import process_time
from itertools import chain 
import gc

In [3]:
import urllib 
urllib.request.urlretrieve("ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz", "/tmp/sift.tar.gz")

In [4]:
def get_zippedFvecs(pathToGz,memeber):
    import tarfile
    fn = pathToGz
    import struct
    import numpy as np
    t = tarfile.open(fn, 'r:gz') 
    
    
    m = t.getmember(memeber)
    file = t.extractfile(m)
    fileSize = m.size
    #file =  open(path,'rb')
    #first 4 bytes of every vector indicate number od dimensions 
    numOfDimensions = struct.unpack('i', file.read(4))[0]
    #each vector has 4 bytes (float is 32 bits) * numberOfDimensions
    #plus 4 bytes long indicator as mentioned  
    numOfVectors = (int) (fileSize / (4 + 4*numOfDimensions))
    #init empty list for vectors
    #vectors = []
    vectors = np.zeros((numOfVectors,numOfDimensions))
    #return to the beginning
    file.seek(0)
    for vecotr in range(numOfVectors):
        file.read(4) #go trough indicator of dimensions
        #vectors.append(struct.unpack('f' * numOfDimensions, file.read(4*numOfDimensions)))
        vectors[vecotr] = struct.unpack('f' * numOfDimensions, file.read(4*numOfDimensions))
    file.close()
    return vectors

In [5]:

def get_zippedIvecs(pathToGz,memeber):
    import tarfile
    fn = pathToGz
    import struct
    import numpy as np
    t = tarfile.open(fn, 'r:gz') 
    
    
    m = t.getmember(memeber)
    file = t.extractfile(m)
    fileSize = m.size
    #file =  open(path,'rb')
    #first 4 bytes of every vector indicate number od dimensions 
    numOfDimensions = struct.unpack('i', file.read(4))[0]
    #each vector has 4 bytes (float is 32 bits) * numberOfDimensions
    #plus 4 bytes long indicator as mentioned  
    numOfVectors = (int) (fileSize / (4 + 4*numOfDimensions))
    #init empty list for vectors
    #vectors = []
    vectors = np.zeros((numOfVectors,numOfDimensions))
    #return to the beginning
    file.seek(0)
    for vecotr in range(numOfVectors):
        file.read(4) #go trough indicator of dimensions
        #vectors.append(struct.unpack('f' * numOfDimensions, file.read(4*numOfDimensions)))
        vectors[vecotr] = struct.unpack('i' * numOfDimensions, file.read(4*numOfDimensions))
    file.close()
    return vectors


In [6]:
def returnRecall(result, test):
    numOfTrueNeighbours = []
    #for every result vector we check how many right neighbours were identified
    for i in range(result.shape[0]):
        numTN = len(set(result[i].tolist()) & set(test[i].tolist()))
        numOfTrueNeighbours.append(numTN)
        recall = sum(numOfTrueNeighbours) /test.size
    return recall

In [7]:
def fillIfNotAllAreFound(result):
    for i in range(len(result)):
        if len(result[i]) < 100: 
            result[i].extend((100-len(result[i])) *[-1]) 
    return result

In [8]:
paths =  '/tmp/sift.tar.gz'  

train = get_zippedFvecs(paths,'sift/sift_base.fvecs' )
#there is 100 querry ponts
query = get_zippedFvecs(paths,'sift/sift_query.fvecs' )
#there is index number of 100 nearset n. for each querry point
groundTruth = get_zippedIvecs(paths,'sift/sift_groundtruth.ivecs' )

In [9]:
rm /tmp/siftsmall.tar.gz

In [10]:
algorithm = []
construciotnTimes=[]
searchTimes=[]
reacll = []
k = 100
avgdistances = []
constructionClocks = []
searchClocks = []
clockAlg = []

In [11]:
import nmslib
vptree = nmslib.init(method='vptree', space='l2')

startTime = process_time()
vptree.addDataPointBatch(train)
vptree.createIndex({'bucketSize' : 10000,'selectPivotAttempts':10})
end_time = process_time()
constructionTime = end_time - startTime

# get all nearest neighbours for all the datapoint
# using a pool of 4 threads to compute
for maxLeave in [2,15,25,35]:
  
    vptree.setQueryTimeParams({'maxLeavesToVisit':maxLeave,'alphaLeft':1.1,'alphaRight':1.1})
    startTime = process_time()
    neighbours = vptree.knnQueryBatch(query,k=100, num_threads=2 )
    end_time = process_time()
    searchTime = end_time - startTime
    
    rez =[]
    dist = []
    for i in neighbours:
        rez.append(list(i[0]))
        dist.append(list(i[1]))
        
    rez = fillIfNotAllAreFound(rez)    
    
    result = np.asanyarray(rez)
    
    vptreeRecall = returnRecall(result, groundTruth)
    avgDist = np.mean(list(chain.from_iterable(dist)))
    
    reacll.append(vptreeRecall)
    algorithm.append('vp-Tree-10k-mL'+str(maxLeave))
    #algorithm.append('vp-Tree-maxLeaves'+str(maxLeaves))
    construciotnTimes.append(constructionTime)
    searchTimes.append(searchTime)
    avgdistances.append(avgDist)
    del rez
    del dist
    del result
    gc.collect()

#vptree.saveIndex('vptreeIndex.ann')    
del vptree
gc.collect()


#HNSW da bi islo po redu
#______________________________________________#
import nmslib
for MMAX in [5,10,30,48]:
    hnsw = nmslib.init(method='hnsw', space='l2')
    
    startClock = time.clock()
    startTime = process_time()
    hnsw.addDataPointBatch(train)
    hnsw.createIndex({'delaunay_type':0, 'M':MMAX})
    end_time = process_time()
    constructionTime = end_time - startTime
    endClock = time.clock()
    constructionClock= endClock - startClock
    
    
    
    # get all nearest neighbours for all the datapoint
    # using a pool of 4 threads to compute
    startClock = time.clock()
    startTime = process_time()
    neighbours = hnsw.knnQueryBatch(query, k=100, num_threads=2)
    end_time = process_time()
    searchTime = end_time - startTime
    endClock = time.clock()
    searchClock= endClock - startClock
    
    rez =[]
    dist =[]
    for i in neighbours:
        rez.append(list(i[0]))
        dist.append(list(i[1]))
    
    result = fillIfNotAllAreFound(rez)
      
    result = np.array(rez)
    hnswRecall = returnRecall(result, groundTruth)
    avgDist = np.mean(np.sqrt(list(chain.from_iterable(dist))))
    
    reacll.append(hnswRecall)
    algorithm.append('HNSW-M-'+str(MMAX))
    construciotnTimes.append(constructionTime)
    searchTimes.append(searchTime)
    avgdistances.append(avgDist)
    constructionClocks.append(constructionClock)
    searchClocks.append(searchClock)
    clockAlg.append('HNSW-M-'+str(MMAX))
    
    del hnsw
    del rez
    del dist
    del result
    del neighbours
    gc.collect()

#______________________________________________#    
#annoy
#Annoy
from annoy import AnnoyIndex
for trs in [5,60,80,90]:
    
    f = train.shape[1]
    t = AnnoyIndex(f, 'euclidean')
    
    startClock= time.clock()
    startTime = process_time()
    for i in range(train.shape[0]):
        t.add_item(i,train[i])
    t.build(trs)
    end_time = process_time()
    constructionTime = end_time - startTime
    endClock = time.clock()
    constructionClock= endClock - startClock
    
    
    rez = []
    dist = []
    startClock = time.clock()
    startTime = process_time()
    for q in query:
        res,d = t.get_nns_by_vector(q, 100, include_distances=True)
        rez.append(res)
        dist.append(d)
        #result.append(t.get_nns_by_vector(q, 100, include_distances=True))
    end_time = process_time()
    searchTime = end_time - startTime
    endClock = time.clock()
    searchClock= endClock - startClock
    
        
    result = fillIfNotAllAreFound(rez)
    
    result = np.asanyarray(result)
    annoyRecall = returnRecall(result, groundTruth)  
    avgDist = np.mean(list(chain.from_iterable(dist)))
    
    reacll.append(annoyRecall)
    algorithm.append('Annoy-trees-'+str(trs))
    construciotnTimes.append(constructionTime)
    searchTimes.append(searchTime)
    avgdistances.append(avgDist)
    searchClocks.append(searchClock)
    constructionClocks.append(constructionClock)
    clockAlg.append('Annoy-trees-'+str(trs))
    t.save('annoyIndex90.ann')
    del t
    del rez
    del dist
    del result
    gc.collect()
#______________________________________________#




In [12]:
compareResults = pd.DataFrame({ 'algorithm':algorithm, 'constructionTime':construciotnTimes, 'searchTime':searchTimes,'recall':reacll,'avgDistance':avgdistances, 'constructionClocks':constructionClocks,'searchClocks':searchClocks})

In [13]:
compareResults

Unnamed: 0,algorithm,constructionTime,searchTime,recall,avgDistance
0,vp-Tree-10k-mL2,19.401266,15.68492,0.0,252.022156
1,vp-Tree-10k-mL2,18.368038,15.705375,0.0,252.476151
2,vp-Tree-10k-mL10,18.368038,76.588221,0.0,242.125534
3,vp-Tree-10k-mL15,18.368038,114.281209,0.0,240.751236
4,vp-Tree-10k-mL20,18.368038,151.485374,0.0,238.824493
5,vp-Tree-10k-mL25,18.368038,188.059627,0.0,238.173386
6,vp-Tree-10k-mL30,18.368038,225.625846,0.0,237.83168
7,HNSW-M-2,281.885477,0.578932,0.0,352.39682
8,HNSW-M-5,485.369821,1.032199,0.0,257.530518
9,HNSW-M-8,657.97027,1.349002,0.0,249.655685


In [14]:
import nmslib
for MMAX in [5,10,30,48]:
    hnsw = nmslib.init(method='hnsw', space='l2')
    
    startClock = time.clock()
    startTime = process_time()
    hnsw.addDataPointBatch(train)
    hnsw.createIndex({'delaunay_type':0, 'M':MMAX})
    end_time = process_time()
    constructionTime = end_time - startTime
    endClock = time.clock()
    constructionClock= endClock - startClock
    
    
    
    # get all nearest neighbours for all the datapoint
    # using a pool of 4 threads to compute
    startClock = time.clock()
    startTime = process_time()
    neighbours = hnsw.knnQueryBatch(query, k=100, num_threads=2)
    end_time = process_time()
    searchTime = end_time - startTime
    endClock = time.clock()
    searchClock= endClock - startClock
    
    rez =[]
    dist =[]
    for i in neighbours:
        rez.append(list(i[0]))
        dist.append(list(i[1]))
    
    result = fillIfNotAllAreFound(rez)
      
    result = np.array(rez)
    hnswRecall = returnRecall(result, groundTruth)
    avgDist = np.mean(np.sqrt(list(chain.from_iterable(dist))))
    
    reacll.append(hnswRecall)
    algorithm.append('HNSW-M-'+str(MMAX))
    construciotnTimes.append(constructionTime)
    searchTimes.append(searchTime)
    avgdistances.append(avgDist)
    constructionClocks.append(constructionClock)
    searchClocks.append(searchClock)
    clockAlg.append('HNSW-M-'+str(MMAX))
    
    del hnsw
    del rez
    del dist
    del result
    del neighbours
    gc.collect()
