Skip to content

Commit

Permalink
ARROW-5091: [Flight] Rename FlightGetInfo message to FlightInfo
Browse files Browse the repository at this point in the history
Author: Antoine Pitrou <antoine@python.org>

Closes #4143 from pitrou/ARROW-5091-rename-flightgetinfo and squashes the following commits:

1663cdb <Antoine Pitrou> ARROW-5091:  Rename FlightGetInfo message to FlightInfo
  • Loading branch information
pitrou committed Apr 17, 2019
1 parent b496913 commit e8b2207
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 32 deletions.
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/client.cc
Expand Up @@ -255,12 +255,12 @@ class FlightClient::FlightClientImpl {

ClientRpc rpc;
RETURN_NOT_OK(rpc.SetToken(auth_handler_.get()));
std::unique_ptr<grpc::ClientReader<pb::FlightGetInfo>> stream(
std::unique_ptr<grpc::ClientReader<pb::FlightInfo>> stream(
stub_->ListFlights(&rpc.context, pb_criteria));

std::vector<FlightInfo> flights;

pb::FlightGetInfo pb_info;
pb::FlightInfo pb_info;
while (stream->Read(&pb_info)) {
FlightInfo::Data info_data;
RETURN_NOT_OK(internal::FromProto(pb_info, &info_data));
Expand Down Expand Up @@ -314,7 +314,7 @@ class FlightClient::FlightClientImpl {
Status GetFlightInfo(const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info) {
pb::FlightDescriptor pb_descriptor;
pb::FlightGetInfo pb_response;
pb::FlightInfo pb_response;

RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor));

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/internal.cc
Expand Up @@ -206,9 +206,9 @@ Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_desc
return Status::OK();
}

// FlightGetInfo
// FlightInfo

Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info) {
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {
RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor));

info->schema = pb_info.schema();
Expand All @@ -232,7 +232,7 @@ Status SchemaToString(const Schema& schema, std::string* out) {
return Status::OK();
}

Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info) {
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
// clear any repeated fields
pb_info->clear_endpoint();

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/internal.h
Expand Up @@ -77,10 +77,10 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor,
std::unique_ptr<ipc::Message>* message);
Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info);
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);

Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info);
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Status ToProto(const Action& action, pb::Action* pb_action);
Status ToProto(const Result& result, pb::Result* pb_result);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/server.cc
Expand Up @@ -237,7 +237,7 @@ class FlightServiceImpl : public FlightService::Service {
}

grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request,
ServerWriter<pb::FlightGetInfo>* writer) {
ServerWriter<pb::FlightInfo>* writer) {
GrpcServerCallContext flight_context;
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(context, flight_context));

Expand All @@ -257,7 +257,7 @@ class FlightServiceImpl : public FlightService::Service {
}

grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request,
pb::FlightGetInfo* response) {
pb::FlightInfo* response) {
CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null");
GrpcServerCallContext flight_context;
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(context, flight_context));
Expand Down
8 changes: 4 additions & 4 deletions format/Flight.proto
Expand Up @@ -45,7 +45,7 @@ service FlightService {
* the subset of streams that can be listed via this interface. Each flight
* service allows its own definition of how to consume criteria.
*/
rpc ListFlights(Criteria) returns (stream FlightGetInfo) {}
rpc ListFlights(Criteria) returns (stream FlightInfo) {}

/*
* For a given FlightDescriptor, get information about how the flight can be
Expand All @@ -59,7 +59,7 @@ service FlightService {
* available for consumption for the duration defined by the specific flight
* service.
*/
rpc GetFlightInfo(FlightDescriptor) returns (FlightGetInfo) {}
rpc GetFlightInfo(FlightDescriptor) returns (FlightInfo) {}

/*
* Retrieve a single stream associated with a particular descriptor
Expand Down Expand Up @@ -212,10 +212,10 @@ message FlightDescriptor {
}

/*
* The access coordinates for retrieval of a dataset. With a FlightGetInfo, a
* The access coordinates for retrieval of a dataset. With a FlightInfo, a
* consumer is able to determine how to retrieve a dataset.
*/
message FlightGetInfo {
message FlightInfo {
// schema of the dataset as described in Schema.fbs::Schema.
bytes schema = 1;

Expand Down
19 changes: 9 additions & 10 deletions java/flight/src/main/java/org/apache/arrow/flight/FlightInfo.java
Expand Up @@ -25,7 +25,6 @@
import java.util.stream.Collectors;

import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightGetInfo;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
Expand Down Expand Up @@ -53,20 +52,20 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
this.records = records;
}

FlightInfo(FlightGetInfo flightGetInfo) {
FlightInfo(Flight.FlightInfo pbFlightInfo) {
try {
final ByteBuffer schemaBuf = flightGetInfo.getSchema().asReadOnlyByteBuffer();
schema = flightGetInfo.getSchema().size() > 0 ?
final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer();
schema = pbFlightInfo.getSchema().size() > 0 ?
MessageSerializer.deserializeSchema(
new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
: new Schema(ImmutableList.of());
} catch (IOException e) {
throw new RuntimeException(e);
}
descriptor = new FlightDescriptor(flightGetInfo.getFlightDescriptor());
endpoints = flightGetInfo.getEndpointList().stream().map(t -> new FlightEndpoint(t)).collect(Collectors.toList());
bytes = flightGetInfo.getTotalBytes();
records = flightGetInfo.getTotalRecords();
descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor());
endpoints = pbFlightInfo.getEndpointList().stream().map(t -> new FlightEndpoint(t)).collect(Collectors.toList());
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
}

public Schema getSchema() {
Expand All @@ -89,15 +88,15 @@ public List<FlightEndpoint> getEndpoints() {
return endpoints;
}

FlightGetInfo toProtocol() {
Flight.FlightInfo toProtocol() {
// Encode schema in a Message payload
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Flight.FlightGetInfo.newBuilder()
return Flight.FlightInfo.newBuilder()
.addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList()))
.setSchema(ByteString.copyFrom(baos.toByteArray()))
.setFlightDescriptor(descriptor.toProtocol())
Expand Down
Expand Up @@ -22,7 +22,7 @@

import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthInterceptor;
import org.apache.arrow.flight.impl.Flight.FlightGetInfo;
import org.apache.arrow.flight.impl.Flight.FlightInfo;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;

Expand Down Expand Up @@ -100,7 +100,7 @@ public interface OutputFlight {

public interface FlightServerHandler {

public FlightGetInfo getFlightInfo(String descriptor) throws Exception;
public FlightInfo getFlightInfo(String descriptor) throws Exception;

public OutputFlight setupFlight(VectorSchemaRoot root);

Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.ActionType;
import org.apache.arrow.flight.impl.Flight.Empty;
import org.apache.arrow.flight.impl.Flight.FlightGetInfo;
import org.apache.arrow.flight.impl.Flight.HandshakeRequest;
import org.apache.arrow.flight.impl.Flight.HandshakeResponse;
import org.apache.arrow.flight.impl.Flight.PutResult;
Expand Down Expand Up @@ -67,7 +66,7 @@ public StreamObserver<HandshakeRequest> handshake(StreamObserver<HandshakeRespon
}

@Override
public void listFlights(Flight.Criteria criteria, StreamObserver<FlightGetInfo> responseObserver) {
public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> responseObserver) {
try {
producer.listFlights(makeContext((ServerCallStreamObserver<?>) responseObserver), new Criteria(criteria),
StreamPipe.wrap(responseObserver, FlightInfo::toProtocol));
Expand Down Expand Up @@ -181,7 +180,7 @@ public StreamObserver<ArrowMessage> doPutCustom(final StreamObserver<PutResult>
}

@Override
public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<FlightGetInfo> responseObserver) {
public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight.FlightInfo> responseObserver) {
try {
FlightInfo info = producer
.getFlightInfo(makeContext((ServerCallStreamObserver<?>) responseObserver), new FlightDescriptor(request));
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType;
import org.apache.arrow.flight.impl.Flight.FlightGetInfo;
import org.apache.arrow.flight.impl.Flight.PutResult;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -166,7 +165,7 @@ public Producer(BufferAllocator allocator) {
@Override
public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
FlightGetInfo getInfo = FlightGetInfo.newBuilder()
Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder()
.setFlightDescriptor(Flight.FlightDescriptor.newBuilder()
.setType(DescriptorType.CMD)
.setCmd(ByteString.copyFrom("cool thing", Charsets.UTF_8)))
Expand Down Expand Up @@ -227,7 +226,7 @@ public void close() throws Exception {
@Override
public FlightInfo getFlightInfo(CallContext context,
FlightDescriptor descriptor) {
FlightGetInfo getInfo = FlightGetInfo.newBuilder()
Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder()
.setFlightDescriptor(Flight.FlightDescriptor.newBuilder()
.setType(DescriptorType.CMD)
.setCmd(ByteString.copyFrom("cool thing", Charsets.UTF_8)))
Expand Down

0 comments on commit e8b2207

Please sign in to comment.