Skip to content

Commit

Permalink
ARROW-3294: [C++][Flight] Support Flight on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed May 29, 2019
1 parent 2ea7350 commit f394709
Show file tree
Hide file tree
Showing 29 changed files with 462 additions and 116 deletions.
1 change: 1 addition & 0 deletions appveyor.yml
Expand Up @@ -65,6 +65,7 @@ environment:
- JOB: "Toolchain"
GENERATOR: Visual Studio 14 2015 Win64
CONFIGURATION: "Release"
ARROW_BUILD_FLIGHT: "ON"
ARROW_BUILD_GANDIVA: "ON"
- JOB: "Static_Crt_Build"
GENERATOR: Ninja
Expand Down
9 changes: 9 additions & 0 deletions ci/appveyor-cpp-setup.bat
Expand Up @@ -17,6 +17,15 @@

@echo on

@rem Avoid picking up AppVeyor-installed OpenSSL (linker errors with gRPC)
@rem XXX Perhaps there is a smarter way of solving this issue?
rd /s /q C:\OpenSSL-Win32
rd /s /q C:\OpenSSL-Win64
rd /s /q C:\OpenSSL-v11-Win32
rd /s /q C:\OpenSSL-v11-Win64
rd /s /q C:\OpenSSL-v111-Win32
rd /s /q C:\OpenSSL-v111-Win64

conda update -y -q conda
conda config --set auto_update_conda false
conda info -a
Expand Down
9 changes: 8 additions & 1 deletion ci/cpp-msvc-build-main.bat
Expand Up @@ -63,6 +63,7 @@ cmake -G "%GENERATOR%" %CMAKE_ARGS% ^
-DARROW_VERBOSE_THIRDPARTY_BUILD=ON ^
-DARROW_CXXFLAGS="%ARROW_CXXFLAGS%" ^
-DCMAKE_CXX_FLAGS_RELEASE="/MD %CMAKE_CXX_FLAGS_RELEASE%" ^
-DARROW_FLIGHT=%ARROW_BUILD_FLIGHT% ^
-DARROW_GANDIVA=%ARROW_BUILD_GANDIVA% ^
-DARROW_PARQUET=ON ^
-DPARQUET_BUILD_EXECUTABLES=ON ^
Expand All @@ -89,10 +90,16 @@ pip install -r requirements.txt pickle5

set PYARROW_CXXFLAGS=%ARROW_CXXFLAGS%
set PYARROW_CMAKE_GENERATOR=%GENERATOR%
set PYARROW_BUNDLE_ARROW_CPP=ON
if "%ARROW_BUILD_FLIGHT%" == "ON" (
@rem ARROW-5441: bundling Arrow Flight libraries not implemented
set PYARROW_BUNDLE_ARROW_CPP=OFF
) else (
set PYARROW_BUNDLE_ARROW_CPP=ON
)
set PYARROW_BUNDLE_BOOST=OFF
set PYARROW_WITH_STATIC_BOOST=ON
set PYARROW_WITH_PARQUET=ON
set PYARROW_WITH_FLIGHT=%ARROW_BUILD_FLIGHT%
set PYARROW_WITH_GANDIVA=%ARROW_BUILD_GANDIVA%
set PYARROW_PARALLEL=2

Expand Down
9 changes: 9 additions & 0 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Expand Up @@ -1067,6 +1067,10 @@ endmacro()
if(ARROW_WITH_PROTOBUF)
resolve_dependency(Protobuf)

if(ARROW_PROTOBUF_USE_SHARED AND MSVC)
add_definitions(-DPROTOBUF_USE_DLLS)
endif()

# TODO: Don't use global includes but rather target_include_directories
include_directories(SYSTEM ${PROTOBUF_INCLUDE_DIR})

Expand Down Expand Up @@ -1750,6 +1754,8 @@ macro(build_cares)
CMAKE_ARGS ${CARES_CMAKE_ARGS}
BUILD_BYPRODUCTS "${CARES_STATIC_LIB}")

file(MAKE_DIRECTORY ${CARES_INCLUDE_DIR})

add_dependencies(toolchain cares_ep)
add_library(c-ares::cares STATIC IMPORTED)
set_target_properties(c-ares::cares
Expand Down Expand Up @@ -1877,6 +1883,9 @@ macro(build_grpc)
CMAKE_ARGS ${GRPC_CMAKE_ARGS} ${EP_LOG_OPTIONS}
DEPENDS ${grpc_dependencies})

# Work around https://gitlab.kitware.com/cmake/cmake/issues/15052
file(MAKE_DIRECTORY ${GRPC_INCLUDE_DIR})

add_library(gRPC::gpr STATIC IMPORTED)
set_target_properties(gRPC::gpr
PROPERTIES IMPORTED_LOCATION "${GRPC_STATIC_LIBRARY_GPR}"
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/arrow/flight/CMakeLists.txt
Expand Up @@ -27,6 +27,10 @@ set(ARROW_FLIGHT_STATIC_LINK_LIBS
gRPC::gpr
c-ares::cares)

if(WIN32)
list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS Ws2_32.lib)
endif()

if(GRPC_HAS_ADDRESS_SORTING)
list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS gRPC::address_sorting)
endif()
Expand Down Expand Up @@ -81,6 +85,8 @@ set(ARROW_FLIGHT_SRCS
types.cc)

add_arrow_lib(arrow_flight
OUTPUTS
ARROW_FLIGHT_LIBRARIES
SOURCES
${ARROW_FLIGHT_SRCS}
DEPENDENCIES
Expand All @@ -95,9 +101,15 @@ add_arrow_lib(arrow_flight
arrow_static
${ARROW_FLIGHT_STATIC_LINK_LIBS})

foreach(LIB_TARGET ${ARROW_FLIGHT_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
endforeach()

# Define arrow_flight_testing library
if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
add_arrow_lib(arrow_flight_testing
OUTPUTS
ARROW_FLIGHT_TESTING_LIBRARIES
SOURCES
test-util.cc
DEPENDENCIES
Expand All @@ -108,14 +120,20 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
SHARED_LINK_LIBS
arrow_shared
arrow_flight_shared
arrow_testing_shared
${BOOST_FILESYSTEM_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
GTest::GTest
STATIC_LINK_LIBS
arrow_static
arrow_flight_static)
arrow_flight_static
arrow_testing_static)
endif()

foreach(LIB_TARGET ${ARROW_FLIGHT_TESTING_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING)
endforeach()

add_arrow_test(flight-test
EXTRA_LINK_LIBS
${ARROW_FLIGHT_TEST_LINK_LIBS}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/flight/client.cc
Expand Up @@ -17,12 +17,14 @@

#include "arrow/flight/client.h"

// Platform-specific defines
#include "arrow/flight/platform.h"

#include <memory>
#include <sstream>
#include <string>
#include <utility>

#include "arrow/util/config.h"
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
#else
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/client.h
Expand Up @@ -27,9 +27,9 @@

#include "arrow/ipc/writer.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"

#include "arrow/flight/types.h" // IWYU pragma: keep
#include "arrow/flight/visibility.h"

namespace arrow {

Expand All @@ -46,7 +46,7 @@ class ClientAuthHandler;
typedef std::chrono::duration<double, std::chrono::seconds::period> TimeoutDuration;

/// \brief Hints to the underlying RPC layer for Arrow Flight calls.
class ARROW_EXPORT FlightCallOptions {
class ARROW_FLIGHT_EXPORT FlightCallOptions {
public:
/// Create a default set of call options.
FlightCallOptions();
Expand All @@ -57,14 +57,14 @@ class ARROW_EXPORT FlightCallOptions {
TimeoutDuration timeout;
};

class ARROW_EXPORT FlightClientOptions {
class ARROW_FLIGHT_EXPORT FlightClientOptions {
public:
std::string tls_root_certs;
};

/// \brief Client class for Arrow Flight RPC services (gRPC-based).
/// API experimental for now
class ARROW_EXPORT FlightClient {
class ARROW_FLIGHT_EXPORT FlightClient {
public:
~FlightClient();

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/flight/client_auth.h
Expand Up @@ -19,24 +19,24 @@

#include <string>

#include "arrow/flight/visibility.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"

namespace arrow {

namespace flight {

/// \brief A reader for messages from the server during an
/// authentication handshake.
class ARROW_EXPORT ClientAuthReader {
class ARROW_FLIGHT_EXPORT ClientAuthReader {
public:
virtual ~ClientAuthReader() = default;
virtual Status Read(std::string* response) = 0;
};

/// \brief A writer for messages to the server during an
/// authentication handshake.
class ARROW_EXPORT ClientAuthSender {
class ARROW_FLIGHT_EXPORT ClientAuthSender {
public:
virtual ~ClientAuthSender() = default;
virtual Status Write(const std::string& token) = 0;
Expand All @@ -46,7 +46,7 @@ class ARROW_EXPORT ClientAuthSender {
/// Authentication includes both an initial negotiation and a per-call
/// token validation. Implementations may choose to use either or both
/// mechanisms.
class ARROW_EXPORT ClientAuthHandler {
class ARROW_FLIGHT_EXPORT ClientAuthHandler {
public:
virtual ~ClientAuthHandler() = default;
/// \brief Authenticate the client on initial connection. The client
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/flight/customize_protobuf.h
Expand Up @@ -20,7 +20,15 @@
#include <limits>
#include <memory>

#include "arrow/flight/platform.h"
#include "arrow/util/config.h"

// Silence protobuf warnings
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4244)
#endif

#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/impl/codegen/config_protobuf.h>
#else
Expand All @@ -40,6 +48,10 @@
#include <grpc++/impl/codegen/proto_utils.h>
#endif

#ifdef _MSC_VER
#pragma warning(pop)
#endif

namespace grpc {

class ByteBuffer;
Expand Down
32 changes: 22 additions & 10 deletions cpp/src/arrow/flight/flight-test.cc
Expand Up @@ -21,6 +21,7 @@
#include <cstring>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -38,7 +39,6 @@
#error "gRPC headers should not be in public API"
#endif

#include "arrow/flight/Flight.pb.h"
#include "arrow/flight/internal.h"
#include "arrow/flight/test-util.h"

Expand Down Expand Up @@ -117,6 +117,8 @@ TEST(TestFlightDescriptor, Basics) {
ASSERT_TRUE(e.Equals(f));
}

// This tests the internal protobuf types which don't get exported in the Flight DLL.
#ifndef _WIN32
TEST(TestFlightDescriptor, ToFromProto) {
FlightDescriptor descr_test;
pb::FlightDescriptor pb_descr;
Expand All @@ -131,30 +133,40 @@ TEST(TestFlightDescriptor, ToFromProto) {
ASSERT_OK(internal::FromProto(pb_descr, &descr_test));
AssertEqual(descr2, descr_test);
}
#endif

TEST(TestFlight, StartStopTestServer) {
TestServer server("flight-test-server", 30000);
TestServer server("flight-test-server");
server.Start();
ASSERT_TRUE(server.IsRunning());

std::this_thread::sleep_for(std::chrono::duration<double>(0.2));

ASSERT_TRUE(server.IsRunning());
int exit_code = server.Stop();
#ifdef _WIN32
// We do a hard kill on Windows
ASSERT_EQ(259, exit_code);
#else
ASSERT_EQ(0, exit_code);
#endif
ASSERT_FALSE(server.IsRunning());
}

TEST(TestFlight, ConnectUri) {
TestServer server("flight-test-server", 30000);
TestServer server("flight-test-server");
server.Start();
ASSERT_TRUE(server.IsRunning());

std::stringstream ss;
ss << "grpc://localhost:" << server.port();
std::string uri = ss.str();

std::unique_ptr<FlightClient> client;
Location location1;
Location location2;
ASSERT_OK(Location::Parse("grpc://localhost:30000", &location1));
ASSERT_OK(Location::Parse("grpc://localhost:30000", &location2));
ASSERT_OK(Location::Parse(uri, &location1));
ASSERT_OK(Location::Parse(uri, &location2));
ASSERT_OK(FlightClient::Connect(location1, &client));
ASSERT_OK(FlightClient::Connect(location2, &client));
}
Expand All @@ -174,9 +186,9 @@ class TestFlightClient : public ::testing::Test {
// void TearDown() {}

void SetUp() {
port_ = 30000;
server_.reset(new TestServer("flight-test-server", port_));
server_.reset(new TestServer("flight-test-server"));
server_->Start();
port_ = server_->port();
ASSERT_OK(ConnectClient());
}

Expand Down Expand Up @@ -258,7 +270,7 @@ class TestAuthHandler : public ::testing::Test {
Location location;
std::unique_ptr<FlightServerBase> server(new AuthTestServer);

ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location));
ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location));
FlightServerOptions options(location);
options.auth_handler =
std::unique_ptr<ServerAuthHandler>(new TestServerAuthHandler("user", "p4ssw0rd"));
Expand All @@ -282,7 +294,7 @@ class TestDoPut : public ::testing::Test {
public:
void SetUp() {
Location location;
ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location));
ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location));

do_put_server_ = new DoPutTestServer();
server_.reset(new InProcessTestServer(
Expand Down Expand Up @@ -464,7 +476,7 @@ TEST_F(TestFlightClient, TimeoutFires) {
TEST_F(TestFlightClient, NoTimeout) {
// Call should complete quickly, so timeout should not fire
FlightCallOptions options;
options.timeout = TimeoutDuration{0.5};
options.timeout = TimeoutDuration{5.0}; // account for slow server process startup
std::unique_ptr<FlightInfo> info;
auto start = std::chrono::system_clock::now();
auto descriptor = FlightDescriptor::Path({"examples", "ints"});
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/internal.cc
Expand Up @@ -16,14 +16,14 @@
// under the License.

#include "arrow/flight/internal.h"
#include "arrow/flight/platform.h"
#include "arrow/flight/protocol-internal.h"

#include <cstddef>
#include <memory>
#include <string>
#include <utility>

#include "arrow/util/config.h"
#ifdef GRPCPP_PP_INCLUDE
#include <grpcpp/grpcpp.h>
#else
Expand Down

0 comments on commit f394709

Please sign in to comment.