Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #15192 to 20.8: Fix insert into storage Buffer after alter. #15222

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/Storages/StorageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ bool StorageBuffer::optimize(
if (deduplicate)
throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);

flushAllBuffers(false);
flushAllBuffers(false, true);
return true;
}

Expand Down Expand Up @@ -595,14 +595,14 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p
}


void StorageBuffer::flushAllBuffers(const bool check_thresholds)
void StorageBuffer::flushAllBuffers(bool check_thresholds, bool reset_blocks_structure)
{
for (auto & buf : buffers)
flushBuffer(buf, check_thresholds);
flushBuffer(buf, check_thresholds, false, reset_blocks_structure);
}


void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked, bool reset_block_structure)
{
Block block_to_write;
time_t current_time = time(nullptr);
Expand Down Expand Up @@ -655,6 +655,8 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
try
{
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
if (reset_block_structure)
buffer.data.clear();
}
catch (...)
{
Expand Down Expand Up @@ -829,7 +831,9 @@ void StorageBuffer::alter(const AlterCommands & params, const Context & context,
checkAlterIsPossible(params, context.getSettingsRef());
auto metadata_snapshot = getInMemoryMetadataPtr();

/// So that no blocks of the old structure remain.
/// Flush all buffers to storages, so that no non-empty blocks of the old
/// structure remain. Structure of empty blocks will be updated during first
/// insert.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);

StorageInMemoryMetadata new_metadata = *metadata_snapshot;
Expand Down
8 changes: 5 additions & 3 deletions src/Storages/StorageBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ friend class BufferBlockOutputStream;

Poco::Logger * log;

void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
void flushAllBuffers(bool check_thresholds = true, bool reset_blocks_structure = false);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2020-01-01 00:05:00
2020-01-01 00:05:00
2020-01-01 00:06:00 hello
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
DROP TABLE IF EXISTS buf_dest;
DROP TABLE IF EXISTS buf;

CREATE TABLE buf_dest (timestamp DateTime)
ENGINE = MergeTree PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp);

CREATE TABLE buf (timestamp DateTime) Engine = Buffer(currentDatabase(), buf_dest, 16, 3, 20, 2000000, 20000000, 100000000, 300000000);;

INSERT INTO buf (timestamp) VALUES (toDateTime('2020-01-01 00:05:00'));

ALTER TABLE buf_dest ADD COLUMN s String;
ALTER TABLE buf ADD COLUMN s String;

SELECT * FROM buf;

INSERT INTO buf (timestamp, s) VALUES (toDateTime('2020-01-01 00:06:00'), 'hello');

SELECT * FROM buf ORDER BY timestamp;

DROP TABLE IF EXISTS buf;
DROP TABLE IF EXISTS buf_dest;