Skip to content

Commit

Permalink
ARROW-6875: [FlightRPC] implement criteria for ListFlights
Browse files Browse the repository at this point in the history
I'm now working on the integration tests. I got to the end and realized it would be an enormous PR, so instead I'm splitting out the various fixes I made.

None of the languages fully supported passing filter criteria in ListFlights.

Closes #6390 from lidavidm/arrow-6875 and squashes the following commits:

eb1942a <David Li> ARROW-6875:  implement criteria for ListFlights

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
lidavidm authored and pitrou committed Feb 13, 2020
1 parent d437175 commit 7dc9f9b
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ class FlightClient::FlightClientImpl {

Status ListFlights(const FlightCallOptions& options, const Criteria& criteria,
std::unique_ptr<FlightListing>* listing) {
// TODO(wesm): populate criteria
pb::Criteria pb_criteria;
RETURN_NOT_OK(internal::ToProto(criteria, &pb_criteria));

ClientRpc rpc(options);
RETURN_NOT_OK(rpc.SetToken(auth_handler_.get()));
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,14 @@ TEST_F(TestFlightClient, ListFlights) {
ASSERT_TRUE(info == nullptr);
}

TEST_F(TestFlightClient, ListFlightsWithCriteria) {
std::unique_ptr<FlightListing> listing;
ASSERT_OK(client_->ListFlights(FlightCallOptions(), {"foo"}, &listing));
std::unique_ptr<FlightInfo> info;
ASSERT_OK(listing->Next(&info));
ASSERT_TRUE(info == nullptr);
}

TEST_F(TestFlightClient, GetFlightInfo) {
auto descr = FlightDescriptor::Path({"examples", "ints"});
std::unique_ptr<FlightInfo> info;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ Status ToProto(const Result& result, pb::Result* pb_result) {
// Criteria

Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria) {
criteria->expression = pb_criteria.expression();
return Status::OK();
}
Status ToProto(const Criteria& criteria, pb::Criteria* pb_criteria) {
pb_criteria->set_expression(criteria.expression);
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Status ToProto(const Action& action, pb::Action* pb_action);
Status ToProto(const Result& result, pb::Result* pb_result);
Status ToProto(const Criteria& criteria, pb::Criteria* pb_criteria);
Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result);
void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
Status ToProto(const BasicAuth& basic_auth, pb::BasicAuth* pb_basic_auth);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class FlightTestServer : public FlightServerBase {
Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
std::unique_ptr<FlightListing>* listings) override {
std::vector<FlightInfo> flights = ExampleFlightInfo();
if (criteria && criteria->expression != "") {
// For test purposes, if we get criteria, return no results
flights.clear();
}
*listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public Criteria(byte[] bytes) {
this.bytes = criteria.getExpression().toByteArray();
}

/**
* Get the contained filter criteria.
*/
public byte[] getExpression() {
return bytes;
}

Flight.Criteria asCriteria() {
Flight.Criteria.Builder b = Flight.Criteria.newBuilder();
if (bytes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,22 @@ public void roundTripDescriptor() throws Exception {
@Test
public void getDescriptors() throws Exception {
test(c -> {
int count = 0;
for (FlightInfo i : c.listFlights(Criteria.ALL)) {
System.out.println(i.getDescriptor());
count += 1;
}
Assert.assertEquals(1, count);
});
}

@Test
public void getDescriptorsWithCriteria() throws Exception {
test(c -> {
int count = 0;
for (FlightInfo i : c.listFlights(new Criteria(new byte[]{1}))) {
count += 1;
}
Assert.assertEquals(0, count);
});
}

Expand Down Expand Up @@ -268,6 +281,11 @@ public Producer(BufferAllocator allocator) {
@Override
public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
if (criteria.getExpression().length > 0) {
// Don't send anything if criteria are set
listener.onCompleted();
}

Flight.FlightInfo getInfo = Flight.FlightInfo.newBuilder()
.setFlightDescriptor(Flight.FlightDescriptor.newBuilder()
.setType(DescriptorType.CMD)
Expand Down
6 changes: 5 additions & 1 deletion python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1073,14 +1073,18 @@ cdef class FlightClient:
break
yield result

def list_flights(self, options: FlightCallOptions = None):
def list_flights(self, criteria: bytes = None,
options: FlightCallOptions = None):
"""List the flights available on a service."""
cdef:
unique_ptr[CFlightListing] listing
FlightInfo result
CFlightCallOptions* c_options = FlightCallOptions.unwrap(options)
CCriteria c_criteria

if criteria:
c_criteria.expression = tobytes(criteria)

with nogil:
check_flight_status(
self.client.get().ListFlights(deref(c_options),
Expand Down
20 changes: 20 additions & 0 deletions python/pyarrow/tests/test_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ConstantFlightServer(FlightServerBase):
does not properly hold a reference to the Table object.
"""

CRITERIA = b"the expected criteria"

def __init__(self, location=None, **kwargs):
super().__init__(location, **kwargs)
# Ticket -> Table
Expand All @@ -128,6 +130,15 @@ def __init__(self, location=None, **kwargs):
b'dicts': simple_dicts_table,
}

def list_flights(self, context, criteria):
if criteria == self.CRITERIA:
yield flight.FlightInfo(
pa.schema([]),
flight.FlightDescriptor.for_path('/foo'),
[],
-1, -1
)

def do_get(self, context, ticket):
# Return a fresh table, so that Flight is the only one keeping a
# reference.
Expand Down Expand Up @@ -507,6 +518,15 @@ def serve():
assert elapsed >= 0.5


def test_flight_list_flights():
"""Try a simple list_flights call."""
with ConstantFlightServer() as server:
client = flight.connect(('localhost', server.port))
assert list(client.list_flights()) == []
flights = client.list_flights(ConstantFlightServer.CRITERIA)
assert len(list(flights)) == 1


def test_flight_do_get_ints():
"""Try a simple do_get call."""
table = simple_ints_table()
Expand Down

0 comments on commit 7dc9f9b

Please sign in to comment.