Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
fixes to lsb_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecca committed May 18, 2023
1 parent 3e0be41 commit 4c34f2c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 25 deletions.
3 changes: 3 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ set dotenv-load
build:
cmake --build build --config Debug --target PuffinnJoin

build-lsbtree:
cmake --build build --config Debug --target LSBTree

check:
cmake --build build --config RelWithDebInfo --target PuffinnJoin
env OMP_NUM_THREADS=56 build/PuffinnJoin < instructions.txt > result.dsv
Expand Down
76 changes: 51 additions & 25 deletions join-experiments/lsb_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// Transactions on Database Systems
// https://dl.acm.org/doi/pdf/10.1145/1806907.1806912

#include <bitset>
#include <cstdlib>
#include <fstream>
#include <random>
Expand All @@ -24,13 +25,15 @@ class Progress {
uint64_t every;
uint64_t last_log;
uint64_t count;
uint64_t expected;

void log() const {
std::cerr << prefix << count << std::endl;
float perc = 100.0 * (float) count / (float) expected;
std::cerr << prefix << perc << "%" << std::endl;
}

public:
Progress(std::string prefix, uint64_t every): prefix(prefix), every(every), last_log(0), count(0) {}
Progress(std::string prefix, uint64_t every, uint64_t expected): prefix(prefix), every(every), last_log(0), count(0), expected(expected) {}

void update(uint64_t x) {
count += x;
Expand Down Expand Up @@ -183,6 +186,7 @@ std::pair<std::vector<std::pair<uint64_t, size_t>>, size_t> build_index(std::vec
}
}
std::cerr << "we need " << coord_grid_bits << " bits to represent each gridded coordinate" << std::endl;
std::cerr << "overall we need to interleave " << m*coord_grid_bits << " bits" << std::endl;

if(m * coord_grid_bits > 64) {
throw std::runtime_error("too many bits to interleave");
Expand All @@ -207,12 +211,12 @@ std::pair<std::vector<std::pair<uint64_t, size_t>>, size_t> build_index(std::vec
}

// longest common prefix
size_t lcp(uint64_t a, uint64_t b) {
size_t lcp(uint64_t a, uint64_t b, size_t word_length) {
size_t l = 0;
while ( (a >> l) != (b >> l) ) {
l--;
while ( (a >> l) != (b >> l) && l < word_length ) {
l++;
}
return 64 - l;
return word_length - l;
}

std::pair<size_t, size_t> find_segment(std::vector<std::pair<uint64_t, size_t>> & index, size_t from) {
Expand All @@ -225,6 +229,7 @@ std::pair<size_t, size_t> find_segment(std::vector<std::pair<uint64_t, size_t>>

void merge(std::vector<Pair> & a, std::vector<Pair> & b, size_t k) {
for (Pair p : b) {
// DEBUG
a.push_back(p);
std::push_heap(a.begin(), a.end(), cmp_pairs);
if (a.size() > k) {
Expand All @@ -239,21 +244,25 @@ std::vector<Pair> cp3(
std::vector<std::pair<uint64_t, size_t>> & index,
size_t k,
size_t m,
size_t coord_grid_bits)
{
size_t coord_grid_bits,
size_t min_leaves // the minimum number of neighbor leaves to explore
) {
int nthreads = omp_get_max_threads();
std::vector<std::vector<Pair>> tl_result(nthreads);

Progress progress("++ ", 100);
Progress progress("++ ", 5000, index.size());

std::pair<size_t, size_t> current = find_segment(index, 0);
while (current.first < index.size()) {
progress.update(current.second - current.first);
/* std::cerr << " current block from " << current.first << " to " << current.second << " with " << (current.second - current.first) << " elements" << std::endl; */
/* std::cerr << "Start zindex " << std::bitset<64>(index[current.first].first) */
/* << std::endl */
/* << " End zindex " << std::bitset<64>(index[current.second-1].first) */
/* << std::endl; */
// std::cerr << "blk: " << (current.second - current.first) << std::endl;
// std::cerr << " current block from " << current.first << " to " << current.second << " with " << (current.second - current.first) << " elements" << std::endl;
// std::cerr << "zindex " << std::bitset<64>(index[current.first].first) << std::endl;

for (size_t tid=1; tid<tl_result.size(); tid++) {
tl_result[tid].clear();
}

// Self join of the current node
#pragma omp parallel for
for (size_t i=current.first; i < current.second; i++) {
Expand All @@ -280,27 +289,39 @@ std::vector<Pair> cp3(
if (tl_result[0].size() == k) {
best = tl_result[0].front().distance;
}
// std::cerr << " current best guess " << best << std::endl;
std::pair<size_t, size_t> next = find_segment(index, current.second);
Progress inner_progress("---- ", 100000);
// Progress inner_progress("---- ", 100000);
size_t visited_leaves = 0;
while (next.first < index.size()) {
inner_progress.update(next.second - next.first);
/* std::cerr << " next block from " << next.first << " to " */
/* << next.second << " with " */
/* << (next.second - next.first) << " elements" */
/* << std::endl; */
// inner_progress.update(next.second - next.first);
// std::cerr << " next block from " << next.first << " to "
// << next.second << " with "
// << (next.second - next.first) << " elements"
// << " zindex " << std::bitset<64>(index[next.first].first)
// << std::endl;
// check if the next segment is too far away
if (tl_result[0].size() == k) {
float d = tl_result[0].front().distance;
if (d < best) {
best = d;
std::cerr << " current best guess " << best << std::endl;
// std::cerr << " current best guess " << best << std::endl;
}
}
float block_bound = std::pow(2.0, 1 + coord_grid_bits - std::floor(lcp(index[current.first].first, index[next.first].first) / m));
if (best < block_bound) {
//std::cerr << " early stopping loop block bound=" << block_bound << std::endl;
size_t word_length = coord_grid_bits * m;
size_t prefix_length = lcp(index[current.first].first, index[next.first].first, word_length);
float block_bound = std::pow(2.0, 1 + coord_grid_bits - std::floor(prefix_length / m));
// std::cerr << "longest common prefix " << prefix_length
// << " block bound " << block_bound
// << std::endl;
if (best < block_bound && visited_leaves >= min_leaves) {
// std::cerr << " early stopping loop block bound=" << block_bound << std::endl;
break;
}

for (size_t tid=1; tid<tl_result.size(); tid++) {
tl_result[tid].clear();
}

#pragma omp parallel for
for (size_t i=current.first; i < current.second; i++) {
Expand All @@ -325,6 +346,7 @@ std::vector<Pair> cp3(

// go to the next block
next = find_segment(index, next.second);
visited_leaves++;
}
current = find_segment(index, current.second);
}
Expand Down Expand Up @@ -387,6 +409,7 @@ int main(void) {

// query params
unsigned int k = 1;
size_t min_leaves = 0;

std::istringstream workload_params_stream(workload_params_str);
while (true) {
Expand All @@ -397,13 +420,15 @@ int main(void) {
}
if (key == "k") {
workload_params_stream >> k;
} else if (key == "min_leaves") {
workload_params_stream >> min_leaves;
} else {
std::cout << "sppv1 err unknown parameter " << key << std::endl;
throw std::invalid_argument("unknown parameter");
}
}

auto top_pairs = cp3(dataset, index_pair.first, k, m, index_pair.second);
auto top_pairs = cp3(dataset, index_pair.first, k, m, index_pair.second, min_leaves);
send("ok");

bool check = false;
Expand Down Expand Up @@ -431,6 +456,7 @@ int main(void) {
auto p = top_pairs.back();
top_pairs.pop_back();
std::cout << p.a << " " << p.b << std::endl;
std::cerr << p.a << " " << p.b << " " << p.distance << std::endl;
if (check) {
std::pop_heap(actual.begin(), actual.end(), cmp_pairs);
auto pcheck = actual.back();
Expand Down

0 comments on commit 4c34f2c

Please sign in to comment.