Skip to content

Commit

Permalink
Merge pull request #57544 from ClickHouse/vdimir/dict_hashed_array_shard
Browse files Browse the repository at this point in the history
Support SHARDS for HashedArrayDictionary
  • Loading branch information
vdimir committed Dec 20, 2023
2 parents b10183e + 398499d commit ae42704
Show file tree
Hide file tree
Showing 14 changed files with 528 additions and 184 deletions.
8 changes: 4 additions & 4 deletions docs/en/sql-reference/dictionaries/index.md
Expand Up @@ -394,7 +394,7 @@ Configuration example:
or

``` sql
LAYOUT(HASHED_ARRAY())
LAYOUT(HASHED_ARRAY([SHARDS 1]))
```

### complex_key_hashed_array
Expand All @@ -412,7 +412,7 @@ Configuration example:
or

``` sql
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
LAYOUT(COMPLEX_KEY_HASHED_ARRAY([SHARDS 1]))
```

### range_hashed {#range_hashed}
Expand Down Expand Up @@ -2415,8 +2415,8 @@ clickhouse client \
--secure \
--password MY_PASSWORD \
--query "
INSERT INTO regexp_dictionary_source_table
SELECT * FROM input ('id UInt64, parent_id UInt64, regexp String, keys Array(String), values Array(String)')
INSERT INTO regexp_dictionary_source_table
SELECT * FROM input ('id UInt64, parent_id UInt64, regexp String, keys Array(String), values Array(String)')
FORMAT CSV" < regexp_dict.csv
```

Expand Down
337 changes: 213 additions & 124 deletions src/Dictionaries/HashedArrayDictionary.cpp

Large diffs are not rendered by default.

114 changes: 74 additions & 40 deletions src/Dictionaries/HashedArrayDictionary.h
Expand Up @@ -13,6 +13,7 @@
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryHelpers.h>
#include <Dictionaries/HashedDictionaryParallelLoader.h>

/** This dictionary stores all attributes in arrays.
* Key is stored in hash table and value is index into attribute array.
Expand All @@ -25,12 +26,17 @@ struct HashedArrayDictionaryStorageConfiguration
{
const bool require_nonempty;
const DictionaryLifetime lifetime;
size_t shards = 1;
size_t shard_load_queue_backlog = 10000;
bool use_async_executor = false;
};

template <DictionaryKeyType dictionary_key_type>
template <DictionaryKeyType dictionary_key_type, bool sharded>
class HashedArrayDictionary final : public IDictionary
{
using DictionaryParallelLoaderType = HashedDictionaryImpl::HashedDictionaryParallelLoader<dictionary_key_type, HashedArrayDictionary<dictionary_key_type, sharded>>;
friend class HashedDictionaryImpl::HashedDictionaryParallelLoader<dictionary_key_type, HashedArrayDictionary<dictionary_key_type, sharded>>;

public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;

Expand Down Expand Up @@ -63,13 +69,13 @@ class HashedArrayDictionary final : public IDictionary

double getHitRate() const override { return 1.0; }

size_t getElementCount() const override { return element_count; }
size_t getElementCount() const override { return total_element_count; }

double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
double getLoadFactor() const override { return static_cast<double>(total_element_count) / bucket_count; }

std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<HashedArrayDictionary<dictionary_key_type>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
return std::make_shared<HashedArrayDictionary<dictionary_key_type, sharded>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
}

DictionarySourcePtr getSource() const override { return source_ptr; }
Expand Down Expand Up @@ -132,50 +138,54 @@ class HashedArrayDictionary final : public IDictionary
template <typename Value>
using AttributeContainerType = std::conditional_t<std::is_same_v<Value, Array>, std::vector<Value>, PaddedPODArray<Value>>;

template <typename Value>
using AttributeContainerShardsType = std::vector<AttributeContainerType<Value>>;

struct Attribute final
{
AttributeUnderlyingType type;

std::variant<
AttributeContainerType<UInt8>,
AttributeContainerType<UInt16>,
AttributeContainerType<UInt32>,
AttributeContainerType<UInt64>,
AttributeContainerType<UInt128>,
AttributeContainerType<UInt256>,
AttributeContainerType<Int8>,
AttributeContainerType<Int16>,
AttributeContainerType<Int32>,
AttributeContainerType<Int64>,
AttributeContainerType<Int128>,
AttributeContainerType<Int256>,
AttributeContainerType<Decimal32>,
AttributeContainerType<Decimal64>,
AttributeContainerType<Decimal128>,
AttributeContainerType<Decimal256>,
AttributeContainerType<DateTime64>,
AttributeContainerType<Float32>,
AttributeContainerType<Float64>,
AttributeContainerType<UUID>,
AttributeContainerType<IPv4>,
AttributeContainerType<IPv6>,
AttributeContainerType<StringRef>,
AttributeContainerType<Array>>
container;

std::optional<std::vector<bool>> is_index_null;
AttributeContainerShardsType<UInt8>,
AttributeContainerShardsType<UInt16>,
AttributeContainerShardsType<UInt32>,
AttributeContainerShardsType<UInt64>,
AttributeContainerShardsType<UInt128>,
AttributeContainerShardsType<UInt256>,
AttributeContainerShardsType<Int8>,
AttributeContainerShardsType<Int16>,
AttributeContainerShardsType<Int32>,
AttributeContainerShardsType<Int64>,
AttributeContainerShardsType<Int128>,
AttributeContainerShardsType<Int256>,
AttributeContainerShardsType<Decimal32>,
AttributeContainerShardsType<Decimal64>,
AttributeContainerShardsType<Decimal128>,
AttributeContainerShardsType<Decimal256>,
AttributeContainerShardsType<DateTime64>,
AttributeContainerShardsType<Float32>,
AttributeContainerShardsType<Float64>,
AttributeContainerShardsType<UUID>,
AttributeContainerShardsType<IPv4>,
AttributeContainerShardsType<IPv6>,
AttributeContainerShardsType<StringRef>,
AttributeContainerShardsType<Array>>
containers;

/// One container per shard
using RowsMask = std::vector<bool>;
std::optional<std::vector<RowsMask>> is_index_null;
};

struct KeyAttribute final
{

KeyContainerType container;

/// One container per shard
std::vector<KeyContainerType> containers;
};

void createAttributes();

void blockToAttributes(const Block & block);
void blockToAttributes(const Block & block, DictionaryKeysArenaHolder<dictionary_key_type> & arena_holder, size_t shard);

void updateData();

Expand All @@ -185,6 +195,22 @@ class HashedArrayDictionary final : public IDictionary

void calculateBytesAllocated();

UInt64 getShard(UInt64 key) const
{
if constexpr (!sharded)
return 0;
/// NOTE: function here should not match with the DefaultHash<> since
/// it used for the HashMap/sparse_hash_map.
return intHashCRC32(key) % configuration.shards;
}

UInt64 getShard(StringRef key) const
{
if constexpr (!sharded)
return 0;
return StringRefHash()(key) % configuration.shards;
}

template <typename KeysProvider>
ColumnPtr getAttributeColumn(
const Attribute & attribute,
Expand All @@ -200,10 +226,13 @@ class HashedArrayDictionary final : public IDictionary
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;


using KeyIndexToElementIndex = std::conditional_t<sharded, PaddedPODArray<std::pair<ssize_t, UInt8>>, PaddedPODArray<ssize_t>>;

template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<ssize_t> & key_index_to_element_index,
const KeyIndexToElementIndex & key_index_to_element_index,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;

Expand All @@ -215,6 +244,8 @@ class HashedArrayDictionary final : public IDictionary

void resize(size_t total_rows);

Poco::Logger * log;

const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const HashedArrayDictionaryStorageConfiguration configuration;
Expand All @@ -225,17 +256,20 @@ class HashedArrayDictionary final : public IDictionary

size_t bytes_allocated = 0;
size_t hierarchical_index_bytes_allocated = 0;
size_t element_count = 0;
std::atomic<size_t> total_element_count = 0;
std::vector<size_t> element_counts;
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> found_count{0};

BlockPtr update_field_loaded_block;
Arena string_arena;
std::vector<std::unique_ptr<Arena>> string_arenas;
DictionaryHierarchicalParentToChildIndexPtr hierarchical_index;
};

extern template class HashedArrayDictionary<DictionaryKeyType::Simple>;
extern template class HashedArrayDictionary<DictionaryKeyType::Complex>;
extern template class HashedArrayDictionary<DictionaryKeyType::Simple, false>;
extern template class HashedArrayDictionary<DictionaryKeyType::Simple, true>;
extern template class HashedArrayDictionary<DictionaryKeyType::Complex, false>;
extern template class HashedArrayDictionary<DictionaryKeyType::Complex, true>;

}
9 changes: 5 additions & 4 deletions src/Dictionaries/HashedDictionary.h
Expand Up @@ -71,7 +71,8 @@ struct HashedDictionaryConfiguration
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
class HashedDictionary final : public IDictionary
{
friend class HashedDictionaryParallelLoader<dictionary_key_type, sparse, sharded>;
using DictionaryParallelLoaderType = HashedDictionaryParallelLoader<dictionary_key_type, HashedDictionary<dictionary_key_type, sparse, sharded>>;
friend class HashedDictionaryParallelLoader<dictionary_key_type, HashedDictionary<dictionary_key_type, sparse, sharded>>;

public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;
Expand Down Expand Up @@ -987,7 +988,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::getItemsImpl(
auto key = keys_extractor.extractCurrentKey();
auto shard = getShard(key);

const auto & container = attribute_containers[getShard(key)];
const auto & container = attribute_containers[shard];
const auto it = container.find(key);

if (it != container.end())
Expand Down Expand Up @@ -1020,11 +1021,11 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
{
if (!source_ptr->hasUpdateField())
{
std::optional<HashedDictionaryParallelLoader<dictionary_key_type, sparse, sharded>> parallel_loader;
std::optional<DictionaryParallelLoaderType> parallel_loader;
if constexpr (sharded)
parallel_loader.emplace(*this);

QueryPipeline pipeline = QueryPipeline(source_ptr->loadAll());
QueryPipeline pipeline(source_ptr->loadAll());

DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
Expand Down
7 changes: 3 additions & 4 deletions src/Dictionaries/HashedDictionaryParallelLoader.h
Expand Up @@ -38,13 +38,12 @@ namespace DB::HashedDictionaryImpl
{

/// Implementation parallel dictionary load for SHARDS
template <DictionaryKeyType dictionary_key_type, bool sparse, bool sharded>
template <DictionaryKeyType dictionary_key_type, typename DictionaryType>
class HashedDictionaryParallelLoader : public boost::noncopyable
{
using HashedDictionary = HashedDictionary<dictionary_key_type, sparse, sharded>;

public:
explicit HashedDictionaryParallelLoader(HashedDictionary & dictionary_)
explicit HashedDictionaryParallelLoader(DictionaryType & dictionary_)
: dictionary(dictionary_)
, shards(dictionary.configuration.shards)
, pool(CurrentMetrics::HashedDictionaryThreads, CurrentMetrics::HashedDictionaryThreadsActive, CurrentMetrics::HashedDictionaryThreadsScheduled, shards)
Expand Down Expand Up @@ -118,7 +117,7 @@ class HashedDictionaryParallelLoader : public boost::noncopyable
}

private:
HashedDictionary & dictionary;
DictionaryType & dictionary;
const size_t shards;
ThreadPool pool;
std::vector<std::optional<ConcurrentBoundedQueue<Block>>> shards_queues;
Expand Down
Expand Up @@ -26,6 +26,62 @@ select all values as input stream
0 value_0 value_second_0
1 value_1 value_second_1
2 value_2 value_second_2
Dictionary hashed_array_dictionary_simple_key_simple_attributes
dictGet existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 value_0 value_second_0
1 value_1 value_second_1
2 value_2 value_second_2
Dictionary hashed_array_dictionary_simple_key_complex_attributes
dictGet existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 value_0 value_second_0
1 value_1 \N
2 value_2 value_second_2
Dictionary hashed_array_dictionary_simple_key_complex_attributes
dictGet existing value
value_0 value_second_0
Expand Down Expand Up @@ -64,3 +120,13 @@ dictGet
dictGetHierarchy
[1]
[4,2,1]
Dictionary hashed_array_dictionary_simple_key_hierarchy
dictGet
0
0
1
1
2
dictGetHierarchy
[1]
[4,2,1]

0 comments on commit ae42704

Please sign in to comment.