Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of a new lockless hashtable mpmc #262

Merged
merged 81 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
6f45215
Fix object type names
danielealbano Nov 14, 2022
3b6587b
Update epoch gc tests to use new names
danielealbano Nov 14, 2022
1a458e8
Initial cut implementation of the new hashtable
danielealbano Nov 14, 2022
b6fe569
Refactor out hashtable_mpmc_support_hash_half
danielealbano Nov 15, 2022
17d3379
Ensure that the operation is always marked as completed at the end of…
danielealbano Nov 15, 2022
e9d2845
Keep track of the amount of memory allocated via mmap as it's necessa…
danielealbano Nov 18, 2022
3d78b12
The memory allocated for the buckets as to be zero-ed
danielealbano Nov 18, 2022
6dec29c
Store in the struct the amount of memory allocated, required for free…
danielealbano Nov 18, 2022
e322ba5
Improve code readability
danielealbano Nov 18, 2022
f71330f
Implement the necessary functions to free up the allocated memory
danielealbano Nov 18, 2022
4aefbcd
Style fix
danielealbano Nov 18, 2022
1c8c411
Implement a wrapper to get a bucket index from an hash
danielealbano Nov 18, 2022
6e3d9d2
Should not operate on hashtable_mpmc but on hashtable_mpmc_data in pr…
danielealbano Nov 18, 2022
9ac0dae
The value can be fetched only if an entry has been found
danielealbano Nov 18, 2022
1a7559f
Optimize the creation and the destruction of new_key_value when setti…
danielealbano Nov 18, 2022
094ec29
If a double entry was found, reset the current entry and try from the…
danielealbano Nov 18, 2022
add58c6
Improve code readability
danielealbano Nov 18, 2022
7463d10
When searching for a duplicated entry while setting, the newly create…
danielealbano Nov 18, 2022
2f57332
Add headers
danielealbano Nov 18, 2022
d57e219
Add tests
danielealbano Nov 18, 2022
a2c1e0c
Implement the destructor for the memory to be gc-ed via the epoch gar…
danielealbano Nov 18, 2022
b2be620
Update tests to do not touch the epoch gc destructor registered by th…
danielealbano Nov 18, 2022
37c4a8d
Expose an interface, compatible with bools, to let the caller know it…
danielealbano Nov 18, 2022
dd262ba
Duplicate the benchmark for the hashtable for the new implementation
danielealbano Nov 18, 2022
b681f71
Drop unused code
danielealbano Nov 18, 2022
1c8e8d0
Ensure to interrupt the search if a match is found
danielealbano Nov 18, 2022
9932c4f
Catch failures when the epoch operation queue is full
danielealbano Nov 18, 2022
b96e024
Fix benchmarks for hashtable_mpmc_op_get
danielealbano Nov 18, 2022
e060f2a
Increase the size of the epoch operations queue and increase the amou…
danielealbano Nov 18, 2022
65a11f9
Ensure keys are validated only if the keys validation define is set t…
danielealbano Nov 18, 2022
4815408
Use macros for the pointer tagging
danielealbano Nov 19, 2022
b35364d
Add support for tombstones and to acquire the previous value if was u…
danielealbano Nov 19, 2022
02338b1
Update tests to handle the new parameters in op set and to test the t…
danielealbano Nov 19, 2022
84105ab
For clarity rename hashtable_mpmc_support_get_bucket_and_key_value in…
danielealbano Nov 19, 2022
638d4eb
Refactor hashtable_mpmc_support_acquire_empty_bucket_for_insert out f…
danielealbano Nov 19, 2022
8a37f12
Handle correctly the returns using hashtable_mpmc_result_t
danielealbano Nov 19, 2022
0d42b84
Initial bits and pieces to support resizing
danielealbano Nov 19, 2022
47141ee
Update the tests to support the new interface for hashtable_mpmc_init…
danielealbano Nov 19, 2022
8dd0660
Rename hashtable_mpmc_data_bucket_t in hashtable_mpmc_bucket_t
danielealbano Nov 20, 2022
d3160ed
Refactor out from op set the insert validation
danielealbano Nov 20, 2022
549f092
Improve code readability
danielealbano Nov 20, 2022
76c4017
Add support to gc hashtable_mpmc_data_t pointers
danielealbano Nov 20, 2022
4f34d34
Add the ability to define the preferred blocks' size used during the …
danielealbano Nov 20, 2022
f3176f1
Refactor upsize code to allow only 1 upsize at the time
danielealbano Nov 20, 2022
c03b957
Update tests and benchmarks code after refactoring
danielealbano Nov 20, 2022
017a9d8
Work on a copy of the bucket _packed value instead of directly on it …
danielealbano Nov 21, 2022
90bd943
Add accessor methods to read the last epoch from the epoch operation …
danielealbano Nov 21, 2022
aadf181
Ensure that if an insert is in progress when an upsize starts, the op…
danielealbano Nov 21, 2022
9a8c79d
Rename hashtable_mpmc_upsize_copy_block in hashtable_mpmc_upsize_migr…
danielealbano Nov 21, 2022
a0fedc1
Refactor out the bucket migration code from the block migration code …
danielealbano Nov 23, 2022
b1cdab4
Ensure that the buckets migration during an upsize is properly taken …
danielealbano Nov 23, 2022
e7b2626
Add tests for hashtable_mpmc_support_acquire_empty_bucket_for_insert …
danielealbano Nov 23, 2022
3380c12
FInish to implement the hashtable mpmc upsize support and related tests
danielealbano Nov 26, 2022
b3a60dd
Allow the hashtable spsc to be used with a larger set of keys
danielealbano Nov 26, 2022
6c939ba
Minor improvements of the queue mpmc tests
danielealbano Nov 26, 2022
989434d
Remove duplicated initialization
danielealbano Nov 26, 2022
bd18f33
Initial implementation of the insert benchmark for the new hashtable
danielealbano Nov 26, 2022
50dc2ed
Fix bug in tests
danielealbano Nov 26, 2022
aa286e3
Finish to implement benchmarks
danielealbano Nov 26, 2022
d3dfa0e
Use a memory fence to ensure that the values of static_epoch_gc_* are…
danielealbano Nov 27, 2022
c5b12a3
Ensure that the epoch garbage collectors are initialised before regis…
danielealbano Nov 27, 2022
0b87110
Improve debugging
danielealbano Nov 27, 2022
24e50a8
Implement some simple parallelisation for the generation of unique keys
danielealbano Nov 27, 2022
7b15350
Ensure epoch_gc is never null when trying to register the local epoch…
danielealbano Nov 27, 2022
5ddd0cc
Fix benchmark parameters
danielealbano Nov 27, 2022
7b28e2e
It's not realistic running the operations collection every insert, th…
danielealbano Nov 27, 2022
df96d47
Style fix
danielealbano Nov 29, 2022
4811758
Performance improvement on the hotpath
danielealbano Nov 29, 2022
b5516a9
Don't use the monotonic coarse clock to initialize the random number …
danielealbano Nov 29, 2022
3293aca
Add some required libraries to the list of the requirements for Debia…
danielealbano Nov 29, 2022
3a01187
Various fixes and improvements to the benchmark which will make it mo…
danielealbano Nov 29, 2022
c21fdaf
The resize should always be triggered by the caller
danielealbano Nov 30, 2022
6ad3b3d
Drop test code
danielealbano Nov 30, 2022
32f4c7b
Update the test after the refactoring of the upsize interface
danielealbano Nov 30, 2022
79b3e4f
Ensure to unregister the epoch gc thread at the end of the section
danielealbano Nov 30, 2022
4289fba
Fix epoch gc worker tests after introducing a new object type for the…
danielealbano Dec 1, 2022
4555097
Various fix for upsize process and the get, set and delete operations…
danielealbano Dec 4, 2022
39e0f43
Add tracking of the creation time and last update time in the hashtab…
danielealbano Dec 4, 2022
1d822ca
Update & fix testing after refactoring
danielealbano Dec 4, 2022
fcbaae9
Always use the hashtable mpmc data acquired at the beginning to ensur…
danielealbano Dec 4, 2022
a602326
Tune tests duration, drop unused variable
danielealbano Dec 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions benches/bench-hashtable-mpmc-new-op-get.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/**
* Copyright (C) 2018-2022 Daniele Salvatore Albano
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
**/

#include <cstdio>
#include <cstring>

#include <benchmark/benchmark.h>

#include "misc.h"
#include "exttypes.h"
#include "xalloc.h"
#include "clock.h"
#include "config.h"
#include "thread.h"
#include "memory_fences.h"
#include "transaction.h"
#include "transaction_spinlock.h"
#include "spinlock.h"
#include "log/log.h"
#include "memory_fences.h"
#include "utils_cpu.h"
#include "fiber/fiber.h"
#include "fiber/fiber_scheduler.h"
#include "data_structures/double_linked_list/double_linked_list.h"
#include "data_structures/ring_bounded_queue_spsc/ring_bounded_queue_spsc_uint128.h"
#include "data_structures/queue_mpmc/queue_mpmc.h"
#include "epoch_gc.h"
#include "data_structures/hashtable_mpmc/hashtable_mpmc.h"
#include "worker/worker_stats.h"
#include "worker/worker_context.h"
#include "worker/worker.h"

#include "../tests/unit_tests/support.h"
#include "../tests/unit_tests/data_structures/hashtable/mpmc/fixtures-hashtable-mpmc.h"

#include "log/log.h"
#include "log/sink/log_sink.h"
#include "log/sink/log_sink_console.h"

#define TEST_VALIDATE_KEYS 0

// It is possible to control the amount of threads used for the test tuning the two defines below
#define TEST_THREADS_RANGE_BEGIN (1)
#define TEST_THREADS_RANGE_END (utils_cpu_count())

class BenchmarkProgram {
private:
const char* tag;

void setup_initial_log_sink_console() {
log_level_t level = LOG_LEVEL_ALL;
log_sink_settings_t settings = { 0 };
settings.console.use_stdout_for_errors = false;

log_sink_register(log_sink_console_init(level, &settings));
}

public:
explicit BenchmarkProgram(const char *tag) {
this->tag = tag;
}

int Main(int argc, char** argv) {
// Setup the log sink
BenchmarkProgram::setup_initial_log_sink_console();

// Ensure that the current thread is pinned to the core 0 otherwise some tests can fail if the kernel shift around
// the main thread of the process
thread_current_set_affinity(0);

::benchmark::Initialize(&argc, argv);
if (::benchmark::ReportUnrecognizedArguments(argc, argv)) {
return 1;
}
::benchmark::RunSpecifiedBenchmarks();

return 0;
}
};

int main(int argc, char** argv) {
return BenchmarkProgram(__FILE__).Main(argc, argv);
}

static void hashtable_mpmc_op_get_not_found_key(benchmark::State& state) {
static hashtable_mpmc_t * hashtable;
hashtable_value_data_t value;
worker_context_t worker_context = { 0 };

worker_context.worker_index = state.thread_index();
worker_context_set(&worker_context);
transaction_set_worker_index(worker_context.worker_index);

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_init();

if (state.thread_index() == 0) {
hashtable = hashtable_mpmc_init(state.range(0));
}

test_support_set_thread_affinity(state.thread_index());

for (auto _ : state) {
benchmark::DoNotOptimize(hashtable_mpmc_op_get(
hashtable,
test_key_1,
test_key_1_len,
&value));
}

if (state.thread_index() == 0) {
hashtable_mpmc_free(hashtable);
}

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_free();
}

static void hashtable_mpmc_op_get_single_key_inline(benchmark::State& state) {
static hashtable_mpmc_t * hashtable;
static hashtable_mpmc_bucket_index_t bucket_index;
uintptr_t value;
bool result;
char error_message[150] = {0};
worker_context_t worker_context = { 0 };

worker_context.worker_index = state.thread_index();
worker_context_set(&worker_context);
transaction_set_worker_index(worker_context.worker_index);

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_init();

if (state.thread_index() == 0) {
hashtable = hashtable_mpmc_init(state.range(0));

bucket_index = hashtable_mpmc_support_bucket_index_from_hash(
hashtable->data,
test_key_1_hash);
char *test_key_1_clone = (char*)xalloc_alloc(test_key_1_len + 1);
strncpy(test_key_1_clone, test_key_1, test_key_1_len);

auto key_value = (hashtable_mpmc_data_key_value_t*)xalloc_alloc(sizeof(hashtable_mpmc_data_key_value_t));
strncpy(key_value->key.embedded.key, test_key_1_clone, test_key_1_len);
key_value->key.embedded.key_length = test_key_1_len;
key_value->value = test_value_1;
key_value->hash = test_key_1_hash;
key_value->key_is_embedded = true;

hashtable->data->buckets[bucket_index].data.hash_half = hashtable_mpmc_support_hash_half(test_key_1_hash);
hashtable->data->buckets[bucket_index].data.key_value = key_value;
}

test_support_set_thread_affinity(state.thread_index());

for (auto _ : state) {
benchmark::DoNotOptimize((result = hashtable_mpmc_op_get(
hashtable,
test_key_1,
test_key_1_len,
&value)));

#if TEST_VALIDATE_KEYS == 1
if (result == HASHTABLE_MPMC_RESULT_FALSE || value != test_value_1) {
sprintf(
error_message,
"Unable to get the key <%s> with bucket index <%lu> for the thread <%d>",
test_key_1,
bucket_index,
state.thread_index());
state.SkipWithError(error_message);
break;
}
#endif
}

if (state.thread_index() == 0) {
hashtable_mpmc_free(hashtable);
}

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_free();
}

static void hashtable_mpmc_op_get_single_key_external(benchmark::State& state) {
static hashtable_mpmc_t * hashtable;
static hashtable_mpmc_bucket_index_t bucket_index;
uintptr_t value;
bool result;
char error_message[150] = {0};
worker_context_t worker_context = { 0 };

worker_context.worker_index = state.thread_index();
worker_context_set(&worker_context);
transaction_set_worker_index(worker_context.worker_index);

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_init();

if (state.thread_index() == 0) {
hashtable = hashtable_mpmc_init(state.range(0));

bucket_index = hashtable_mpmc_support_bucket_index_from_hash(
hashtable->data,
test_key_1_hash);
char *test_key_1_clone = (char*)xalloc_alloc(test_key_1_len + 1);
strncpy(test_key_1_clone, test_key_1, test_key_1_len);

auto key_value = (hashtable_mpmc_data_key_value_t*)xalloc_alloc(sizeof(hashtable_mpmc_data_key_value_t));
key_value->key.external.key = test_key_1_clone;
key_value->key.external.key_length = test_key_1_len;
key_value->value = test_value_1;
key_value->hash = test_key_1_hash;
key_value->key_is_embedded = false;

hashtable->data->buckets[bucket_index].data.hash_half = hashtable_mpmc_support_hash_half(test_key_1_hash);
hashtable->data->buckets[bucket_index].data.key_value = key_value;
}

test_support_set_thread_affinity(state.thread_index());

for (auto _ : state) {
benchmark::DoNotOptimize((result = hashtable_mpmc_op_get(
hashtable,
test_key_1,
test_key_1_len,
&value)));

#if TEST_VALIDATE_KEYS == 1
if (result == HASHTABLE_MPMC_RESULT_FALSE || value != test_value_1) {
sprintf(
error_message,
"Unable to get the key <%s> with bucket index <%lu> for the thread <%d>",
test_key_1,
bucket_index,
state.thread_index());
state.SkipWithError(error_message);
break;
}
#endif
}

if (state.thread_index() == 0) {
hashtable_mpmc_free(hashtable);
}

hashtable_mpmc_thread_epoch_operation_queue_hashtable_key_value_free();
}

static void BenchArguments(benchmark::internal::Benchmark* b) {
// To run more than 131072 iterations is necessary to increase EPOCH_OPERATION_QUEUE_RING_SIZE in
// epoch_operations_queue.h as there is no processing of the queue included with the test
b
->Arg(256)
->ThreadRange(TEST_THREADS_RANGE_BEGIN, TEST_THREADS_RANGE_END)
->Iterations(131072)
->DisplayAggregatesOnly(false);
}

BENCHMARK(hashtable_mpmc_op_get_not_found_key)
->Apply(BenchArguments);

BENCHMARK(hashtable_mpmc_op_get_single_key_inline)
->Apply(BenchArguments);

BENCHMARK(hashtable_mpmc_op_get_single_key_external)
->Apply(BenchArguments);
Loading