Skip to content

Commit

Permalink
Merge pull request #18052 from kitaisreal/memory-storage-read-optimiz…
Browse files Browse the repository at this point in the history
…ation

MemoryStorage read optimization
  • Loading branch information
alexey-milovidov committed Dec 14, 2020
2 parents aac8b85 + 1bccd6d commit 493ee67
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 52 deletions.
94 changes: 43 additions & 51 deletions src/Storages/StorageMemory.cpp
Expand Up @@ -23,7 +23,7 @@ namespace ErrorCodes

class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &)>;
using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;
public:
/// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size.
Expand All @@ -32,17 +32,15 @@ class MemorySource : public SourceWithProgress

MemorySource(
Names column_names_,
BlocksList::const_iterator first_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot,
std::shared_ptr<const BlocksList> data_,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &, std::shared_ptr<const BlocksList> &) {})
std::shared_ptr<const Blocks> data_,
std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
InitializerFunc initializer_func_ = {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current_it(first_)
, num_blocks(num_blocks_)
, data(data_)
, parallel_execution_index(parallel_execution_index_)
, initializer_func(std::move(initializer_func_))
{
}
Expand All @@ -52,37 +50,47 @@ class MemorySource : public SourceWithProgress
protected:
Chunk generate() override
{
if (!postponed_init_done)
if (initializer_func)
{
initializer_func(current_it, num_blocks, data);
postponed_init_done = true;
initializer_func(data);
initializer_func = {};
}

if (current_block_idx == num_blocks)
size_t current_index = getAndIncrementExecutionIndex();

if (current_index >= data->size())
{
return {};
}

const Block & src = *current_it;
const Block & src = (*data)[current_index];
Columns columns;
columns.reserve(column_names.size());

/// Add only required columns to `res`.
for (const auto & name : column_names)
columns.push_back(src.getByName(name).column);

if (++current_block_idx < num_blocks)
++current_it;

return Chunk(std::move(columns), src.rows());
}

private:
const Names column_names;
BlocksList::const_iterator current_it;
size_t num_blocks;
size_t current_block_idx = 0;
size_t getAndIncrementExecutionIndex()
{
if (parallel_execution_index)
{
return (*parallel_execution_index)++;
}
else
{
return execution_index++;
}
}

std::shared_ptr<const BlocksList> data;
bool postponed_init_done = false;
const Names column_names;
size_t execution_index = 0;
std::shared_ptr<const Blocks> data;
std::shared_ptr<std::atomic<size_t>> parallel_execution_index;
InitializerFunc initializer_func;
};

Expand All @@ -107,7 +115,7 @@ class MemoryBlockOutputStream : public IBlockOutputStream
metadata_snapshot->check(block, true);
{
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
new_data->push_back(block);
storage.data.set(std::move(new_data));

Expand All @@ -123,7 +131,7 @@ class MemoryBlockOutputStream : public IBlockOutputStream


StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: IStorage(table_id_), data(std::make_unique<const BlocksList>())
: IStorage(table_id_), data(std::make_unique<const Blocks>())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
Expand Down Expand Up @@ -155,45 +163,29 @@ Pipe StorageMemory::read(

return Pipe(std::make_shared<MemorySource>(
column_names,
data.get()->end(),
0,
*this,
metadata_snapshot,
data.get(),
[this](BlocksList::const_iterator & current_it, size_t & num_blocks, std::shared_ptr<const BlocksList> & current_data)
nullptr /* data */,
nullptr /* parallel execution index */,
[this](std::shared_ptr<const Blocks> & data_to_initialize)
{
current_data = data.get();
current_it = current_data->begin();
num_blocks = current_data->size();
data_to_initialize = data.get();
}));
}

auto current_data = data.get();

size_t size = current_data->size();

if (num_streams > size)
num_streams = size;

Pipes pipes;

BlocksList::const_iterator it = current_data->begin();
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);

size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
{
size_t next_offset = (stream + 1) * size / num_streams;
size_t num_blocks = next_offset - offset;

assert(num_blocks > 0);

pipes.emplace_back(std::make_shared<MemorySource>(column_names, it, num_blocks, *this, metadata_snapshot, current_data));

while (offset < next_offset)
{
++it;
++offset;
}
pipes.emplace_back(std::make_shared<MemorySource>(column_names, *this, metadata_snapshot, current_data, parallel_execution_index));
}

return Pipe::unitePipes(std::move(pipes));
Expand All @@ -208,7 +200,7 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag

void StorageMemory::drop()
{
data.set(std::make_unique<BlocksList>());
data.set(std::make_unique<Blocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
Expand All @@ -233,25 +225,25 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
auto in = interpreter->execute();

in->readPrefix();
BlocksList out;
Blocks out;
Block block;
while ((block = in->read()))
{
out.push_back(block);
}
in->readSuffix();

std::unique_ptr<BlocksList> new_data;
std::unique_ptr<Blocks> new_data;

// all column affected
if (interpreter->isAffectingAllColumns())
{
new_data = std::make_unique<BlocksList>(out);
new_data = std::make_unique<Blocks>(out);
}
else
{
/// just some of the column affected, we need update it with new column
new_data = std::make_unique<BlocksList>(*(data.get()));
new_data = std::make_unique<Blocks>(*(data.get()));
auto data_it = new_data->begin();
auto out_it = out.begin();

Expand Down Expand Up @@ -284,7 +276,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
data.set(std::make_unique<BlocksList>());
data.set(std::make_unique<Blocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageMemory.h
Expand Up @@ -91,7 +91,7 @@ friend struct ext::shared_ptr_helper<StorageMemory>;

private:
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion<BlocksList> data;
MultiVersion<Blocks> data;

mutable std::mutex mutex;

Expand Down

0 comments on commit 493ee67

Please sign in to comment.