Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant code from merge tree #9827

Merged
merged 6 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ bool PartLog::addNewParts(Context & current_context, const PartLog::MutableDataP
elem.part_name = part->name;
elem.path_on_disk = part->getFullPath();

elem.bytes_compressed_on_disk = part->bytes_on_disk;
elem.bytes_compressed_on_disk = part->getBytesOnDisk();
elem.rows = part->rows_count;

elem.error = static_cast<UInt16>(execution_status.code);
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks

loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
calculateColumnsSizesOnDisk();
loadIndexGranularity();
loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
Expand Down Expand Up @@ -870,6 +871,31 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}


void IMergeTreeDataPart::calculateColumnsSizesOnDisk()
{
if (getColumns().empty() || checksums.empty())
throw Exception("Cannot calculate columns sizes when columns or checksums are not initialized", ErrorCodes::LOGICAL_ERROR);

calculateEachColumnSizesOnDisk(columns_sizes, total_columns_size);
}

ColumnSize IMergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & /* type */) const
{
/// For some types of parts columns_size maybe not calculated
auto it = columns_sizes.find(column_name);
if (it != columns_sizes.end())
return it->second;

return ColumnSize{};
}

void IMergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
for (const auto & [column_name, size] : columns_sizes)
column_to_size[column_name] = size.data_compressed;
}

bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
Expand Down
33 changes: 24 additions & 9 deletions dbms/src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
virtual bool supportsVerticalMerge() const { return false; }

/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (by locking table structure).
virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; }
/// Otherwise return information about column size on disk.
ColumnSize getColumnSize(const String & column_name, const IDataType & /* type */) const;

virtual ColumnSize getTotalColumnsSize() const { return {}; }
/// Return information about column size on disk for all columns in part
ColumnSize getTotalColumnsSize() const { return total_columns_size; }

virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;

virtual ~IMergeTreeDataPart();

using ColumnToSize = std::map<std::string, UInt64>;
virtual void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const {}
void accumulateColumnSizes(ColumnToSize & /* column_to_size */) const;

Type getType() const { return part_type; }

Expand Down Expand Up @@ -158,9 +158,6 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

size_t rows_count = 0;

std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
/// May not contain size of checksums.txt and columns.txt

time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
Expand Down Expand Up @@ -281,6 +278,9 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
UInt64 getIndexSizeInAllocatedBytes() const;
UInt64 getMarksCount() const;

UInt64 getBytesOnDisk() const { return bytes_on_disk; }
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }

size_t getFileSizeOrZero(const String & file_name) const;
String getFullRelativePath() const;
String getFullPath() const;
Expand All @@ -295,9 +295,20 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
virtual bool hasColumnFiles(const String & /* column */, const IDataType & /* type */) const{ return false; }

static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk_, const String & from);
void calculateColumnsSizesOnDisk();

protected:
/// Columns description.
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
ColumnSize total_columns_size;

/// Size for each column, calculated once in calcuateColumnSizesOnDisk
ColumnSizeByName columns_sizes;

/// Total size on disk, not only columns. May not contain size of
/// checksums.txt and columns.txt. 0 - if not counted;
UInt64 bytes_on_disk{0};

/// Columns description. Cannot be changed, after part initialiation.
NamesAndTypesList columns;
const Type part_type;

Expand All @@ -306,6 +317,10 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
virtual void checkConsistency(bool require_part_metadata) const = 0;
void checkConsistencyBase() const;

/// Fill each_columns_size and total_size with sizes from columns files on
/// disk using columns and checksums.
virtual void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0;

private:
/// In compact parts order of columns is necessary
NameToPosition column_name_to_position;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/MergeList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());

total_size_bytes_compressed += source_part->bytes_on_disk;
total_size_bytes_compressed += source_part->getBytesOnDisk();
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
}
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
{
part_log_elem.partition_id = part->info.partition_id;
part_log_elem.part_name = part->name;
part_log_elem.bytes_compressed_on_disk = part->bytes_on_disk;
part_log_elem.bytes_compressed_on_disk = part->getBytesOnDisk();
part_log_elem.rows = part->rows_count;

part_log->add(part_log_elem);
Expand Down Expand Up @@ -2135,7 +2135,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const
auto lock = lockParts();

for (auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->bytes_on_disk;
res += part->getBytesOnDisk();
}

return res;
Expand Down Expand Up @@ -3190,7 +3190,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;

auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk);
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->disk);
auto disk = reservation->getDisk();
String src_part_path = src_part->getFullRelativePath();
String dst_part_path = relative_data_path + tmp_dst_part_name;
Expand Down Expand Up @@ -3340,7 +3340,7 @@ try
if (result_part)
{
part_log_elem.path_on_disk = result_part->getFullPath();
part_log_elem.bytes_compressed_on_disk = result_part->bytes_on_disk;
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
part_log_elem.rows = result_part->rows_count;
}

Expand Down Expand Up @@ -3452,7 +3452,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
MergeTreeMovingParts parts_to_move;
for (const auto & part : parts)
{
auto reservation = space->reserve(part->bytes_on_disk);
auto reservation = space->reserve(part->getBytesOnDisk());
if (!reservation)
throw Exception("Move is not possible. Not enough space on '" + space->getName() + "'", ErrorCodes::NOT_ENOUGH_SPACE);

Expand Down
7 changes: 0 additions & 7 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,6 @@ class MergeTreeData : public IStorage
return column_sizes;
}

/// Calculates column sizes in compressed form for the current state of data_parts.
void recalculateColumnSizes()
{
auto lock = lockParts();
calculateColumnSizesImpl();
}

/// For ATTACH/DETACH/DROP PARTITION.
String getPartitionIDFromQuery(const ASTPtr & ast, const Context & context);

Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
}

IMergeSelector::Part part_info;
part_info.size = part->bytes_on_disk;
part_info.size = part->getBytesOnDisk();
part_info.age = current_time - part->modification_time;
part_info.level = part->info.level;
part_info.data = &part;
Expand Down Expand Up @@ -333,7 +333,7 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
return false;
}

sum_bytes += (*it)->bytes_on_disk;
sum_bytes += (*it)->getBytesOnDisk();

prev_it = it;
++it;
Expand Down Expand Up @@ -671,7 +671,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t total_size = 0;
for (const auto & part : parts)
{
total_size += part->bytes_on_disk;
total_size += part->getBytesOnDisk();
if (total_size >= data_settings->min_merge_bytes_to_use_direct_io)
{
LOG_DEBUG(log, "Will merge parts reading files in O_DIRECT");
Expand Down Expand Up @@ -1021,8 +1021,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = context.chooseCompressionCodec(
source_part->bytes_on_disk,
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
source_part->getBytesOnDisk(),
static_cast<double>(source_part->getBytesOnDisk()) / data.getTotalActiveSizeInBytes());


disk->createDirectories(new_part_tmp_path);
Expand Down Expand Up @@ -1193,7 +1193,7 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
{
size_t res = 0;
for (const MergeTreeData::DataPartPtr & part : source_parts)
res += part->bytes_on_disk;
res += part->getBytesOnDisk();

return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
Expand Down Expand Up @@ -1562,9 +1562,9 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
new_data_part->index = source_part->index;
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->bytes_on_disk
= MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->disk, new_data_part->getFullRelativePath());

new_data_part->setBytesOnDisk(
MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_data_part->disk, new_data_part->getFullRelativePath()));
new_data_part->calculateColumnsSizesOnDisk();
}


Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
default_codec, writer_settings, computed_index_granularity);
}

ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const

void MergeTreeDataPartCompact::calculateEachColumnSizesOnDisk(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const
{
ColumnSize total_size;
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
if (bin_checksum != checksums.files.end())
{
Expand All @@ -86,8 +86,6 @@ ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);
if (mrk_checksum != checksums.files.end())
total_size.marks += mrk_checksum->second.file_size;

return total_size;
}

void MergeTreeDataPartCompact::loadIndexGranularity()
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart

bool isStoredOnDisk() const override { return true; }

ColumnSize getTotalColumnsSize() const override;

bool hasColumnFiles(const String & column_name, const IDataType & type) const override;

String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
Expand All @@ -67,6 +65,9 @@ class MergeTreeDataPartCompact : public IMergeTreeDataPart

/// Loads marks index granularity into memory
void loadIndexGranularity() override;

/// Compact parts doesn't support per column size, only total size
void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};

}
42 changes: 11 additions & 31 deletions dbms/src/Storages/MergeTree/MergeTreeDataPartWide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,6 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size;
}

ColumnSize MergeTreeDataPartWide::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}

ColumnSize MergeTreeDataPartWide::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}

void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullRelativePath();
Expand Down Expand Up @@ -157,20 +140,6 @@ MergeTreeDataPartWide::~MergeTreeDataPartWide()
removeIfNeeded();
}

void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
{
IDataType::SubstreamPath path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
auto bin_file_path = getFullRelativePath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin";
if (disk->exists(bin_file_path))
column_to_size[name_type.name] += disk->getFileSize(bin_file_path);
}, path);
}
}

void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
checkConsistencyBase();
Expand Down Expand Up @@ -258,4 +227,15 @@ String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & colum
return filename;
}

void MergeTreeDataPartWide::calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const
{
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
each_columns_size[column.name] = size;
total_size.add(size);
}
}

}
8 changes: 2 additions & 6 deletions dbms/src/Storages/MergeTree/MergeTreeDataPartWide.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,8 @@ class MergeTreeDataPartWide : public IMergeTreeDataPart

bool supportsVerticalMerge() const override { return true; }

void accumulateColumnSizes(ColumnToSize & column_to_size) const override;

String getFileNameForColumn(const NameAndTypePair & column) const override;

ColumnSize getTotalColumnsSize() const override;

ColumnSize getColumnSize(const String & column_name, const IDataType & type) const override;

~MergeTreeDataPartWide() override;

bool hasColumnFiles(const String & column, const IDataType & type) const override;
Expand All @@ -67,6 +61,8 @@ class MergeTreeDataPartWide : public IMergeTreeDataPart
void loadIndexGranularity() override;

ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;

void calculateEachColumnSizesOnDisk(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};

}
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa

ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->bytes_on_disk);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());

return new_data_part;
}
Expand Down