Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to limit the number of bytes to read in schema inference #50592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 9 additions & 7 deletions docs/en/interfaces/schema-inference.md
Expand Up @@ -329,8 +329,8 @@ SELECT count() FROM system.schema_inference_cache WHERE storage='S3'
## Text formats {#text-formats}

For text formats, ClickHouse reads the data row by row, extracts column values according to the format,
and then uses some recursive parsers and heuristics to determine the type for each value. The maximum number of rows read from the data in schema inference
is controlled by the setting `input_format_max_rows_to_read_for_schema_inference` with default value 25000.
and then uses some recursive parsers and heuristics to determine the type for each value. The maximum number of rows and bytes read from the data in schema inference
is controlled by the settings `input_format_max_rows_to_read_for_schema_inference` (25000 by default) and `input_format_max_bytes_to_read_for_schema_inference` (32Mb by default).
By default, all inferred types are [Nullable](../sql-reference/data-types/nullable.md), but you can change this by setting `schema_inference_make_columns_nullable` (see examples in the [settings](#settings-for-text-formats) section).

### JSON formats {#json-formats}
Expand Down Expand Up @@ -1144,13 +1144,15 @@ Line: value_1=2, value_2="Some string 2", value_3="[4, 5, NULL]"$$)

### Settings for text formats {#settings-for-text-formats}

#### input_format_max_rows_to_read_for_schema_inference
#### input_format_max_rows_to_read_for_schema_inference/input_format_max_bytes_to_read_for_schema_inference

This setting controls the maximum number of rows to be read while schema inference.
The more rows are read, the more time is spent on schema inference, but the greater the chance to
These settings control the amount of data to be read while schema inference.
The more rows/bytes are read, the more time is spent on schema inference, but the greater the chance to
correctly determine the types (especially when the data contains a lot of nulls).

Default value: `25000`.
Default values:
- `25000` for `input_format_max_rows_to_read_for_schema_inference`.
- `33554432` (32 Mb) for `input_format_max_bytes_to_read_for_schema_inference`.

#### column_names_for_schema_inference

Expand Down Expand Up @@ -1623,7 +1625,7 @@ In schema inference for CapnProto format ClickHouse uses the following type matc
## Strong-typed binary formats {#strong-typed-binary-formats}

In such formats, each serialized value contains information about its type (and possibly about its name), but there is no information about the whole table.
In schema inference for such formats, ClickHouse reads data row by row (up to `input_format_max_rows_to_read_for_schema_inference` rows) and extracts
In schema inference for such formats, ClickHouse reads data row by row (up to `input_format_max_rows_to_read_for_schema_inference` rows or `input_format_max_bytes_to_read_for_schema_inference` bytes) and extracts
the type (and possibly name) for each value from the data and then converts these types to ClickHouse types.

### MsgPack {#msgpack}
Expand Down
6 changes: 6 additions & 0 deletions docs/en/operations/settings/settings-formats.md
Expand Up @@ -137,6 +137,12 @@ The maximum rows of data to read for automatic schema inference.

Default value: `25'000`.

## input_format_max_bytes_to_read_for_schema_inference {#input_format_max_bytes_to_read_for_schema_inference}

The maximum amount of data in bytes to read for automatic schema inference.

Default value: `33554432` (32 Mb).

## column_names_for_schema_inference {#column_names_for_schema_inference}

The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -854,6 +854,7 @@ class IColumn;
M(UInt64, input_format_msgpack_number_of_columns, 0, "The number of columns in inserted MsgPack data. Used for automatic schema inference from data.", 0) \
M(MsgPackUUIDRepresentation, output_format_msgpack_uuid_representation, FormatSettings::MsgPackUUIDRepresentation::EXT, "The way how to output UUID in MsgPack format.", 0) \
M(UInt64, input_format_max_rows_to_read_for_schema_inference, 25000, "The maximum rows of data to read for automatic schema inference", 0) \
M(UInt64, input_format_max_bytes_to_read_for_schema_inference, 32 * 1024 * 1024, "The maximum bytes of data to read for automatic schema inference", 0) \
Avogar marked this conversation as resolved.
Show resolved Hide resolved
M(Bool, input_format_csv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in CSV format", 0) \
M(Bool, input_format_tsv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in TSV format", 0) \
M(Bool, input_format_csv_detect_header, true, "Automatically detect header with names and types in CSV format", 0) \
Expand Down
3 changes: 2 additions & 1 deletion src/Formats/EscapingRuleUtils.cpp
Expand Up @@ -408,9 +408,10 @@ DataTypes getDefaultDataTypeForEscapingRules(const std::vector<FormatSettings::E
String getAdditionalFormatInfoForAllRowBasedFormats(const FormatSettings & settings)
{
return fmt::format(
"schema_inference_hints={}, max_rows_to_read_for_schema_inference={}, schema_inference_make_columns_nullable={}",
"schema_inference_hints={}, max_rows_to_read_for_schema_inference={}, max_bytes_to_read_for_schema_inference={}, schema_inference_make_columns_nullable={}",
settings.schema_inference_hints,
settings.max_rows_to_read_for_schema_inference,
settings.max_bytes_to_read_for_schema_inference,
settings.schema_inference_make_columns_nullable);
}

Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatFactory.cpp
Expand Up @@ -183,6 +183,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;
format_settings.max_rows_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.max_bytes_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.schema_inference_hints = settings.schema_inference_hints;
format_settings.schema_inference_make_columns_nullable = settings.schema_inference_make_columns_nullable;
Expand Down
3 changes: 2 additions & 1 deletion src/Formats/FormatSettings.h
Expand Up @@ -36,7 +36,8 @@ struct FormatSettings
bool defaults_for_omitted_fields = true;

bool seekable_read = true;
UInt64 max_rows_to_read_for_schema_inference = 100;
UInt64 max_rows_to_read_for_schema_inference = 25000;
UInt64 max_bytes_to_read_for_schema_inference = 32 * 1024 * 1024;

String column_names_for_schema_inference;
String schema_inference_hints;
Expand Down
14 changes: 10 additions & 4 deletions src/Formats/ReadSchemaUtils.cpp
Expand Up @@ -75,6 +75,8 @@ ColumnsDescription readSchemaFromFormat(
SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference
: context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
size_t max_bytes_to_read = format_settings ? format_settings->max_bytes_to_read_for_schema_inference
: context->getSettingsRef().input_format_max_bytes_to_read_for_schema_inference;
size_t iterations = 0;
ColumnsDescription cached_columns;
while (true)
Expand Down Expand Up @@ -120,7 +122,7 @@ ColumnsDescription readSchemaFromFormat(
try
{
schema_reader = FormatFactory::instance().getSchemaReader(format_name, *buf, context, format_settings);
schema_reader->setMaxRowsToRead(max_rows_to_read);
schema_reader->setMaxRowsAndBytesToRead(max_rows_to_read, max_bytes_to_read);
names_and_types = schema_reader->readSchema();
break;
}
Expand All @@ -132,10 +134,14 @@ ColumnsDescription readSchemaFromFormat(
size_t rows_read = schema_reader->getNumRowsRead();
assert(rows_read <= max_rows_to_read);
max_rows_to_read -= schema_reader->getNumRowsRead();
if (rows_read != 0 && max_rows_to_read == 0)
size_t bytes_read = buf->count();
/// We could exceed max_bytes_to_read a bit to complete row parsing.
max_bytes_to_read -= std::min(bytes_read, max_bytes_to_read);
if (rows_read != 0 && (max_rows_to_read == 0 || max_bytes_to_read == 0))
{
exception_message += "\nTo increase the maximum number of rows to read for structure determination, use setting "
"input_format_max_rows_to_read_for_schema_inference";
exception_message += "\nTo increase the maximum number of rows/bytes to read for structure determination, use setting "
"input_format_max_rows_to_read_for_schema_inference/input_format_max_bytes_to_read_for_schema_inference";

if (iterations > 1)
{
exception_messages += "\n" + exception_message;
Expand Down
24 changes: 14 additions & 10 deletions src/Processors/Formats/ISchemaReader.cpp
Expand Up @@ -57,11 +57,15 @@ void checkFinalInferredType(
}

IIRowSchemaReader::IIRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: ISchemaReader(in_), default_type(default_type_), hints_str(format_settings_.schema_inference_hints), format_settings(format_settings_)
: ISchemaReader(in_)
, max_rows_to_read(format_settings_.max_rows_to_read_for_schema_inference)
, max_bytes_to_read(format_settings_.max_bytes_to_read_for_schema_inference)
, default_type(default_type_)
, hints_str(format_settings_.schema_inference_hints)
, format_settings(format_settings_)
{
}


void IIRowSchemaReader::setContext(ContextPtr & context)
{
ColumnsDescription columns;
Expand Down Expand Up @@ -99,11 +103,11 @@ IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & form

NamesAndTypesList IRowSchemaReader::readSchema()
{
if (max_rows_to_read == 0)
if (max_rows_to_read == 0 || max_bytes_to_read == 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot read rows to determine the schema, the maximum number of rows to read is set to 0. "
"Most likely setting input_format_max_rows_to_read_for_schema_inference is set to 0");
"Cannot read rows to determine the schema, the maximum number of rows (or bytes) to read is set to 0. "
"Most likely setting input_format_max_rows_to_read_for_schema_inference or input_format_max_bytes_to_read_for_schema_inference is set to 0");

DataTypes data_types = readRowAndGetDataTypes();

Expand Down Expand Up @@ -143,7 +147,7 @@ NamesAndTypesList IRowSchemaReader::readSchema()
data_types[i] = hint_it->second;
}

for (rows_read = 1; rows_read < max_rows_to_read; ++rows_read)
for (rows_read = 1; rows_read < max_rows_to_read && in.count() < max_bytes_to_read; ++rows_read)
{
DataTypes new_data_types = readRowAndGetDataTypes();
if (new_data_types.empty())
Expand Down Expand Up @@ -220,11 +224,11 @@ IRowWithNamesSchemaReader::IRowWithNamesSchemaReader(ReadBuffer & in_, const For

NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
{
if (max_rows_to_read == 0)
if (max_rows_to_read == 0 || max_bytes_to_read == 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot read rows to determine the schema, the maximum number of rows to read is set to 0. "
"Most likely setting input_format_max_rows_to_read_for_schema_inference is set to 0");
"Cannot read rows to determine the schema, the maximum number of rows (or bytes) to read is set to 0. "
"Most likely setting input_format_max_rows_to_read_for_schema_inference or input_format_max_bytes_to_read_for_schema_inference is set to 0");

bool eof = false;
auto names_and_types = readRowAndGetNamesAndDataTypes(eof);
Expand All @@ -245,7 +249,7 @@ NamesAndTypesList IRowWithNamesSchemaReader::readSchema()
names_order.push_back(name);
}

for (rows_read = 1; rows_read < max_rows_to_read; ++rows_read)
for (rows_read = 1; rows_read < max_rows_to_read && in.count() < max_bytes_to_read; ++rows_read)
{
auto new_names_and_types = readRowAndGetNamesAndDataTypes(eof);
if (eof)
Expand Down
9 changes: 7 additions & 2 deletions src/Processors/Formats/ISchemaReader.h
Expand Up @@ -32,7 +32,7 @@ class ISchemaReader
virtual bool needContext() const { return false; }
virtual void setContext(ContextPtr &) {}

virtual void setMaxRowsToRead(size_t) {}
virtual void setMaxRowsAndBytesToRead(size_t, size_t) {}
virtual size_t getNumRowsRead() const { return 0; }

virtual ~ISchemaReader() = default;
Expand All @@ -54,12 +54,17 @@ class IIRowSchemaReader : public ISchemaReader
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);

protected:
void setMaxRowsToRead(size_t max_rows) override { max_rows_to_read = max_rows; }
void setMaxRowsAndBytesToRead(size_t max_rows, size_t max_bytes) override
{
max_rows_to_read = max_rows;
max_bytes_to_read = max_bytes;
}
size_t getNumRowsRead() const override { return rows_read; }

virtual void transformFinalTypeIfNeeded(DataTypePtr &) {}

size_t max_rows_to_read;
size_t max_bytes_to_read;
size_t rows_read = 0;
DataTypePtr default_type;
String hints_str;
Expand Down
Expand Up @@ -55,7 +55,7 @@ void registerJSONColumnsSchemaReader(FormatFactory & factory)
);
factory.registerAdditionalInfoForSchemaCacheGetter("JSONColumns", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return getAdditionalFormatInfoForAllRowBasedFormats(settings) + getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
});
}

Expand Down
12 changes: 7 additions & 5 deletions src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.cpp
Expand Up @@ -176,6 +176,8 @@ JSONColumnsSchemaReaderBase::JSONColumnsSchemaReaderBase(
, hints_str(format_settings_.schema_inference_hints)
, reader(std::move(reader_))
, column_names_from_settings(splitColumnNames(format_settings_.column_names_for_schema_inference))
, max_rows_to_read(format_settings_.max_rows_to_read_for_schema_inference)
, max_bytes_to_read(format_settings_.max_bytes_to_read_for_schema_inference)
{
}

Expand All @@ -196,12 +198,12 @@ void JSONColumnsSchemaReaderBase::transformTypesIfNeeded(DataTypePtr & type, Dat

NamesAndTypesList JSONColumnsSchemaReaderBase::readSchema()
{
size_t total_rows_read = 0;
std::unordered_map<String, DataTypePtr> names_to_types;
std::vector<String> names_order;
/// Read data block by block and determine the type for each column
/// until max_rows_to_read_for_schema_inference is reached.
while (total_rows_read < format_settings.max_rows_to_read_for_schema_inference)
/// until max_rows_to_read/max_bytes_to_read is reached.
/// Note that we can exceed max_bytes_to_read to compete block parsing.
while (total_rows_read < max_rows_to_read && in.count() < max_bytes_to_read)
{
if (in.eof())
break;
Expand Down Expand Up @@ -268,7 +270,7 @@ NamesAndTypesList JSONColumnsSchemaReaderBase::readSchema()
return result;
}

DataTypePtr JSONColumnsSchemaReaderBase::readColumnAndGetDataType(const String & column_name, size_t & rows_read, size_t max_rows_to_read)
DataTypePtr JSONColumnsSchemaReaderBase::readColumnAndGetDataType(const String & column_name, size_t & rows_read, size_t max_rows)
{
/// Check for empty column.
if (reader->checkColumnEnd())
Expand All @@ -279,7 +281,7 @@ DataTypePtr JSONColumnsSchemaReaderBase::readColumnAndGetDataType(const String &
do
{
/// If we reached max_rows_to_read, skip the rest part of this column.
if (rows_read == max_rows_to_read)
if (rows_read == max_rows)
{
reader->skipColumn();
break;
Expand Down
16 changes: 14 additions & 2 deletions src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h
Expand Up @@ -82,11 +82,19 @@ class JSONColumnsSchemaReaderBase : public ISchemaReader
bool needContext() const override { return !hints_str.empty(); }
void setContext(ContextPtr & ctx) override;

void setMaxRowsAndBytesToRead(size_t max_rows, size_t max_bytes) override
{
max_rows_to_read = max_rows;
max_bytes_to_read = max_bytes;
}

size_t getNumRowsRead() const override { return total_rows_read; }

private:
NamesAndTypesList readSchema() override;

/// Read whole column in the block (up to max_rows_to_read rows) and extract the data type.
DataTypePtr readColumnAndGetDataType(const String & column_name, size_t & rows_read, size_t max_rows_to_read);
/// Read whole column in the block (up to max_rows rows) and extract the data type.
DataTypePtr readColumnAndGetDataType(const String & column_name, size_t & rows_read, size_t max_rows);

const FormatSettings format_settings;
String hints_str;
Expand All @@ -95,6 +103,10 @@ class JSONColumnsSchemaReaderBase : public ISchemaReader
std::unique_ptr<JSONColumnsReaderBase> reader;
Names column_names_from_settings;
JSONInferenceInfo inference_info;

size_t total_rows_read = 0;
size_t max_rows_to_read;
size_t max_bytes_to_read;
};

}
Expand Up @@ -53,7 +53,7 @@ void registerJSONCompactColumnsSchemaReader(FormatFactory & factory)
);
factory.registerAdditionalInfoForSchemaCacheGetter("JSONCompactColumns", [](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
auto result = getAdditionalFormatInfoForAllRowBasedFormats(settings) + getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return result + fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
});
}
Expand Down
@@ -0,0 +1 @@
a Nullable(Int64)
@@ -0,0 +1,4 @@
set input_format_max_rows_to_read_for_schema_inference=2;
desc format('JSONEachRow', '{"a" : null}, {"a" : 42}') settings input_format_max_bytes_to_read_for_schema_inference=10; -- {serverError ONLY_NULLS_WHILE_READING_SCHEMA}
desc format('JSONEachRow', '{"a" : null}, {"a" : 42}') settings input_format_max_bytes_to_read_for_schema_inference=20;