Skip to content

Commit

Permalink
Merge remote-tracking branch 'rschu1ze/master' into invalid-async-met…
Browse files Browse the repository at this point in the history
…rics-update-period
  • Loading branch information
rschu1ze committed Aug 16, 2023
2 parents 1986996 + d5ed014 commit 336262f
Show file tree
Hide file tree
Showing 73 changed files with 1,255 additions and 279 deletions.
1 change: 1 addition & 0 deletions docker/test/install/deb/Dockerfile
Expand Up @@ -12,6 +12,7 @@ ENV \
# install systemd packages
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
systemd \
&& \
apt-get clean && \
Expand Down
2 changes: 1 addition & 1 deletion docs/en/engines/database-engines/materialized-mysql.md
Expand Up @@ -190,7 +190,7 @@ These are the schema conversion manipulations you can do with table overrides fo
* Modify [column TTL](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#mergetree-column-ttl).
* Modify [column compression codec](/docs/en/sql-reference/statements/create/table.md/#codecs).
* Add [ALIAS columns](/docs/en/sql-reference/statements/create/table.md/#alias).
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes)
* Add [skipping indexes](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-data_skipping-indexes). Note that you need to enable `use_skip_indexes_if_final` setting to make them work (MaterializedMySQL is using `SELECT ... FINAL` by default)
* Add [projections](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#projections). Note that projection optimizations are
disabled when using `SELECT ... FINAL` (which MaterializedMySQL does by default), so their utility is limited here.
`INDEX ... TYPE hypothesis` as [described in the v21.12 blog post]](https://clickhouse.com/blog/en/2021/clickhouse-v21.12-released/)
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnAggregateFunction.cpp
Expand Up @@ -524,7 +524,7 @@ void ColumnAggregateFunction::insertDefault()
pushBackAndCreateState(data, arena, func.get());
}

StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin, const UInt8 *) const
{
WriteBufferFromArena out(arena, begin);
func->serialize(data[n], out, version);
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnAggregateFunction.h
Expand Up @@ -162,7 +162,7 @@ class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateF

void insertDefault() override;

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;

const char * deserializeAndInsertFromArena(const char * src_arena) override;

Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnArray.cpp
Expand Up @@ -205,7 +205,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}


StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnArray.h
Expand Up @@ -77,7 +77,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray>
StringRef getDataAt(size_t n) const override;
bool isDefaultAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnCompressed.h
Expand Up @@ -88,7 +88,7 @@ class ColumnCompressed : public COWHelper<IColumn, ColumnCompressed>
void insertData(const char *, size_t) override { throwMustBeDecompressed(); }
void insertDefault() override { throwMustBeDecompressed(); }
void popBack(size_t) override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeDecompressed(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeDecompressed(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeDecompressed(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeDecompressed(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeDecompressed(); }
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnConst.h
Expand Up @@ -151,7 +151,7 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst>
s -= n;
}

StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, const UInt8 *) const override
{
return data->serializeValueIntoArena(0, arena, begin);
}
Expand Down
21 changes: 19 additions & 2 deletions src/Columns/ColumnDecimal.cpp
Expand Up @@ -59,9 +59,26 @@ bool ColumnDecimal<T>::hasEqualValues() const
}

template <is_decimal T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(sizeof(T), begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = sizeof(T);
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnDecimal.h
Expand Up @@ -80,7 +80,7 @@ class ColumnDecimal final : public COWHelper<ColumnVectorHelper, ColumnDecimal<T

Float64 getFloat64(size_t n) const final { return DecimalUtils::convertTo<Float64>(data[n], scale); }

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
Expand Down
23 changes: 20 additions & 3 deletions src/Columns/ColumnFixedString.cpp
Expand Up @@ -86,11 +86,28 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memset(chars.data() + old_size + length, 0, n - length);
}

StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 * null_bit) const
{
auto * pos = arena.allocContinue(n, begin);
constexpr size_t null_bit_size = sizeof(UInt8);
StringRef res;
char * pos;
if (null_bit)
{
res.size = * null_bit ? null_bit_size : null_bit_size + n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
memcpy(pos, null_bit, null_bit_size);
if (*null_bit) return res;
pos += null_bit_size;
}
else
{
res.size = n;
pos = arena.allocContinue(res.size, begin);
res.data = pos;
}
memcpy(pos, &chars[n * index], n);
return StringRef(pos, n);
return res;
}

const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnFixedString.h
Expand Up @@ -115,7 +115,7 @@ class ColumnFixedString final : public COWHelper<ColumnVectorHelper, ColumnFixed
chars.resize_assume_reserved(chars.size() - n * elems);
}

StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, const UInt8 *) const override;

const char * deserializeAndInsertFromArena(const char * pos) override;

Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnFunction.h
Expand Up @@ -96,7 +96,7 @@ class ColumnFunction final : public COWHelper<IColumn, ColumnFunction>
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot insert into {}", getName());
}

StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot serialize from {}", getName());
}
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnLowCardinality.cpp
Expand Up @@ -255,7 +255,7 @@ void ColumnLowCardinality::insertData(const char * pos, size_t length)
idx.insertPosition(dictionary.getColumnUnique().uniqueInsertData(pos, length));
}

StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnLowCardinality::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return getDictionary().serializeValueIntoArena(getIndexes().getUInt(n), arena, begin);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnLowCardinality.h
Expand Up @@ -87,7 +87,7 @@ class ColumnLowCardinality final : public COWHelper<IColumn, ColumnLowCardinalit

void popBack(size_t n) override { idx.popBack(n); }

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;

const char * deserializeAndInsertFromArena(const char * pos) override;

Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnMap.cpp
Expand Up @@ -111,7 +111,7 @@ void ColumnMap::popBack(size_t n)
nested->popBack(n);
}

StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnMap::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return nested->serializeValueIntoArena(n, arena, begin);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnMap.h
Expand Up @@ -58,7 +58,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap>
void insert(const Field & x) override;
void insertDefault() override;
void popBack(size_t n) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
Expand Down
83 changes: 72 additions & 11 deletions src/Columns/ColumnNullable.cpp
Expand Up @@ -4,6 +4,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsDateTime.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
Expand Down Expand Up @@ -34,6 +38,7 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
{
/// ColumnNullable cannot have constant nested column. But constant argument could be passed. Materialize it.
nested_column = getNestedColumn().convertToFullColumnIfConst();
nested_type = nested_column->getDataType();

if (!getNestedColumn().canBeInsideNullable())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "{} cannot be inside Nullable column", getNestedColumn().getName());
Expand Down Expand Up @@ -134,21 +139,77 @@ void ColumnNullable::insertData(const char * pos, size_t length)
}
}

StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
char * pos;

auto * pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);

if (arr[n])
return StringRef(pos, s);

auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);

/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
switch (nested_type)
{
case TypeIndex::UInt8:
return static_cast<const ColumnUInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt16:
return static_cast<const ColumnUInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt32:
return static_cast<const ColumnUInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt64:
return static_cast<const ColumnUInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt128:
return static_cast<const ColumnUInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UInt256:
return static_cast<const ColumnUInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int8:
return static_cast<const ColumnInt8 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int16:
return static_cast<const ColumnInt16 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int32:
return static_cast<const ColumnInt32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int64:
return static_cast<const ColumnInt64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int128:
return static_cast<const ColumnInt128 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Int256:
return static_cast<const ColumnInt256 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float32:
return static_cast<const ColumnFloat32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Float64:
return static_cast<const ColumnFloat64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date:
return static_cast<const ColumnDate *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Date32:
return static_cast<const ColumnDate32 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime:
return static_cast<const ColumnDateTime *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::DateTime64:
return static_cast<const ColumnDateTime64 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::String:
return static_cast<const ColumnString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::FixedString:
return static_cast<const ColumnFixedString *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal32:
return static_cast<const ColumnDecimal<Decimal32> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal64:
return static_cast<const ColumnDecimal<Decimal64> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal128:
return static_cast<const ColumnDecimal<Decimal128> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::Decimal256:
return static_cast<const ColumnDecimal<Decimal256> *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::UUID:
return static_cast<const ColumnUUID *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv4:
return static_cast<const ColumnIPv4 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
case TypeIndex::IPv6:
return static_cast<const ColumnIPv6 *>(nested_column.get())->serializeValueIntoArena(n, arena, begin, &arr[n]);
default:
pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
if (arr[n])
return StringRef(pos, s);
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
}

const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)
Expand Down
5 changes: 4 additions & 1 deletion src/Columns/ColumnNullable.h
Expand Up @@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>

#include "Core/TypeId.h"
#include "config.h"


Expand Down Expand Up @@ -62,7 +63,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>
StringRef getDataAt(size_t) const override;
/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 * null_bit) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char * pos) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
Expand Down Expand Up @@ -212,6 +213,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>
private:
WrappedPtr nested_column;
WrappedPtr null_map;
// optimize serializeValueIntoArena
TypeIndex nested_type;

template <bool negative>
void applyNullMapImpl(const NullMap & map);
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnObject.h
Expand Up @@ -244,7 +244,7 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject>
StringRef getDataAt(size_t) const override { throwMustBeConcrete(); }
bool isDefaultAt(size_t) const override { throwMustBeConcrete(); }
void insertData(const char *, size_t) override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override { throwMustBeConcrete(); }
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, const UInt8 *) const override { throwMustBeConcrete(); }
const char * deserializeAndInsertFromArena(const char *) override { throwMustBeConcrete(); }
const char * skipSerializedInArena(const char *) const override { throwMustBeConcrete(); }
void updateHashWithValue(size_t, SipHash &) const override { throwMustBeConcrete(); }
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnSparse.cpp
Expand Up @@ -150,7 +150,7 @@ void ColumnSparse::insertData(const char * pos, size_t length)
insertSingleValue([&](IColumn & column) { column.insertData(pos, length); });
}

StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnSparse::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const
{
return values->serializeValueIntoArena(getValueIndex(n), arena, begin);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Columns/ColumnSparse.h
Expand Up @@ -78,7 +78,7 @@ class ColumnSparse final : public COWHelper<IColumn, ColumnSparse>

/// Will insert null value if pos=nullptr
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
Expand Down

0 comments on commit 336262f

Please sign in to comment.