Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working example of list_flights with ObjectStore #5116

Closed
djanderson opened this issue Nov 22, 2023 · 2 comments
Closed

Working example of list_flights with ObjectStore #5116

djanderson opened this issue Nov 22, 2023 · 2 comments
Labels
question Further information is requested

Comments

@djanderson
Copy link

djanderson commented Nov 22, 2023

Which part is this question about

https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/server.rs#L48

Describe your question

I've been struggling to understand how to implement something like list_flights in a non-trivial way (i.e., with an ObjectStore), but the example is unimplemented and the example for ObjectStore.list also doesn't give too many clues on how to use it in something like list_flights.

The issue I'm running into is that I initialize a connection to an ObjectStore and store it in a session context, but I cannot understand how to convert a stream of ObjectMeta from the object store into a steam of FlightInfo as a response to list_flights, since the ListFlightsStream has a static lifetime.

#[derive(Clone, Debug)]
struct SessionContext {
    object_store: Arc<dyn ObjectStore>,
}

pub struct FlightServiceImpl {
    sessions: Arc<Mutex<HashMap<String, SessionContext>>>,
}
#[tonic::async_trait]
impl FlightService for FlightServiceImpl {

    async fn handshake(&self, request: Request<Streaming<HandshakeRequest>>,) -> Result<Response<Self::HandshakeStream>, Status> {

        // Validate user and initialize an `object_store` with appropriate scope

        let session_token = self.generate_session_token();
        self.sessions.lock().unwrap().insert(
            session_token,
            SessionContext { object_store: Arc::new(object_store) },
        );

        // Complete handshake 

    }

    async fn list_flights(
        &self,
        request: Request<Criteria>,
    ) -> Result<Response<Self::ListFlightsStream>, Status> {
        let context = self.check_session_token(&request)?;
        let prefix = None;
        // HELP: the response stream is 'static but context.object_store is not
        let object_meta = context.object_store.list(prefix);
        let result = object_meta.filter_map(|meta| async move {
            let Ok(file_meta) = meta else {
                return None;
            };

            // Build the FlightInfo

        });
        Ok(Response::new(Box::pin(result) as Self::ListFlightsStream))
    }

But I'm getting the error:

error[E0597]: `context.object_store` does not live long enough
   --> src/main.rs:164:27
    |
162 |         let context = self.check_session_token(&request)?;
    |             ------- binding `context` declared here
163 |         let prefix = None;
164 |         let object_meta = context.object_store.list(prefix);
    |                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ borrowed value does not live long enough
...
196 |         Ok(Response::new(Box::pin(result) as Self::ListFlightsStream))
    |                          ------------------------------------------- type annotation requires that `context.object_store` is borrowed for `'static`
197 |     }
    |     - `context.object_store` dropped here while still borrowed

Additional context

@djanderson djanderson added the question Further information is requested label Nov 22, 2023
@tustvold
Copy link
Contributor

tustvold commented Nov 22, 2023

So the issue here is tonic is imposing a 'static lifetime bound on the returned stream, which is likely a historical artifact from when GATs were not supported.

You have a couple of options here:

let meta: Vec<_> = context.object_store.list(prefix).try_collect().await?;
let stream = futures::stream::iter(meta);
let (sender, receiver) = mpsc::channel(2);
let store = contect.object_store.clone();
tokio::spawn(async move {
    let mut stream = store.list(prefix);
    while let Some(n) = stream.next().transpose().await? {
        if sender.send(n).is_err() {
            break
        }
    }    
})
let stream = receiver;

Otherwise you will need to use something like https://docs.rs/ouroboros/latest/ouroboros/index.html to construct a self-referential stream.

@djanderson
Copy link
Author

@tustvold, thank you very much for taking the time to respond. After quite a bit more experimentation, I was able to get the mpsc::channel method working!

It's a little weird since I need the object_store in the receiver task as well to fetch more metadata to generate the FlightInfo, I wasn't able to find a way to only pass the ObjectMeta. There may be a better way to do it than passing through the ParquetObjectReader but I'm going to include what I came up with in case it it gives anyone else a jumping off point. Caveat emptor that I only verified this with a couple files and the pyarrow client.

Imports
use arrow::ipc::writer::IpcWriteOptions;
use arrow_flight::IpcMessage;
use arrow_flight::{
    flight_descriptor::DescriptorType, flight_service_server::FlightService,
    flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData,
    FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
    SchemaResult, Ticket,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::stream::{BoxStream, StreamExt};
use log::{debug, error, info};
use object_store::ObjectMeta;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::parquet_to_arrow_schema;
use rand::distributions::{Alphanumeric, DistString};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
    async fn list_flights(
        &self,
        request: Request<Criteria>,
    ) -> Result<Response<Self::ListFlightsStream>, Status> {
        let context = self.check_session_token(&request)?;
        let (mut tx, rx) = mpsc::channel::<(ObjectMeta, ParquetObjectReader)>(2);

        let store = context.object_store;
        tokio::spawn(async move {
            let prefix = None;
            let mut objects = store.list(prefix);
            while let Some(md) = objects.next().await.transpose().unwrap() {
                let reader = ParquetObjectReader::new(store.clone(), md.clone());
                if let Err(_) = tx.try_send((md, reader)) {
                    debug!("rx channel dropped");
                    break;
                }
            }
            tx.close_channel();
        });

        let result = rx.filter_map(|(object_md, mut pqt_reader)| async move {
            let Ok(pqt_md) = pqt_reader.get_metadata().await else {
                error!("Failed to get parquet metadata from {}", object_md.location);
                return None;
            };

            let file_md = pqt_md.file_metadata();

            // Convert file's schema to arrow format and serialize as IPC message
            let Ok(arrow_schema) = parquet_to_arrow_schema(file_md.schema_descr(), None) else {
                error!("Failed to convert schema for {}", object_md.location);
                return None;
            };
            let Ok(IpcMessage(schema)) =
                SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default()).try_into()
            else {
                error!("Failed to serialize schema for {}", object_md.location);
                return None;
            };

            let flight_descriptor = Some(FlightDescriptor {
                r#type: DescriptorType::Path.into(),
                cmd: Bytes::new(),
                path: vec![object_md.location.to_string()],
            });

            return Some(Ok(FlightInfo {
                flight_descriptor,
                endpoint: vec![],
                total_records: file_md.num_rows(),
                total_bytes: object_md.size as i64,
                ordered: false,
                schema: schema.into(),
            }));
        });

        Ok(Response::new(Box::pin(result) as Self::ListFlightsStream))
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants