In [1]:
%matplotlib inline

In [2]:
import itertools
import pickle
from collections import defaultdict
import ast
import numpy as np
from contextlib import closing
import numba as nb
import pickle
import shelve
import pandas as pd
import multiprocessing
import csv
import os
import sqlite3
import time
import dill
from glob import iglob

### Load in dictionary of probabilities

In [3]:
with open("/data1/kji/databases/probabilities.pkl", "rb") as f:
    utilities = pickle.load(f)

### Load in dictionary of n-grams and their counts

In [4]:
db_dir = '/data1/kji/databases'

In [4]:
files = []
for filename in iglob(f"{db_dir}/*_counts.pkl", recursive=True):
    files.append(filename)

In [5]:
files.sort(key = len)

In [6]:
filesplit1 = [
     '/data1/kji/databases/0_counts.pkl',
     '/data1/kji/databases/04_counts.pkl',
     '/data1/kji/databases/05_counts.pkl',
     '/data1/kji/databases/034_counts.pkl',
     '/data1/kji/databases/024_counts.pkl',
     '/data1/kji/databases/015_counts.pkl',
     '/data1/kji/databases/035_counts.pkl',
     '/data1/kji/databases/023_counts.pkl',
     '/data1/kji/databases/0124_counts.pkl',
     '/data1/kji/databases/0135_counts.pkl',
     '/data1/kji/databases/0145_counts.pkl',
     '/data1/kji/databases/0134_counts.pkl',
     '/data1/kji/databases/0234_counts.pkl']

In [7]:
filesplit2 = [
     '/data1/kji/databases/03_counts.pkl',
     '/data1/kji/databases/01_counts.pkl',
     '/data1/kji/databases/02_counts.pkl',
     '/data1/kji/databases/013_counts.pkl',
     '/data1/kji/databases/012_counts.pkl',
     '/data1/kji/databases/014_counts.pkl',
     '/data1/kji/databases/045_counts.pkl',
     '/data1/kji/databases/025_counts.pkl',
     '/data1/kji/databases/0235_counts.pkl',
     '/data1/kji/databases/0123_counts.pkl',
     '/data1/kji/databases/0345_counts.pkl',
     '/data1/kji/databases/0125_counts.pkl',
     '/data1/kji/databases/0245_counts.pkl']

In [8]:
def store_DB(db, combination, outdir):
    with open(f"{outdir}/{combination}.pkl", "wb") as f:
        pickle.dump(db, f, protocol=pickle.HIGHEST_PROTOCOL)

## Test database with in memory dictionary

Store each n-gram, utility, and the number of matches for an n-gram in a separate array.

In [16]:
def generate_arrays(file_list):
    n_grams = np.array([], dtype = object)
    probabilities = np.array([], dtype = float)
    matches = np.array([], dtype = int)
    start = 0
    for filename in file_list:
        with open(filename, "rb") as f:
            d = pickle.load(f)
            combination = filename.split('/')[-1].split("_")[0]
            length = len(d)            
            fps = np.empty(length, dtype = object)
            utils = np.full(length, utilities[combination])
            values = np.empty(length, dtype = int)
            for i, (n_gram, count) in enumerate(d.items()):
                fps[i] = n_gram
                values[i] = count
            d.clear()
            n_grams = np.concatenate([n_grams, fps])
            probabilities = np.concatenate([probabilities, utils])
            matches = np.concatenate([matches, values])
            print(f"finished processing {filename}")
            f.flush()
    return n_grams, probabilities, matches

Mark indices as boundaries for each type of n-gram.

In [9]:
indices1 = {}
start = 0
for filename in filesplit1:
    with open(filename, "rb") as f:
        d = pickle.load(f)
    combination = filename.split('/')[-1].split("_")[0]
    length = len(d)
    # update indices for the given combination
    indices1[combination] = [start, start + length]
    start = start + length

In [10]:
indices2 = {}
start = 0
for filename in filesplit2:
    with open(filename, "rb") as f:
        d = pickle.load(f)
    combination = filename.split('/')[-1].split("_")[0]
    length = len(d)
    # update indices for the given combination
    indices2[combination] = [start, start + length]
    start = start + length

In [11]:
indices1.update(indices2)

In [11]:
store_DB(indices1, "db_indices", db_dir)

In [17]:
n_grams, probs, matches = generate_arrays(filesplit2)

finished processing /data1/kji/databases/03_counts.pkl
finished processing /data1/kji/databases/01_counts.pkl
finished processing /data1/kji/databases/02_counts.pkl
finished processing /data1/kji/databases/013_counts.pkl
finished processing /data1/kji/databases/012_counts.pkl
finished processing /data1/kji/databases/014_counts.pkl
finished processing /data1/kji/databases/045_counts.pkl
finished processing /data1/kji/databases/025_counts.pkl
finished processing /data1/kji/databases/0235_counts.pkl
finished processing /data1/kji/databases/0123_counts.pkl
finished processing /data1/kji/databases/0345_counts.pkl
finished processing /data1/kji/databases/0125_counts.pkl
finished processing /data1/kji/databases/0245_counts.pkl


In [18]:
store_DB(n_grams, "fps_split1", db_dir)

In [19]:
store_DB(probs, "utils_split1", db_dir)

In [20]:
store_DB(matches, "matches_split1", db_dir)

## Generate database construction file

In [5]:
@nb.njit(parallel=True)
def parallel_runtime(x, Dm_squared, Dm, Dt, Bt):
    y=np.empty(x.shape)
    for i in nb.prange(len(x)):
        y[i]=((x[i]*x[i] + Dm_squared) / (x[i] + Dm) - Dt) / (Bt - Dt)
    return y

In [6]:
@nb.njit(parallel=True)
def parallel_memory(x, Dm, Bm):
    y=np.empty(x.shape)
    for i in nb.prange(len(x)):
        y[i]=x[i] / (Bm - Dm)
    return y

In [7]:
@nb.njit(parallel=True)
def parallel_max(x, y, utils):
    z=np.empty(x.shape)
    for i in nb.prange(len(x)):
        z[i] = utils[i] / max(x[i], y[i])
    return z

In [8]:
combinations = []
for n_gram in range(1, 5):
    combinations += [[0] + list(tup) for tup in itertools.combinations(range(1, 6), n_gram-1)]

In [9]:
combinations = ["".join(str(num) for num in combination) for combination in combinations]

In [10]:
def construct_db(Bm, Bt, outdir, outfile_name, k):
    """
    Inputs: a memory budget and a runtime budget, and a number k representing the top k fingerprints we want each time
    Output: a file specifying an ordered list of n-grams to include in the final database
    """
    # total number of matches for all fingerprints in the database
    Dm = 0
    Dm_squared = 0
    # cumulative average runtime cost of all fingerprints in the database
    Dt = 0
    
    masks = [None, None]
    with open(f"{outdir}/{outfile_name}.csv", "w") as f2:
        writer = csv.writer(f2)
        writer.writerow(["ratio", "m_i", "Ct_i", "Bm-Dm", "Bt-Dt"])
        while Bm - Dm > 0 and Bt - Dt > 0:
            start = time.time()
            candidate_n_grams = np.array([], dtype = object)
            candidate_ratios = np.array([], dtype = float)
            candidate_indices = np.array([], dtype = int)
            candidate_matches = np.array([], dtype = int)
            # consider top k from both file splits
            for i in range(2):
                with open(f'/data1/kji/databases/utils_split{i}.pkl', "rb") as f2:
                    probabilities = pickle.load(f2)
                with open(f'/data1/kji/databases/matches_split{i}.pkl', "rb") as f3:
                    matches = pickle.load(f3)
                    
                if masks[i] is None:
                    masks[i] = np.zeros(matches.shape)
                    print(matches.shape)
                else:
                    matches[np.nonzero(masks[i])[0]] = 10 ** 9
                    
                # parallelize computation of marginal costs
                marginal_memory = parallel_memory(matches, Dm, Bm)
                marginal_runtime = parallel_runtime(matches, Dm_squared, Dm, Dt, Bt)
                ratios = parallel_max(marginal_memory, marginal_runtime, probabilities)
                del probabilities
                
                # get the top k fingerprints with the highest utility:cost ratio
                idx = np.argpartition(ratios, -k)[-k:]
                candidate_ratios = np.concatenate((candidate_ratios, ratios[idx]))
                del ratios
                candidate_matches = np.concatenate((candidate_matches, matches[idx]))
                del matches
                
                with open(f"/data1/kji/databases/fps_split{i}.pkl", "rb") as f1:
                    n_grams = pickle.load(f1)
                candidate_n_grams = np.concatenate((candidate_n_grams, n_grams[idx]))
                del n_grams
                candidate_indices = np.concatenate((candidate_indices, idx))
                del idx
                
                assert candidate_ratios.shape == candidate_matches.shape == candidate_n_grams.shape == candidate_indices.shape
            # after looping over both, we have length 2*k arrays storing all the information we need        
            final_idx = np.argpartition(candidate_ratios, -k)[-k:]
            final_matches = candidate_matches[final_idx]
            first_half = final_idx < k
            idx_split1 = candidate_indices[final_idx[first_half]]
            idx_split2 = candidate_indices[final_idx[~first_half]]
            
            # use mask arrays to keep track of fingerprints we've selected
            if idx_split1.size:
                masks[0][idx_split1] = 1
            if idx_split2.size:
                masks[1][idx_split2] = 1
                
            Dm += np.sum(final_matches)
            Dm_squared += np.sum(final_matches**2)
            cost = Dm_squared / Dm - Dt
            Dt = Dm_squared / Dm
            remaining_memory = Bm - Dm
            remaining_runtime = Bt - Dt
            writer.writerow([candidate_ratios[final_idx[-1]], final_matches[-1], cost, remaining_memory, remaining_runtime])

            # free memory
            candidate_n_grams = None
            candidate_ratios = None
            candidate_indices = None
            candidate_matches = None
            
    # write all used fingerprints to a file
    with open(f"{outdir}/{outfile_name}.txt", "w") as out:
        with open(f'/data1/kji/databases/db_indices.pkl', "rb") as f:
            db_indices = pickle.load(f)
        for i in range(2):
            with open(f"/data1/kji/databases/fps_split{i}.pkl", "rb") as f:
                n_grams = pickle.load(f)
                used = np.nonzero(masks[i])[0]
                masks[i] = None
                for j in used:
                    for combination in combinations:
                        start, end = db_indices[combination]
                        if start <= j < end:
                            out.write(f"{n_grams[j]} {combination}\n")
                            break
                del n_grams
                del used

In [11]:
Bm = 200000000
Bt = 200000000
k = 10000000

In [None]:
construct_db(Bm, Bt, "/data1/kji/db_tests", "200mill", k)

(329995024,)
(336214600,)


## Construct database of offsets

Now that we have selected all the fingerprints, we construct the database containing each fingerprint and their offsets in IMSLP.

In [5]:
fp_file =  "/data1/kji/db_tests/test.txt"

Construct a dictionary with all the n-grams in our database.

In [6]:
def initialize_entry(line):
    line = line.rstrip().split()
    n_gram, combination = ''.join(line[:-1]), line[-1]
    return ast.literal_eval(n_gram), combination

In [7]:
def make_db(fp_file):
    with open(fp_file) as f:
        lines = f.readlines()
    n_cores = 30
    pool = multiprocessing.Pool(n_cores)
    keys = pool.map(initialize_entry, lines)
    dbs = {combination: {} for combination in combinations}
    for fp, combination in keys:
        dbs[combination][fp] = {}
    return dbs

In [8]:
dbs = make_db(fp_file)

In [22]:
for combination in combinations:
    with open(f"/data1/kji/databases/sub_dbs/{combination}.pkl", "wb") as f:
        pickle.dump(dbs[combination], f, protocol = pickle.HIGHEST_PROTOCOL)

In [None]:
with open("/data1/kji/databases/135mill_empty.pkl", "wb") as f:
    pickle.dump(dbs, f, protocol = pickle.HIGHEST_PROTOCOL)

Now load in every single database file and update our current database with the real offsets.

In [23]:
db_dir = '/data1/kji/databases'

In [24]:
files = []
for filename in iglob(f"{db_dir}/*.pkl", recursive=True):
    files.append(filename)
files.sort(key = len)

In [25]:
files[:26]

['/data1/kji/databases/0.pkl',
 '/data1/kji/databases/02.pkl',
 '/data1/kji/databases/03.pkl',
 '/data1/kji/databases/05.pkl',
 '/data1/kji/databases/04.pkl',
 '/data1/kji/databases/01.pkl',
 '/data1/kji/databases/015.pkl',
 '/data1/kji/databases/014.pkl',
 '/data1/kji/databases/035.pkl',
 '/data1/kji/databases/013.pkl',
 '/data1/kji/databases/023.pkl',
 '/data1/kji/databases/025.pkl',
 '/data1/kji/databases/045.pkl',
 '/data1/kji/databases/034.pkl',
 '/data1/kji/databases/012.pkl',
 '/data1/kji/databases/024.pkl',
 '/data1/kji/databases/0125.pkl',
 '/data1/kji/databases/0124.pkl',
 '/data1/kji/databases/0134.pkl',
 '/data1/kji/databases/0135.pkl',
 '/data1/kji/databases/0235.pkl',
 '/data1/kji/databases/0345.pkl',
 '/data1/kji/databases/0123.pkl',
 '/data1/kji/databases/0145.pkl',
 '/data1/kji/databases/0234.pkl',
 '/data1/kji/databases/0245.pkl']

In [28]:
def add_db_values(file, db):
    with open(file, "rb") as f:
        d = dill.load(f)
    for n_gram in d.keys():
        if n_gram in db:
            if db[n_gram] is None:
                db[n_gram] = d[n_gram]
            else:
                db[n_gram].update(d[n_gram])
    d.clear()
    print(f"finished {file}")

In [29]:
for combination in combinations:
    if dbs[combination]:
        add_db_values(f"{db_dir}/{combination}.pkl", dbs[combination])

finished /data1/kji/databases/0.pkl
finished /data1/kji/databases/01.pkl
finished /data1/kji/databases/02.pkl
finished /data1/kji/databases/03.pkl
finished /data1/kji/databases/04.pkl
finished /data1/kji/databases/05.pkl
finished /data1/kji/databases/015.pkl
finished /data1/kji/databases/024.pkl
finished /data1/kji/databases/034.pkl
finished /data1/kji/databases/035.pkl


In [10]:
with open("/data1/kji/databases/split21_Bt_200mill.pkl", "wb") as f:
    pickle.dump(database, f)

In [3]:
with shelve.open("/data1/kji/databases2/shelve_test") as db:
    add_db_values(files[:16], db)

KeyboardInterrupt: 

In [10]:
add_db_values(files[13:16], database)

finished /data1/kji/databases2/034.pkl
finished /data1/kji/databases2/012.pkl
finished /data1/kji/databases2/024.pkl


In [14]:
with open("/data1/kji/databases/split2_Bt_200mill.pkl", "wb") as f:
    pickle.dump(result, f)

In [None]:
start = time.time()
for file in test_files:
    with open(file, "rb") as f:
        d = dill.load(f)
        combined.update(d)
        d.clear()
print(f"finished in {time.time() - start} seconds")

In [9]:
for fp in database:
    database[fp] = combined[fp]

In [None]:
with open("/data1/kji/databases2/140mill.pkl", "w") as f:
    pickle.dump(database, f)

## SQL database

In [5]:
engine = sqlalchemy.create_engine('sqlite:////data1/kji/databases/test.db')

In [12]:
test_files = ['/data1/kji/databases/0_counts.pkl', 
              '/data1/kji/databases/01_counts.pkl',
              '/data1/kji/databases/012_counts.pkl',
              '/data1/kji/databases/0123_counts.pkl']

In [6]:
for filename in test_files:
    with open(filename, "rb") as f:
        d = pickle.load(f)
        df_dict = defaultdict(list)
        combination = filename.split('/')[-1].split("_")[0]
        for n_gram, count in d.items():
            df_dict['combination'].append(combination)
            df_dict['n_gram'].append(str(n_gram))
            df_dict['matches'].append(count)
            df_dict['utility'].append(utilities[combination])
            df_dict['used'].append(0)
        d.clear()
        df = pd.DataFrame.from_dict(df_dict)
        with engine.begin() as connection:
            df.to_sql(combination, con=connection, index=False, if_exists='replace')
        print(f"finished processing {filename}")
        f.flush()
        df_dict.clear()

finished processing /data1/kji/databases/0_counts.pkl
finished processing /data1/kji/databases/01_counts.pkl
finished processing /data1/kji/databases/012_counts.pkl
finished processing /data1/kji/databases/0123_counts.pkl


### Generate database construction plan

In [4]:
def marginal_memory(num_matches, remaining_budget):
    return num_matches / remaining_budget

In [5]:
def marginal_runtime(matches_squared, num_matches, total_matches, total_avg_runtime, runtime_budget):
    cost = (matches_squared + num_matches ** 2) / (total_matches + num_matches) - total_avg_runtime
    return cost / (runtime_budget - total_avg_runtime)

In [6]:
def metric(utility, num_matches, memory_budget, runtime_budget, total_matches, matches_squared, total_avg_runtime):
    marginal_memory_cost = marginal_memory(num_matches, memory_budget-total_matches)
    marginal_runtime_cost = marginal_runtime(matches_squared, num_matches, total_matches, total_avg_runtime, runtime_budget)
    return utility / max(marginal_memory_cost, marginal_runtime_cost)

In [14]:
conn = sqlite3.connect("/data1/kji/databases/test.db")
conn.create_function("metric", 7, metric)
c = conn.cursor()

In [10]:
db_items = ['(2048, 0, 0, 0, 0, 0)', '(1073741824, 0, 0, 0, 0, 0)']

In [11]:
query = "select * from fingerprints where used == 0 order by utility desc limit 1"

In [15]:
result = c.execute(query).fetchone()

In [7]:
def query(db_path, table_name, memory_budget, runtime_budget, total_matches, matches_squared, total_avg_runtime):
    with closing(sqlite3.connect(db_path)) as con, con,  \
            closing(con.cursor()) as cur:
        con.create_function("metric", 7, metric)
        query = f"select * from '{table_name}' where used = 0 order by metric(utility, matches, ?, ?, ?, ?, ?) desc limit 1"
        cur.execute(query, (memory_budget, runtime_budget, total_matches, matches_squared, total_avg_runtime))
        result = cur.fetchone()
        ratio = metric(result[3], result[2], memory_budget, runtime_budget, total_matches, matches_squared, total_avg_runtime)
        return (result, ratio)

In [8]:
def construct_db(db_path, memory_budget, runtime_budget, outdir):
    """
    Inputs: a database which contains n-grams, their utility, and number of matches,
            a memory budget, and a runtime budget
    Output: a file specifying an ordered list of n-grams to include in the final database
    """
    conn = sqlite3.connect(db_path)
    
    with closing(conn.cursor()) as c:
        c.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = c.fetchall()
        
    # clear all used bits in the database
    for table in tables:
        with closing(conn.cursor()) as cur:
            cur.execute(f"UPDATE '{table[0]}' SET used = 0")
    
    total_matches = 0
    matches_squared = 0
    total_avg_runtime = 0
    with open(f"{outdir}/Bm_10k_Bt_8k_fingerprints.txt", "w") as f1, open(f"{outdir}/Bm_10k_Bt_8k_info.csv", "w") as f2:
        writer = csv.writer(f2)
        writer.writerow(["fingerprint", "ratio", "m_i", "Ct_i", "Bm-Dm", "Bt-Dt"])
        while memory_budget - total_matches > 0 and runtime_budget - total_avg_runtime > 0:
            start = time.time()
            inputs = [(db_path, table[0], memory_budget, runtime_budget, total_matches, matches_squared, total_avg_runtime) for table in tables]
            with multiprocessing.Pool(processes = 26) as pool:
                results = pool.starmap(query, inputs)
            # get the pair with the highest ratio
            result = max(results, key = lambda pair: pair[1])
            n_gram = result[0][0]
            f1.write(f"{n_gram}\n")
            ratio = result[1]
            m_i = result[0][2]
            cost = (matches_squared + m_i ** 2) / (total_matches + m_i) - total_avg_runtime
            remaining_memory = memory_budget - total_matches
            remaining_runtime = runtime_budget - total_avg_runtime
            print(remaining_memory)
            print(remaining_runtime)
            writer.writerow([result[0][1], ratio, m_i, cost, remaining_memory, remaining_runtime])
            total_matches += m_i
            matches_squared += m_i**2
            total_avg_runtime += matches_squared / total_matches  
            combination = result[0][0]
            with closing(conn.cursor()) as c:
                c.execute(f"UPDATE '{combination}' SET used = 1 WHERE combination = '{combination}' and n_gram = '{n_gram}'")
            print(f"finished in {time.time() - start} seconds")

In [9]:
db_path = "/data1/kji/databases/test.db"

In [10]:
memory_budget = 10000

In [11]:
runtime_budget = 8000

In [11]:
%load_ext line_profiler

No multithreading takes 166 seconds per fingerprint.

In [21]:
%lprun -f construct_db construct_db(db_path, memory_budget, runtime_budget, "experiments/db_tests")

UsageError: Line magic function `%lprun` not found.


In [12]:
construct_db(db_path, memory_budget, runtime_budget, "experiments/db_tests")

10000
8000
finished in 49.985451221466064 seconds
9999
7999.0
finished in 46.40796089172363 seconds
9998
7998.0
finished in 44.28696370124817 seconds
9997
7997.0
finished in 46.907904386520386 seconds
9996
7996.0
finished in 46.40547823905945 seconds
9995
7995.0
finished in 52.72178411483765 seconds
9994
7994.0
finished in 45.79409456253052 seconds
9993
7993.0
finished in 46.2898633480072 seconds
9992
7992.0
finished in 46.557454109191895 seconds
9991
7991.0
finished in 44.22396373748779 seconds
9990
7990.0
finished in 49.21182632446289 seconds


Process ForkPoolWorker-311:
Process ForkPoolWorker-308:
Process ForkPoolWorker-297:
Process ForkPoolWorker-293:
Process ForkPoolWorker-310:
Process ForkPoolWorker-309:
Process ForkPoolWorker-304:
Process ForkPoolWorker-291:
Process ForkPoolWorker-305:
Process ForkPoolWorker-296:
Process ForkPoolWorker-307:
Process ForkPoolWorker-312:
Process ForkPoolWorker-306:
Process ForkPoolWorker-300:
Process ForkPoolWorker-298:
Process ForkPoolWorker-294:
Process ForkPoolWorker-299:
Process ForkPoolWorker-301:
Process ForkPoolWorker-303:


KeyboardInterrupt: 