Skip to content

Commit

Permalink
[Memory Engine] Add hash index implementation (#3462)
Browse files Browse the repository at this point in the history
  • Loading branch information
decster committed May 6, 2020
1 parent d647045 commit 7399997
Show file tree
Hide file tree
Showing 6 changed files with 393 additions and 0 deletions.
1 change: 1 addition & 0 deletions be/src/olap/memory/CMakeLists.txt
Expand Up @@ -22,5 +22,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory")

add_library(Memory STATIC
hash_index.cpp
mem_tablet.cpp
)
169 changes: 169 additions & 0 deletions be/src/olap/memory/hash_index.cpp
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/memory/hash_index.h"

#include <stdio.h>
#ifdef __SSE2__
#include <emmintrin.h>
#endif

#include <algorithm>
#include <vector>

#include "gutil/stringprintf.h"

namespace doris {

struct alignas(64) HashChunk {
static const uint32_t CAPACITY = 12;
uint8_t tags[12];
std::atomic<uint32_t> size;
uint32_t values[12];

const std::string debug_string() const {
std::string ret;
StringPrintf("[");
for (uint32_t i = 0; i < std::min((uint32_t)size, (uint32_t)12); i++) {
printf("%6u(%02x)", values[i], (uint32_t)tags[i]);
}
printf("]\n");
return ret;
}
};

const uint64_t HashIndex::npos;

HashIndex::HashIndex(size_t capacity)
: _size(0), _max_size(0), _num_chunks(0), _chunk_mask(0), _chunks(NULL) {
size_t min_chunk = (capacity * 14 / 12 + HashChunk::CAPACITY - 1) / HashChunk::CAPACITY;
if (min_chunk == 0) {
return;
}
size_t nc = 1;
while (nc < min_chunk) {
nc *= 2;
}
_chunks = reinterpret_cast<HashChunk*>(aligned_malloc(nc * 64, 64));
if (_chunks) {
_num_chunks = nc;
_chunk_mask = nc - 1;
memset(_chunks, 0, _num_chunks * 64);
_max_size = _num_chunks * HashChunk::CAPACITY * 12 / 14;
}
}

HashIndex::~HashIndex() {
if (_chunks) {
free(_chunks);
_chunks = 0;
_size = 0;
_max_size = 0;
_num_chunks = 0;
_chunk_mask = 0;
}
}

uint64_t HashIndex::find(uint64_t key_hash, std::vector<uint32_t>* entries) const {
uint64_t tag = std::max((uint64_t)1, key_hash & 0xff);
uint64_t pos = (key_hash >> 8) & _chunk_mask;
uint64_t orig_pos = pos;
#ifdef __SSE2__
auto tests = _mm_set1_epi8(static_cast<uint8_t>(tag));
while (true) {
// get corresponding chunk
HashChunk& chunk = _chunks[pos];
uint32_t sz = chunk.size;
// load tags
auto tags = _mm_load_si128(reinterpret_cast<__m128i*>(chunk.tags));
auto eqs = _mm_cmpeq_epi8(tags, tests);
// check tag equality and store equal tag positions into masks
uint32_t mask = _mm_movemask_epi8(eqs) & 0xfff;
// iterator over mask and put candidates into entries
while (mask != 0) {
uint32_t i = __builtin_ctz(mask);
mask &= (mask - 1);
entries->emplace_back(chunk.values[i]);
}
#else
// TODO: use NEON on arm platform
while (true) {
HashChunk& chunk = _chunks[pos];
uint32_t sz = chunk.size;
for (uint32_t i = 0; i < sz; i++) {
if (chunk.tags[i] == (uint8_t)tag) {
entries->emplace_back(chunk.values[i]);
}
}
#endif
if (sz == HashChunk::CAPACITY) {
// this chunk is full, so there may be more candidates in other chunks
uint64_t step = tag * 2 + 1;
pos = (pos + step) & _chunk_mask;
if (pos == orig_pos) {
return npos;
}
} else {
// return new entry position, so if key is not found, this entry position
// can be used to insert new key directly
return (pos << 4) | sz;
}
}
}

void HashIndex::set(uint64_t entry_pos, uint64_t key_hash, uint32_t value) {
uint64_t pos = entry_pos >> 4;
uint64_t tpos = entry_pos & 0xf;
HashChunk& chunk = _chunks[pos];
uint64_t tag = std::max((uint64_t)1, key_hash & 0xff);
chunk.tags[tpos] = tag;
chunk.values[tpos] = value;
if (tpos == chunk.size) {
chunk.size++;
_size++;
}
}

bool HashIndex::add(uint64_t key_hash, uint32_t value) {
uint64_t tag = std::max((uint64_t)1, key_hash & 0xff);
uint64_t pos = (key_hash >> 8) & _chunk_mask;
uint64_t orig_pos = pos;
while (true) {
HashChunk& chunk = _chunks[pos];
if (chunk.size == HashChunk::CAPACITY) {
uint64_t step = tag * 2 + 1;
pos = (pos + step) & _chunk_mask;
if (pos == orig_pos) {
return false;
}
} else {
chunk.tags[chunk.size] = tag;
chunk.values[chunk.size] = value;
chunk.size++;
_size++;
return true;
}
}
}

const std::string HashIndex::dump() const {
return StringPrintf("chunk: %zu %.1fM capacity: %zu/%zu slot util: %.3f", _num_chunks,
_num_chunks * 64.0f / (1024 * 1024), size(), max_size(),
size() / (_num_chunks * 12.0f));
}

} // namespace doris
108 changes: 108 additions & 0 deletions be/src/olap/memory/hash_index.h
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_
#define DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_

#include <stdint.h>

#include <atomic>
#include <string>
#include <vector>

#include "gutil/ref_counted.h"

namespace doris {

struct HashChunk;

// A hash index, which maps uint64_t hashcode(key) to uint32_t rowid(value).
//
// From the user's perspective, it doesn't store row-key values, so when user
// lookup a row-key, user need to first calculate row-key's hashcode, then
// query the hash index to get a list of candidate rowids, then check all
// the candidates rowids for equality.
//
// Note: this hash index do not support delete.
//
// Note: This hash index is thread-safe, but it only support
// single-writer/multi-reader style concurrency. When hash index reaches it's
// capacity, a writer need to expand&rebuild, this is not thread-safe, so the
// writer need to do a copy-on-write, and make it's owner to link to the new
// reference, while readers still reading the old reference.
//
// To add rowkeys into hash index, typical usage may look like:
// vector<uint32_t> entries;
// uint64_t hashcode = hashcode(rowkey)
// uint64_t new_entry_pos = hashIndex.find(hashcode, &entries)
// bool found = false;
// for (auto rowid : entries) {
// if (rowkey == get_row_key(rowid)) {
// // found existing rowkey
// found = true;
// break;
// }
// }
// if (!found) {
// hashIndex.set(new_entry_pos, hashcode, new_row_id);
// }
//
// Note: for more info, please refer to:
// https://engineering.fb.com/developer-tools/f14/
class HashIndex : public RefCountedThreadSafe<HashIndex> {
public:
static const uint64_t npos = (uint64_t)-1;

// Create hash index with capacity
explicit HashIndex(size_t capacity);
~HashIndex();

// Return number of elements
size_t size() const { return _size; }

// Return max number of elements this hash index can hold
// If size >= max_size, this hash index needs to expand and rebuild
size_t max_size() const { return _max_size; }

// Find by key hash, put all candidate values into entries,
// and return a entry position, so later user can use this position to
// add a value with the same key hash directly into this hash index.
uint64_t find(uint64_t key_hash, std::vector<uint32_t>* entries) const;

// Set a value with hash key_hash, at a entry position returned by find
void set(uint64_t entry_pos, uint64_t key_hash, uint32_t value);

// Add a value with hash key_hash
bool add(uint64_t key_hash, uint32_t value);

// return true if this hash index needs rebuild
bool need_rebuild() const { return _size >= _max_size; }

// dump debug information
const std::string dump() const;

private:
std::atomic<size_t> _size;
size_t _max_size;
size_t _num_chunks;
size_t _chunk_mask;
HashChunk* _chunks;
};

} /* namespace doris */

#endif /* DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_ */
1 change: 1 addition & 0 deletions be/test/olap/CMakeLists.txt
Expand Up @@ -82,3 +82,4 @@ ADD_BE_TEST(hll_test)
ADD_BE_TEST(selection_vector_test)
ADD_BE_TEST(options_test)
ADD_BE_TEST(fs/file_block_manager_test)
ADD_BE_TEST(memory/hash_index_test)

0 comments on commit 7399997

Please sign in to comment.