Skip to content

Commit

Permalink
Added port column into system.processes table. [#METR-22966]
Browse files Browse the repository at this point in the history
  • Loading branch information
ludv1x committed Oct 11, 2016
1 parent ce0c51f commit 144d901
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 13 deletions.
8 changes: 5 additions & 3 deletions dbms/include/DB/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ class Context
using Shared = std::shared_ptr<ContextShared>;
Shared shared;

String user; /// Текущий пользователь.
Poco::Net::IPAddress ip_address; /// IP-адрес, с которого задан запрос.
String user; /// Current user
Poco::Net::IPAddress ip_address; /// IP address
UInt16 port; /// and port, from which current query was recieved
Interface interface = Interface::TCP;
HTTPMethod http_method = HTTPMethod::UNKNOWN; /// NOTE Возможно, перенести это в отдельный struct ClientInfo.

Expand Down Expand Up @@ -121,9 +122,10 @@ class Context

ConfigurationPtr getUsersConfig();

void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key);
void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key);
String getUser() const { return user; }
Poco::Net::IPAddress getIPAddress() const { return ip_address; }
UInt16 getPort() const { return port; }

Interface getInterface() const { return interface; }
void setInterface(Interface interface_) { interface = interface_; }
Expand Down
9 changes: 6 additions & 3 deletions dbms/include/DB/Interpreters/ProcessList.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct ProcessInfo
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
double elapsed_seconds;
size_t rows;
size_t bytes;
Expand All @@ -48,6 +49,7 @@ struct ProcessListElement
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;

Stopwatch watch;

Expand All @@ -67,9 +69,9 @@ struct ProcessListElement

ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_memory_usage, double memory_tracker_fault_probability,
UInt16 port_, size_t max_memory_usage, double memory_tracker_fault_probability,
QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage),
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), port(port_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
{
memory_tracker.setDescription("(for query)");
Expand Down Expand Up @@ -101,6 +103,7 @@ struct ProcessListElement
.user = user,
.query_id = query_id,
.ip_address = ip_address,
.port = port,
.elapsed_seconds = watch.elapsedSeconds(),
.rows = progress.rows,
.bytes = progress.bytes,
Expand Down Expand Up @@ -184,7 +187,7 @@ class ProcessList
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings);
UInt16 port_, const Settings & settings);

/// Количество одновременно выполняющихся запросов.
size_t size() const { return cur_size; }
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,17 @@ ConfigurationPtr Context::getUsersConfig()
}


void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key)
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key)
{
auto lock = getLock();

const User & user_props = shared->users.get(name, password, address);
setSetting("profile", user_props.profile);
setQuota(user_props.quota, quota_key, name, address);

user = name;
ip_address = address;
this->user = name;
this->ip_address = address;
this->port = port;
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace ErrorCodes

ProcessList::EntryPtr ProcessList::insert(
const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings)
UInt16 port_, const Settings & settings)
{
EntryPtr res;

Expand Down Expand Up @@ -54,7 +54,7 @@ ProcessList::EntryPtr ProcessList::insert(
++cur_size;

res = std::make_shared<Entry>(*this, cont.emplace(cont.end(),
query_, user_, query_id_, ip_address_,
query_, user_, query_id_, ip_address_, port_,
settings.limits.max_memory_usage, settings.memory_tracker_fault_probability,
priorities.insert(settings.priority)));

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.getUser(),
context.getCurrentQueryId(),
context.getIPAddress(),
context.getPort(),
settings);

context.setProcessListElement(&process_list_entry->get());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/HTTPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void HTTPHandler::processQuery(
Context context = *server.global_context;
context.setGlobalContext(*server.global_context);

context.setUser(user, password, request.clientAddress().host(), quota_key);
context.setUser(user, password, request.clientAddress().host(), request.clientAddress().port(), quota_key);
context.setCurrentQueryId(query_id);

std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ void TCPHandler::receiveHello()
<< (!user.empty() ? ", user: " + user : "")
<< ".");

connection_context.setUser(user, password, socket().peerAddress().host(), "");
connection_context.setUser(user, password, socket().peerAddress().host(), socket().peerAddress().port(), "");
}


Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/System/StorageSystemProcesses.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_)
, columns{
{ "user", std::make_shared<DataTypeString>() },
{ "address", std::make_shared<DataTypeString>() },
{ "port", std::make_shared<DataTypeUInt16>() },
{ "elapsed", std::make_shared<DataTypeFloat64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_read", std::make_shared<DataTypeUInt64>() },
Expand Down Expand Up @@ -46,6 +47,7 @@ BlockInputStreams StorageSystemProcesses::read(

ColumnWithTypeAndName col_user{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "user"};
ColumnWithTypeAndName col_address{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "address"};
ColumnWithTypeAndName col_port{std::make_shared<ColumnUInt16>(), std::make_shared<DataTypeUInt16>(), "port"};
ColumnWithTypeAndName col_elapsed{std::make_shared<ColumnFloat64>(), std::make_shared<DataTypeFloat64>(), "elapsed"};
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_read"};
Expand All @@ -60,6 +62,7 @@ BlockInputStreams StorageSystemProcesses::read(
{
col_user.column->insert(process.user);
col_address.column->insert(process.ip_address.toString());
col_port.column->insert(static_cast<UInt64>(process.port));
col_elapsed.column->insert(process.elapsed_seconds);
col_rows_read.column->insert(process.rows);
col_bytes_read.column->insert(process.bytes);
Expand All @@ -72,6 +75,7 @@ BlockInputStreams StorageSystemProcesses::read(
Block block{
col_user,
col_address,
col_port,
col_elapsed,
col_rows_read,
col_bytes_read,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1390
4 changes: 4 additions & 0 deletions dbms/tests/queries/0_stateless/00379_system_processes_port.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

curl --local-port 1390 'http://localhost:8123?query=SELECT%20port%20FROM%20system.processes%20ORDER%20BY%20elapsed%20LIMIT%201'

0 comments on commit 144d901

Please sign in to comment.