# Environment preparation

In [None]:
pip install duckdb

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
pip install "kshingle>=0.10.0,<1"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
pip install datasketch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import numpy as np
import pandas as pd
import duckdb
from tabulate import tabulate
import kshingle as ks
import datasketch
import time
import random
from itertools import combinations
import matplotlib.pyplot as plt

# Utility functions

In [None]:
def select_all(dbcon, table_name):
  query = "SELECT * FROM " + table_name
  print(dbcon.execute(query).fetchall())

In [None]:
def print_table(tab):
  table = tabulate(tab, tab.keys(), tablefmt="fancy_grid")
  print(table)

In [None]:
'''
Parameters:
  1. database_name: the name of the duckdb database to connect with
Output: a database connection
Descriprion: create with duckdb or creates it from scratch(if the the file does not exist)
'''
def open_connection(database_name):
  dbcon = duckdb.connect(database=database_name, read_only=False)
  return dbcon

# Write csv function


In [None]:
'''
Parameters:
  1. dbcon: the connection with the database
  2. table_name: the name of the table to create
  3. csv_name: the name of the csv file to read
Output: none
Description: it creates a new table starting from a csv file inside the database
'''
def write_csv(dbcon, table_name, csv_name):
  print('Write csv starting')
  query1 = "DROP TABLE IF EXISTS " + table_name + ";"
  query2 = "CREATE TABLE " + table_name + " AS SELECT * FROM read_csv_auto('" + csv_name + "');"
  dbcon.execute(query1)
  dbcon.execute(query2)
  print('Write csv ending')

# Indexed table creation

In [None]:
'''
Parameters:
  1. dbcon: the connection with the database
  2. table_name: the name of the table to index
  3. column_list: the list of columns to analyze 
Output: none
Description: create a new table called __table_name_indexed copying the rows of the old one(removing rows with null value for all the columns to analyze) and adding a column called __table_name_indexes containing an increasing index
'''
def create_indexed_table(dbcon, table_name, column_list):

  query1 = "DROP TABLE IF EXISTS " + "__" + table_name + "_indexed" + ";"
  columns = ''
  for i in range(len(column_list)):
    if i == 0:
      columns = columns + column_list[i] + " IS NOT NULL "
    else:
      columns = columns + "OR " + column_list[i] + " IS NOT NULL "

  query2 = "CREATE TABLE " + "__" + table_name + "_indexed" + " AS SELECT *, ROW_NUMBER() OVER () AS " + "__" + table_name + "_indexes" + " FROM " + table_name + " WHERE " + columns
  dbcon.execute(query1)
  dbcon.execute(query2)

# Text view creation

In [None]:
'''
Parameters:
  1. dbcon: the connection with the database
  2. table_name: the name of the original table
  3. indexed_table_name: the name of the table from which the view will be created
  4. column_list: list of columns to analyze
Output: none
Description: create a view on the indexed table to show the index and a new column called "to_analyze" obtained concatenating all the columns to analyze
'''
def create_text_view(dbcon, table_name, indexed_table_name, column_list):
  query1 = "DROP VIEW IF EXISTS " + "__" + table_name + "_text_view"+ ";"
  columns = ''
  for i in range(len(column_list)):
    if i == 0:
      columns = columns + column_list[i]
    else:
      columns = columns + ', ' +column_list[i]
  query2 = "CREATE VIEW " + "__" + table_name + "_text_view AS SELECT __" + table_name + "_indexes, " + "CONCAT("+ columns+") AS to_analyze" + " FROM "+"__" + table_name + "_indexed"+ ";"
  dbcon.execute(query1)
  dbcon.execute(query2)

# Fetch into array

In [None]:
'''
Parameters:
  1. dbcon: the connection with the database
  2. table_name: the name of the table/view to fetch into the array
  3. rows_limit: max number of rows to select
Output: a numpy array representation of the view
Description: the function read the view in the database and fetch it into an array
'''
def fetch_view_array(dbcon, table_name, rows_limit):
  if rows_limit == False:
    query = "SELECT * FROM " + table_name
  else:
    query = "SELECT * FROM " + table_name + " LIMIT " + str(rows_limit)
  out = dbcon.execute(query).fetchnumpy()
  return out

# Shingling

In [None]:
def count_shingle_occ(documents, k_shingle_length):
  shingle_occ_dict = {}
  for i in range(len(documents)):
    tmp = []
    for j in ks.shingleset_range(documents[i], k_shingle_length, k_shingle_length):
      if j.isascii() and j.isalpha():
        try:
          shingle_occ_dict[j] += 1
        except:
          shingle_occ_dict[j] = 1
  return shingle_occ_dict

In [None]:
'''
Parameters:
  1. base_view_array: a dictionary containing 2 numpy array, one composed by the indexes of the documents and the other one by the concatenated columns to analyze
  2. k_shingle_length: the chosen length for the shingles
  3. column_name: key to access the concatenated columns array
  4. index_name: key to access the indexes array
  5. drop_shingles_threshold: a number between 0 and 1, it can be used to filter the created shingles dropping the most frequent ones, specifically this threshold tells the function to drop all the shingles that occurs in more than the drop_shingles_threshold% of the documents
Output: a new dictionary containing the shingle matrix(a list of lists), the array of indexes and the number of shingles created over the entire dataset
'''

def create_k_shingle(base_view_array, k_shingle_length, column_name, index_name, drop_shingles_threshold):
  print("Shingling phase starts______________")
  start_time = time.time_ns() / 1000000

  documents = base_view_array[column_name]

  #compute shingles occ
  shingles_occ = count_shingle_occ(documents, k_shingle_length)

  #shingle matrix creation
  n_doc=documents.shape[0]
  shingle_indexes = {}
  shingle_matrix = []
  new_index = 1
  for i in range(len(documents)):
    tmp = []
    for j in ks.shingleset_range(documents[i], k_shingle_length, k_shingle_length):
      #print(j)  #debug
      if j.isascii() and j.isalpha():
        if shingles_occ[j]/n_doc < drop_shingles_threshold:
          if j.lower() in shingle_indexes:
            tmp.append(shingle_indexes[j.lower()])
          else:
            shingle_indexes[j.lower()] = new_index
            tmp.append(new_index)
            new_index += 1
    shingle_matrix.append(set(tmp))
  out = {index_name:base_view_array[index_name], 'shingle_matrix':shingle_matrix, 'num_shingles':new_index-1}
  #print(shingles_occ) #debug
  #print(shingle_indexes)  #debug
  print('Shingles created: ' + str(new_index-1))
  end_time = time.time_ns() / 1000000
  print('t_exec: '+str(end_time-start_time)+'ms')
  print("Shingling phase ends______________")
  return out

# Shingling not dropping blanks

In [None]:
def count_shingle_occ_with_blanks(documents, k_shingle_length):
  shingle_occ_dict = {}
  for i in range(len(documents)):
    tmp = []
    for j in ks.shingleset_range(documents[i].replace(" ",""), k_shingle_length, k_shingle_length):
      if j.isascii() and j.isalpha():
        try:
          shingle_occ_dict[j] += 1
        except:
          shingle_occ_dict[j] = 1
  return shingle_occ_dict

In [None]:
'''
Note: it is a variant of the previous one, here we don't drop spaces
'''
def create_k_shingle_with_blanks(base_view_array, k_shingle_length, column_name, index_name, drop_shingles_threshold):
  print("Shingling phase starts______________")
  start_time = time.time_ns() / 1000000

  documents = base_view_array[column_name]
  #compute shingles occ
  shingles_occ = count_shingle_occ_with_blanks(documents, k_shingle_length)

  #shingle  matrix creation
  n_doc=documents.shape[0]
  shingle_indexes = {}
  shingle_matrix = []
  new_index = 1
  for i in range(len(documents)):
    tmp = []
    for j in ks.shingleset_range(documents[i].replace(" ",""), k_shingle_length, k_shingle_length):
      if j.isascii() and j.isalpha():
        if shingles_occ[j]/n_doc < drop_shingles_threshold:
          if j.lower() in shingle_indexes:
            tmp.append(shingle_indexes[j.lower()])
          else:
            shingle_indexes[j.lower()] = new_index
            tmp.append(new_index)
            new_index += 1
    shingle_matrix.append(set(tmp))
  out = {index_name:base_view_array[index_name], 'shingle_matrix':shingle_matrix, 'num_shingles':new_index-1}

  #print(shingles_occ) #debug
  #print(shingle_indexes)  #debug

  print('Shingles created: ' + str(new_index))
  end_time = time.time_ns() / 1000000
  print('t_exec: '+str(end_time-start_time)+'ms')
  print("Shingling phase ends______________")
  return out

# Minhashing

In [None]:
#Note: this function is not collision proof, finding a better one would be good
def create_hash_function(p, num_shingles):
  a = random.randint(1, p - 1)  
  b = random.randint(0, p - 1)  
  return lambda x : (((a * x + b) % p) % num_shingles)  # hash function

def create_hash_family(num_shingles, num_func=128, large_prime=1000000000039):
  out = []
  for i in range(num_func):
    out.append(create_hash_function(large_prime, num_shingles))
  return out

In [None]:
'''
Parameters:
  1. shingle_matrix: the output of the create_k_shingle function
  2. minhashing_threshold: it allow to stop minhashing before it tries all the permutations
Output: the shingle matrix privided by input after the needed modifications
Description: this function add an entry to the shingle matrix that contains a numpy minhashed matrix, this matrix has one row for each document and one column for each hash to compute
'''
def minhash(shingle_matrix, minhashing_threshold=0.2):
  print("Minhashing phase starts______________________________________")
  start_time = time.time_ns() / 1000000
  shingled_documents = shingle_matrix['shingle_matrix']
  num_shingles = shingle_matrix['num_shingles']
  
  #hash functions creation
  hash_funcs = create_hash_family(num_shingles)

  #minhashed matrix initizlization
  minhashed = np.zeros(128*len(shingled_documents)).reshape(len(shingled_documents),128)

  for i in range(len(hash_funcs)):
    if i%10 == 0:
      print('Hash number: ' + str(i)+'/128') #debug
    start_time_epoche = time.time_ns() / 1000000

    f = hash_funcs[i]
    docs = list(range(len(shingled_documents)))
    for j in range(0,num_shingles):
      #if (j%1000) == 0:  #debug
        #print('Index number: '+str(j)+'/'+str(num_shingles)) #debug
        #print('Documents still to analyze: '+str(len(docs))+'/'+str(len(shingled_documents))+' = '+str(len(docs)/len(shingled_documents))) #debug
      index = f(j)+1
      for d in list(docs):
        if index in shingled_documents[d]:
          minhashed[d,i] = j+1
          docs.remove(d)
      if docs == [] or (len(docs)/len(shingled_documents)) < minhashing_threshold:
        break
    end_time_epoche = time.time_ns() / 1000000
    if i % 10 == 0:
      print('t_exec last epoche: '+str(end_time_epoche-start_time_epoche)+'ms')
  
  #print(minhashed)  #debug

  shingle_matrix['minhashed_matrix'] = minhashed
  end_time = time.time_ns() / 1000000
  print('t_exec: '+str(end_time-start_time)+'ms')
  print("Minhashing phase ends______________________________________")
  return shingle_matrix

# Write minhash function

In [None]:
'''
Parameters:
  1. dbcon: connection to duckdb
  2. minhash_array: the output of the minhash function
  3. index_name: key to access the indexes array
  4. table_name: name of the table to analyze
Output: none
Description: it creates a table containing a row for every document, the first column is the index of the document and the other ones are the minhash
'''
def write_minhash(dbcon, minhash_array, index_name, table_name):
  print('Writing minhashes in the database____________')
  minhash = minhash_array['minhashed_matrix']
  indexes = minhash_array[index_name]
  tmp = {index_name:indexes}

  for i in range(128):
    tmp[i]=minhash_array['minhashed_matrix'][:,i]
  tmp = pd.DataFrame.from_dict(tmp)

  query1 = "DROP TABLE IF EXISTS " + "__" + table_name + "_minhash" + ";"
  query2 = "CREATE TABLE " + "__" + table_name + "_minhash" + " AS SELECT * FROM tmp"
  dbcon.execute(query1)
  dbcon.execute(query2)
  print('Minhashes written in the database___________')

# Read minhash function

In [None]:
'''
Parameters:
  1. dbcon: connection to duckdb
  3. index_name: key to access the indexes array
  4. table_name: name of the table to analyze
Output: a dictionary containing 2 entries, the first one is a 1-d numpy array containing the indexes, the second one is a 2-d numpy array containing the minhashes of the documents
'''
def read_minhash(dbcon, table_name, index_name):
  print('Reading minhashing from the database__________________')
  raw_minhash_array = fetch_view_array(dbcon, "__" + table_name + "_minhash", False)
  out = {index_name:raw_minhash_array[index_name]}
  column_list = []
  
  for i in range(128):
    column_list.append(raw_minhash_array[str(i)].reshape(1,len(raw_minhash_array['0'])))

  tmp = np.concatenate(column_list,axis=0).T
  out['minhashed_matrix'] = tmp
  print('Minhashes ridden from the database__________________')
  return out

# LSH python

In [None]:
def compute_jaccard(a,b):
  return (a == b).sum() / a.shape[0]

'''
Parameters:
  1. minhashed_matrix: the output of read minhash funciton
  2. b: number of bands
  3. threshold: similarity threshold for documents
  4. indexes_name: key to access the indexes array
Output: a set containing all the found similar pairs
'''
def compute_LSH_python(minhashed_matrix, b, threshold, indexes_name):
  print('Python LSH computation phase starts______________________________________________')
  start_time = time.time_ns() / 1000000
  #bands creation
  minhashes = minhashed_matrix['minhashed_matrix']
  indexes = minhashed_matrix[indexes_name]
  num_band_values = minhashes.shape[1] / b
  
  #buckets creation
  # O(n_bande)
  band_list = []
  for i in range(b):
    band_list.append({})

  #ranges creation
  # O(n_bande)
  ranges = []
  tmp = 0
  while tmp < 128:
    a = tmp
    tmp += num_band_values
    b = tmp
    c = (int(a),int(b))
    ranges.append(c)

  #buckets filling
  # O(n_bande*n_doc)
  print('Filling buckets')
  for i in range(len(ranges)):
    for j in range(minhashes.shape[0]):
      bucket_key = minhashes[j, ranges[i][0]:ranges[i][1] ].sum()
      if bucket_key != 0:
        try:
          band_list[i][bucket_key].append(j)
        except:
          band_list[i][bucket_key] = [j]
  print('Buckets filled')

  print('Candidates creation starts')
  candidates = []
  for ba in band_list:
    for bu in ba.values():
      tmp = list(combinations(bu,2))
      candidates += tmp
  candidates = set(candidates)
  print('Candidates creation ends')

  print('False positive eliminations starts')
  start_time_pairs = time.time_ns() / 1000000
  pairs = []
  for c in candidates:
    if compute_jaccard(minhashes[c[0]], minhashes[c[1]]) >= threshold:
      new_pair = (indexes[c[0]], indexes[c[1]])
      pairs.append(new_pair)
      #print(str(c[0])+':'+str(c[1])+'='+str(compute_jaccard(minhashes[c[0]], minhashes[c[1]]))) #debug
  end_time_pairs = time.time_ns() / 1000000
  print('t_exec false elim: '+str(end_time_pairs-start_time_pairs)+'ms')
  print('False positive eliminations ends')
  
  pairs = set(pairs)
  end_time = time.time_ns() / 1000000
  print('____________________t_exec lsh python: '+str(end_time-start_time)+'ms')
  print('Python LSH computation phase ends________________________________________________')
  return pairs

# LSH SQL

In [None]:
def get_column_sum(a,b, t_name):
  out = ""
  for i in range(a,b):
    if i != a:
      out += '+'
    out = out + t_name + '.' +'"'+str(i)+'"'
  return out

def get_query_band(a,b,minhashes_table_name,indexes_name):
  l_sum = get_column_sum(a,b,'m1')
  r_sum = get_column_sum(a,b,'m2')
  query = "select m1."+indexes_name+' as l_doc_id, m2.'+indexes_name+' as r_doc_id from '+minhashes_table_name+' m1, '+minhashes_table_name+' m2 '
  query = query + 'where m1.'+indexes_name+' < m2.'+indexes_name+' and ' + l_sum + ' = '+r_sum
  return query

def get_intersect_string(num_hash):
  out = 'case when p1."0"=p2."0" then 1 else 0 end '
  for i in range(1,num_hash):
    out = out + ' +case when p1."'+str(i)+'"=p2."'+str(i)+'" then 1 else 0 end '
  return out

def get_union_string(num_hash, intersect_string):
  out = str(128)
  return out

def get_argument_string(intersect_string, union_string):
  out = "cast(("+intersect_string+") as DOUBLE) / cast(("+union_string+") as DOUBLE)"
  return out
'''
Parameters:
  1. dbcon: connection to duckdb
  2. table_name: name of the table to analyze
  3. minhashes_table_name: name of the minhashes table saved inside duckdb
  4. bands: number of bands
  5. threshold: similarity threshold for documents
  6. indexes_name:  key to access the indexes array
  7. num_hash: number of hashes computed from minhash
Output: none
Description: it computes the lsh without reading data from the db
'''
def compute_LSH_SQL(dbcon, table_name, minhashes_table_name, bands, threshold, indexes_name, num_hash=128):
  print('SQL LSH computation phase starts______________________________________________')
  start_time = time.time_ns() / 1000000
  # Intervals list creation
  num_band_values = num_hash / bands
  ranges = []
  tmp = 0
  while tmp < num_hash:
    a = tmp
    tmp += num_band_values
    b = tmp
    c = (int(a),int(b))
    ranges.append(c)
  
  # Index pairs table creation
  dup_pairs_table_name = '__' + table_name + '_pairs_index_dup'
  pairs_table_name = '__' + table_name + '_pairs_index'
  query1 = "DROP TABLE IF EXISTS " + dup_pairs_table_name + ";"
  query2 = "CREATE TABLE " + dup_pairs_table_name + "(l_doc_id INT, r_doc_id INT);"
  #query2 = "CREATE TABLE " + pairs_table_name + "(l_doc_id INT, r_doc_id INT, PRIMARY KEY(l_doc_id, r_doc_id));"
  dbcon.execute(query1)
  dbcon.execute(query2)

  # Index pairs table filling
  for ba in ranges:
    a = ba[0]
    b = ba[1]
    query_index_band = get_query_band(a,b, minhashes_table_name,indexes_name)
    #print('___________________')
    #print(dbcon.execute(query_index_band).fetchall())
    query_insert = "insert into "+dup_pairs_table_name+" "+query_index_band
    dbcon.execute(query_insert)
  
  # Duplicates elimination
  query3 = "DROP TABLE IF EXISTS " + pairs_table_name + ";"
  query4 = "CREATE TABLE " + pairs_table_name + " as " + "select distinct * from "+dup_pairs_table_name
  dbcon.execute(query3)
  dbcon.execute(query4)
  dbcon.execute(query1)

  #print('Candidate pairs list:')  #debug
  #print(dbcon.execute("select * from "+pairs_table_name).fetchall())  #debug

  # False positives elimination
  intersect_string = get_intersect_string(num_hash)
  union_string = get_union_string(num_hash, intersect_string)
  argument_string = get_argument_string(intersect_string, union_string)
  select_string = "select "+argument_string+" from "+minhashes_table_name+" p1, "+ minhashes_table_name+" p2 where p1."+indexes_name+"=l_doc_id and p2."+indexes_name+"=r_doc_id"
  delete_string = "delete from "+pairs_table_name+" where "+str(threshold)+">("+select_string+")"
  dbcon.execute(delete_string)
  #print(dbcon.execute("select * from "+pairs_table_name).fetchall())
  
  end_time = time.time_ns() / 1000000
  print('____________________t_exec lsh SQL: '+str(end_time-start_time)+'ms')
  print('SQL LSH computation phase ends________________________________________________')
  return

# Pairs persist function for python lsh

In [None]:
'''
Parameters:
  1. dbcon: connection to duckdb
  2. table_name: name of the table to analyze
  3. column_name: name of the column of concatenated columns
  4. pairs: list of similar pairs
Output: True if the write operation is successful
'''
def persist(dbcon, table_name, column_name, pairs):
  tmp = {}
  for i in pairs:
    try:
      tmp['l_doc_id'].append(i[0])
      tmp['r_doc_id'].append(i[1])
    except:
      tmp['l_doc_id'] = [i[0]]
      tmp['r_doc_id'] = [i[1]]
  tmp = pd.DataFrame(tmp)
  if tmp.shape[0] < 1:
    print("Pairs not found, nothing to persist")
    return False
  query1 = "DROP TABLE IF EXISTS " + "__" + table_name + "_pairs_index" + ";"
  query2 = "CREATE TABLE " + "__" + table_name + "_pairs_index" + " AS SELECT * FROM tmp"
  dbcon.execute(query1)
  dbcon.execute(query2)

  
  query1 = "DROP VIEW IF EXISTS " + "__" + table_name + "_pairs" + ";"
  query2 = "CREATE VIEW " + "__" + table_name + "_pairs" + " AS "+"SELECT l_doc_id, tl."+column_name+" as l_"+column_name+", r_doc_id, tr." + column_name + " as r_"+column_name+" FROM "+"__" + table_name + "_pairs_index"+" AS t JOIN " + "__"+table_name+"_text_view" " as tl ON tl.__" + table_name + "_indexes = l_doc_id JOIN "+ "__"+table_name+"_text_view" + " as tr ON tr.__" + table_name + "_indexes = r_doc_id"
  dbcon.execute(query1)
  dbcon.execute(query2)
  return True

# Show output

In [None]:
'''
Parameters:
  1. dbcon: connection to duckdb
  2. table_name: name of the table to analyze
  3. pairs: list of similar pairs found
  4. show_similar_contents: if it is True all the similar pairs are displayed, otherwise only the number of found pairs appears
Output: none
'''
def show_output_python(dbcon, table_name, pairs, show_similar_contents):
  print("_______________Output computed using python")
  print("Number of pairs found: "+str(len(pairs)))
  if show_similar_contents:
    try:
      tmp = dbcon.query("select * from __" + table_name + "_pairs").to_df()
      print("The similar pairs found are the following: ")
      print(tmp)
    except:
      print("Table not found, can't display output")

In [None]:
'''
Parameters:
  1. dbcon: connection to duckdb
  2. pairs_table_name: name of the table containing the pairs
  3. indexes_table_name: name of the indexed table
  4. index_column_name: name of the column containing the indexes
  5. column_name: name of the analyzed column
  6. show_similar_contents: if it is True all the similar pairs are displayed, otherwise only the number of found pairs appears
Output: none
'''
def show_output_SQL(dbcon, pairs_table_name, indexed_table_name,index_column_name,column_name,show_similar_contents):
  print("_______________Output computed using SQL")
  query = "select l_doc_id, tl."+column_name+", r_doc_id, tr."+column_name+" from "+pairs_table_name+" join "+indexed_table_name+" tl on tl."+index_column_name+"=l_doc_id join "+indexed_table_name+" tr on tr."+index_column_name+"=r_doc_id"
  tmp = dbcon.query(query).to_df()
  if tmp.shape[0] == 0:
    print("No similar pairs found")
    return
  print("Number of similar pairs found: "+str(tmp.shape[0]))
  if show_similar_contents == True:
    print("The similar pairs found are the following: ")
    print(tmp)

# Main function

In [None]:
'''
Parameters:
  1. database_name: the name of the .duckdb file
  2. table_name: name of the table to analyze
  3. column_list: list of the columns to analyze
  4. csv_name: optional, if not false it contains the name of a csv file containing the table to insert in duckdb
  5. drop_blanks: if True it drop all the shingles containing blanks, otherwise it doesn't
  6. lsh_python: if true the function use the python version of the lsh
  7. lsh_SQL: if true the function use the SQL version of the lsh
  8. compute_minhash: if True the minhash is computed, otherwise we assume to already have a table containing the minhashes inside the db
  9. bands: number of bands for lsh
  10. show_similar_contents: if true the ouput of lsh is verbose
  11. rows_limit: limits the number of rows analyzed
  12. k_shingle_length: lenth of the shingles created
  13. threshold: threshold for lsh
  14. minhashing_threshold: threshold to stop minhashing before it computes all the permutations(lower is stricter)
  15. drop_shingles_threshold: a number between greater than 0, it can be used to filter the created shingles dropping the most frequent ones, specifically this threshold tells the function to drop all the shingles that occurs in more than the drop_shingles_threshold% of the documents, obviously passing a number greater than 1 ther won't be drops
Output: a dictionary containing the execution times of the main steps of the program
'''
def lsh(database_name, table_name, column_list, csv_name=False, drop_blanks=True,lsh_python=False,lsh_SQL=True,compute_minhash=True, bands=16, show_similar_contents=True, rows_limit=False, k_shingle_length=5, drop_shingles_threshold=1.1,threshold=0.7,minhashing_threshold=0.2):
  #create connection
  t_exec = {}
  dbcon = open_connection(database_name)
  column_name = 'to_analyze'
  if compute_minhash:
    #write csv in  duck_db
    if csv_name != False:
      start_time_csv = time.time_ns() / 1000000
      write_csv(dbcon, table_name, csv_name)
      end_time_csv = time.time_ns() / 1000000
      t_exec['csv']=end_time_csv-start_time_csv
    
    #Create indexed table
    create_indexed_table(dbcon, table_name, column_list)

    #Create table view
    create_text_view(dbcon, table_name, "__" + table_name + "_indexed", column_list)

    #Load numpy array
    base_view_array = fetch_view_array(dbcon, "__" + table_name + "_text_view", rows_limit)

    #create k-shingle
    start_time_shingle = time.time_ns() / 1000000
    if drop_blanks == False:
      kshingle_array = create_k_shingle(base_view_array, k_shingle_length, column_name, "__" + table_name + "_indexes", drop_shingles_threshold)
    else:
      kshingle_array = create_k_shingle_with_blanks(base_view_array, k_shingle_length, column_name, "__" + table_name + "_indexes", drop_shingles_threshold)
    end_time_shingle = time.time_ns() / 1000000
    t_exec['shingle']=end_time_shingle-start_time_shingle

    #minhashing
    start_time_minhashing = time.time_ns() / 1000000
    minhash_array = minhash(kshingle_array,minhashing_threshold=minhashing_threshold)
    end_time_minhashing = time.time_ns() / 1000000
    t_exec['minhashing']=end_time_minhashing-start_time_minhashing

    #write minhashed in duck db
    write_minhash(dbcon, minhash_array, "__" + table_name + "_indexes", table_name)
 
  #LSH with banding
  start_time_lsh_python = time.time_ns() / 1000000
  if lsh_python==True:
    minhash_array = read_minhash(dbcon, table_name, "__" + table_name + "_indexes")
    pairs = compute_LSH_python(minhash_array, bands, threshold, "__" + table_name + "_indexes")
    isPersisted = persist(dbcon, table_name, column_name, pairs)
    show_output_python(dbcon, table_name, pairs, show_similar_contents)
  end_time_lsh_python = time.time_ns() / 1000000
  t_exec['lsh_python']=end_time_lsh_python-start_time_lsh_python

  start_time_lsh_sql = time.time_ns() / 1000000
  if lsh_SQL==True:
    compute_LSH_SQL(dbcon, table_name, "__" + table_name + "_minhash", bands, threshold, "__" + table_name + "_indexes")
    show_output_SQL(dbcon, "__" + table_name + "_pairs_index",  "__" + table_name + "_text_view", "__" + table_name + "_indexes",column_name,show_similar_contents)
  end_time_lsh_sql= time.time_ns() / 1000000
  t_exec['lsh_sql']=end_time_lsh_sql-start_time_lsh_python

  return t_exec


# Testing

DuckDB large tables creation

In [None]:
# table with shingle_length=5 and 10000 rows creation
#lsh('test.duckdb', 'Blurbs_table', column_list=['Title','Blurb'],lsh_python=True,drop_blanks=True,lsh_SQL=True,csv_name='/content/drive/MyDrive/Colab Notebooks/ProgettoBDMG/csv/books_with_blurbs.csv', minhashing_threshold=0.2, show_similar_contents=True,compute_minhash=True,bands = 16,rows_limit=10000, threshold=0.8, k_shingle_length=5, drop_shingles_threshold=1.1)

In [None]:
# tabel creation with shingle of length 5 and max number of rows (almost 50k)
#lsh('test.duckdb', 'Blurbs_table_max_length_ks5', column_list=['Title','Blurb'],lsh_python=True,drop_blanks=True,lsh_SQL=True,csv_name='/content/drive/MyDrive/Colab Notebooks/ProgettoBDMG/csv/books_with_blurbs.csv', minhashing_threshold=0.2, show_similar_contents=True,compute_minhash=True,bands = 16,rows_limit=False, threshold=0.8, k_shingle_length=5, drop_shingles_threshold=0.4)

Esecution times computation

In [None]:
# 10k rows
out = lsh('/content/drive/MyDrive/Colab Notebooks/ProgettoBDMG/duckdb_versions/BlurbsK5_10k_50k/test.duckdb', 'Blurbs_table', column_list=['Title','Blurb'],lsh_python=True,drop_blanks=True,lsh_SQL=True, minhashing_threshold=0.2, show_similar_contents=True,compute_minhash=False,bands = 16,rows_limit=10000, threshold=0.7, k_shingle_length=5, drop_shingles_threshold=1.1)
print(out)

In [None]:
lsh('/content/drive/MyDrive/Colab Notebooks/ProgettoBDMG/duckdb_versions/BlurbsK5_10k_50k/test.duckdb', 'Blurbs_table_max_length_ks5', column_list=['Title','Blurb'],lsh_python=True,drop_blanks=True,lsh_SQL=True, minhashing_threshold=0.2, show_similar_contents=True,compute_minhash=False,bands = 16,rows_limit=10000, threshold=0.7, k_shingle_length=5, drop_shingles_threshold=1.1)

In [None]:
l = [100, 500, 1000, 5000]
texecs = []
for i in l:
  texecs.append(lsh('test.duckdb', 'TestTable', column_list=['title','Blurb'],lsh_python=True,drop_blanks=True,lsh_SQL=True,csv_name='/content/drive/MyDrive/Colab Notebooks/ProgettoBDMG/csv/books_with_blurbs.csv', minhashing_threshold=0.2, show_similar_contents=True,compute_minhash=True,bands = 16,rows_limit=i, threshold=0.7, k_shingle_length=5, drop_shingles_threshold=1.1))

In [None]:
python=[]
sql=[]
for i in texecs:
  python.append(i['lsh_python'])
  sql.append(i['lsh_sql'])
python.append(2011.009033203125)
python.append(31106)
sql.append(11961.82568359375)
sql.append(336867)
labels=['100','500','1000','5000','10000','57000']

In [None]:
print(python)
print(sql)

[99.9638671875, 141.32275390625, 199.98876953125, 945.99755859375, 2011.009033203125, 31106]
[422.06494140625, 530.91357421875, 701.226806640625, 3748.53662109375, 11961.82568359375, 336867]


# References

Books with blurb dataset: https://www.kaggle.com/datasets/jdobrow/57000-books-with-metadata-and-blurbs