In [None]:
!pip install --upgrade pip
!pip install -q -r ../feast_modelregistry/requirements.txt
!pip install pyarrow

# Arrow Flight server

Mimics the behavior of the `feast serve_offline` CLI command:
* Starts the server at port 8815
* Connects the Feast repo in the MNIST demo folder
* Implements the Arrow protocol to retrieve historical features

## Constants and imports

In [None]:
%env REPO_PATH=../feast_modelregistry/mnist_demo/feature_repo/
import os
os.environ['FEAST_USAGE'] = 'False'

In [None]:
import ast
import logging

import pyarrow as pa
import pyarrow.flight

from feast import FeatureStore

## Start Arrow Flight server

**References**: Python examples from [Apache Arrow](https://github.com/apache/arrow/tree/main/python/examples/flight)

In [None]:
class FlightServer(pa.flight.FlightServerBase):

    def __init__(self, location="grpc://0.0.0.0:8815",
                 **kwargs):
        super(FlightServer, self).__init__(location, **kwargs)
        self._location = location
        self.flights = {}
        # INitializes the FeatureStore from the REPO_PATH path
        self.store = FeatureStore(repo_path=os.environ['REPO_PATH'])

    @classmethod
    def descriptor_to_key(self, descriptor):
        return (descriptor.descriptor_type.value, descriptor.command,
                tuple(descriptor.path or tuple()))

    # TODO: since we cannot anticipate here the call to get_historical_features call, what data should we return?
    # ATM it returns the metadata of the "entity_df" table
    def _make_flight_info(self, key, descriptor, params):
        table = params['entity_df']
        endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [self._location])]
        mock_sink = pyarrow.MockOutputStream()
        stream_writer = pyarrow.RecordBatchStreamWriter(
            mock_sink, table.schema)
        stream_writer.write_table(table)
        stream_writer.close()
        data_size = mock_sink.size()

        return pyarrow.flight.FlightInfo(table.schema,
                                         descriptor, endpoints,
                                         table.num_rows, data_size)

    # Returns FlightDescriptor from the flights dictionary
    def list_flights(self, context, criteria):
        for key, table in self.flights.items():
            if key[1] is not None:
                descriptor = \
                    pyarrow.flight.FlightDescriptor.for_command(key[1])
            else:
                descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2])

            yield self._make_flight_info(key, descriptor, table)

    def get_flight_info(self, context, descriptor):
        key = FlightServer.descriptor_to_key(descriptor)
        if key in self.flights:
            params = self.flights[key]
            return self._make_flight_info(key, descriptor, params)
        raise KeyError('Flight not found.')

    # Expects to receive request parameters and stores them in the flights dictionary
    # Indexed by the unique command
    def do_put(self, context, descriptor, reader, writer):
        key = FlightServer.descriptor_to_key(descriptor)

        if key in self.flights:
            params = self.flights[key]
        else:
            params = {}
        decoded_metadata = {key.decode(): value.decode() for key, value in reader.schema.metadata.items()}
        if 'command' in decoded_metadata:
            command = decoded_metadata['command']
            api = decoded_metadata['api']
            param = decoded_metadata['param']
            value = reader.read_all()
            # Merge the existing dictionary for the same key, as we have multiple calls to do_put for the same key
            params.update({'command': command, 'api': api, param: value})

        self.flights[key] = params

    # Extracts the API parameters from the flights dictionary, delegates the execution to the FeatureStore instance
    # and returns the stream of data
    def do_get(self, context, ticket):
        key = ast.literal_eval(ticket.ticket.decode())
        if key not in self.flights:
            print(f"Unknown key {key}")
            return None

        api = self.flights[key]['api']
        # print(f"get key is {key}")
        # print(f"requested api is {api}")
        if api == "get_historical_features":
            # Extract parameters from the internal flight descriptor
            entity_df_value = self.flights[key]['entity_df']
            entity_df = pa.Table.to_pandas(entity_df_value)
            # print(f"entity_df is {entity_df}")

            features_value = self.flights[key]['features']
            features = pa.RecordBatch.to_pylist(features_value)
            features = [item['features'] for item in features]
            # print(f"features is {features}")

            print(f"get_historical_features for: entity_df from {entity_df.index[0]} to {entity_df.index[len(entity_df)-1]}, "
                  f"features from {features[0]} to {features[len(features)-1]}")
            training_df = self.store.get_historical_features(entity_df, features).to_df()
            table = pa.Table.from_pandas(training_df)

            # Get service is consumed, so we clear the corresponding flight
            del self.flights[key]

            return pa.flight.RecordBatchStream(table)
        else:
            raise NotImplementedError

    def list_actions(self, context):
        return []

    def do_action(self, context, action):
        raise NotImplementedError

    def do_drop_dataset(self, dataset):
        pass

**Note** Use `Interrupt the Kernel` button to stop the server (shortcut: `I,I`).

If you encounter the `Address already in use` error, run `lsof -i :8815` from the Terminal and kill the process using the port. 

In [None]:
LOG_FORMAT = "%(asctime)s %(levelname)s: %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(
    level=logging.INFO,
    format=LOG_FORMAT,
    datefmt=DATE_FORMAT,
)
server = FlightServer()
logging.info("Started gRPC server")
server.serve()

2024-05-08 12:32:24 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:24 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:24 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:24 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:25 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:26 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:26 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:26 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:26 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:26 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:27 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:27 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:27 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:27 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:27 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:28 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:28 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:28 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:28 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:28 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:30 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:30 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:30 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:30 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:30 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:31 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:31 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:31 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:31 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:31 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:32 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:32 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:32 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:32 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:32 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:33 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:33 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:33 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:33 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:34 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:35 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:35 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:35 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:35 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:35 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:36 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:36 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:36 INFO: Registry cache expired, so refreshing
2024-05-08 12:32:36 INFO: Registry cache expired, so refreshing


get_historical_features for: entity_df from 0 to 4999, features from {features[0]} to {features[len(features)-1]}


2024-05-08 12:32:36 INFO: Registry cache expired, so refreshing
