Skip to content
Permalink
Browse files
[Python] Add Flight streaming example (#109)
* [Python] Add Flight streaming example

Fixes #86

* [Python] Remove special dataset example

* [Python] Add back missing server shutdown
  • Loading branch information
lidavidm committed Jan 19, 2022
1 parent c156957 commit 79a57a0a87aaacc7eed82902363170d5fe8445f0
Showing 1 changed file with 169 additions and 8 deletions.
@@ -10,8 +10,8 @@ Simple Parquet storage service with Arrow Flight
================================================

Suppose you want to implement a service that can store, send and receive
Parquet files using the Arrow Flight protocol,
``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight`
Parquet files using the Arrow Flight protocol,
``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight`
and particularly through the :class:`pyarrow.flight.FlightServerBase` class.

.. testcode::
@@ -25,7 +25,7 @@ and particularly through the :class:`pyarrow.flight.FlightServerBase` class.

class FlightServer(pa.flight.FlightServerBase):

def __init__(self, location="grpc://0.0.0.0:8815",
def __init__(self, location="grpc://0.0.0.0:8815",
repo=pathlib.Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
@@ -40,9 +40,9 @@ and particularly through the :class:`pyarrow.flight.FlightServerBase` class.
)
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pyarrow.flight.FlightInfo(schema,
descriptor,
descriptor,
endpoints,
metadata.num_rows,
metadata.num_rows,
metadata.serialized_size)

def list_flights(self, context, criteria):
@@ -97,13 +97,13 @@ into a parquet file)
This are the most common Arrow Flight requests, if we need to add more
functionalities, we can do so using custom actions.

In the previous example a ``drop_dataset`` custom action is added.
All custom actions are executed through the
In the previous example a ``drop_dataset`` custom action is added.
All custom actions are executed through the
:meth:`pyarrow.flight.FlightServerBase.do_action` method, thus it's up to
the server subclass to dispatch them properly. In this case we invoke
the `do_drop_dataset` method when the `action.type` is the one we expect.

Our server can then be started with
Our server can then be started with
:meth:`pyarrow.flight.FlightServerBase.serve`

.. code-block::
@@ -226,3 +226,164 @@ we might list all parquet files that are currently stored by the server:
# Shutdown the server
server.shutdown()

Streaming Parquet Storage Service
=================================

We can improve the Parquet storage service and avoid holding entire datasets in
memory by streaming data. Flight readers and writers, like others in PyArrow,
can be iterated through, so let's update the server from before to take
advantage of this:

.. testcode::

import pathlib

import pyarrow as pa
import pyarrow.flight
import pyarrow.parquet


class FlightServer(pa.flight.FlightServerBase):

def __init__(self, location="grpc://0.0.0.0:8815",
repo=pathlib.Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
self._repo = repo

def _make_flight_info(self, dataset):
dataset_path = self._repo / dataset
schema = pa.parquet.read_schema(dataset_path)
metadata = pa.parquet.read_metadata(dataset_path)
descriptor = pa.flight.FlightDescriptor.for_path(
dataset.encode('utf-8')
)
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pyarrow.flight.FlightInfo(schema,
descriptor,
endpoints,
metadata.num_rows,
metadata.serialized_size)

def list_flights(self, context, criteria):
for dataset in self._repo.iterdir():
yield self._make_flight_info(dataset.name)

def get_flight_info(self, context, descriptor):
return self._make_flight_info(descriptor.path[0].decode('utf-8'))

def do_put(self, context, descriptor, reader, writer):
dataset = descriptor.path[0].decode('utf-8')
dataset_path = self._repo / dataset
# Read the uploaded data and write to Parquet incrementally
with dataset_path.open("wb") as sink:
with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
for chunk in reader:
writer.write_table(pa.Table.from_batches([chunk.data]))

def do_get(self, context, ticket):
dataset = ticket.ticket.decode('utf-8')
# Stream data from a file
dataset_path = self._repo / dataset
reader = pa.parquet.ParquetFile(dataset_path)
return pa.flight.GeneratorStream(
reader.schema_arrow, reader.iter_batches())

def list_actions(self, context):
return [
("drop_dataset", "Delete a dataset."),
]

def do_action(self, context, action):
if action.type == "drop_dataset":
self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
else:
raise NotImplementedError

def do_drop_dataset(self, dataset):
dataset_path = self._repo / dataset
dataset_path.unlink()

First, we've modified :meth:`pyarrow.flight.FlightServerBase.do_put`. Instead
of reading all the uploaded data into a :class:`pyarrow.Table` before writing,
we instead iterate through each batch as it comes and add it to a Parquet file.

Then, we've modified :meth:`pyarrow.flight.FlightServerBase.do_get` to stream
data to the client. This uses :class:`pyarrow.flight.GeneratorStream`, which
takes a schema and any iterable or iterator. Flight then iterates through and
sends each record batch to the client, allowing us to handle even large Parquet
files that don't fit into memory.

While GeneratorStream has the advantage that it can stream data, that means
Flight must call back into Python for each record batch to send. In contrast,
RecordBatchStream requires that all data is in-memory up front, but once
created, all data transfer is handled purely in C++, without needing to call
Python code.

Let's give the server a spin. As before, we'll start the server:

.. code-block::

if __name__ == '__main__':
server = FlightServer()
server._repo.mkdir(exist_ok=True)
server.serve()

.. testcode::
:hide:

# Code block to start for real a server in background
# and wait for it to be available.
# Previous code block is just to show to user how to start it.
import threading
server = FlightServer()
server._repo.mkdir(exist_ok=True)
t = threading.Thread(target=server.serve)
t.start()

pa.flight.connect("grpc://0.0.0.0:8815").wait_for_available()

We create a client, and this time, we'll write batches to the writer, as if we
had a stream of data instead of a table in memory:

.. testcode::

import pyarrow as pa
import pyarrow.flight

client = pa.flight.connect("grpc://0.0.0.0:8815")

# Upload a new dataset
NUM_BATCHES = 1024
ROWS_PER_BATCH = 4096
upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet")
batch = pa.record_batch([
pa.array(range(ROWS_PER_BATCH)),
], names=["ints"])
writer, _ = client.do_put(upload_descriptor, batch.schema)
with writer:
for _ in range(NUM_BATCHES):
writer.write_batch(batch)

As before, we can then read it back. Again, we'll read each batch from the
stream as it arrives, instead of reading them all into a table:

.. testcode::

# Read content of the dataset
flight = client.get_flight_info(upload_descriptor)
reader = client.do_get(flight.endpoints[0].ticket)
total_rows = 0
for chunk in reader:
total_rows += chunk.data.num_rows
print("Got", total_rows, "rows total, expected", NUM_BATCHES * ROWS_PER_BATCH)

.. testoutput::

Got 4194304 rows total, expected 4194304

.. testcode::
:hide:

# Shutdown the server
server.shutdown()

0 comments on commit 79a57a0

Please sign in to comment.