Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Fix computation of baseline
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecca committed May 22, 2023
1 parent b61a24b commit db48d6a
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 102 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ if (OpenMP_FOUND)
target_link_libraries(StoreJaccard ${OpenMP_CXX_LIBRARIES} HighFive)
endif()

add_executable(TopPairsCosine "join-experiments/TopPairsCosine.cpp")
if (OpenMP_FOUND)
target_link_libraries(TopPairsCosine ${OpenMP_CXX_LIBRARIES} HighFive)
endif()

add_executable(SampleJaccard "join-experiments/SampleJaccard.cpp")
if (OpenMP_FOUND)
target_link_libraries(SampleJaccard ${OpenMP_CXX_LIBRARIES} HighFive)
Expand Down
102 changes: 63 additions & 39 deletions join-experiments/TopPairsCosine.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
// Implements Algorithm CP3 of the paper
// Efficient and Accurate Nearest Neighbor and Closest Pair Search in High-Dimensional Space
// Yufei Tao, Ke Yi, Cheng Sheng, Panos Kalnis
// Transactions on Database Systems
// https://dl.acm.org/doi/pdf/10.1145/1806907.1806912

#include <cstdlib>
#include <fstream>
#include <random>
Expand All @@ -19,18 +13,28 @@
#include "puffinn.hpp"
#include "puffinn/performance.hpp"

const unsigned long long MB = 1024*1024;
const unsigned long long KB = 1024;

class Progress {
std::string prefix;
std::chrono::time_point<std::chrono::system_clock> start;
uint64_t every;
uint64_t last_log;
uint64_t count;
uint64_t expected;

void log() const {
std::cerr << prefix << count << std::endl;
auto cur = std::chrono::system_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::seconds>(cur - start).count();
double rate = count / elapsed;
uint64_t todo = expected - count;
double estimate = todo / rate;
std::cerr << prefix << count << " elapsed " << elapsed << " :: " << estimate << " to go" << std::endl;
}

public:
Progress(std::string prefix, uint64_t every): prefix(prefix), every(every), last_log(0), count(0) {}
Progress(std::string prefix, uint64_t every, uint64_t expected): prefix(prefix), start(std::chrono::system_clock::now()), every(every), last_log(0), count(0), expected(expected) {}

void update(uint64_t x) {
#pragma omp atomic
Expand Down Expand Up @@ -100,6 +104,7 @@ int main(int argc, char** argv) {
std::cerr << "USAGE: TopPairsCosine [-k K] <dataset>" << std::endl;
return 1;
}
bool exact = false;

std::cerr << "loading data from " << path << std::endl;
H5Easy::File file(path, H5Easy::File::ReadWrite);
Expand All @@ -116,43 +121,62 @@ int main(int argc, char** argv) {
<< " vectors from hdf5 file, of dimension "
<< dim << std::endl;

std::vector<std::vector<Pair>> threads_res(omp_get_max_threads());

Progress prog("point ", 10000);
#pragma omp parallel for schedule(dynamic)
for (size_t i=0; i<data.size(); i++) {
std::vector<Pair> & res = threads_res[omp_get_thread_num()];
for (size_t j=i+1; j<data.size(); j++) {
float sim = (dotp(data[i], data[j]) + 1) / 2.0;
if (sim > 1.0) {
sim = 1.0;
}
res.push_back(Pair{i, j, sim});
std::push_heap(res.begin(), res.end(), cmp_pairs);
if (res.size() > k) {
std::pop_heap(res.begin(), res.end(), cmp_pairs);
res.pop_back();
std::vector<std::vector<uint32_t>> out_res;

if (exact) {
std::vector<std::vector<Pair>> threads_res(omp_get_max_threads());

Progress prog("point ", 1000, data.size());
#pragma omp parallel for schedule(dynamic)
for (size_t i=0; i<data.size(); i++) {
std::vector<Pair> & res = threads_res[omp_get_thread_num()];
for (size_t j=i+1; j<data.size(); j++) {
float sim = dotp(data[i], data[j]);
if (sim > 1.0) {
sim = 1.0;
}
res.push_back(Pair{i, j, sim});
std::push_heap(res.begin(), res.end(), cmp_pairs);
if (res.size() > k) {
std::pop_heap(res.begin(), res.end(), cmp_pairs);
res.pop_back();
}
}
prog.update(1);
}
prog.update(1);
}

std::vector<Pair> res = threads_res[0];
for (size_t tid=1; tid < threads_res.size(); tid++) {
for (auto pair : threads_res[tid]) {
res.push_back(pair);
std::push_heap(res.begin(), res.end(), cmp_pairs);
if (res.size() > k) {
std::pop_heap(res.begin(), res.end(), cmp_pairs);
res.pop_back();
std::vector<Pair> res = threads_res[0];
for (size_t tid=1; tid < threads_res.size(); tid++) {
for (auto pair : threads_res[tid]) {
res.push_back(pair);
std::push_heap(res.begin(), res.end(), cmp_pairs);
if (res.size() > k) {
std::pop_heap(res.begin(), res.end(), cmp_pairs);
res.pop_back();
}
}
}
}

std::sort_heap(res.begin(), res.end(), cmp_pairs);
std::vector<std::vector<float>> out_res;
for (auto pair : res) {
out_res.push_back({pair.similarity, pair.a, pair.b});
std::sort_heap(res.begin(), res.end(), cmp_pairs);
for (auto pair : res) {
out_res.push_back({pair.similarity, pair.a, pair.b});
}
} else {
puffinn::Index<puffinn::CosineSimilarity, puffinn::SimHash> index(
data[0].size(),
4 * KB * data.size(),
puffinn::IndependentHashArgs<puffinn::SimHash>()
);
for (auto v : data) { index.insert(v); }
index.rebuild(false, false);
auto pairs = index.global_lsh_join(k, 0.999);
for (auto entry : pairs.best_indices()) {
std::vector<uint32_t> vpair;
vpair.push_back(entry.first);
vpair.push_back(entry.second);
out_res.push_back(vpair);
}

}

/* for (size_t i=res.size(); i>res.size() - 10; i--) { */
Expand Down
140 changes: 77 additions & 63 deletions join-experiments/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import os
import hashlib
import sqlite3
#import faiss
import faiss
#import falconn
import json
import random
Expand Down Expand Up @@ -192,6 +192,7 @@ def compute_distances(k, dataset, distancefn):
norms = np.linalg.norm(dataset, axis=1)[:, np.newaxis]
assert np.sum(norms == 0.0) == 0
dataset /= norms
faiss.omp_set_num_threads(52)
index = faiss.IndexFlatIP(dataset.shape[1])
index.add(dataset)
all_distances, all_neighbors = index.search(dataset, k+1)
Expand Down Expand Up @@ -257,9 +258,10 @@ def compute_recalls(db):
missing_recalls = db.execute("SELECT rowid, algorithm, params, dataset, k, output_file, hdf5_group FROM main WHERE recall IS NULL AND WORKLOAD = 'global-top-k';").fetchall()
print("There are {} missing recalls for global-top-k".format(len(missing_recalls)))
for rowid, algorithm, params, dataset, k, output_file, hdf5_group in missing_recalls:
if k > 1000:
continue

if dataset == 'DeepImage':
print("TODO: Fix DeepImage")
if 'sample' in dataset:
continue

# Compute the top-1000 distances for the dataset, if they are not already there
Expand All @@ -268,43 +270,51 @@ def compute_recalls(db):

dataset_path = DATASETS[dataset]()
with h5py.File(dataset_path, 'r+') as hfp:
if dist_key not in hfp or nn_key not in hfp:
print('Computing top distances for', dataset)
distances, neighbors, avg_distance = compute_distances(1000, hfp['/train'], hfp.attrs['distance'])
hfp[dist_key] = distances
hfp[nn_key] = neighbors
hfp['/average_distance'] = avg_distance
if top_pairs_key not in hfp:
print('Computing top 1000 pairs for', dataset)
distances = hfp[dist_key]
neighbors = hfp[nn_key]
topk = []
for i, (dists, neighs) in tqdm(enumerate(zip(distances, neighbors)), total=neighbors.shape[0]):
for d, j in zip(dists, neighs):
if i != j:
t = (d, min(i, j), max(i, j))
if len(topk) > 2000:
heapq.heappushpop(topk, t)
else:
heapq.heappush(topk, t)
topk = list(set(topk)) # remove duplicates
topk.sort(reverse=True)
topk = topk[:1000]
hfp[top_pairs_key] = topk

baseline_pairs = set([(min(pair[0], pair[1]), max(pair[0], pair[1])) for pair in hfp[top_pairs_key][:k, 1:3].astype(np.int32)])
top = hfp[top_pairs_key][:]
kth_sim = top[k-1, 0]
# Select all the pairs with similarity larger then or equal
# to the k-th
baseline_pairs = set([
(min(pair[0], pair[1]), max(pair[0], pair[1]))
for pair in top[top[:,0] >= kth_sim][:,1:].astype(np.int32)
])
if dist_key not in hfp or nn_key not in hfp:
print('Computing top distances for', dataset)
distances, neighbors, avg_distance = compute_distances(1000, hfp['/train'], hfp.attrs['distance'])
hfp[dist_key] = distances
hfp[nn_key] = neighbors
hfp['/average_distance'] = avg_distance

print('Computing top 1000 pairs for', dataset)
distances = hfp[dist_key]
neighbors = hfp[nn_key]
topk = []
for i, (dists, neighs) in tqdm(enumerate(zip(distances, neighbors)), total=neighbors.shape[0]):
for d, j in zip(dists, neighs):
if i != j:
t = (d, min(i, j), max(i, j))
if len(topk) > 2000:
heapq.heappushpop(topk, t)
else:
heapq.heappush(topk, t)
topk = list(set(topk)) # remove duplicates
topk.sort(reverse=True)
topk = topk[:1000]
hfp[top_pairs_key] = topk

if hfp[top_pairs_key].shape[1] == 3:
baseline_pairs = set([(min(pair[0], pair[1]), max(pair[0], pair[1]))
for pair in hfp[top_pairs_key][:k, 1:3].astype(np.int32)])
top = hfp[top_pairs_key][:]
kth_sim = top[k-1, 0]
# Select all the pairs with similarity larger then or equal
# to the k-th
baseline_pairs = set([
(min(pair[0], pair[1]), max(pair[0], pair[1]))
for pair in top[top[:,0] >= kth_sim][:,1:].astype(np.int32)
])

else:
baseline_pairs = set([(min(pair[0], pair[1]), max(pair[0], pair[1]))
for pair in hfp[top_pairs_key][:k,:].astype(np.int32)])

print("Computing recalls for {} {} on {} with k={}".format(algorithm, params, dataset, k))
print(baseline_pairs)


# print("Computing recalls for {} {} on {} with k={}".format(algorithm, params, dataset, k))
# print(baseline_pairs)
output_file = os.path.join(BASE_DIR, output_file)
with h5py.File(output_file) as hfp:
actual_pairs = set(map(tuple, hfp[hdf5_group]['global-top-{}'.format(k)]))
Expand Down Expand Up @@ -581,7 +591,7 @@ def feed_data(self, h5py_path):
self._send("data")
self._send(distance)
program = self._subprocess_handle()
self._send("path " + h5py_path)run
self._send("path " + h5py_path)
self._expect("ok", "population phase failed")

def index(self, params):
Expand Down Expand Up @@ -1698,27 +1708,26 @@ def insert_sizes():

threads = 56

for dataset in ['DeepImage-sample-100k' ]:
index_params = {
'dataset': dataset,
'workload': 'local-top-k',
'algorithm': 'PMLSH',
'params': {}
}
query_params = [
{'k': k, 'radius': radius, 'alpha1': alpha1,
'T': T, 'approx': approx}
for k in [10]
for radius in [1.0]
for alpha1 in [0.001]
for T in [0.2, 0.3, 0.4]
for approx in [1.1, 1.25]
]
run_multiple(index_params, query_params)


for dataset in ['glove-200', 'DeepImage', 'DBLP', 'Orkut']:
continue
# for dataset in ['DeepImage-sample-100k' ]:
# index_params = {
# 'dataset': dataset,
# 'workload': 'local-top-k',
# 'algorithm': 'PMLSH',
# 'params': {}
# }
# query_params = [
# {'k': k, 'radius': radius, 'alpha1': alpha1,
# 'T': T, 'approx': approx}
# for k in [10]
# for radius in [1.0]
# for alpha1 in [0.001]
# for T in [0.2, 0.3, 0.4]
# for approx in [1.1, 1.25]
# ]
# run_multiple(index_params, query_params)


for dataset in ['glove-200']: #, 'DeepImage']:#, 'DBLP', 'Orkut']:
# ----------------------------------------------------------------------
# Xiao et al. global top-k
# if dataset in ['AOL', 'DBLP', "Orkut", "movielens-20M"]:
Expand Down Expand Up @@ -1753,7 +1762,8 @@ def insert_sizes():
'threads': threads,
'params': {
'space_usage': space_usage,
'hash_source': hash_source
'hash_source': hash_source,
'with_sketches': False
}
}
query_params = [
Expand All @@ -1773,12 +1783,16 @@ def insert_sizes():
# 'algorithm': 'LSBTree',
# 'params': {
# 'm': m,
# 'w': w
# 'w': w,
# }
# }
# join_params = [
# {'k': k}
# for k in [1, 10]
# {
# 'k': k,
# 'min_leaves': min_leaves
# }
# for k in [100, 1000]
# for min_leaves in [0, 2, 4, 8, 16, 32]
# ]
# run_multiple(index_params, join_params)

Expand Down

0 comments on commit db48d6a

Please sign in to comment.