Skip to content

Commit

Permalink
Merge pull request #1323 from ByConity/cherry-pick1
Browse files Browse the repository at this point in the history
feat: [To cnch-dev] Support outfile into directory
  • Loading branch information
dmthuc committed Mar 12, 2024
2 parents 2eecb2e + cc2e041 commit 6e0dabc
Show file tree
Hide file tree
Showing 49 changed files with 542 additions and 156 deletions.
68 changes: 37 additions & 31 deletions programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
#include <Core/QueryProcessingStage.h>
#include <Core/ExternalTable.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Processors/Formats/Impl/NullFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerFormats.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -233,7 +233,7 @@ class Client : public Poco::Util::Application
/// The user can specify to redirect query output to a file.
OutfileTargetPtr outfile_target;

BlockOutputStreamPtr block_out_stream;
std::shared_ptr<IOutputFormat> output_format;
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
Expand Down Expand Up @@ -1995,7 +1995,7 @@ class Client : public Poco::Util::Application
/// Flush all buffers.
void resetOutput()
{
block_out_stream.reset();
output_format.reset();
logs_out_stream.reset();
outfile_target.reset();

Expand Down Expand Up @@ -2224,12 +2224,12 @@ class Client : public Poco::Util::Application

void initBlockOutputStream(const Block & block)
{
if (!block_out_stream)
if (!output_format)
{
/// Ignore all results when fuzzing as they can be huge.
if (query_fuzzer_runs)
{
block_out_stream = std::make_shared<NullBlockOutputStream>(block);
output_format = std::make_shared<NullOutputFormat>(block);
return;
}

Expand All @@ -2253,6 +2253,13 @@ class Client : public Poco::Util::Application
if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
query_with_output && !context->getSettingsRef().outfile_in_server_with_tcp)
{
if (query_with_output->format != nullptr)
{
if (has_vertical_output_suffix)
throw Exception("Output format already specified", ErrorCodes::CLIENT_OUTPUT_FORMAT_SPECIFIED);
current_format = query_with_output->format->as<ASTIdentifier &>().name();
}

if (query_with_output->out_file)
{
// const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
Expand All @@ -2266,23 +2273,15 @@ class Client : public Poco::Util::Application
}
out_path.emplace(typeid_cast<const ASTLiteral &>(*query_with_output->out_file).value.safeGet<std::string>());
// We are writing to file, so default format is the same as in non-interactive mode.
if (is_interactive && is_default_format)
if (is_interactive && query_with_output->format == nullptr && is_default_format)
current_format = "TabSeparated";

String compression_method_str;
UInt64 compression_level = 1;

OutfileTarget::setOutfileCompression(query_with_output, compression_method_str, compression_level);

outfile_target = OutfileTarget::getOutfileTarget(*out_path, "", compression_method_str, compression_level);
out_buf = outfile_target->getOutfileBuffer(context, true).get();
}
if (query_with_output->format != nullptr)
{
if (has_vertical_output_suffix)
throw Exception("Output format already specified", ErrorCodes::CLIENT_OUTPUT_FORMAT_SPECIFIED);
const auto & id = query_with_output->format->as<ASTIdentifier &>();
current_format = id.name();
outfile_target = std::make_shared<OutfileTarget>(context, *out_path, current_format, compression_method_str, compression_level);
out_buf = outfile_target->getOutfileBuffer(true).get();
}
}

Expand All @@ -2291,11 +2290,15 @@ class Client : public Poco::Util::Application

/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!need_render_progress)
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
output_format = context->getOutputFormatParallelIfPossible(
current_format, *out_buf, block, outfile_target ? outfile_target->outToMultiFile() : false);
else
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
output_format = context->getOutputFormat(current_format, *out_buf, block);

if (outfile_target)
output_format->setOutFileTarget(outfile_target);

block_out_stream->writePrefix();
output_format->doWritePrefix();
}
}

Expand Down Expand Up @@ -2344,20 +2347,20 @@ class Client : public Poco::Util::Application
initBlockOutputStream(block);

/// The header block containing zero rows was used to initialize
/// block_out_stream, do not output it.
/// output_format, do not output it.
/// Also do not output too much data if we're fuzzing.
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return;

if (need_render_progress)
progress_indication.clearProgressOutput();

block_out_stream->write(block);
output_format->write(block);
written_first_block = true;

/// If does not upload result to local file/hdfs, then received data block is immediately displayed to the user.
if (!out_path)
block_out_stream->flush();
output_format->flush();

/// Restore progress bar after data block.
if (need_render_progress)
Expand All @@ -2377,13 +2380,13 @@ class Client : public Poco::Util::Application
void onTotals(Block & block)
{
initBlockOutputStream(block);
block_out_stream->setTotals(block);
output_format->setTotals(block);
}

void onExtremes(Block & block)
{
initBlockOutputStream(block);
block_out_stream->setExtremes(block);
output_format->setExtremes(block);
}


Expand All @@ -2395,8 +2398,8 @@ class Client : public Poco::Util::Application
return;
}

if (block_out_stream)
block_out_stream->onProgress(value);
if (output_format)
output_format->onProgress(value);

if (need_render_progress)
progress_indication.writeProgress();
Expand All @@ -2416,22 +2419,25 @@ class Client : public Poco::Util::Application

void onProfileInfo(const BlockStreamProfileInfo & profile_info)
{
if (profile_info.hasAppliedLimit() && block_out_stream)
block_out_stream->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
if (profile_info.hasAppliedLimit() && output_format)
output_format->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
}


void onEndOfStream()
{
progress_indication.clearProgressOutput();

if (block_out_stream)
if (output_format)
{
block_out_stream->writeSuffix();
output_format->doWriteSuffix();

if (outfile_target)
{
outfile_target->flushFile(context);
if (outfile_target->outToFile())
outfile_target->flushFile();
if (outfile_target->outToMultiFile())
outfile_target->resetCounter();
}
}

Expand Down
21 changes: 12 additions & 9 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1802,15 +1802,17 @@ enum PreloadLevelSettings : UInt64
M(UInt64, grf_ndv_enlarge_size, 8, "The times to enlarge the grf ndv, optimal value is the number workers ", 0) \
\
/** ip2geo settings */ \
M(String, ip2geo_local_path, "/data01/clickhouse/data/geo_db/", "Local path for IP Database files", 0) \
M(String, ip2geo_local_path_oversea, "/data01/clickhouse/data/geo_db/oversea/", "Local path for IP Database files for oversea", 0) \
M(Bool, ip2geo_update_from_hdfs, 0, "Whether to update db file from hdfs", 0) \
M(String, ipv4_file, "ipv4_pro", "IPDB file for ipv4", 0) \
M(String, ipv6_file, "ipv6_pro", "IPDB file for ipv6", 0) \
M(String, geoip_city_file, "GeoIP2-City", "GeoIP DB file for city", 0) \
M(String, geoip_isp_file, "GeoIP2-ISP", "GeoIP DB file for ISP", 0) \
M(String, geoip_asn_file, "GeoLite2-ASN", "GeoIP DB file for ASN", 0) \
\
M(String, ip2geo_local_path, "/data01/clickhouse/data/geo_db/", "Local path for IP Database files", 0)\
M(String, ip2geo_local_path_oversea, "/data01/clickhouse/data/geo_db/oversea/", "Local path for IP Database files for oversea", 0)\
M(Bool, ip2geo_update_from_hdfs, 0, "Whether to update db file from hdfs", 0)\
M(String, ipv4_file, "ipv4_pro", "IPDB file for ipv4", 0)\
M(String, ipv6_file, "ipv6_pro", "IPDB file for ipv6", 0)\
M(String, geoip_city_file, "GeoIP2-City", "GeoIP DB file for city", 0)\
M(String, geoip_isp_file, "GeoIP2-ISP", "GeoIP DB file for ISP", 0)\
M(String, geoip_asn_file, "GeoLite2-ASN", "GeoIP DB file for ASN", 0)\
\
/** gateway simplication settings*/ \
M(Bool, block_privileged_operations, 0, "Whether to disable tenant to access specific functions or not", 0)\
/** Sample setttings */ \
M(Bool, enable_sample_by_range, false, "Sample by range if it is true", 0) \
M(Bool, enable_deterministic_sample_by_range, false, "Deterministic sample by range if it is true", 0) \
Expand Down Expand Up @@ -1854,6 +1856,7 @@ enum PreloadLevelSettings : UInt64
M(String, use_snapshot, "", "If not empty, specify the name of the snapshot to use for query", 0) \
M(Seconds, snapshot_clean_interval, 300, "How often to remove ttl expired snapshots", 0) \
/* Outfile related Settings */ \
M(UInt64, split_file_size_in_mb, 0, "Threshold to split the out data in 'INTO OUTFILE' clause", 0) \
M(Bool, outfile_in_server_with_tcp, false, "Out file in sever with tcp and return client empty block", 0) \
M(UInt64, outfile_buffer_size_in_mb, 1, "Out file buffer size in 'OUT FILE'", 0) \
M(UInt64, fuzzy_max_files, 100, "The max number of files when insert with fuzzy names.", 0) \
Expand Down
6 changes: 0 additions & 6 deletions src/DataStreams/IBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,8 @@ class IBlockOutputStream : private boost::noncopyable
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

void setBuffer(std::shared_ptr<WriteBuffer> buffer) {
this->buffer_ = buffer;
}

private:
std::vector<TableLockHolder> table_locks;
// Used to ensure buffer's life cycle is as long as this class
std::shared_ptr<WriteBuffer> buffer_;
};

}
4 changes: 3 additions & 1 deletion src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
bool out_to_directory,
WriteCallback callback,
const std::optional<FormatSettings> & _format_settings) const
{
Expand All @@ -346,7 +347,8 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(

const Settings & settings = context->getSettingsRef();

if (settings.output_format_parallel_formatting && getCreators(name).supports_parallel_formatting
if (!out_to_directory && settings.output_format_parallel_formatting
&& getCreators(name).supports_parallel_formatting
&& !settings.output_format_json_array_of_rows)
{
auto formatter_creator = [output_getter, sample, callback, format_settings]
Expand Down
5 changes: 4 additions & 1 deletion src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ class FormatFactory final : private boost::noncopyable
UInt64 max_block_size,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;

/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done.
/// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done
/// For exporting into multiple files, ParallelFormat can't be used because of concurrency calculation
/// of accumulated file.
OutputFormatPtr getOutputFormatParallelIfPossible(
const String & name,
WriteBuffer & buf,
const Block & sample,
ContextPtr context,
bool out_to_directory = false,
WriteCallback callback = {},
const std::optional<FormatSettings> & format_settings = std::nullopt) const;

Expand Down
1 change: 1 addition & 0 deletions src/Functions/FunctionFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class FunctionFile : public IFunction, WithContext
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FunctionFile>(context_); }
explicit FunctionFile(ContextPtr context_) : WithContext(context_) {}

bool isPreviledgedFunction() const override { return getContext()->shouldBlockPrivilegedOperations(); }
bool isVariadic() const override { return true; }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
Expand Down
3 changes: 3 additions & 0 deletions src/Functions/IFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ ColumnPtr IExecutableFunction::defaultImplementationForNothing(
ColumnPtr IExecutableFunction::executeWithoutLowCardinalityColumns(
const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
{
if (isPreviledgedFunction())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tenant cannot execute this function {} for security reason.", getName());

if (auto res = defaultImplementationForConstantArguments(args, result_type, input_rows_count, dry_run))
return res;

Expand Down
3 changes: 3 additions & 0 deletions src/Functions/IFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class IExecutableFunction
return executeImpl(arguments, result_type, input_rows_count);
}

virtual bool isPreviledgedFunction() const { return false; }

/** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following:
* if some of arguments are NULL constants then return NULL constant,
* if some of arguments are Nullable, then execute function as usual for columns,
Expand Down Expand Up @@ -429,6 +431,7 @@ class IFunction
return executeImpl(arguments, result_type, input_rows_count);
}

virtual bool isPreviledgedFunction() const { return false; }
/** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following:
* if some of arguments are NULL constants then return NULL constant,
* if some of arguments are Nullable, then execute function as usual for columns,
Expand Down
2 changes: 2 additions & 0 deletions src/Functions/IFunctionAdaptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class FunctionToExecutableFunctionAdaptor final : public IExecutableFunction
return function->executeImplDryRun(arguments, result_type, input_rows_count);
}

bool isPreviledgedFunction() const final { return function->isPreviledgedFunction(); }

bool useDefaultImplementationForNulls() const final { return function->useDefaultImplementationForNulls(); }
bool useDefaultImplementationForNothing() const final
{
Expand Down
8 changes: 8 additions & 0 deletions src/Functions/getSetting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int LOGICAL_ERROR;
}

namespace
Expand Down Expand Up @@ -44,6 +45,11 @@ class FunctionGetSetting : public IFunction, WithContext
ErrorCodes::ILLEGAL_COLUMN};

std::string_view setting_name{column->getDataAt(0)};
/// Tenant is not allowed to get blacklist settings
if (getContext()->shouldBlockPrivilegedOperations()
&& privilegedSettings.find(std::string(setting_name)) != privilegedSettings.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tenant cannot execute this function {} for security reason.", getName());

value = getContext()->getSettingsRef().get(setting_name);

DataTypePtr type = applyVisitor(FieldToDataType{}, value);
Expand All @@ -58,8 +64,10 @@ class FunctionGetSetting : public IFunction, WithContext

private:
mutable Field value;
static const std::unordered_set<std::string> privilegedSettings;
};

const std::unordered_set<std::string> FunctionGetSetting::privilegedSettings = { "s3_access_key_secret", "tos_secret_key", "lasfs_secret_key", "s3_ak_secret" };
}

REGISTER_FUNCTION(GetSetting)
Expand Down
6 changes: 4 additions & 2 deletions src/Functions/hasColumnInTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class FunctionHasColumnInTable : public IFunction, WithContext
static constexpr auto name = "hasColumnInTable";
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionHasColumnInTable>(context_->getGlobalContext());
return std::make_shared<FunctionHasColumnInTable>(context_);
}

explicit FunctionHasColumnInTable(ContextPtr global_context_) : WithContext(global_context_)
explicit FunctionHasColumnInTable(ContextPtr context_) : WithContext(context_)
{
}

Expand All @@ -73,6 +73,8 @@ class FunctionHasColumnInTable : public IFunction, WithContext
return name;
}

bool isPreviledgedFunction() const override { return getContext()->shouldBlockPrivilegedOperations(); }

DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override;

bool isDeterministic() const override { return false; }
Expand Down
3 changes: 2 additions & 1 deletion src/IO/BrotliWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class BrotliWriteBuffer::BrotliStateWrapper
BrotliEncoderState * state;
};

BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, compression_level(compression_level_)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
Expand Down

0 comments on commit 6e0dabc

Please sign in to comment.