In [49]:
import psycopg2
import pandas
from datasketch import MinHashLSHEnsemble, MinHash, LeanMinHash
import pickle
from collections import defaultdict
import re

In [50]:
gigabytes = re.compile(r'([0-9]+) GB')
megabytes = re.compile(r'([0-9]+) MB')
bs = re.compile(r'([0-9]+) bytes')

In [51]:
def create_and_serialize_index(conn, table_name, num_perm=128, thresh=0.8, smaller=True):
    # get list of tables available in conn
    
    cursor=conn.cursor()
    
    drop_if_exist_q = f"""
    DROP TABLE IF EXISTS hashes.{table_name};"""
    cursor.execute(drop_if_exist_q)
    
    create_table_q = f"""
    CREATE TABLE IF NOT EXISTS hashes.{table_name}(thresh float, tname text, cname text, dsize int, hashval bytea);
    """
    cursor.execute(create_table_q)
    
    if(smaller):
        # exclude very big tables and very small tables
        get_t_q_w_size = f"""
        SELECT                                                            
        relname,
        pg_size_pretty(table_size) AS size,
        table_size
        FROM (
        SELECT pg_catalog.pg_namespace.nspname AS schema_name,
        relname,
        pg_relation_size(pg_catalog.pg_class.oid) AS table_size
        FROM pg_catalog.pg_class
        JOIN pg_catalog.pg_namespace ON relnamespace = pg_catalog.pg_namespace.oid
        ) t
        WHERE schema_name ='public' and relname!='{table_name}'
        ORDER BY table_size DESC;
        """
        cursor.execute(get_t_q_w_size)
        raw=cursor.fetchall()
        res = []
        for r in raw:
            if(gigabytes.search(r[1]) or bs.search(r[1])):
                continue
            else:
                if(megabytes.search(r[1])):
                    if(int(megabytes.search(r[1]).group(1))>100):
                        continue
                res.append(r)
        print("number of tables we consider: %s", str(len(res)))
    else:
        get_t_q = f"""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema='public'
        AND table_type='BASE TABLE'
        AND table_name != '{table_name}'
        """
        cursor.execute(get_t_q)
        res=cursor.fetchall()

    # Create an LSH Ensemble index with threshold and number of partition
    # settings.
    lshensemble = MinHashLSHEnsemble(threshold=thresh, num_perm=num_perm,
    num_part=32)
    # create lshensemble 
    table_hash_input_tuples = []
    
    i = 0
    for t in res:
        print(f"table : {t[0]}")
        t_hashes = {}
        cols_q = f"SELECT column_name FROM INFORMATION_SCHEMA. COLUMNS WHERE TABLE_NAME = '{t[0]}'";
        cursor.execute(cols_q)
        cols = cursor.fetchall()
        for c in cols:
#             print(f'column: {c}')
            distincts_q = f'select distinct "{c[0]}" FROM {t[0]} where "{c[0]}" is not null'
            cursor.execute(distincts_q)
            set_vals = [v[0] for v in cursor.fetchall()]
#             print(set_vals)
            mh = MinHash(num_perm=num_perm)
            for v in set_vals:
                mh.update(v.encode('utf8'))
            lmh = LeanMinHash(seed=mh.seed,hashvalues=mh.hashvalues)
            pickled_lmh = pickle.dumps(lmh)
            insert_one_hash_q = f"""
            INSERT INTO {table_name} VALUES ({thresh}, '{t[0]}', '{c[0]}', {len(set_vals)}, \
            {psycopg2.Binary(pickled_lmh)})
            """
#             print(insert_one_hash_q)
            cursor.execute(insert_one_hash_q)
            table_hash_input_tuples.append((f"{t[0]}->{c[0]}", mh, len(set_vals)))
        i+=1
    lshensemble.index(table_hash_input_tuples)
    
    return lshensemble

In [52]:
def load_index(conn, index_table, threshold=0.8, num_perm=128):
    
    retrieve_q = f"SELECT tname, cname, dsize, hashval FROM {index_table}"
    cur = conn.cursor()
    
    index_dict = defaultdict(dict)
    
    cur.execute(retrieve_q)
    raw = cur.fetchall()
    
    # create lshensemble 
    table_hash_input_tuples = []
    
    for r in raw:
        tname, cname, dsize, mhash = r[0], r[1], r[2], pickle.loads(r[3])
        index_dict[f'{r[0]}->{r[1]}'] = {'table':tname, 
                                           'col': cname, 
                                           'mhash': mhash,
                                           'dsize':dsize}
        
        table_hash_input_tuples.append((f"{tname}->{cname}", mhash, dsize))
    
    # Create an LSH Ensemble index with threshold and number of partition
    # settings.
    lshensemble = MinHashLSHEnsemble(threshold=threshold, num_perm=num_perm,
    num_part=32)
    lshensemble.index(table_hash_input_tuples)
    
    return lshensemble, index_dict

In [53]:
def search_index(lshensemble, index_dict, tname, cname):
    
    mhash = index_dict[f'{tname}->{cname}']['mhash']
    dsize = index_dict[f'{tname}->{cname}']['dsize']
    
    for key in lshensemble.query(mhash, dsize):
        print(key)

In [54]:
if __name__ == '__main__':
    conn = psycopg2.connect("dbname=curation_data_lake user=postgres")
    
#     conn = psycopg2.connect("dbname=test_hash_db user=postgres")
    conn.autocommit=True
    create_and_serialize_index(conn, 'april16hashes')

number of tables we consider: %s 416
table : table311_service_requests__rodent_baiting__historical
table : table311_service_requests__tree_trims__historical
table : table311_service_requests__abandoned_vehicles
table : strategic_subject_list__historical
table : cta__ridership__l_station_entries__daily_totals
table : table311_service_requests__street_lights__all_out__historical
table : cdph_environmental_records_lookup_table
table : table311_service_requests__alley_lights_out__historical
table : speed_camera_violations
table : cta__ridership__bus_routes__daily_totals_by_route
table : crimes__one_year_prior_to_present
table : contracts
table : table311_service_requests__tree_debris__historical
table : table311_service_requests__sanitation_code_complaints__historic
table : payments
table : cdph_asbestos_and_demolition_notification
table : cdph_environmental_complaints
table : violence_reduction__shotspotter_alerts
table : energy_usage_2010
table : cdph_environmental_permits
table : busine

table : budget__2012_budget_ordinance__appropriations
table : chicago_early_learning_programs
table : tif_balance_sheets__2009
table : budget__2015_budget_ordinance__appropriations
table : parks__facilities__features_deprecated_november_2016
table : foia_request_log__311
table : list_of_contractors_doing_business_with_the_city_of_chicago
table : budget__2014_budget_ordinance__appropriations
table : budget__2016_budget_ordinance__appropriations
table : house_share_prohibited_buildings_list
table : cta__ridership__daily_boarding_totals
table : foia_request_log__planning_and_development
table : budget__2013_budget_ordinance__appropriations
table : employee_reimbursements
table : budget__2017_budget_ordinance__appropriations
table : lobbyist_data__historical__lobbyist_agency_report__2010
table : tax_increment_financing_tif_annual_report__projects
table : budget__2019_budget_ordinance__appropriations
table : budget__2020_budget_ordinance__appropriations
table : budget__2022_budget_ordinance

table : public_health_services_chicago_primary_care_community_health_ce
table : public_health_statistics__births_to_mothers_aged_1519_years_old
table : red_light_camera_locations
table : lobbyist_data__expenditures__hosting
table : public_health_statistics__low_birth_weight_in_chicago_by_year_1
table : array_of_things_locations
table : libraries__locations__contact_information_and_usual_hours_of_op
table : house_share_restricted_residential_zone_precincts
table : public_health_statistics__asthma_hospitalizations_in_chicago_by
table : public_health_statistics__diabetes_hospitalizations_in_chicago_
table : restricted_flavored_tobacco_flavor_terms
table : lobbyist_data__historical__lobbyist_major_expenditures_report__
table : public_health_statistics__births_and_birth_rates_in_chicago_by_
table : performance_metrics__transportation__street_lights_all_out
table : public_health_statistics__tuberculosis_cases_and_average_annual
table : public_health_statistics__general_fertility_rates_in_chi

  return np.sum((float(sizes[u])-sizes[l:u+1])/float(sizes[u])*counts[l:u+1])
