Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove context from formats #8388

Merged
merged 8 commits into from Dec 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/CMakeLists.txt
Expand Up @@ -186,8 +186,8 @@ endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})

list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp)
list (APPEND dbms_headers src/Functions/IFunctionImpl.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h)
list (APPEND dbms_sources src/Functions/IFunction.cpp src/Functions/FunctionFactory.cpp src/Functions/FunctionHelpers.cpp src/Functions/extractTimeZoneFromFunctionArguments.cpp)
list (APPEND dbms_headers src/Functions/IFunctionImpl.h src/Functions/FunctionFactory.h src/Functions/FunctionHelpers.h src/Functions/extractTimeZoneFromFunctionArguments.h)

list (APPEND dbms_sources
src/AggregateFunctions/AggregateFunctionFactory.cpp
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/ParallelParsingBlockInputStream.cpp
Expand Up @@ -65,7 +65,7 @@ void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_n
*/
ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
auto parser = std::make_unique<InputStreamFromInputFormat>(
input_processor_creator(read_buffer, header, context,
input_processor_creator(read_buffer, header,
row_input_format_params, format_settings));

unit.block_ext.block.clear();
Expand Down
12 changes: 4 additions & 8 deletions dbms/src/DataStreams/ParallelParsingBlockInputStream.h
Expand Up @@ -55,31 +55,28 @@ class ParallelParsingBlockInputStream : public IBlockInputStream
using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings)>;
public:
struct InputCreatorParams
{
const Block &sample;
const Context &context;
const RowInputFormatParams& row_input_format_params;
const Block & sample;
const RowInputFormatParams & row_input_format_params;
const FormatSettings &settings;
};

struct Params
{
ReadBuffer & read_buffer;
const InputProcessorCreator &input_processor_creator;
const InputCreatorParams &input_creator_params;
const InputProcessorCreator & input_processor_creator;
const InputCreatorParams & input_creator_params;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
int max_threads;
size_t min_chunk_bytes;
};

explicit ParallelParsingBlockInputStream(const Params & params)
: header(params.input_creator_params.sample),
context(params.input_creator_params.context),
row_input_format_params(params.input_creator_params.row_input_format_params),
format_settings(params.input_creator_params.settings),
input_processor_creator(params.input_processor_creator),
Expand Down Expand Up @@ -149,7 +146,6 @@ class ParallelParsingBlockInputStream : public IBlockInputStream

private:
const Block header;
const Context context;
const RowInputFormatParams row_input_format_params;
const FormatSettings format_settings;
const InputProcessorCreator input_processor_creator;
Expand Down
62 changes: 48 additions & 14 deletions dbms/src/Formats/FormatFactory.cpp
Expand Up @@ -12,6 +12,8 @@
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>


namespace DB
Expand All @@ -34,7 +36,7 @@ const FormatFactory::Creators & FormatFactory::getCreators(const String & name)
}


static FormatSettings getInputFormatSetting(const Settings & settings)
static FormatSettings getInputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.csv.delimiter = settings.format_csv_delimiter;
Expand All @@ -56,11 +58,21 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;

return format_settings;
}

static FormatSettings getOutputFormatSetting(const Settings & settings)
static FormatSettings getOutputFormatSetting(const Settings & settings, const Context & context)
{
FormatSettings format_settings;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;
Expand All @@ -77,6 +89,16 @@ static FormatSettings getOutputFormatSetting(const Settings & settings)
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.write_statistics = settings.output_format_write_statistics;
format_settings.parquet.row_group_size = settings.output_format_parquet_row_group_size;
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context.getFormatSchemaPath();
format_settings.schema.is_server = context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
format_settings.custom.result_before_delimiter = settings.format_custom_result_before_delimiter;
format_settings.custom.result_after_delimiter = settings.format_custom_result_after_delimiter;
format_settings.custom.escaping_rule = settings.format_custom_escaping_rule;
format_settings.custom.field_delimiter = settings.format_custom_field_delimiter;
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;

return format_settings;
}
Expand All @@ -100,9 +122,9 @@ BlockInputStreamPtr FormatFactory::getInput(
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);

const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);

return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, max_block_size, callback ? callback : ReadCallback(), format_settings);
}

const Settings & settings = context.getSettingsRef();
Expand All @@ -118,7 +140,7 @@ BlockInputStreamPtr FormatFactory::getInput(
if (!input_getter)
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);

FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);

RowInputFormatParams row_input_format_params;
row_input_format_params.max_block_size = max_block_size;
Expand All @@ -128,7 +150,7 @@ BlockInputStreamPtr FormatFactory::getInput(
row_input_format_params.max_execution_time = settings.max_execution_time;
row_input_format_params.timeout_overflow_mode = settings.timeout_overflow_mode;

auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, context, row_input_format_params, format_settings};
auto input_creator_params = ParallelParsingBlockInputStream::InputCreatorParams{sample, row_input_format_params, format_settings};
ParallelParsingBlockInputStream::Params params{buf, input_getter,
input_creator_params, file_segmentation_engine,
static_cast<int>(settings.max_threads),
Expand Down Expand Up @@ -164,16 +186,16 @@ BlockOutputStreamPtr FormatFactory::getOutput(
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);

const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings);
FormatSettings format_settings = getOutputFormatSetting(settings, context);

/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, callback, format_settings), sample);
output_getter(buf, sample, std::move(callback), format_settings), sample);
}

auto format = getOutputFormat(name, buf, sample, context, callback);
auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}

Expand All @@ -191,7 +213,7 @@ InputFormatPtr FormatFactory::getInputFormat(
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);

const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);
FormatSettings format_settings = getInputFormatSetting(settings, context);

RowInputFormatParams params;
params.max_block_size = max_block_size;
Expand All @@ -201,7 +223,13 @@ InputFormatPtr FormatFactory::getInputFormat(
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;

return input_getter(buf, sample, context, params, format_settings);
auto format = input_getter(buf, sample, params, format_settings);

/// It's a kludge. Because I cannot remove context from values format.
if (auto * values = typeid_cast<ValuesBlockInputFormat *>(format.get()))
values->setContext(context);

return format;
}


Expand All @@ -213,12 +241,18 @@ OutputFormatPtr FormatFactory::getOutputFormat(
throw Exception("Format " + name + " is not suitable for output", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT);

const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings);
FormatSettings format_settings = getOutputFormatSetting(settings, context);

/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return output_getter(buf, sample, context, callback, format_settings);
auto format = output_getter(buf, sample, std::move(callback), format_settings);

/// It's a kludge. Because I cannot remove context from MySQL format.
if (auto * mysql = typeid_cast<MySQLOutputFormat *>(format.get()))
mysql->setContext(context);

return format;
}


Expand Down Expand Up @@ -259,7 +293,7 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
auto & target = dict[name].file_segmentation_engine;
if (target)
throw Exception("FormatFactory: File segmentation engine " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = file_segmentation_engine;
target = std::move(file_segmentation_engine);
}

FormatFactory::FormatFactory()
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Formats/FormatFactory.h
Expand Up @@ -59,29 +59,25 @@ class FormatFactory final : private boost::noncopyable
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
const Block & sample,
const Context & context,
UInt64 max_block_size,
ReadCallback callback,
const FormatSettings & settings)>;

using OutputCreator = std::function<BlockOutputStreamPtr(
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;

using InputProcessorCreator = std::function<InputFormatPtr(
ReadBuffer & buf,
const Block & header,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings)>;

using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;

Expand Down
14 changes: 5 additions & 9 deletions dbms/src/Formats/FormatSchemaInfo.cpp
Expand Up @@ -26,7 +26,7 @@ namespace
}


FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & format_schema, const String & format, bool require_message)
FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path)
{
if (format_schema.empty())
throw Exception(
Expand Down Expand Up @@ -54,29 +54,25 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & forma
else
path.assign(format_schema).makeFile().getFileName();

auto default_schema_directory = [&context]()
auto default_schema_directory = [&format_schema_path]()
{
static const String str = Poco::Path(context.getFormatSchemaPath()).makeAbsolute().makeDirectory().toString();
static const String str = Poco::Path(format_schema_path).makeAbsolute().makeDirectory().toString();
return str;
};
auto is_server = [&context]()
{
return context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
};

if (path.getExtension().empty() && !default_file_extension.empty())
path.setExtension(default_file_extension);

if (path.isAbsolute())
{
if (is_server())
if (is_server)
throw Exception("Absolute path in the 'format_schema' setting is prohibited: " + path.toString(), ErrorCodes::BAD_ARGUMENTS);
schema_path = path.getFileName();
schema_directory = path.makeParent().toString();
}
else if (path.depth() >= 1 && path.directory(0) == "..")
{
if (is_server())
if (is_server)
throw Exception(
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: " + path.toString(),
ErrorCodes::BAD_ARGUMENTS);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Formats/FormatSchemaInfo.h
Expand Up @@ -10,7 +10,7 @@ class Context;
class FormatSchemaInfo
{
public:
FormatSchemaInfo(const Context & context, const String & format_schema, const String & format, bool require_message);
FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path);

/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }
Expand Down
21 changes: 21 additions & 0 deletions dbms/src/Formats/FormatSettings.h
Expand Up @@ -89,6 +89,27 @@ struct FormatSettings
UInt64 row_group_size = 1000000;
} parquet;

struct Schema
{
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
};

Schema schema;

struct Custom
{
std::string result_before_delimiter;
std::string result_after_delimiter;
std::string row_before_delimiter;
std::string row_after_delimiter;
std::string row_between_delimiter;
std::string field_delimiter;
std::string escaping_rule;
};

Custom custom;
};

}
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Formats/NativeFormat.cpp
Expand Up @@ -11,7 +11,6 @@ void registerInputFormatNative(FormatFactory & factory)
factory.registerInputFormat("Native", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
UInt64 /* max_block_size */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
Expand All @@ -25,7 +24,6 @@ void registerOutputFormatNative(FormatFactory & factory)
factory.registerOutputFormat("Native", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Formats/NullFormat.cpp
Expand Up @@ -10,7 +10,6 @@ void registerOutputFormatNull(FormatFactory & factory)
factory.registerOutputFormat("Null", [](
WriteBuffer &,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
Expand Down