Skip to content

Commit

Permalink
Added ClientInfo: passing original source of query during distributed…
Browse files Browse the repository at this point in the history
… query execution; improvement of per-user limits in ProcessList [#METR-23279].
  • Loading branch information
alexey-milovidov committed Oct 24, 2016
1 parent 15b369b commit daeac24
Show file tree
Hide file tree
Showing 22 changed files with 486 additions and 206 deletions.
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Expand Up @@ -369,6 +369,7 @@ add_library (dbms
include/DB/Interpreters/InterpreterSelectQuery.h
include/DB/Interpreters/QueryPriorities.h
include/DB/Interpreters/QueryLog.h
include/DB/Interpreters/ClientInfo.h
include/DB/Interpreters/getClusterName.h
include/DB/Interpreters/ClusterProxy/IQueryConstructor.h
include/DB/Interpreters/ClusterProxy/SelectQueryConstructor.h
Expand Down Expand Up @@ -854,6 +855,7 @@ add_library (dbms
src/Interpreters/DictionaryFactory.cpp
src/Interpreters/ProcessList.cpp
src/Interpreters/QueryLog.cpp
src/Interpreters/ClientInfo.cpp
src/Interpreters/getClusterName.cpp
src/Interpreters/ClusterProxy/SelectQueryConstructor.cpp
src/Interpreters/ClusterProxy/DescribeQueryConstructor.cpp
Expand Down
11 changes: 9 additions & 2 deletions dbms/include/DB/Client/Connection.h
Expand Up @@ -24,6 +24,8 @@
namespace DB
{

class ClientInfo;

/// Поток блоков читающих из таблицы и ее имя
using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
/// Вектор пар, описывающих таблицы
Expand Down Expand Up @@ -135,8 +137,13 @@ class Connection : private boost::noncopyable
const String & getDefaultDatabase() const;

/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr, bool with_pending_data = false);
void sendQuery(
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false);

void sendCancel();
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
Expand Down
8 changes: 6 additions & 2 deletions dbms/include/DB/Client/MultiplexedConnections.h
Expand Up @@ -43,8 +43,12 @@ class MultiplexedConnections final : private boost::noncopyable
void sendExternalTablesData(std::vector<ExternalTablesData> & data);

/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
void sendQuery(
const String & query,
const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false);

/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
Expand Down
3 changes: 2 additions & 1 deletion dbms/include/DB/Core/Defines.h
Expand Up @@ -63,9 +63,10 @@
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
#define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032

/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 53694
#define DBMS_TCP_PROTOCOL_VERSION 54032

#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100

Expand Down
2 changes: 1 addition & 1 deletion dbms/include/DB/DataStreams/RemoteBlockOutputStream.h
Expand Up @@ -41,7 +41,7 @@ class RemoteBlockOutputStream : public IBlockOutputStream
* Он нужен, чтобы знать, какие блоки передавать в метод write.
*/

connection.sendQuery(query, "", QueryProcessingStage::Complete, settings);
connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr);

Connection::Packet packet = connection.receivePacket();

Expand Down
83 changes: 83 additions & 0 deletions dbms/include/DB/Interpreters/ClientInfo.h
@@ -0,0 +1,83 @@
#pragma once

#include <Poco/Net/SocketAddress.h>
#include <DB/Core/Types.h>


namespace DB
{

class WriteBuffer;
class ReadBuffer;


/** Information about client for query.
* Some fields are passed explicitly from client and some are calculated automatically.
*
* Contains info about initial query source, for tracing distributed queries
* (where one query initiates many other queries).
*/
class ClientInfo
{
public:
enum class Interface : UInt8
{
TCP = 1,
HTTP = 2,
};

enum class HTTPMethod : UInt8
{
UNKNOWN = 0,
GET = 1,
POST = 2,
};

enum class QueryKind : UInt8
{
NO_QUERY = 0, /// Uninitialized object.
INITIAL_QUERY = 1,
SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed query execution.
};


QueryKind query_kind = QueryKind::NO_QUERY;

/// Current values are not serialized, because it is passed separately.
String current_user;
String current_query_id;
Poco::Net::SocketAddress current_address;

/// When query_kind == INITIAL_QUERY, these values are equal to current.
String initial_user;
String initial_query_id;
Poco::Net::SocketAddress initial_address;

/// All below are parameters related to initial query.

Interface interface = Interface::TCP;

/// For tcp
String os_user;
String client_hostname;
String client_name;
UInt64 client_version_major = 0;
UInt64 client_version_minor = 0;
unsigned client_revision = 0;

/// For http
HTTPMethod http_method = HTTPMethod::UNKNOWN;
String http_user_agent;

bool empty() const { return query_kind == QueryKind::NO_QUERY; }

/** Serialization and deserialization.
* Only values that are not calculated automatically or passed separately are serialized.
*/
void write(WriteBuffer & out) const;
void read(ReadBuffer & in);

void fillOSUserHostNameAndVersionInfo();
};

}
35 changes: 6 additions & 29 deletions dbms/include/DB/Interpreters/Context.h
Expand Up @@ -6,6 +6,7 @@
#include <DB/Core/Types.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/ClientInfo.h>
#include <DB/Storages/IStorage.h>
#include <DB/IO/CompressedStream.h>

Expand Down Expand Up @@ -62,33 +63,14 @@ using Dependencies = std::vector<DatabaseAndTableName>;
*/
class Context
{
public:
enum class Interface
{
TCP = 1,
HTTP = 2,
};

enum class HTTPMethod
{
UNKNOWN = 0,
GET = 1,
POST = 2,
};

private:
using Shared = std::shared_ptr<ContextShared>;
Shared shared;

String user; /// Current user
Poco::Net::IPAddress ip_address; /// IP address
UInt16 port; /// and port, from which current query was recieved
Interface interface = Interface::TCP;
HTTPMethod http_method = HTTPMethod::UNKNOWN; /// NOTE Возможно, перенести это в отдельный struct ClientInfo.
ClientInfo client_info;

std::shared_ptr<QuotaForIntervals> quota; /// Текущая квота. По-умолчанию - пустая квота, которая ничего не ограничивает.
String current_database; /// Текущая БД.
String current_query_id; /// Id текущего запроса.
Settings settings; /// Настройки выполнения запроса.
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Колбек для отслеживания прогресса выполнения запроса.
Expand Down Expand Up @@ -122,16 +104,11 @@ class Context

ConfigurationPtr getUsersConfig();

void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key);
String getUser() const { return user; }
Poco::Net::IPAddress getIPAddress() const { return ip_address; }
UInt16 getPort() const { return port; }

Interface getInterface() const { return interface; }
void setInterface(Interface interface_) { interface = interface_; }
/// Must be called before getClientInfo.
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key);

HTTPMethod getHTTPMethod() const { return http_method; }
void setHTTPMethod(HTTPMethod http_method_) { http_method = http_method_; }
ClientInfo & getClientInfo() { return client_info; };
const ClientInfo & getClientInfo() const { return client_info; };

void setQuota(const String & name, const String & quota_key, const String & user_name, const Poco::Net::IPAddress & address);
QuotaForIntervals & getQuota();
Expand Down
47 changes: 21 additions & 26 deletions dbms/include/DB/Interpreters/ProcessList.h
Expand Up @@ -13,6 +13,7 @@
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Interpreters/ClientInfo.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/CurrentMetrics.h>

Expand All @@ -35,26 +36,20 @@ namespace DB
struct ProcessInfo
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
double elapsed_seconds;
size_t rows;
size_t bytes;
size_t total_rows;
Int64 memory_usage;
ClientInfo client_info;
};


/// Query and information about its execution.
struct ProcessListElement
{
String query;
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
ClientInfo client_info;

Stopwatch watch;

Expand All @@ -72,11 +67,13 @@ struct ProcessListElement
Tables temporary_tables;


ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, size_t max_memory_usage, double memory_tracker_fault_probability,
ProcessListElement(
const String & query_,
const ClientInfo & client_info_,
size_t max_memory_usage,
double memory_tracker_fault_probability,
QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), port(port_), memory_tracker(max_memory_usage),
: query(query_), client_info(client_info_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
{
memory_tracker.setDescription("(for query)");
Expand All @@ -103,18 +100,17 @@ struct ProcessListElement

ProcessInfo getInfo() const
{
return ProcessInfo{
.query = query,
.user = user,
.query_id = query_id,
.ip_address = ip_address,
.port = port,
.elapsed_seconds = watch.elapsedSeconds(),
.rows = progress.rows,
.bytes = progress.bytes,
.total_rows = progress.total_rows,
.memory_usage = memory_tracker.get(),
};
ProcessInfo res;

res.query = query;
res.client_info = client_info;
res.elapsed_seconds = watch.elapsedSeconds();
res.rows = progress.rows;
res.bytes = progress.bytes;
res.total_rows = progress.total_rows;
res.memory_usage = memory_tracker.get();

return res;
}
};

Expand Down Expand Up @@ -191,8 +187,7 @@ class ProcessList
* If too much running queries - wait for not more than specified (see settings) amount of time.
* If timeout is passed - throw an exception.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
UInt16 port_, const Settings & settings);
EntryPtr insert(const String & query_, const ClientInfo & client_info, const Settings & settings);

/// Number of currently executing queries.
size_t size() const { return cur_size; }
Expand Down
6 changes: 1 addition & 5 deletions dbms/include/DB/Interpreters/QueryLog.h
Expand Up @@ -59,11 +59,7 @@ struct QueryLogElement
String exception;
String stack_trace;

Context::Interface interface = Context::Interface::TCP;
Context::HTTPMethod http_method = Context::HTTPMethod::UNKNOWN;
Poco::Net::IPAddress ip_address;
String user;
String query_id;
ClientInfo client_info;
};


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Client/Client.cpp
Expand Up @@ -699,7 +699,7 @@ class Client : public Poco::Util::Application
/// Обработать запрос, который не требует передачи блоков данных на сервер.
void processOrdinaryQuery()
{
connection->sendQuery(query, "", QueryProcessingStage::Complete, &context.getSettingsRef(), true);
connection->sendQuery(query, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
}
Expand All @@ -717,7 +717,7 @@ class Client : public Poco::Util::Application
if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);

connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), true);
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();

/// Receive description of table structure.
Expand Down

0 comments on commit daeac24

Please sign in to comment.