Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Arrow Flight Server #16243

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
08db132
Add simple interface for servers
FawnD2 Oct 5, 2020
bf59be8
Add stub implementation of Arrow Flight Server
FawnD2 Oct 11, 2020
e35ba77
Use interfaces instead of Poco::Net::TCPServer
FawnD2 Oct 11, 2020
347f6d6
Simplify some code
FawnD2 Oct 10, 2020
6c9daf3
Link server with arrow
FawnD2 Oct 18, 2020
2d6255e
Remove Parquet from Arrow include
FawnD2 Oct 18, 2020
4d99cd3
Conditional support for Arrow Flight server
FawnD2 Oct 18, 2020
46d4bd8
Add arrow option in core config
FawnD2 Oct 18, 2020
7b82960
Try compile with Flight
FawnD2 Oct 18, 2020
82e9ea0
Merge branch 'master' into arrow-flight-server
FawnD2 Oct 21, 2020
d82bddc
Include Arrow lib with PUBLIC modifier
FawnD2 Oct 22, 2020
70fdde2
Disable condition USE_ARROW
FawnD2 Oct 22, 2020
21446ea
Add flight in arrow-cmake
FawnD2 Oct 24, 2020
7969307
Change path for arrow proto dir
FawnD2 Oct 24, 2020
403e81e
Fix output sources path for grpc proto generator
FawnD2 Oct 24, 2020
cd4801e
Link arrow with grpc. Use predefined functions to generate sources
FawnD2 Oct 24, 2020
7d68b45
Fix path
FawnD2 Oct 25, 2020
bb61cac
Link with grpc++
FawnD2 Oct 25, 2020
b8531a2
Specify out dir for proto files manually
FawnD2 Oct 25, 2020
699e156
Include directory with generated sources
FawnD2 Oct 25, 2020
2901509
Add source file for Uri
FawnD2 Oct 25, 2020
fa5a43d
Add some headers for uri
FawnD2 Oct 25, 2020
307f552
Remove dependency on test_util
FawnD2 Oct 25, 2020
8820d00
Remove direct dependency on flight internal
FawnD2 Oct 25, 2020
482dc54
Replace EXPECTR_TRUE with custom macro
FawnD2 Oct 25, 2020
6363f35
Include Exception header
FawnD2 Oct 25, 2020
94d72a8
Rename macro
FawnD2 Oct 25, 2020
28a9e72
Change comment
FawnD2 Oct 25, 2020
52941bf
Add all sources for Uri
FawnD2 Oct 25, 2020
e758364
Make address string before create location
FawnD2 Oct 31, 2020
355d3ad
Manual representation for ip addr
FawnD2 Oct 31, 2020
1c52872
Fix format
FawnD2 Oct 31, 2020
bc25d60
Merge branch 'master' into arrow-flight-server
FawnD2 Nov 7, 2020
883fb6e
Set localhost for Flight location
FawnD2 Nov 7, 2020
542168e
Log Flight server location
FawnD2 Nov 7, 2020
f1ed3ff
Use server location to determine Flight Info
FawnD2 Nov 7, 2020
788fcc7
Support string batches on server
FawnD2 Nov 7, 2020
9d77f4d
Set correct num total records
FawnD2 Nov 7, 2020
01113d6
Fix function definition
FawnD2 Nov 7, 2020
c41cde6
Fix function usage
FawnD2 Nov 7, 2020
143576d
Sum batch sizes to calc total number of records
FawnD2 Nov 7, 2020
c9e7d1b
Restore previous API version
FawnD2 Nov 7, 2020
c41a8e6
Mark total_bytes as unkown
FawnD2 Nov 7, 2020
d3e5d9a
Mark total_records as unknown
FawnD2 Nov 8, 2020
1007d6a
Add logs
FawnD2 Nov 8, 2020
62a5f65
Fix segfault: Do not compile generated Flight sources directly.
FawnD2 Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 31 additions & 11 deletions programs/server/Server.cpp
Expand Up @@ -64,6 +64,9 @@
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>

#include <Server/IRoutineServer.h>
#include <Server/HTTPServer.h>
#include <Server/TCPServer.h>

#if !defined(ARCADIA_BUILD)
# include "config_core.h"
Expand All @@ -78,6 +81,10 @@
# include <Common/hasLinuxCapability.h>
#endif

#if USE_ARROW
# include <Server/ArrowFlightServer.h>
#endif

#if USE_SSL
# include <Poco/Net/Context.h>
# include <Poco/Net/SecureServerSocket.h>
Expand Down Expand Up @@ -802,7 +809,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);

std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;
std::vector<std::unique_ptr<IRoutineServer>> servers;

std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");

Expand Down Expand Up @@ -907,8 +914,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);

servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(std::make_unique<DB::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening for http://{}", address.toString());
Expand All @@ -922,7 +928,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(std::make_unique<DB::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening for https://{}", address.toString());
Expand All @@ -940,7 +946,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(std::make_unique<DB::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
Expand All @@ -957,7 +963,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(std::make_unique<DB::TCPServer>(
new TCPHandlerFactory(*this, /* secure= */ true),
server_pool,
socket,
Expand All @@ -977,7 +983,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(std::make_unique<DB::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
Expand All @@ -990,7 +996,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(std::make_unique<DB::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
Expand All @@ -1007,7 +1013,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(std::make_unique<DB::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
Expand All @@ -1022,7 +1028,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(std::make_unique<DB::TCPServer>(
new PostgreSQLHandlerFactory(*this),
server_pool,
socket,
Expand All @@ -1031,14 +1037,28 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
});

create_server("arrow_flight_port", [&](UInt16 port)
{
#if USE_ARROW
auto arrow_flight_server = std::make_unique<DB::ArrowFlightServer>(*this, listen_host, port);
std::string location = arrow_flight_server->getLocation();
servers.emplace_back(std::move(arrow_flight_server));
LOG_INFO(log, "Listening for Arrow Flight compatibility protocol: ", location);
#else
UNUSED(port);
throw Exception{"Arrow Flight compatibility protocol is disabled because ClickHouse was built without Arrow support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});

/// Prometheus (if defined and not setup yet with http_port)
create_server("prometheus.port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(std::make_unique<DB::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));

LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
Expand Down
5 changes: 5 additions & 0 deletions src/CMakeLists.txt
Expand Up @@ -332,6 +332,11 @@ if (USE_CAPNP)
dbms_target_link_libraries (PRIVATE ${CAPNP_LIBRARIES})
endif ()

if (USE_ARROW)
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${ARROW_INCLUDE_DIR})
dbms_target_link_libraries(PRIVATE ${ARROW_LIBRARY})
endif ()

if (USE_PARQUET)
dbms_target_link_libraries(PRIVATE ${PARQUET_LIBRARY})
if (NOT USE_INTERNAL_PARQUET_LIBRARY OR USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Expand Up @@ -25,6 +25,7 @@
M(HTTPConnection, "Number of connections to HTTP server") \
M(InterserverConnection, "Number of connections from other replicas to fetch parts") \
M(PostgreSQLConnection, "Number of client connections using PostgreSQL protocol") \
M(ArrowFlightConnection, "Number of client connections using Arrow Flight protocol") \
M(OpenFileForRead, "Number of files open for reading") \
M(OpenFileForWrite, "Number of files open for writing") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
Expand Down
1 change: 1 addition & 0 deletions src/Core/config_core.h.in
Expand Up @@ -11,3 +11,4 @@
#cmakedefine01 USE_SSL
#cmakedefine01 USE_OPENCL
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ARROW
153 changes: 153 additions & 0 deletions src/Server/ArrowFlightServer.cpp
@@ -0,0 +1,153 @@
#include "ArrowFlightServer.h"
#include <arrow/flight/test_util.h> // FIXME: Remove it before merge

#include <memory>

namespace
{

arrow::Status GetBatchForFlight(const arrow::flight::Ticket & ticket, std::shared_ptr<arrow::RecordBatchReader> * out) {
if (ticket.ticket == "ticket-ints-1") {
arrow::flight::BatchVector batches;
RETURN_NOT_OK(arrow::flight::ExampleIntBatches(&batches));
*out = std::make_shared<arrow::flight::BatchIterator>(batches[0]->schema(), batches);
return arrow::Status::OK();
} else if (ticket.ticket == "ticket-dicts-1") {
arrow::flight::BatchVector batches;
RETURN_NOT_OK(arrow::flight::ExampleDictBatches(&batches));
*out = std::make_shared<arrow::flight::BatchIterator>(batches[0]->schema(), batches);
return arrow::Status::OK();
} else {
return arrow::Status::NotImplemented("no stream implemented for this ticket");
}
}

} // FIXME: Remove it before merge

namespace DB
{

namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;
}

ArrowFlightServer::ArrowFlightServer(IServer & server_, std::string host, int port)
: server(server_)
{
auto parse_location_status = arrow::flight::Location::ForGrpcTcp(host, port, &location);
if (!parse_location_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Invalid location {}:{} for Arrow Flight Server: {}",
host,
port,
parse_location_status.ToString());
}

std::string ArrowFlightServer::getLocation() const {
return location.ToString();
}

void ArrowFlightServer::start() {
arrow::flight::FlightServerOptions options(location);
auto init_status = Init(options);
if (!init_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Failed init Arrow Flight Server: {}",
init_status.ToString());

// Exit with a clean error code (0) on SIGTERM
// ARROW_CHECK_OK(handler->SetShutdownOnSignals({SIGTERM}));

auto serve_status = Serve();
if (!serve_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Failed serve Arrow Flight: {}",
serve_status.ToString());
}

void ArrowFlightServer::stop() {
auto status = Shutdown();
if (!status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Failed shutdown Arrow Flight: {}",
status.ToString());
}

int ArrowFlightServer::currentConnections() const {
return 0; // FIXME: implement
}

arrow::Status ArrowFlightServer::ListFlights(
const arrow::flight::ServerCallContext & /* context */,
const arrow::flight::Criteria * criteria,
std::unique_ptr<arrow::flight::FlightListing> * listings)
{
std::vector<arrow::flight::FlightInfo> flights = arrow::flight::ExampleFlightInfo();
if (criteria && !criteria->expression.empty()) {
// For test purposes, if we get criteria, return no results
flights.clear();
}
*listings = std::unique_ptr<arrow::flight::FlightListing>(new arrow::flight::SimpleFlightListing(flights));
return arrow::Status::OK();
}

arrow::Status ArrowFlightServer::GetFlightInfo(
const arrow::flight::ServerCallContext & /* context */,
const arrow::flight::FlightDescriptor & request,
std::unique_ptr<arrow::flight::FlightInfo> * out_info)
{
// Test that Arrow-C++ status codes can make it through gRPC
if (request.type == arrow::flight::FlightDescriptor::DescriptorType::CMD &&
request.cmd == "status-outofmemory") {
return arrow::Status::OutOfMemory("Sentinel");
}

std::vector<arrow::flight::FlightInfo> flights = arrow::flight::ExampleFlightInfo();

for (const auto& info : flights) {
if (info.descriptor().Equals(request)) {
*out_info = std::make_unique<arrow::flight::FlightInfo>(info);
return arrow::Status::OK();
}
}
return arrow::Status::Invalid("Flight not found: ", request.ToString());
}

arrow::Status ArrowFlightServer::GetSchema(
const arrow::flight::ServerCallContext & /* context */,
const arrow::flight::FlightDescriptor & request,
std::unique_ptr<arrow::flight::SchemaResult> * schema)
{
std::vector<arrow::flight::FlightInfo> flights = arrow::flight::ExampleFlightInfo();

for (const auto& info : flights) {
if (info.descriptor().Equals(request)) {
*schema = std::make_unique<arrow::flight::SchemaResult>(info.serialized_schema());
return arrow::Status::OK();
}
}
return arrow::Status::Invalid("Flight not found: ", request.ToString());
}

arrow::Status ArrowFlightServer::DoGet(
const arrow::flight::ServerCallContext & /* context */,
const arrow::flight::Ticket & request,
std::unique_ptr<arrow::flight::FlightDataStream> * data_stream)
{
// Test for ARROW-5095
if (request.ticket == "ARROW-5095-fail") {
return arrow::Status::UnknownError("Server-side error");
}
if (request.ticket == "ARROW-5095-success") {
return arrow::Status::OK();
}

std::shared_ptr<arrow::RecordBatchReader> batch_reader;
RETURN_NOT_OK(GetBatchForFlight(request, &batch_reader));

*data_stream = std::unique_ptr<arrow::flight::FlightDataStream>(new arrow::flight::RecordBatchStream(batch_reader));
return arrow::Status::OK();
}

}
74 changes: 74 additions & 0 deletions src/Server/ArrowFlightServer.h
@@ -0,0 +1,74 @@
#pragma once

#include <Common/CurrentMetrics.h>
#include <Poco/Net/TCPServerConnection.h>
#include <common/logger_useful.h>
#include "IServer.h"
#include "IRoutineServer.h"

#include <arrow/flight/server.h>
FawnD2 marked this conversation as resolved.
Show resolved Hide resolved

namespace CurrentMetrics
{
extern const Metric ArrowFlightConnection;
}

namespace DB
{

class ArrowFlightServer : public IRoutineServer, public arrow::flight::FlightServerBase
{
public:
ArrowFlightServer(IServer & server_, std::string host, int port);

std::string getLocation() const;

void start() override;

void stop() override;

int currentConnections() const override;

arrow::Status ListFlights(
const arrow::flight::ServerCallContext & context,
const arrow::flight::Criteria * criteria,
std::unique_ptr<arrow::flight::FlightListing> * listings
) override;

arrow::Status GetFlightInfo(
const arrow::flight::ServerCallContext & context,
const arrow::flight::FlightDescriptor & request,
std::unique_ptr<arrow::flight::FlightInfo> * info
) override;

arrow::Status GetSchema(
const arrow::flight::ServerCallContext & context,
const arrow::flight::FlightDescriptor & request,
std::unique_ptr<arrow::flight::SchemaResult> * schema
) override;

arrow::Status DoGet(
const arrow::flight::ServerCallContext & context,
const arrow::flight::Ticket & request,
std::unique_ptr<arrow::flight::FlightDataStream> * data_stream
) override;

// arrow::Status DoPut(
// const arrow::flight::ServerCallContext & context,
// std::unique_ptr<arrow::flight::FlightMessageReader> reader,
// std::unique_ptr<arrow::flight::FlightMetadataWriter> writer
// ) override;

private:
Poco::Logger * log = &Poco::Logger::get("ArrowFlightServer");

IServer & server;
arrow::flight::Location location;

std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;

CurrentMetrics::Increment metric_increment{CurrentMetrics::ArrowFlightConnection};
};

}