Skip to content

Commit

Permalink
Merge pull request #57863 from ClickHouse/fix-flaky-pg-test
Browse files Browse the repository at this point in the history
Follow up to #57568
  • Loading branch information
kssenii committed Dec 15, 2023
2 parents 06438cc + dbca466 commit f285a01
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 39 deletions.
90 changes: 56 additions & 34 deletions src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp
Expand Up @@ -138,15 +138,35 @@ MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
columns_ast.children.emplace_back(std::make_shared<ASTIdentifier>(name));
}

MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getBuffer()
MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getLastBuffer()
{
if (!buffer)
if (buffers.empty())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data buffer not initialized for {}",
throw Exception(ErrorCodes::LOGICAL_ERROR, "No data buffer for {}",
storage->getStorageID().getNameForLogs());
}

return *buffer;
return *buffers.back();
}

MaterializedPostgreSQLConsumer::StorageData::BufferPtr MaterializedPostgreSQLConsumer::StorageData::popBuffer()
{
if (buffers.empty())
return nullptr;

auto buffer = std::move(buffers.front());
buffers.pop_front();
return buffer;
}

void MaterializedPostgreSQLConsumer::StorageData::addBuffer(BufferPtr buffer)
{
buffers.push_back(std::move(buffer));
}

void MaterializedPostgreSQLConsumer::StorageData::returnBuffer(BufferPtr buffer)
{
buffers.push_front(std::move(buffer));
}

void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const
Expand All @@ -163,7 +183,7 @@ void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible

void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx)
{
auto & buffer = storage_data.getBuffer();
auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx);

const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
Expand Down Expand Up @@ -203,7 +223,7 @@ void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, con

void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx)
{
auto & buffer = storage_data.getBuffer();
auto & buffer = storage_data.getLastBuffer();
buffer.assertInsertIsPossible(column_idx);

const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
Expand Down Expand Up @@ -346,7 +366,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
}
}

auto & columns = storage_data.getBuffer().columns;
auto & columns = storage_data.getLastBuffer().columns;
switch (type)
{
case PostgreSQLQuery::INSERT:
Expand Down Expand Up @@ -637,7 +657,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
}
}

storage_data.setBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
storage_data.addBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
tables_to_sync.insert(table_name);
break;
}
Expand All @@ -660,43 +680,45 @@ void MaterializedPostgreSQLConsumer::syncTables()
{
auto table_name = *tables_to_sync.begin();
auto & storage_data = storages.find(table_name)->second;
auto & buffer = storage_data.getBuffer();
Block result_rows = buffer.sample_block.cloneWithColumns(std::move(buffer.columns));

try
while (auto buffer = storage_data.popBuffer())
{
if (result_rows.rows())
Block result_rows = buffer->sample_block.cloneWithColumns(std::move(buffer->columns));
try
{
auto storage = storage_data.storage;
if (result_rows.rows())
{
auto storage = storage_data.storage;

auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);
auto insert_context = Context::createCopy(context);
insert_context->setInternalQuery(true);

auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = std::make_shared<ASTExpressionList>(buffer.columns_ast);
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = std::make_shared<ASTExpressionList>(buffer->columns_ast);

InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
auto input = std::make_shared<SourceFromSingleChunk>(
result_rows.cloneEmpty(), Chunk(result_rows.getColumns(), result_rows.rows()));

assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input)));
assertBlocksHaveEqualStructure(input->getPort().getHeader(), io.pipeline.getHeader(), "postgresql replica table sync");
io.pipeline.complete(Pipe(std::move(input)));

CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
++synced_tables;
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
++synced_tables;
}
}
catch (...)
{
/// Retry this buffer later.
buffer->columns = result_rows.mutateColumns();
storage_data.returnBuffer(std::move(buffer));
throw;
}
}
catch (...)
{
/// Retry this buffer later.
buffer.columns = result_rows.mutateColumns();
throw;
}

storage_data.setBuffer(nullptr);
tables_to_sync.erase(tables_to_sync.begin());
}

Expand Down
13 changes: 9 additions & 4 deletions src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.h
Expand Up @@ -46,7 +46,7 @@ class MaterializedPostgreSQLConsumer
const Names column_names;
const ArrayInfo array_info;

struct Buffer
struct Buffer : private boost::noncopyable
{
Block sample_block;
MutableColumns columns;
Expand All @@ -56,13 +56,18 @@ class MaterializedPostgreSQLConsumer

void assertInsertIsPossible(size_t col_idx) const;
};
using BufferPtr = std::unique_ptr<Buffer>;

Buffer & getBuffer();
Buffer & getLastBuffer();

void setBuffer(std::unique_ptr<Buffer> buffer_) { buffer = std::move(buffer_); }
BufferPtr popBuffer();

void addBuffer(BufferPtr buffer);

void returnBuffer(BufferPtr buffer);

private:
std::unique_ptr<Buffer> buffer;
std::deque<BufferPtr> buffers;
};

using Storages = std::unordered_map<String, StorageData>;
Expand Down
Expand Up @@ -431,7 +431,11 @@ def attack(thread_id):

# random update / delete query
cursor.execute(query_pool[query_id].format(random_table_name))
print("table {} query {} ok".format(random_table_name, query_id))
print(
"Executing for table {} query: {}".format(
random_table_name, query_pool[query_id]
)
)

# allow some thread to do inserts (not to violate key constraints)
if thread_id < 5:
Expand Down

0 comments on commit f285a01

Please sign in to comment.