From f394709a3efff56b9caa16c9779a848296a5dd73 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 29 May 2019 21:34:37 +0200 Subject: [PATCH] ARROW-3294: [C++][Flight] Support Flight on Windows --- appveyor.yml | 1 + ci/appveyor-cpp-setup.bat | 9 +++ ci/cpp-msvc-build-main.bat | 9 ++- cpp/cmake_modules/ThirdpartyToolchain.cmake | 9 +++ cpp/src/arrow/flight/CMakeLists.txt | 20 ++++- cpp/src/arrow/flight/client.cc | 4 +- cpp/src/arrow/flight/client.h | 8 +- cpp/src/arrow/flight/client_auth.h | 8 +- cpp/src/arrow/flight/customize_protobuf.h | 12 +++ cpp/src/arrow/flight/flight-test.cc | 32 +++++--- cpp/src/arrow/flight/internal.cc | 2 +- cpp/src/arrow/flight/internal.h | 13 +++- cpp/src/arrow/flight/platform.h | 32 ++++++++ cpp/src/arrow/flight/protocol-internal.h | 3 + .../arrow/flight/serialization-internal.cc | 3 +- cpp/src/arrow/flight/server.cc | 68 ++++++++++------- cpp/src/arrow/flight/server.h | 30 +++++--- cpp/src/arrow/flight/server_auth.h | 10 +-- cpp/src/arrow/flight/test-util.cc | 17 +++++ cpp/src/arrow/flight/test-util.h | 55 ++++++++++---- cpp/src/arrow/flight/types.h | 32 ++++---- cpp/src/arrow/flight/visibility.h | 48 ++++++++++++ cpp/src/arrow/io/mman.h | 7 -- cpp/src/arrow/python/flight.cc | 11 ++- cpp/src/arrow/util/io-util.cc | 76 +++++++++++++++++++ cpp/src/arrow/util/io-util.h | 39 ++++++++++ cpp/src/arrow/util/windows_compatibility.h | 11 ++- python/CMakeLists.txt | 2 +- python/pyarrow/tests/test_flight.py | 7 +- 29 files changed, 462 insertions(+), 116 deletions(-) create mode 100644 cpp/src/arrow/flight/platform.h create mode 100644 cpp/src/arrow/flight/visibility.h diff --git a/appveyor.yml b/appveyor.yml index f4c587733df7f..943b30f7ee0e0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -65,6 +65,7 @@ environment: - JOB: "Toolchain" GENERATOR: Visual Studio 14 2015 Win64 CONFIGURATION: "Release" + ARROW_BUILD_FLIGHT: "ON" ARROW_BUILD_GANDIVA: "ON" - JOB: "Static_Crt_Build" GENERATOR: Ninja diff --git a/ci/appveyor-cpp-setup.bat b/ci/appveyor-cpp-setup.bat index 0f5868a504a49..aa29498918304 100644 --- a/ci/appveyor-cpp-setup.bat +++ b/ci/appveyor-cpp-setup.bat @@ -17,6 +17,15 @@ @echo on +@rem Avoid picking up AppVeyor-installed OpenSSL (linker errors with gRPC) +@rem XXX Perhaps there is a smarter way of solving this issue? +rd /s /q C:\OpenSSL-Win32 +rd /s /q C:\OpenSSL-Win64 +rd /s /q C:\OpenSSL-v11-Win32 +rd /s /q C:\OpenSSL-v11-Win64 +rd /s /q C:\OpenSSL-v111-Win32 +rd /s /q C:\OpenSSL-v111-Win64 + conda update -y -q conda conda config --set auto_update_conda false conda info -a diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index 74eac7266deab..8486f60ba5839 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -63,6 +63,7 @@ cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ -DARROW_VERBOSE_THIRDPARTY_BUILD=ON ^ -DARROW_CXXFLAGS="%ARROW_CXXFLAGS%" ^ -DCMAKE_CXX_FLAGS_RELEASE="/MD %CMAKE_CXX_FLAGS_RELEASE%" ^ + -DARROW_FLIGHT=%ARROW_BUILD_FLIGHT% ^ -DARROW_GANDIVA=%ARROW_BUILD_GANDIVA% ^ -DARROW_PARQUET=ON ^ -DPARQUET_BUILD_EXECUTABLES=ON ^ @@ -89,10 +90,16 @@ pip install -r requirements.txt pickle5 set PYARROW_CXXFLAGS=%ARROW_CXXFLAGS% set PYARROW_CMAKE_GENERATOR=%GENERATOR% -set PYARROW_BUNDLE_ARROW_CPP=ON +if "%ARROW_BUILD_FLIGHT%" == "ON" ( + @rem ARROW-5441: bundling Arrow Flight libraries not implemented + set PYARROW_BUNDLE_ARROW_CPP=OFF +) else ( + set PYARROW_BUNDLE_ARROW_CPP=ON +) set PYARROW_BUNDLE_BOOST=OFF set PYARROW_WITH_STATIC_BOOST=ON set PYARROW_WITH_PARQUET=ON +set PYARROW_WITH_FLIGHT=%ARROW_BUILD_FLIGHT% set PYARROW_WITH_GANDIVA=%ARROW_BUILD_GANDIVA% set PYARROW_PARALLEL=2 diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 4debe21e56b6f..deffab4de436a 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -1067,6 +1067,10 @@ endmacro() if(ARROW_WITH_PROTOBUF) resolve_dependency(Protobuf) + if(ARROW_PROTOBUF_USE_SHARED AND MSVC) + add_definitions(-DPROTOBUF_USE_DLLS) + endif() + # TODO: Don't use global includes but rather target_include_directories include_directories(SYSTEM ${PROTOBUF_INCLUDE_DIR}) @@ -1750,6 +1754,8 @@ macro(build_cares) CMAKE_ARGS ${CARES_CMAKE_ARGS} BUILD_BYPRODUCTS "${CARES_STATIC_LIB}") + file(MAKE_DIRECTORY ${CARES_INCLUDE_DIR}) + add_dependencies(toolchain cares_ep) add_library(c-ares::cares STATIC IMPORTED) set_target_properties(c-ares::cares @@ -1877,6 +1883,9 @@ macro(build_grpc) CMAKE_ARGS ${GRPC_CMAKE_ARGS} ${EP_LOG_OPTIONS} DEPENDS ${grpc_dependencies}) + # Work around https://gitlab.kitware.com/cmake/cmake/issues/15052 + file(MAKE_DIRECTORY ${GRPC_INCLUDE_DIR}) + add_library(gRPC::gpr STATIC IMPORTED) set_target_properties(gRPC::gpr PROPERTIES IMPORTED_LOCATION "${GRPC_STATIC_LIBRARY_GPR}" diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt index 6ec5d5d69b9bf..a46e7764566c2 100644 --- a/cpp/src/arrow/flight/CMakeLists.txt +++ b/cpp/src/arrow/flight/CMakeLists.txt @@ -27,6 +27,10 @@ set(ARROW_FLIGHT_STATIC_LINK_LIBS gRPC::gpr c-ares::cares) +if(WIN32) + list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS Ws2_32.lib) +endif() + if(GRPC_HAS_ADDRESS_SORTING) list(APPEND ARROW_FLIGHT_STATIC_LINK_LIBS gRPC::address_sorting) endif() @@ -81,6 +85,8 @@ set(ARROW_FLIGHT_SRCS types.cc) add_arrow_lib(arrow_flight + OUTPUTS + ARROW_FLIGHT_LIBRARIES SOURCES ${ARROW_FLIGHT_SRCS} DEPENDENCIES @@ -95,9 +101,15 @@ add_arrow_lib(arrow_flight arrow_static ${ARROW_FLIGHT_STATIC_LINK_LIBS}) +foreach(LIB_TARGET ${ARROW_FLIGHT_LIBRARIES}) + target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING) +endforeach() + # Define arrow_flight_testing library if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS) add_arrow_lib(arrow_flight_testing + OUTPUTS + ARROW_FLIGHT_TESTING_LIBRARIES SOURCES test-util.cc DEPENDENCIES @@ -108,14 +120,20 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS) SHARED_LINK_LIBS arrow_shared arrow_flight_shared + arrow_testing_shared ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_SYSTEM_LIBRARY} GTest::GTest STATIC_LINK_LIBS arrow_static - arrow_flight_static) + arrow_flight_static + arrow_testing_static) endif() +foreach(LIB_TARGET ${ARROW_FLIGHT_TESTING_LIBRARIES}) + target_compile_definitions(${LIB_TARGET} PRIVATE ARROW_FLIGHT_EXPORTING) +endforeach() + add_arrow_test(flight-test EXTRA_LINK_LIBS ${ARROW_FLIGHT_TEST_LINK_LIBS} diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index d831647555013..1c927da782d43 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -17,12 +17,14 @@ #include "arrow/flight/client.h" +// Platform-specific defines +#include "arrow/flight/platform.h" + #include #include #include #include -#include "arrow/util/config.h" #ifdef GRPCPP_PP_INCLUDE #include #else diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h index 276ffc7021290..689c9f8c5b501 100644 --- a/cpp/src/arrow/flight/client.h +++ b/cpp/src/arrow/flight/client.h @@ -27,9 +27,9 @@ #include "arrow/ipc/writer.h" #include "arrow/status.h" -#include "arrow/util/visibility.h" #include "arrow/flight/types.h" // IWYU pragma: keep +#include "arrow/flight/visibility.h" namespace arrow { @@ -46,7 +46,7 @@ class ClientAuthHandler; typedef std::chrono::duration TimeoutDuration; /// \brief Hints to the underlying RPC layer for Arrow Flight calls. -class ARROW_EXPORT FlightCallOptions { +class ARROW_FLIGHT_EXPORT FlightCallOptions { public: /// Create a default set of call options. FlightCallOptions(); @@ -57,14 +57,14 @@ class ARROW_EXPORT FlightCallOptions { TimeoutDuration timeout; }; -class ARROW_EXPORT FlightClientOptions { +class ARROW_FLIGHT_EXPORT FlightClientOptions { public: std::string tls_root_certs; }; /// \brief Client class for Arrow Flight RPC services (gRPC-based). /// API experimental for now -class ARROW_EXPORT FlightClient { +class ARROW_FLIGHT_EXPORT FlightClient { public: ~FlightClient(); diff --git a/cpp/src/arrow/flight/client_auth.h b/cpp/src/arrow/flight/client_auth.h index cc7ed10d4b235..9dad36aa09489 100644 --- a/cpp/src/arrow/flight/client_auth.h +++ b/cpp/src/arrow/flight/client_auth.h @@ -19,8 +19,8 @@ #include +#include "arrow/flight/visibility.h" #include "arrow/status.h" -#include "arrow/util/visibility.h" namespace arrow { @@ -28,7 +28,7 @@ namespace flight { /// \brief A reader for messages from the server during an /// authentication handshake. -class ARROW_EXPORT ClientAuthReader { +class ARROW_FLIGHT_EXPORT ClientAuthReader { public: virtual ~ClientAuthReader() = default; virtual Status Read(std::string* response) = 0; @@ -36,7 +36,7 @@ class ARROW_EXPORT ClientAuthReader { /// \brief A writer for messages to the server during an /// authentication handshake. -class ARROW_EXPORT ClientAuthSender { +class ARROW_FLIGHT_EXPORT ClientAuthSender { public: virtual ~ClientAuthSender() = default; virtual Status Write(const std::string& token) = 0; @@ -46,7 +46,7 @@ class ARROW_EXPORT ClientAuthSender { /// Authentication includes both an initial negotiation and a per-call /// token validation. Implementations may choose to use either or both /// mechanisms. -class ARROW_EXPORT ClientAuthHandler { +class ARROW_FLIGHT_EXPORT ClientAuthHandler { public: virtual ~ClientAuthHandler() = default; /// \brief Authenticate the client on initial connection. The client diff --git a/cpp/src/arrow/flight/customize_protobuf.h b/cpp/src/arrow/flight/customize_protobuf.h index 1d67480a53e4e..f27ab0b6878d2 100644 --- a/cpp/src/arrow/flight/customize_protobuf.h +++ b/cpp/src/arrow/flight/customize_protobuf.h @@ -20,7 +20,15 @@ #include #include +#include "arrow/flight/platform.h" #include "arrow/util/config.h" + +// Silence protobuf warnings +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4244) +#endif + #ifdef GRPCPP_PP_INCLUDE #include #else @@ -40,6 +48,10 @@ #include #endif +#ifdef _MSC_VER +#pragma warning(pop) +#endif + namespace grpc { class ByteBuffer; diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc index e3e01df5037af..cb7e57c85584b 100644 --- a/cpp/src/arrow/flight/flight-test.cc +++ b/cpp/src/arrow/flight/flight-test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -38,7 +39,6 @@ #error "gRPC headers should not be in public API" #endif -#include "arrow/flight/Flight.pb.h" #include "arrow/flight/internal.h" #include "arrow/flight/test-util.h" @@ -117,6 +117,8 @@ TEST(TestFlightDescriptor, Basics) { ASSERT_TRUE(e.Equals(f)); } +// This tests the internal protobuf types which don't get exported in the Flight DLL. +#ifndef _WIN32 TEST(TestFlightDescriptor, ToFromProto) { FlightDescriptor descr_test; pb::FlightDescriptor pb_descr; @@ -131,9 +133,10 @@ TEST(TestFlightDescriptor, ToFromProto) { ASSERT_OK(internal::FromProto(pb_descr, &descr_test)); AssertEqual(descr2, descr_test); } +#endif TEST(TestFlight, StartStopTestServer) { - TestServer server("flight-test-server", 30000); + TestServer server("flight-test-server"); server.Start(); ASSERT_TRUE(server.IsRunning()); @@ -141,20 +144,29 @@ TEST(TestFlight, StartStopTestServer) { ASSERT_TRUE(server.IsRunning()); int exit_code = server.Stop(); +#ifdef _WIN32 + // We do a hard kill on Windows + ASSERT_EQ(259, exit_code); +#else ASSERT_EQ(0, exit_code); +#endif ASSERT_FALSE(server.IsRunning()); } TEST(TestFlight, ConnectUri) { - TestServer server("flight-test-server", 30000); + TestServer server("flight-test-server"); server.Start(); ASSERT_TRUE(server.IsRunning()); + std::stringstream ss; + ss << "grpc://localhost:" << server.port(); + std::string uri = ss.str(); + std::unique_ptr client; Location location1; Location location2; - ASSERT_OK(Location::Parse("grpc://localhost:30000", &location1)); - ASSERT_OK(Location::Parse("grpc://localhost:30000", &location2)); + ASSERT_OK(Location::Parse(uri, &location1)); + ASSERT_OK(Location::Parse(uri, &location2)); ASSERT_OK(FlightClient::Connect(location1, &client)); ASSERT_OK(FlightClient::Connect(location2, &client)); } @@ -174,9 +186,9 @@ class TestFlightClient : public ::testing::Test { // void TearDown() {} void SetUp() { - port_ = 30000; - server_.reset(new TestServer("flight-test-server", port_)); + server_.reset(new TestServer("flight-test-server")); server_->Start(); + port_ = server_->port(); ASSERT_OK(ConnectClient()); } @@ -258,7 +270,7 @@ class TestAuthHandler : public ::testing::Test { Location location; std::unique_ptr server(new AuthTestServer); - ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location)); + ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location)); FlightServerOptions options(location); options.auth_handler = std::unique_ptr(new TestServerAuthHandler("user", "p4ssw0rd")); @@ -282,7 +294,7 @@ class TestDoPut : public ::testing::Test { public: void SetUp() { Location location; - ASSERT_OK(Location::ForGrpcTcp("localhost", 30000, &location)); + ASSERT_OK(Location::ForGrpcTcp("localhost", GetListenPort(), &location)); do_put_server_ = new DoPutTestServer(); server_.reset(new InProcessTestServer( @@ -464,7 +476,7 @@ TEST_F(TestFlightClient, TimeoutFires) { TEST_F(TestFlightClient, NoTimeout) { // Call should complete quickly, so timeout should not fire FlightCallOptions options; - options.timeout = TimeoutDuration{0.5}; + options.timeout = TimeoutDuration{5.0}; // account for slow server process startup std::unique_ptr info; auto start = std::chrono::system_clock::now(); auto descriptor = FlightDescriptor::Path({"examples", "ints"}); diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc index c60e58597f655..55821495e1caa 100644 --- a/cpp/src/arrow/flight/internal.cc +++ b/cpp/src/arrow/flight/internal.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/flight/internal.h" +#include "arrow/flight/platform.h" #include "arrow/flight/protocol-internal.h" #include @@ -23,7 +24,6 @@ #include #include -#include "arrow/util/config.h" #ifdef GRPCPP_PP_INCLUDE #include #else diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h index db95eb3746038..784e8ebae1c34 100644 --- a/cpp/src/arrow/flight/internal.h +++ b/cpp/src/arrow/flight/internal.h @@ -65,8 +65,17 @@ namespace internal { static const char* AUTH_HEADER = "auth-token-bin"; +ARROW_FLIGHT_EXPORT Status SchemaToString(const Schema& schema, std::string* out); +ARROW_FLIGHT_EXPORT +Status FromGrpcStatus(const grpc::Status& grpc_status); + +ARROW_FLIGHT_EXPORT +grpc::Status ToGrpcStatus(const Status& arrow_status); + +// These functions depend on protobuf types which are not exported in the Flight DLL. + Status FromProto(const pb::ActionType& pb_type, ActionType* type); Status FromProto(const pb::Action& pb_action, Action* action); Status FromProto(const pb::Result& pb_result, Result* result); @@ -86,10 +95,6 @@ Status ToProto(const Action& action, pb::Action* pb_action); Status ToProto(const Result& result, pb::Result* pb_result); void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket); -Status FromGrpcStatus(const grpc::Status& grpc_status); - -grpc::Status ToGrpcStatus(const Status& arrow_status); - } // namespace internal } // namespace flight } // namespace arrow diff --git a/cpp/src/arrow/flight/platform.h b/cpp/src/arrow/flight/platform.h new file mode 100644 index 0000000000000..7f1b0954d8487 --- /dev/null +++ b/cpp/src/arrow/flight/platform.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Internal header. Platform-specific definitions for gRPC. + +#pragma once + +#ifdef _MSC_VER + +// The protobuf documentation says that C4251 warnings when using the +// library are spurious and suppressed when the build the library and +// compiler, but must be also suppressed in downstream projects +#pragma warning(disable : 4251) + +#endif // _MSC_VER + +#include "arrow/util/config.h" // IWYU pragma: keep +#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep diff --git a/cpp/src/arrow/flight/protocol-internal.h b/cpp/src/arrow/flight/protocol-internal.h index 848c1a801bd00..98bf923880934 100644 --- a/cpp/src/arrow/flight/protocol-internal.h +++ b/cpp/src/arrow/flight/protocol-internal.h @@ -16,6 +16,9 @@ #pragma once +// This addresses platform-specific defines, e.g. on Windows +#include "arrow/flight/platform.h" // IWYU pragma: keep + // This header holds the Flight protobuf definitions. // Need to include this first to get our gRPC customizations diff --git a/cpp/src/arrow/flight/serialization-internal.cc b/cpp/src/arrow/flight/serialization-internal.cc index c0d0bc1ad5afc..d78bac8387072 100644 --- a/cpp/src/arrow/flight/serialization-internal.cc +++ b/cpp/src/arrow/flight/serialization-internal.cc @@ -22,11 +22,12 @@ #include #include -#include "arrow/util/config.h" +#include "arrow/flight/platform.h" #include #include #include + #include #ifdef GRPCPP_PP_INCLUDE #include diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 46dd5bf011901..9b6bf6ca410c8 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +// Platform-specific defines +#include "arrow/flight/platform.h" + #include "arrow/flight/server.h" #include @@ -25,7 +28,6 @@ #include #include -#include "arrow/util/config.h" #ifdef GRPCPP_PP_INCLUDE #include #else @@ -39,6 +41,7 @@ #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/status.h" +#include "arrow/util/io-util.h" #include "arrow/util/logging.h" #include "arrow/util/stl.h" #include "arrow/util/uri.h" @@ -407,21 +410,41 @@ class FlightServiceImpl : public FlightService::Service { } // namespace +// +// gRPC server lifecycle +// + #if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2) #error "atomic ints and atomic pointers not always lock-free!" #endif +using ::arrow::internal::GetSignalHandler; +using ::arrow::internal::SetSignalHandler; +using ::arrow::internal::SignalHandler; + struct FlightServerBase::Impl { std::unique_ptr service_; std::unique_ptr server_; +#ifdef _WIN32 + // Signal handlers are executed in a separate thread on Windows, so getting + // the current thread instance wouldn't make sense. This means only a single + // instance can receive signals on Windows. + static std::atomic running_instance_; +#else + static thread_local std::atomic running_instance_; +#endif // Signal handling std::vector signals_; - std::vector old_signal_handlers_; + std::vector old_signal_handlers_; std::atomic got_signal_; - static thread_local std::atomic running_instance_; - static void HandleSignal(int signum); + static void HandleSignal(int signum) { + auto instance = running_instance_.load(); + if (instance != nullptr) { + instance->DoHandleSignal(signum); + } + } void DoHandleSignal(int signum) { got_signal_ = signum; @@ -429,15 +452,12 @@ struct FlightServerBase::Impl { } }; +#ifdef _WIN32 +std::atomic FlightServerBase::Impl::running_instance_; +#else thread_local std::atomic FlightServerBase::Impl::running_instance_; - -void FlightServerBase::Impl::HandleSignal(int signum) { - auto instance = running_instance_.load(); - if (instance != nullptr) { - instance->DoHandleSignal(signum); - } -} +#endif FlightServerOptions::FlightServerOptions(const Location& location_) : location(location_), auth_handler(nullptr) {} @@ -502,23 +522,15 @@ Status FlightServerBase::Serve() { if (!impl_->server_) { return Status::UnknownError("Server did not start properly"); } - impl_->got_signal_ = 0; + impl_->old_signal_handlers_.clear(); impl_->running_instance_ = impl_.get(); - // Setup signal handlers - impl_->old_signal_handlers_.clear(); + // Override existing signal handlers with our own handler so as to stop the server. for (size_t i = 0; i < impl_->signals_.size(); ++i) { int signum = impl_->signals_[i]; - // Override with our own handler so as to stop the server. - struct sigaction sa, old_handler; - sa.sa_handler = &Impl::HandleSignal; - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - int ret = sigaction(signum, &sa, &old_handler); - if (ret != 0) { - return Status::IOError("sigaction call failed"); - } + SignalHandler new_handler(&Impl::HandleSignal), old_handler; + RETURN_NOT_OK(SetSignalHandler(signum, new_handler, &old_handler)); impl_->old_signal_handlers_.push_back(old_handler); } @@ -527,10 +539,8 @@ Status FlightServerBase::Serve() { // Restore signal handlers for (size_t i = 0; i < impl_->signals_.size(); ++i) { - int ret = sigaction(impl_->signals_[i], &impl_->old_signal_handlers_[i], nullptr); - if (ret != 0) { - return Status::IOError("sigaction call failed"); - } + RETURN_NOT_OK( + SetSignalHandler(impl_->signals_[i], impl_->old_signal_handlers_[i], nullptr)); } return Status::OK(); @@ -659,11 +669,15 @@ class RecordBatchStream::RecordBatchStreamImpl { int dictionary_index_ = 0; }; +FlightDataStream::~FlightDataStream() {} + RecordBatchStream::RecordBatchStream(const std::shared_ptr& reader, MemoryPool* pool) { impl_.reset(new RecordBatchStreamImpl(reader, pool)); } +RecordBatchStream::~RecordBatchStream() {} + std::shared_ptr RecordBatchStream::schema() { return impl_->schema(); } Status RecordBatchStream::GetSchemaPayload(FlightPayload* payload) { diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h index 28e87aa2fa96c..7164b64c4aba3 100644 --- a/cpp/src/arrow/flight/server.h +++ b/cpp/src/arrow/flight/server.h @@ -25,11 +25,11 @@ #include #include "arrow/flight/server_auth.h" -#include "arrow/flight/types.h" // IWYU pragma: keep +#include "arrow/flight/types.h" // IWYU pragma: keep +#include "arrow/flight/visibility.h" // IWYU pragma: keep #include "arrow/ipc/dictionary.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" -#include "arrow/util/visibility.h" namespace arrow { @@ -41,9 +41,9 @@ namespace flight { /// \brief Interface that produces a sequence of IPC payloads to be sent in /// FlightData protobuf messages -class ARROW_EXPORT FlightDataStream { +class ARROW_FLIGHT_EXPORT FlightDataStream { public: - virtual ~FlightDataStream() = default; + virtual ~FlightDataStream(); virtual std::shared_ptr schema() = 0; @@ -57,12 +57,13 @@ class ARROW_EXPORT FlightDataStream { /// \brief A basic implementation of FlightDataStream that will provide /// a sequence of FlightData messages to be written to a gRPC stream -class ARROW_EXPORT RecordBatchStream : public FlightDataStream { +class ARROW_FLIGHT_EXPORT RecordBatchStream : public FlightDataStream { public: /// \param[in] reader produces a sequence of record batches /// \param[in,out] pool a MemoryPool to use for allocations explicit RecordBatchStream(const std::shared_ptr& reader, MemoryPool* pool = default_memory_pool()); + ~RecordBatchStream() override; std::shared_ptr schema() override; Status GetSchemaPayload(FlightPayload* payload) override; @@ -73,22 +74,33 @@ class ARROW_EXPORT RecordBatchStream : public FlightDataStream { std::unique_ptr impl_; }; +// Silence warning +// "non dll-interface class RecordBatchReader used as base for dll-interface class" +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4275) +#endif + /// \brief A reader for IPC payloads uploaded by a client -class ARROW_EXPORT FlightMessageReader : public RecordBatchReader { +class ARROW_FLIGHT_EXPORT FlightMessageReader : public RecordBatchReader { public: /// \brief Get the descriptor for this upload. virtual const FlightDescriptor& descriptor() const = 0; }; +#ifdef _MSC_VER +#pragma warning(pop) +#endif + /// \brief Call state/contextual data. -class ARROW_EXPORT ServerCallContext { +class ARROW_FLIGHT_EXPORT ServerCallContext { public: virtual ~ServerCallContext() = default; /// \brief The name of the authenticated peer (may be the empty string) virtual const std::string& peer_identity() const = 0; }; -class ARROW_EXPORT FlightServerOptions { +class ARROW_FLIGHT_EXPORT FlightServerOptions { public: explicit FlightServerOptions(const Location& location_); @@ -100,7 +112,7 @@ class ARROW_EXPORT FlightServerOptions { /// \brief Skeleton RPC server implementation which can be used to create /// custom servers by implementing its abstract methods -class ARROW_EXPORT FlightServerBase { +class ARROW_FLIGHT_EXPORT FlightServerBase { public: FlightServerBase(); virtual ~FlightServerBase(); diff --git a/cpp/src/arrow/flight/server_auth.h b/cpp/src/arrow/flight/server_auth.h index 5d06894fc1d75..b1ccb096d7b2f 100644 --- a/cpp/src/arrow/flight/server_auth.h +++ b/cpp/src/arrow/flight/server_auth.h @@ -21,8 +21,8 @@ #include +#include "arrow/flight/visibility.h" #include "arrow/status.h" -#include "arrow/util/visibility.h" namespace arrow { @@ -30,7 +30,7 @@ namespace flight { /// \brief A reader for messages from the client during an /// authentication handshake. -class ARROW_EXPORT ServerAuthReader { +class ARROW_FLIGHT_EXPORT ServerAuthReader { public: virtual ~ServerAuthReader() = default; virtual Status Read(std::string* token) = 0; @@ -38,7 +38,7 @@ class ARROW_EXPORT ServerAuthReader { /// \brief A writer for messages to the client during an /// authentication handshake. -class ARROW_EXPORT ServerAuthSender { +class ARROW_FLIGHT_EXPORT ServerAuthSender { public: virtual ~ServerAuthSender() = default; virtual Status Write(const std::string& message) = 0; @@ -50,7 +50,7 @@ class ARROW_EXPORT ServerAuthSender { /// mechanisms. /// An implementation may need to track some state, e.g. a mapping of /// client tokens to authenticated identities. -class ARROW_EXPORT ServerAuthHandler { +class ARROW_FLIGHT_EXPORT ServerAuthHandler { public: virtual ~ServerAuthHandler(); /// \brief Authenticate the client on initial connection. The server @@ -67,7 +67,7 @@ class ARROW_EXPORT ServerAuthHandler { }; /// \brief An authentication mechanism that does nothing. -class ARROW_EXPORT NoOpAuthHandler : public ServerAuthHandler { +class ARROW_FLIGHT_EXPORT NoOpAuthHandler : public ServerAuthHandler { public: ~NoOpAuthHandler() override; Status Authenticate(ServerAuthSender* outgoing, ServerAuthReader* incoming) override; diff --git a/cpp/src/arrow/flight/test-util.cc b/cpp/src/arrow/flight/test-util.cc index f870dcbfa7171..b20a4cbf9dea1 100644 --- a/cpp/src/arrow/flight/test-util.cc +++ b/cpp/src/arrow/flight/test-util.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/flight/platform.h" + #ifdef __APPLE__ #include #include @@ -58,6 +60,12 @@ Status ResolveCurrentExecutable(fs::path* out) { return Status::Invalid("Can't resolve current exe: path too large"); } *out = fs::canonical(buf, ec); +#elif defined(_WIN32) + char buf[MAX_PATH + 1]; + if (!GetModuleFileNameA(NULL, buf, sizeof(buf))) { + return Status::Invalid("Can't get executable file path"); + } + *out = fs::canonical(buf, ec); #else ARROW_UNUSED(ec); return Status::NotImplemented("Not available on this system"); @@ -72,6 +80,10 @@ Status ResolveCurrentExecutable(fs::path* out) { } // namespace +static int next_listen_port_ = 30001; + +int GetListenPort() { return next_listen_port_++; } + void TestServer::Start() { namespace fs = boost::filesystem; @@ -105,7 +117,12 @@ void TestServer::Start() { int TestServer::Stop() { if (server_process_ && server_process_->valid()) { +#ifndef _WIN32 kill(server_process_->id(), SIGTERM); +#else + // This would use SIGKILL on POSIX, which is more brutal than SIGTERM + server_process_->terminate(); +#endif server_process_->wait(); return server_process_->exit_code(); } else { diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h index b5bc31a4c3bd0..1347e05b44131 100644 --- a/cpp/src/arrow/flight/test-util.h +++ b/cpp/src/arrow/flight/test-util.h @@ -27,6 +27,7 @@ #include "arrow/flight/client_auth.h" #include "arrow/flight/server_auth.h" #include "arrow/flight/types.h" +#include "arrow/flight/visibility.h" namespace boost { namespace process { @@ -42,8 +43,22 @@ namespace flight { // ---------------------------------------------------------------------- // Fixture to use for running test servers -class ARROW_EXPORT TestServer { +// Get a TCP port number to listen on. This is a different number every time, +// as reusing the same port accross tests can produce spurious "Stream removed" +// errors as Windows. +ARROW_FLIGHT_EXPORT +int GetListenPort(); + +// Get a TCP port number to listen on. This is a different number every time, +// as reusing the same port accross tests can produce spurious "Stream removed" +// errors as Windows. +ARROW_FLIGHT_EXPORT +int GetListenPort(); + +class ARROW_FLIGHT_EXPORT TestServer { public: + explicit TestServer(const std::string& executable_name) + : executable_name_(executable_name), port_(GetListenPort()) {} explicit TestServer(const std::string& executable_name, int port) : executable_name_(executable_name), port_(port) {} @@ -61,7 +76,7 @@ class ARROW_EXPORT TestServer { std::shared_ptr<::boost::process::child> server_process_; }; -class ARROW_EXPORT InProcessTestServer { +class ARROW_FLIGHT_EXPORT InProcessTestServer { public: explicit InProcessTestServer(std::unique_ptr server, const Location& location) @@ -80,7 +95,14 @@ class ARROW_EXPORT InProcessTestServer { // ---------------------------------------------------------------------- // A RecordBatchReader for serving a sequence of in-memory record batches -class BatchIterator : public RecordBatchReader { +// Silence warning +// "non dll-interface class RecordBatchReader used as base for dll-interface class" +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4275) +#endif + +class ARROW_FLIGHT_EXPORT BatchIterator : public RecordBatchReader { public: BatchIterator(const std::shared_ptr& schema, const std::vector>& batches) @@ -103,30 +125,37 @@ class BatchIterator : public RecordBatchReader { size_t position_; }; +#ifdef _MSC_VER +#pragma warning(pop) +#endif + // ---------------------------------------------------------------------- // Example data for test-server and unit tests using BatchVector = std::vector>; -ARROW_EXPORT std::shared_ptr ExampleIntSchema(); +ARROW_FLIGHT_EXPORT +std::shared_ptr ExampleIntSchema(); -ARROW_EXPORT std::shared_ptr ExampleStringSchema(); +ARROW_FLIGHT_EXPORT +std::shared_ptr ExampleStringSchema(); -ARROW_EXPORT std::shared_ptr ExampleDictSchema(); +ARROW_FLIGHT_EXPORT +std::shared_ptr ExampleDictSchema(); -ARROW_EXPORT +ARROW_FLIGHT_EXPORT Status ExampleIntBatches(BatchVector* out); -ARROW_EXPORT +ARROW_FLIGHT_EXPORT Status ExampleDictBatches(BatchVector* out); -ARROW_EXPORT +ARROW_FLIGHT_EXPORT std::vector ExampleFlightInfo(); -ARROW_EXPORT +ARROW_FLIGHT_EXPORT std::vector ExampleActionTypes(); -ARROW_EXPORT +ARROW_FLIGHT_EXPORT Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, int64_t total_records, int64_t total_bytes, FlightInfo::Data* out); @@ -135,7 +164,7 @@ Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor, // A pair of authentication handlers that check for a predefined password // and set the peer identity to a predefined username. -class ARROW_EXPORT TestServerAuthHandler : public ServerAuthHandler { +class ARROW_FLIGHT_EXPORT TestServerAuthHandler : public ServerAuthHandler { public: explicit TestServerAuthHandler(const std::string& username, const std::string& password); @@ -148,7 +177,7 @@ class ARROW_EXPORT TestServerAuthHandler : public ServerAuthHandler { std::string password_; }; -class ARROW_EXPORT TestClientAuthHandler : public ClientAuthHandler { +class ARROW_FLIGHT_EXPORT TestClientAuthHandler : public ClientAuthHandler { public: explicit TestClientAuthHandler(const std::string& username, const std::string& password); diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index ba07b999635aa..8d37225263606 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -26,8 +26,8 @@ #include #include +#include "arrow/flight/visibility.h" #include "arrow/ipc/writer.h" -#include "arrow/util/visibility.h" namespace arrow { @@ -50,7 +50,7 @@ class Uri; namespace flight { /// \brief A type of action that can be performed with the DoAction RPC -struct ActionType { +struct ARROW_FLIGHT_EXPORT ActionType { /// Name of action std::string type; @@ -59,13 +59,13 @@ struct ActionType { }; /// \brief Opaque selection critera for ListFlights RPC -struct Criteria { +struct ARROW_FLIGHT_EXPORT Criteria { /// Opaque criteria expression, dependent on server implementation std::string expression; }; /// \brief An action to perform with the DoAction RPC -struct Action { +struct ARROW_FLIGHT_EXPORT Action { /// The action type std::string type; @@ -74,15 +74,15 @@ struct Action { }; /// \brief Opaque result returned after executing an action -struct Result { +struct ARROW_FLIGHT_EXPORT Result { std::shared_ptr body; }; /// \brief A message received after completing a DoPut stream -struct PutResult {}; +struct ARROW_FLIGHT_EXPORT PutResult {}; /// \brief A request to retrieve or generate a dataset -struct FlightDescriptor { +struct ARROW_FLIGHT_EXPORT FlightDescriptor { enum DescriptorType { UNKNOWN = 0, /// Unused PATH = 1, /// Named path identifying a dataset @@ -117,7 +117,7 @@ struct FlightDescriptor { /// \brief Data structure providing an opaque identifier or credential to use /// when requesting a data stream with the DoGet RPC -struct Ticket { +struct ARROW_FLIGHT_EXPORT Ticket { std::string ticket; }; @@ -130,7 +130,7 @@ static const char* kSchemeGrpcUnix = "grpc+unix"; static const char* kSchemeGrpcTls = "grpc+tls"; /// \brief A host location (a URI) -struct Location { +struct ARROW_FLIGHT_EXPORT Location { public: /// \brief Initialize a blank location. Location(); @@ -174,7 +174,7 @@ struct Location { /// \brief A flight ticket and list of locations where the ticket can be /// redeemed -struct FlightEndpoint { +struct ARROW_FLIGHT_EXPORT FlightEndpoint { /// Opaque ticket identify; use with DoGet RPC Ticket ticket; @@ -187,14 +187,14 @@ struct FlightEndpoint { /// \brief Staging data structure for messages about to be put on the wire /// /// This structure corresponds to FlightData in the protocol. -struct FlightPayload { +struct ARROW_FLIGHT_EXPORT FlightPayload { std::shared_ptr descriptor; ipc::internal::IpcPayload ipc_message; }; /// \brief The access coordinates for retireval of a dataset, returned by /// GetFlightInfo -class FlightInfo { +class ARROW_FLIGHT_EXPORT FlightInfo { public: struct Data { std::string schema; @@ -239,7 +239,7 @@ class FlightInfo { }; /// \brief An iterator to FlightInfo instances returned by ListFlights -class ARROW_EXPORT FlightListing { +class ARROW_FLIGHT_EXPORT FlightListing { public: virtual ~FlightListing() = default; @@ -251,7 +251,7 @@ class ARROW_EXPORT FlightListing { }; /// \brief An iterator to Result instances returned by DoAction -class ARROW_EXPORT ResultStream { +class ARROW_FLIGHT_EXPORT ResultStream { public: virtual ~ResultStream() = default; @@ -264,7 +264,7 @@ class ARROW_EXPORT ResultStream { // \brief Create a FlightListing from a vector of FlightInfo objects. This can // be iterated once, then it is consumed -class ARROW_EXPORT SimpleFlightListing : public FlightListing { +class ARROW_FLIGHT_EXPORT SimpleFlightListing : public FlightListing { public: explicit SimpleFlightListing(const std::vector& flights); explicit SimpleFlightListing(std::vector&& flights); @@ -276,7 +276,7 @@ class ARROW_EXPORT SimpleFlightListing : public FlightListing { std::vector flights_; }; -class ARROW_EXPORT SimpleResultStream : public ResultStream { +class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream { public: explicit SimpleResultStream(std::vector&& results); Status Next(std::unique_ptr* result) override; diff --git a/cpp/src/arrow/flight/visibility.h b/cpp/src/arrow/flight/visibility.h new file mode 100644 index 0000000000000..bdee8b751d8a3 --- /dev/null +++ b/cpp/src/arrow/flight/visibility.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#if defined(_WIN32) || defined(__CYGWIN__) +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4251) +#else +#pragma GCC diagnostic ignored "-Wattributes" +#endif + +#ifdef ARROW_FLIGHT_STATIC +#define ARROW_FLIGHT_EXPORT +#elif defined(ARROW_FLIGHT_EXPORTING) +#define ARROW_FLIGHT_EXPORT __declspec(dllexport) +#else +#define ARROW_FLIGHT_EXPORT __declspec(dllimport) +#endif + +#define ARROW_FLIGHT_NO_EXPORT +#else // Not Windows +#ifndef ARROW_FLIGHT_EXPORT +#define ARROW_FLIGHT_EXPORT __attribute__((visibility("default"))) +#endif +#ifndef ARROW_FLIGHT_NO_EXPORT +#define ARROW_FLIGHT_NO_EXPORT __attribute__((visibility("hidden"))) +#endif +#endif // Non-Windows + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif diff --git a/cpp/src/arrow/io/mman.h b/cpp/src/arrow/io/mman.h index 139ee7e226587..6125492560967 100644 --- a/cpp/src/arrow/io/mman.h +++ b/cpp/src/arrow/io/mman.h @@ -8,13 +8,6 @@ #ifndef _MMAN_WIN32_H #define _MMAN_WIN32_H -// Allow use of features specific to Windows XP or later. -#ifndef _WIN32_WINNT -// Change this to the appropriate value to target other versions of Windows. -#define _WIN32_WINNT 0x0501 - -#endif - #include "arrow/util/windows_compatibility.h" #include diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc index 4db31570d8164..409ba60129b1c 100644 --- a/cpp/src/arrow/python/flight.cc +++ b/cpp/src/arrow/python/flight.cc @@ -20,6 +20,7 @@ #include "arrow/flight/internal.h" #include "arrow/python/flight.h" +#include "arrow/util/io-util.h" #include "arrow/util/logging.h" using arrow::flight::FlightPayload; @@ -137,12 +138,10 @@ Status PyFlightServer::ServeWithSignals() { // an active signal handler for SIGINT and SIGTERM. std::vector signals; for (const int signum : {SIGINT, SIGTERM}) { - struct sigaction handler; - int ret = sigaction(signum, nullptr, &handler); - if (ret != 0) { - return Status::IOError("sigaction call failed"); - } - if (handler.sa_handler != SIG_DFL && handler.sa_handler != SIG_IGN) { + ::arrow::internal::SignalHandler handler; + RETURN_NOT_OK(::arrow::internal::GetSignalHandler(signum, &handler)); + auto cb = handler.callback(); + if (cb != SIG_DFL && cb != SIG_IGN) { signals.push_back(signum); } } diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc index cb1237c29d8d3..cda08c37776ad 100644 --- a/cpp/src/arrow/util/io-util.cc +++ b/cpp/src/arrow/util/io-util.cc @@ -34,6 +34,7 @@ #include #include +#include #include #include #include // IWYU pragma: keep @@ -822,5 +823,80 @@ Status TemporaryDir::Make(const std::string& prefix, std::unique_ptr #include +#if ARROW_HAVE_SIGACTION +#include // Needed for struct sigaction +#endif + #include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/util/macros.h" @@ -205,6 +213,37 @@ class ARROW_EXPORT TemporaryDir { explicit TemporaryDir(PlatformFilename&&); }; +class ARROW_EXPORT SignalHandler { + public: + typedef void (*Callback)(int); + + SignalHandler(); + explicit SignalHandler(Callback cb); +#if ARROW_HAVE_SIGACTION + explicit SignalHandler(const struct sigaction& sa); +#endif + + Callback callback() const; +#if ARROW_HAVE_SIGACTION + const struct sigaction& action() const; +#endif + + protected: +#if ARROW_HAVE_SIGACTION + // Storing the full sigaction allows to restore the entire signal handling + // configuration. + struct sigaction sa_; +#else + Callback cb_; +#endif +}; + +ARROW_EXPORT +Status GetSignalHandler(int signum, SignalHandler* out); +ARROW_EXPORT +Status SetSignalHandler(int signum, SignalHandler handler, + SignalHandler* old_handler = NULLPTR); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/windows_compatibility.h b/cpp/src/arrow/util/windows_compatibility.h index 7b70e287c3f9e..70c4313a5420c 100644 --- a/cpp/src/arrow/util/windows_compatibility.h +++ b/cpp/src/arrow/util/windows_compatibility.h @@ -26,10 +26,15 @@ #define WIN32_LEAN_AND_MEAN +// Set Windows 7 as a conservative minimum for Apache Arrow +#if defined(_WIN32_WINNT) && _WIN32_WINNT < 0x601 +#undef _WIN32_WINNT +#endif +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x601 +#endif + #include #include -// TODO(wesm): address when/if we add windows support -// #include - #endif // _WIN32 diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index d7f1aba21703c..4927aec1bcab1 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -490,7 +490,7 @@ endif() # Flight if(PYARROW_BUILD_FLIGHT) if(PYARROW_BUNDLE_ARROW_CPP) - # TODO: + # TODO: need to implement FindArrowFlight.cmake first message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow") endif() # We do NOT want to link gRPC or any other Flight dependency diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index 907d4f236c80e..9ce2264ee31b1 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -18,6 +18,7 @@ import base64 import contextlib +import os import socket import tempfile import threading @@ -319,8 +320,10 @@ def test_flight_get_info(): flight.Location.for_grpc_tcp('localhost', 5005) +@pytest.mark.skipif(os.name == 'nt', + reason="Unix sockets can't be tested on Windows") def test_flight_domain_socket(): - """Try a simple do_get call over a domain socket.""" + """Try a simple do_get call over a Unix domain socket.""" table = simple_ints_table() with tempfile.NamedTemporaryFile() as sock: @@ -395,7 +398,7 @@ def test_timeout_passes(): """Make sure timeouts do not fire on fast requests.""" with flight_server(ConstantFlightServer) as server_location: client = flight.FlightClient.connect(server_location) - options = flight.FlightCallOptions(timeout=0.2) + options = flight.FlightCallOptions(timeout=5.0) client.do_get(flight.Ticket(b'ints'), options=options).read_all()