From e8b220760ac4fba21f5983b1ffd3ac6bdf890fa6 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 17 Apr 2019 18:45:24 +0200 Subject: [PATCH] ARROW-5091: [Flight] Rename FlightGetInfo message to FlightInfo Author: Antoine Pitrou Closes #4143 from pitrou/ARROW-5091-rename-flightgetinfo and squashes the following commits: 1663cdbc0 ARROW-5091: Rename FlightGetInfo message to FlightInfo --- cpp/src/arrow/flight/client.cc | 6 +++--- cpp/src/arrow/flight/internal.cc | 6 +++--- cpp/src/arrow/flight/internal.h | 4 ++-- cpp/src/arrow/flight/server.cc | 4 ++-- format/Flight.proto | 8 ++++---- .../org/apache/arrow/flight/FlightInfo.java | 19 +++++++++---------- .../org/apache/arrow/flight/FlightServer.java | 4 ++-- .../apache/arrow/flight/FlightService.java | 5 ++--- .../arrow/flight/TestBasicOperation.java | 5 ++--- 9 files changed, 29 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 4d839c5fe4a93..52f5b9e95e879 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -255,12 +255,12 @@ class FlightClient::FlightClientImpl { ClientRpc rpc; RETURN_NOT_OK(rpc.SetToken(auth_handler_.get())); - std::unique_ptr> stream( + std::unique_ptr> stream( stub_->ListFlights(&rpc.context, pb_criteria)); std::vector flights; - pb::FlightGetInfo pb_info; + pb::FlightInfo pb_info; while (stream->Read(&pb_info)) { FlightInfo::Data info_data; RETURN_NOT_OK(internal::FromProto(pb_info, &info_data)); @@ -314,7 +314,7 @@ class FlightClient::FlightClientImpl { Status GetFlightInfo(const FlightDescriptor& descriptor, std::unique_ptr* info) { pb::FlightDescriptor pb_descriptor; - pb::FlightGetInfo pb_response; + pb::FlightInfo pb_response; RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor)); diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc index 0c4ba4591afb9..4fce14c7e39bc 100644 --- a/cpp/src/arrow/flight/internal.cc +++ b/cpp/src/arrow/flight/internal.cc @@ -206,9 +206,9 @@ Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_desc return Status::OK(); } -// FlightGetInfo +// FlightInfo -Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info) { +Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) { RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor)); info->schema = pb_info.schema(); @@ -232,7 +232,7 @@ Status SchemaToString(const Schema& schema, std::string* out) { return Status::OK(); } -Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info) { +Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) { // clear any repeated fields pb_info->clear_endpoint(); diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h index cf58cec1a1a87..db95eb3746038 100644 --- a/cpp/src/arrow/flight/internal.h +++ b/cpp/src/arrow/flight/internal.h @@ -77,10 +77,10 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor, std::unique_ptr* message); Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr); Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint); -Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info); +Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info); Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr); -Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info); +Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info); Status ToProto(const ActionType& type, pb::ActionType* pb_type); Status ToProto(const Action& action, pb::Action* pb_action); Status ToProto(const Result& result, pb::Result* pb_result); diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 60646d4c37b89..8cb6921c33e7b 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -237,7 +237,7 @@ class FlightServiceImpl : public FlightService::Service { } grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request, - ServerWriter* writer) { + ServerWriter* writer) { GrpcServerCallContext flight_context; GRPC_RETURN_NOT_GRPC_OK(CheckAuth(context, flight_context)); @@ -257,7 +257,7 @@ class FlightServiceImpl : public FlightService::Service { } grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request, - pb::FlightGetInfo* response) { + pb::FlightInfo* response) { CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null"); GrpcServerCallContext flight_context; GRPC_RETURN_NOT_GRPC_OK(CheckAuth(context, flight_context)); diff --git a/format/Flight.proto b/format/Flight.proto index 0131dac325e51..1fcefe9a63e31 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -45,7 +45,7 @@ service FlightService { * the subset of streams that can be listed via this interface. Each flight * service allows its own definition of how to consume criteria. */ - rpc ListFlights(Criteria) returns (stream FlightGetInfo) {} + rpc ListFlights(Criteria) returns (stream FlightInfo) {} /* * For a given FlightDescriptor, get information about how the flight can be @@ -59,7 +59,7 @@ service FlightService { * available for consumption for the duration defined by the specific flight * service. */ - rpc GetFlightInfo(FlightDescriptor) returns (FlightGetInfo) {} + rpc GetFlightInfo(FlightDescriptor) returns (FlightInfo) {} /* * Retrieve a single stream associated with a particular descriptor @@ -212,10 +212,10 @@ message FlightDescriptor { } /* - * The access coordinates for retrieval of a dataset. With a FlightGetInfo, a + * The access coordinates for retrieval of a dataset. With a FlightInfo, a * consumer is able to determine how to retrieve a dataset. */ -message FlightGetInfo { +message FlightInfo { // schema of the dataset as described in Schema.fbs::Schema. bytes schema = 1; diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightInfo.java index 9accbbe434a10..3b73c97d91734 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.arrow.flight.impl.Flight; -import org.apache.arrow.flight.impl.Flight.FlightGetInfo; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; @@ -53,20 +52,20 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List 0 ? + final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer(); + schema = pbFlightInfo.getSchema().size() > 0 ? MessageSerializer.deserializeSchema( new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf)))) : new Schema(ImmutableList.of()); } catch (IOException e) { throw new RuntimeException(e); } - descriptor = new FlightDescriptor(flightGetInfo.getFlightDescriptor()); - endpoints = flightGetInfo.getEndpointList().stream().map(t -> new FlightEndpoint(t)).collect(Collectors.toList()); - bytes = flightGetInfo.getTotalBytes(); - records = flightGetInfo.getTotalRecords(); + descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor()); + endpoints = pbFlightInfo.getEndpointList().stream().map(t -> new FlightEndpoint(t)).collect(Collectors.toList()); + bytes = pbFlightInfo.getTotalBytes(); + records = pbFlightInfo.getTotalRecords(); } public Schema getSchema() { @@ -89,7 +88,7 @@ public List getEndpoints() { return endpoints; } - FlightGetInfo toProtocol() { + Flight.FlightInfo toProtocol() { // Encode schema in a Message payload ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { @@ -97,7 +96,7 @@ FlightGetInfo toProtocol() { } catch (IOException e) { throw new RuntimeException(e); } - return Flight.FlightGetInfo.newBuilder() + return Flight.FlightInfo.newBuilder() .addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList())) .setSchema(ByteString.copyFrom(baos.toByteArray())) .setFlightDescriptor(descriptor.toProtocol()) diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java index 9aaffb81fe7ed..320011ddb1c32 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightServer.java @@ -22,7 +22,7 @@ import org.apache.arrow.flight.auth.ServerAuthHandler; import org.apache.arrow.flight.auth.ServerAuthInterceptor; -import org.apache.arrow.flight.impl.Flight.FlightGetInfo; +import org.apache.arrow.flight.impl.Flight.FlightInfo; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -100,7 +100,7 @@ public interface OutputFlight { public interface FlightServerHandler { - public FlightGetInfo getFlightInfo(String descriptor) throws Exception; + public FlightInfo getFlightInfo(String descriptor) throws Exception; public OutputFlight setupFlight(VectorSchemaRoot root); diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java index 4e51f3cf10f1e..d380dba93de42 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java @@ -28,7 +28,6 @@ import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.flight.impl.Flight.ActionType; import org.apache.arrow.flight.impl.Flight.Empty; -import org.apache.arrow.flight.impl.Flight.FlightGetInfo; import org.apache.arrow.flight.impl.Flight.HandshakeRequest; import org.apache.arrow.flight.impl.Flight.HandshakeResponse; import org.apache.arrow.flight.impl.Flight.PutResult; @@ -67,7 +66,7 @@ public StreamObserver handshake(StreamObserver responseObserver) { + public void listFlights(Flight.Criteria criteria, StreamObserver responseObserver) { try { producer.listFlights(makeContext((ServerCallStreamObserver) responseObserver), new Criteria(criteria), StreamPipe.wrap(responseObserver, FlightInfo::toProtocol)); @@ -181,7 +180,7 @@ public StreamObserver doPutCustom(final StreamObserver } @Override - public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver responseObserver) { + public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver responseObserver) { try { FlightInfo info = producer .getFlightInfo(makeContext((ServerCallStreamObserver) responseObserver), new FlightDescriptor(request)); diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 113ef4bbcd643..268580d8e6247 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -25,7 +25,6 @@ import org.apache.arrow.flight.auth.ServerAuthHandler; import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType; -import org.apache.arrow.flight.impl.Flight.FlightGetInfo; import org.apache.arrow.flight.impl.Flight.PutResult; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -166,7 +165,7 @@ public Producer(BufferAllocator allocator) { @Override public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { - FlightGetInfo getInfo = FlightGetInfo.newBuilder() + Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder() .setFlightDescriptor(Flight.FlightDescriptor.newBuilder() .setType(DescriptorType.CMD) .setCmd(ByteString.copyFrom("cool thing", Charsets.UTF_8))) @@ -227,7 +226,7 @@ public void close() throws Exception { @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { - FlightGetInfo getInfo = FlightGetInfo.newBuilder() + Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder() .setFlightDescriptor(Flight.FlightDescriptor.newBuilder() .setType(DescriptorType.CMD) .setCmd(ByteString.copyFrom("cool thing", Charsets.UTF_8)))