Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CapnProto format to improve input/output performance #49752

Merged
merged 30 commits into from Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ac7c54a
Refactor CapnProto format to improve input/output performance
Avogar May 10, 2023
167516b
Fix style
Avogar May 10, 2023
344885d
Fix style
Avogar May 10, 2023
e3cb1e4
Fix style
Avogar May 10, 2023
680d359
Merge branch 'master' into better-capnproto-3
Avogar May 11, 2023
288988b
Fix build
Avogar May 11, 2023
1daa981
Fix special builds
Avogar May 12, 2023
7698776
Fix special build
Avogar May 12, 2023
c9f90fb
Fix special build
Avogar May 12, 2023
b61b4a1
Merge branch 'master' into better-capnproto-3
Avogar May 17, 2023
d7f0b6c
Merge branch 'master' into better-capnproto-3
Avogar May 23, 2023
e66f627
Refactor CapnProto format to improve input/output performance
Avogar May 10, 2023
c2eada7
Fix style
Avogar May 10, 2023
cc7cfa0
Fix style
Avogar May 10, 2023
1347dc4
Fix style
Avogar May 10, 2023
a89a8b8
Fix build
Avogar May 11, 2023
5f1ca61
Fix special builds
Avogar May 12, 2023
94ef089
Fix special build
Avogar May 12, 2023
f76fc5e
Fix special build
Avogar May 12, 2023
c75843e
Merge branch 'better-capnproto-3' of github.com:Avogar/ClickHouse int…
Avogar May 25, 2023
ea395e9
Make better
Avogar May 25, 2023
c962631
Better
Avogar May 31, 2023
898d1f3
Merge branch 'master' into better-capnproto-3
Avogar May 31, 2023
4b46486
Clean up
Avogar May 31, 2023
4987c4b
Clean up
Avogar May 31, 2023
8b34a30
Fix style
Avogar May 31, 2023
883350d
Fix tests
Avogar Jun 1, 2023
cc03652
Merge branch 'master' of github.com:ClickHouse/ClickHouse into better…
Avogar Jun 8, 2023
47b0c2a
Make better
Avogar Jun 9, 2023
24d70a2
Fix
Avogar Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/capnproto
Submodule capnproto updated 66 files
+8 −3 .github/workflows/quick-test.yml
+26 −9 c++/.bazelrc
+1 −1 c++/.bazelversion
+8 −1 c++/WORKSPACE
+13 −0 c++/build/configure.bzl
+12 −0 c++/build/load_br.bzl
+1 −0 c++/compile_flags.txt
+10 −1 c++/src/capnp/BUILD.bazel
+15 −0 c++/src/capnp/capability.h
+3 −3 c++/src/capnp/compat/byte-stream-test.c++
+59 −3 c++/src/capnp/compat/byte-stream.c++
+4 −0 c++/src/capnp/compat/byte-stream.capnp
+3 −0 c++/src/capnp/compat/byte-stream.h
+128 −4 c++/src/capnp/compat/http-over-capnp-test.c++
+43 −19 c++/src/capnp/compat/http-over-capnp.c++
+6 −1 c++/src/capnp/compat/http-over-capnp.capnp
+20 −0 c++/src/capnp/compat/json.capnp.c++
+10 −6 c++/src/capnp/compiler/capnpc-c++.c++
+96 −0 c++/src/capnp/compiler/grammar.capnp.c++
+16 −0 c++/src/capnp/compiler/lexer.capnp.c++
+29 −28 c++/src/capnp/dynamic.c++
+21 −7 c++/src/capnp/dynamic.h
+15 −4 c++/src/capnp/generated-header-support.h
+40 −9 c++/src/capnp/membrane.c++
+18 −0 c++/src/capnp/membrane.h
+10 −0 c++/src/capnp/persistent.capnp.h
+137 −0 c++/src/capnp/rpc-twoparty-test.c++
+25 −10 c++/src/capnp/rpc-twoparty.c++
+24 −0 c++/src/capnp/rpc-twoparty.capnp.c++
+4 −1 c++/src/capnp/rpc-twoparty.h
+176 −32 c++/src/capnp/rpc.c++
+80 −0 c++/src/capnp/rpc.capnp.c++
+1 −1 c++/src/capnp/schema-parser.c++
+141 −1 c++/src/capnp/schema.capnp.c++
+4 −0 c++/src/capnp/stream.capnp.c++
+40 −10 c++/src/kj/BUILD.bazel
+25 −8 c++/src/kj/async-inl.h
+1 −1 c++/src/kj/async-io-unix.c++
+2 −2 c++/src/kj/async-io-win32.c++
+2 −2 c++/src/kj/async-io.c++
+51 −0 c++/src/kj/async-io.h
+1 −0 c++/src/kj/async-prelude.h
+117 −19 c++/src/kj/async-test.c++
+6 −1 c++/src/kj/async-win32.c++
+2 −2 c++/src/kj/async-win32.h
+37 −15 c++/src/kj/async.c++
+12 −1 c++/src/kj/async.h
+4 −1 c++/src/kj/common.h
+31 −0 c++/src/kj/compat/BUILD.bazel
+410 −0 c++/src/kj/compat/brotli-test.c++
+369 −0 c++/src/kj/compat/brotli.c++
+190 −0 c++/src/kj/compat/brotli.h
+8 −2 c++/src/kj/compat/gzip.c++
+5 −3 c++/src/kj/compat/gzip.h
+31 −16 c++/src/kj/compat/http-test.c++
+158 −37 c++/src/kj/compat/http.c++
+195 −2 c++/src/kj/compat/http.h
+72 −0 c++/src/kj/compat/tls-test.c++
+1 −1 c++/src/kj/compat/tls.h
+3 −2 c++/src/kj/exception.c++
+3 −3 c++/src/kj/filesystem.c++
+24 −0 c++/src/kj/memory-test.c++
+8 −2 c++/src/kj/memory.h
+2 −3 doc/otherlang.md
+4 −2 kjdoc/tour.md
+3 −1 style-guide.md
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Expand Up @@ -969,7 +969,7 @@ class IColumn;
M(Bool, output_format_orc_string_as_string, false, "Use ORC String type instead of Binary for String columns", 0) \
M(ORCCompression, output_format_orc_compression_method, "lz4", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format_capn_proto_enum_comparising_mode

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this setting exists for a long time and we cannot change it's name(

\
M(String, input_format_mysql_dump_table_name, "", "Name of the table in MySQL dump from which to read data", 0) \
M(Bool, input_format_mysql_dump_map_column_names, true, "Match columns from table in MySQL dump and columns from ClickHouse table by names", 0) \
Expand Down
8 changes: 4 additions & 4 deletions src/Core/SettingsEnums.cpp
Expand Up @@ -144,10 +144,10 @@ IMPLEMENT_SETTING_ENUM(TransactionsWaitCSNMode, ErrorCodes::BAD_ARGUMENTS,
{"wait", TransactionsWaitCSNMode::WAIT},
{"wait_unknown", TransactionsWaitCSNMode::WAIT_UNKNOWN}})

IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
IMPLEMENT_SETTING_ENUM(CapnProtoEnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::CapnProtoEnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::CapnProtoEnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})

IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{{"None", FormatSettings::EscapingRule::None},
Expand Down
2 changes: 1 addition & 1 deletion src/Core/SettingsEnums.h
Expand Up @@ -188,7 +188,7 @@ enum class TransactionsWaitCSNMode

DECLARE_SETTING_ENUM(TransactionsWaitCSNMode)

DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
DECLARE_SETTING_ENUM_WITH_RENAME(CapnProtoEnumComparingMode, FormatSettings::CapnProtoEnumComparingMode)

DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

Expand Down
298 changes: 298 additions & 0 deletions src/Formats/CapnProtoSchema.cpp
@@ -0,0 +1,298 @@
#include <Formats/CapnProtoSchema.h>
Copy link
Member Author

@Avogar Avogar May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code from this file is not new, it's copied from old file CapnProtoUtils.cpp. The new code is in CapnProtoSerializer.cpp


#if USE_CAPNP

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/IDataType.h>
#include <Common/StringUtils/StringUtils.h>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>

namespace DB
{

namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int BAD_TYPE_OF_FIELD;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int CAPN_PROTO_BAD_TYPE;
extern const int BAD_ARGUMENTS;
}

capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY)); // NOLINT(bugprone-suspicious-semicolon)
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
auto description = std::string_view(e.getDescription().cStr());
if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos || description.find("no such file") != String::npos)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());

if (description.find("Parse error") != String::npos)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());

throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Unknown exception while parsing CapnProto schema: {}, schema dir and file: {}, {}",
description, schema_info.schemaDirectory(), schema_info.schemaPath());
}

auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA,
"CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}

bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}

bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}

/// Get full name of type for better exception messages.
String getCapnProtoFullTypeName(const capnp::Type & type)
{
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};

switch (type.which())
{
case capnp::schema::Type::Which::STRUCT:
{
auto struct_schema = type.asStruct();

auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));

auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));

String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;

String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += ", " + union_name;
type_name += ")";
return type_name;
}
case capnp::schema::Type::Which::LIST:
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
case capnp::schema::Type::Which::ENUM:
{
auto enum_schema = type.asEnum();
String enum_name = "Enum(";
auto enumerants = enum_schema.getEnumerants();
for (unsigned i = 0; i != enumerants.size(); ++i)
{
enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal());
if (i + 1 != enumerants.size())
enum_name += ", ";
}
enum_name += ")";
return enum_name;
}
default:
auto it = capnp_simple_type_names.find(type.which());
if (it == capnp_simple_type_names.end())
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return it->second;
}
}

namespace
{

template <typename ValueType>
DataTypePtr getEnumDataTypeFromEnumerants(const capnp::EnumSchema::EnumerantList & enumerants)
{
std::vector<std::pair<String, ValueType>> values;
for (auto enumerant : enumerants)
values.emplace_back(enumerant.getProto().getName(), ValueType(enumerant.getOrdinal()));
return std::make_shared<DataTypeEnum<ValueType>>(std::move(values));
}

DataTypePtr getEnumDataTypeFromEnumSchema(const capnp::EnumSchema & enum_schema)
{
auto enumerants = enum_schema.getEnumerants();
if (enumerants.size() < 128)
return getEnumDataTypeFromEnumerants<Int8>(enumerants);
if (enumerants.size() < 32768)
return getEnumDataTypeFromEnumerants<Int16>(enumerants);

throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "ClickHouse supports only 8 and 16-bit Enums");
}

DataTypePtr getDataTypeFromCapnProtoType(const capnp::Type & capnp_type, bool skip_unsupported_fields)
{
switch (capnp_type.which())
{
case capnp::schema::Type::INT8:
return std::make_shared<DataTypeInt8>();
case capnp::schema::Type::INT16:
return std::make_shared<DataTypeInt16>();
case capnp::schema::Type::INT32:
return std::make_shared<DataTypeInt32>();
case capnp::schema::Type::INT64:
return std::make_shared<DataTypeInt64>();
case capnp::schema::Type::BOOL: [[fallthrough]];
case capnp::schema::Type::UINT8:
return std::make_shared<DataTypeUInt8>();
case capnp::schema::Type::UINT16:
return std::make_shared<DataTypeUInt16>();
case capnp::schema::Type::UINT32:
return std::make_shared<DataTypeUInt32>();
case capnp::schema::Type::UINT64:
return std::make_shared<DataTypeUInt64>();
case capnp::schema::Type::FLOAT32:
return std::make_shared<DataTypeFloat32>();
case capnp::schema::Type::FLOAT64:
return std::make_shared<DataTypeFloat64>();
case capnp::schema::Type::DATA: [[fallthrough]];
case capnp::schema::Type::TEXT:
return std::make_shared<DataTypeString>();
case capnp::schema::Type::ENUM:
return getEnumDataTypeFromEnumSchema(capnp_type.asEnum());
case capnp::schema::Type::LIST:
{
auto list_schema = capnp_type.asList();
auto nested_type = getDataTypeFromCapnProtoType(list_schema.getElementType(), skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeArray>(nested_type);
}
case capnp::schema::Type::STRUCT:
{
auto struct_schema = capnp_type.asStruct();


if (struct_schema.getFields().size() == 0)
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Empty messages are not supported");
}

/// Check if it can be Nullable.
if (checkIfStructIsNamedUnion(struct_schema))
{
auto fields = struct_schema.getUnionFields();
if (fields.size() != 2 || (!fields[0].getType().isVoid() && !fields[1].getType().isVoid()))
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unions are not supported");
}
auto value_type = fields[0].getType().isVoid() ? fields[1].getType() : fields[0].getType();
if (value_type.isStruct() || value_type.isList())
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Tuples and Lists cannot be inside Nullable");
}

auto nested_type = getDataTypeFromCapnProtoType(value_type, skip_unsupported_fields);
if (!nested_type)
return nullptr;
return std::make_shared<DataTypeNullable>(nested_type);
}

if (checkIfStructContainsUnnamedUnion(struct_schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");

/// Treat Struct as Tuple.
DataTypes nested_types;
Names nested_names;
for (auto field : struct_schema.getNonUnionFields())
{
auto nested_type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (!nested_type)
continue;
nested_names.push_back(field.getProto().getName());
nested_types.push_back(nested_type);
}
if (nested_types.empty())
return nullptr;
return std::make_shared<DataTypeTuple>(std::move(nested_types), std::move(nested_names));
}
default:
{
if (skip_unsupported_fields)
return nullptr;
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unsupported CapnProtoType: {}", getCapnProtoFullTypeName(capnp_type));
}
}
}

}

NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields)
{
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_TYPE, "Unnamed union is not supported");

NamesAndTypesList names_and_types;
for (auto field : schema.getNonUnionFields())
{
auto name = field.getProto().getName();
auto type = getDataTypeFromCapnProtoType(field.getType(), skip_unsupported_fields);
if (type)
names_and_types.emplace_back(name, type);
}
if (names_and_types.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot convert CapnProto schema to ClickHouse table schema, all fields have unsupported types");

return names_and_types;
}

}

#endif
13 changes: 5 additions & 8 deletions src/Formats/CapnProtoUtils.h → src/Formats/CapnProtoSchema.h
Expand Up @@ -30,17 +30,14 @@ class CapnProtoSchemaParser : public DestructorCatcher<capnp::SchemaParser>
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};

std::pair<String, String> splitCapnProtoFieldName(const String & name);
bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema);
bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema);

bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);

std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);

capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name);

void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
/// Get full name of type for better exception messages.
String getCapnProtoFullTypeName(const capnp::Type & type);

NamesAndTypesList capnProtoSchemaToCHSchema(const capnp::StructSchema & schema, bool skip_unsupported_fields);

}

#endif