Skip to content

Commit

Permalink
Merge pull request #53149 from seshWCS/gcddelta-codec
Browse files Browse the repository at this point in the history
Add GCD codec
  • Loading branch information
rschu1ze committed Sep 5, 2023
2 parents b314b8d + d416aaf commit 470ce34
Show file tree
Hide file tree
Showing 18 changed files with 1,442 additions and 4 deletions.
8 changes: 6 additions & 2 deletions docs/en/sql-reference/statements/create/table.md
Expand Up @@ -395,11 +395,15 @@ These codecs are designed to make compression more effective by using specific f

#### Delta

`Delta(delta_bytes)` — Compression approach in which raw values are replaced by the difference of two neighboring values, except for the first value that stays unchanged. Up to `delta_bytes` are used for storing delta values, so `delta_bytes` is the maximum size of raw values. Possible `delta_bytes` values: 1, 2, 4, 8. The default value for `delta_bytes` is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1.
`Delta(delta_bytes)` — Compression approach in which raw values are replaced by the difference of two neighboring values, except for the first value that stays unchanged. Up to `delta_bytes` are used for storing delta values, so `delta_bytes` is the maximum size of raw values. Possible `delta_bytes` values: 1, 2, 4, 8. The default value for `delta_bytes` is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1. Delta is a data preparation codec, i.e. cannot be used stand-alone.

#### DoubleDelta

`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). DoubleDelta is a data preparation codec, i.e. cannot be used stand-alone.

#### GCD

`GCD()` - - Calculates the greatest common denominator (GCD) of the values in the column, then divides each value by the GCD. Can be used with integer, decimal and date/time columns. A viable use case are timestamps or monetary values with high precision. GCD is a data preparation codec, i.e. cannot be used stand-alone.

#### Gorilla

Expand Down
1 change: 1 addition & 0 deletions docs/ru/sql-reference/statements/create/table.md
Expand Up @@ -242,6 +242,7 @@ ClickHouse поддерживает кодеки общего назначени

- `Delta(delta_bytes)` — Метод, в котором исходные значения заменяются разностью двух соседних значений, за исключением первого значения, которое остаётся неизменным. Для хранения разниц используется до `delta_bytes`, т.е. `delta_bytes` — это максимальный размер исходных данных. Возможные значения `delta_bytes`: 1, 2, 4, 8. Значение по умолчанию для `delta_bytes` равно `sizeof(type)`, если результат 1, 2, 4, or 8. Во всех других случаях — 1.
- `DoubleDelta` — Вычисляется разницу от разниц и сохраняет её в компакном бинарном виде. Оптимальная степень сжатия достигается для монотонных последовательностей с постоянным шагом, наподобие временных рядов. Можно использовать с любым типом данных фиксированного размера. Реализует алгоритм, используемый в TSDB Gorilla, поддерживает 64-битные типы данных. Использует 1 дополнительный бит для 32-байтовых значений: 5-битные префиксы вместо 4-битных префиксов. Подробнее читайте в разделе «Compressing Time Stamps» документа [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
- `GCD` - Вычисляет НОД всех чисел, а затем делит их на него. Этот кодек предназначен для подготовки данных и не подходит для использования без дополнительного кодека. GCD-кодек может использоваться с Integer, Decimal и DateTime. Хорошим вариантом использования было бы хранение временных меток или денежных значений с высокой точностью.
- `Gorilla` — Вычисляет XOR между текущим и предыдущим значением и записывает результат в компактной бинарной форме. Еффективно сохраняет ряды медленно изменяющихся чисел с плавающей запятой, поскольку наилучший коэффициент сжатия достигается, если соседние значения одинаковые. Реализует алгоритм, используемый в TSDB Gorilla, адаптируя его для работы с 64-битными значениями. Подробнее читайте в разделе «Compressing Values» документа [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
- `T64` — Метод сжатия который обрезает неиспользуемые старшие биты целочисленных значений (включая `Enum`, `Date` и `DateTime`). На каждом шаге алгоритма, кодек помещает блок из 64 значений в матрицу 64✕64, транспонирует её, обрезает неиспользуемые биты, а то, что осталось возвращает в виде последовательности. Неиспользуемые биты, это биты, которые не изменяются от минимального к максимальному на всём диапазоне значений куска данных.

Expand Down
4 changes: 2 additions & 2 deletions src/Compression/CompressionCodecDelta.cpp
Expand Up @@ -67,7 +67,7 @@ template <typename T>
void compressDataForType(const char * source, UInt32 source_size, char * dest)
{
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot delta compress, data size {} is not aligned to {}", source_size, sizeof(T));
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot delta compress, data size {} is not aligned to {}", source_size, sizeof(T));

T prev_src = 0;
const char * const source_end = source + source_size;
Expand All @@ -88,7 +88,7 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest,
const char * const output_end = dest + output_size;

if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot delta decompress, data size {} is not aligned to {}", source_size, sizeof(T));
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot delta decompress, data size {} is not aligned to {}", source_size, sizeof(T));

T accumulator{};
const char * const source_end = source + source_size;
Expand Down
271 changes: 271 additions & 0 deletions src/Compression/CompressionCodecGCD.cpp
@@ -0,0 +1,271 @@
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <base/unaligned.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h>
#include "Common/Exception.h"
#include "DataTypes/IDataType.h"
#include "base/Decimal_fwd.h"
#include "base/types.h"
#include "config.h"

#include <boost/integer/common_factor.hpp>
#include <libdivide-config.h>
#include <libdivide.h>


namespace DB
{

class CompressionCodecGCD : public ICompressionCodec
{
public:
explicit CompressionCodecGCD(UInt8 gcd_bytes_size_);

uint8_t getMethodByte() const override;

void updateHash(SipHash & hash) const override;

protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;

bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }

private:
const UInt8 gcd_bytes_size;
};


namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int BAD_ARGUMENTS;
}

CompressionCodecGCD::CompressionCodecGCD(UInt8 gcd_bytes_size_)
: gcd_bytes_size(gcd_bytes_size_)
{
setCodecDescription("GCD", {});
}

UInt32 CompressionCodecGCD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return uncompressed_size
+ gcd_bytes_size // To store gcd
+ 2; // Local header
}

uint8_t CompressionCodecGCD::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::GCD);
}

void CompressionCodecGCD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}

namespace
{

template <typename T>
void compressDataForType(const char * source, UInt32 source_size, char * dest)
{
if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot GCD compress, data size {} is not aligned to {}", source_size, sizeof(T));

const char * const source_end = source + source_size;

T gcd_divider{};
const auto * cur_source = source;
while (gcd_divider != T(1) && cur_source < source_end)
{
if (cur_source == source)
gcd_divider = unalignedLoad<T>(cur_source);
else
gcd_divider = boost::integer::gcd(gcd_divider, unalignedLoad<T>(cur_source));
cur_source += sizeof(T);
}

unalignedStore<T>(dest, gcd_divider);
dest += sizeof(T);

if constexpr (sizeof(T) <= 8)
{
/// libdivide support only UInt32 and UInt64.
using LibdivideT = std::conditional_t<sizeof(T) <= 4, UInt32, UInt64>;
libdivide::divider<LibdivideT> divider(static_cast<LibdivideT>(gcd_divider));
cur_source = source;
while (cur_source < source_end)
{
unalignedStore<T>(dest, static_cast<T>(static_cast<LibdivideT>(unalignedLoad<T>(cur_source)) / divider));
cur_source += sizeof(T);
dest += sizeof(T);
}
}
else
{
cur_source = source;
while (cur_source < source_end)
{
unalignedStore<T>(dest, unalignedLoad<T>(cur_source) / gcd_divider);
cur_source += sizeof(T);
dest += sizeof(T);
}
}
}

template <typename T>
void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const char * const output_end = dest + output_size;

if (source_size % sizeof(T) != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot GCD decompress, data size {} is not aligned to {}", source_size, sizeof(T));

if (source_size < sizeof(T))
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot GCD decompress, data size {} is less than {}", source_size, sizeof(T));

const char * const source_end = source + source_size;
const T gcd_multiplier = unalignedLoad<T>(source);
source += sizeof(T);
while (source < source_end)
{
if (dest + sizeof(T) > output_end) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress the data");
unalignedStore<T>(dest, unalignedLoad<T>(source) * gcd_multiplier);

source += sizeof(T);
dest += sizeof(T);
}
}

}

UInt32 CompressionCodecGCD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
UInt8 bytes_to_skip = source_size % gcd_bytes_size;
dest[0] = gcd_bytes_size;
dest[1] = bytes_to_skip; /// unused (backward compatibility)
memcpy(&dest[2], source, bytes_to_skip);
size_t start_pos = 2 + bytes_to_skip;
switch (gcd_bytes_size)
{
case 1:
compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 2:
compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 4:
compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 8:
compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 16:
compressDataForType<UInt128>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 32:
compressDataForType<UInt256>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
}
return 2 + gcd_bytes_size + source_size;
}

void CompressionCodecGCD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
if (source_size < 2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress. File has wrong header");

if (uncompressed_size == 0)
return;

UInt8 bytes_size = source[0];

if (!(bytes_size == 1 || bytes_size == 2 || bytes_size == 4 || bytes_size == 8 || bytes_size == 16 || bytes_size == 32))
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress. File has wrong header");

UInt8 bytes_to_skip = uncompressed_size % bytes_size;
UInt32 output_size = uncompressed_size - bytes_to_skip;

if (static_cast<UInt32>(2 + bytes_to_skip) > source_size)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress. File has wrong header");

memcpy(dest, &source[2], bytes_to_skip);
UInt32 source_size_no_header = source_size - bytes_to_skip - 2;
switch (bytes_size)
{
case 1:
decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
case 2:
decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
case 4:
decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
case 8:
decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
case 16:
decompressDataForType<UInt128>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
case 32:
decompressDataForType<UInt256>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip], output_size);
break;
}
}

namespace
{

UInt8 getGCDBytesSize(const IDataType * column_type)
{
WhichDataType which(column_type);
if (!(which.isInt() || which.isUInt() || which.isDecimal() || which.isDateOrDate32() || which.isDateTime() ||which.isDateTime64()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec GCD is not applicable for {} because the data type is not of fixed size",
column_type->getName());

size_t max_size = column_type->getSizeOfValueInMemory();
if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8 || max_size == 16 || max_size == 32)
return static_cast<UInt8>(max_size);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec GCD is only applicable for data types of size 1, 2, 4, 8, 16, 32 bytes. Given type {}",
column_type->getName());
}

}

void registerCodecGCD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::GCD);
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
/// Default bytes size is 1.
UInt8 gcd_bytes_size = 1;

if (arguments && !arguments->children.empty())
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "GCD codec must have 0 parameters, given {}", arguments->children.size());
else if (column_type)
gcd_bytes_size = getGCDBytesSize(column_type);

return std::make_shared<CompressionCodecGCD>(gcd_bytes_size);
};
factory.registerCompressionCodecWithType("GCD", method_code, codec_builder);
}

CompressionCodecPtr getCompressionCodecGCD(UInt8 gcd_bytes_size)
{
return std::make_shared<CompressionCodecGCD>(gcd_bytes_size);
}

}
4 changes: 4 additions & 0 deletions src/Compression/CompressionFactory.cpp
Expand Up @@ -168,7 +168,9 @@ void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
#ifdef ENABLE_QPL_COMPRESSION
void registerCodecDeflateQpl(CompressionCodecFactory & factory);
#endif

/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
Expand All @@ -179,6 +181,7 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecFPC(CompressionCodecFactory & factory);
void registerCodecGCD(CompressionCodecFactory & factory);
#endif

CompressionCodecFactory::CompressionCodecFactory()
Expand All @@ -198,6 +201,7 @@ CompressionCodecFactory::CompressionCodecFactory()
#ifdef ENABLE_QPL_COMPRESSION
registerCodecDeflateQpl(*this);
#endif
registerCodecGCD(*this);
#endif

default_codec = get("LZ4", {});
Expand Down
1 change: 1 addition & 0 deletions src/Compression/CompressionInfo.h
Expand Up @@ -47,6 +47,7 @@ enum class CompressionMethodByte : uint8_t
AES_256_GCM_SIV = 0x97,
FPC = 0x98,
DeflateQpl = 0x99,
GCD = 0x9a,
};

}
3 changes: 3 additions & 0 deletions src/Compression/fuzzers/CMakeLists.txt
Expand Up @@ -18,3 +18,6 @@ target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms)

clickhouse_add_executable (encrypted_decompress_fuzzer encrypted_decompress_fuzzer.cpp)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms)

clickhouse_add_executable (gcd_decompress_fuzzer gcd_decompress_fuzzer.cpp)
target_link_libraries (gcd_decompress_fuzzer PRIVATE dbms)

0 comments on commit 470ce34

Please sign in to comment.