Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to expose hello hello2 hello3 services on one HelloServer #224

Open
omac777 opened this issue Apr 22, 2019 · 16 comments
Open

how to expose hello hello2 hello3 services on one HelloServer #224

omac777 opened this issue Apr 22, 2019 · 16 comments

Comments

@omac777
Copy link

omac777 commented Apr 22, 2019

I have succeeded in compiling 3 different service implementations within one HelloServer via 3 different modules within one library exposing the 3 different implementation, but there is only one serve() function called and not the other two.

How do I specify HelloServer should be ready to serve up the 3 services on the same port number rather than just one?

Thank you for listening.

@tikue
Copy link
Collaborator

tikue commented Apr 22, 2019

Hey, thanks for this question! So I don't have an official answer to this, but it's an area of interest, and I have a proof of concept for how one might go about it: check out service registry. It basically assumes you have some way of identifying which service your request is for.

@omac777
Copy link
Author

omac777 commented Apr 25, 2019

Thank you for responding. I am finding service registry difficult to quickly follow and make use of it.
In order clarify at what level of ease of use I was hoping to see within tarpc's implementation, please see golang's rpcx package. It is well documented and provides many services exposed from one server in an easy to use convention.
Here is an rpcx server:
https://github.com/rpcx-ecosystem/rpcx-examples3/blob/e40b258d883cfb748c63300bdbf5d85903087e25/102basic/server.go
Here is the respective rpcx client:
https://github.com/rpcx-ecosystem/rpcx-examples3/blob/e40b258d883cfb748c63300bdbf5d85903087e25/102basic/client/client.go

Note you will see the structure/object is called Arith
and it currently holds one service "Mul", but I have add other services to it
and may call all of them via the one server.
I hope this clarifies what I mean by the high level of ease of use.
I do hope to see a rust equivalent some day and I was hoping it could be tarpc because the other rpc/grpc rust alternatives aren't looking easy to use and require a great deal of setup.
I've actually tried them, but they require pulling out some tricks to compile .proto files into .desc files before even writing one line of rust code. Not really elegant or easy. I didn't have to do any of that for the golang rpcx server.

Cheers.

@tikue
Copy link
Collaborator

tikue commented Apr 25, 2019

Thanks for the feedback! As I mentioned above, this is still an area of exploration. If you're looking for a ready to use RPC server that can run on one port and expose multiple services, tarpc doesn't have a solution today. The best it can do is run one service per port. That being said, I linked you above to my current thoughts on how such a thing could work. In case you missed it (I realize the service registry code is convoluted), the user experience would look like this:

    let registry = BincodeRegistry::default()
        .register(
            "WriteService".to_string(),
            write_service::serve(server.clone()),
        )
        .register(
            "ReadService".to_string(),
            read_service::serve(server.clone()),
        );

    let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
    let server = tarpc::Server::default()
        .incoming(listener)
        .respond_with(registry.serve());

@omac777
Copy link
Author

omac777 commented May 1, 2019

I believe all the effort invested in tarpc will be worth it in the long run. I'm growing to like it, but both rust and tarpc still have some kinks.

I copied your service_registry.rs to rs-mfw-tarpc-serviceregistry.rs in the examples directory and
tweaked it, built it and ran it with success the simple apis with just blah(string) -> string.

However when using your service registry with a service with a more complex type,
I'm getting errors everywhere fighting against the compiler. Here is an example of one:

error: parenthesized type parameters may only be used with a `Fn` trait
   --> tarpc/examples/rs-mfw-tarpc-serviceregistry.rs:319:37
    |
319 |     sfWriteChunks : Vec<ByteStr::new([u8; 1024])>,
    |                                     ^^^^^^^^^^^^
    |
    = note: #[deny(parenthesized_params_in_types_and_modules)] on by default
    = warning: this was previously accepted by the compiler but is being phased out; it will become a hard error in a future release!
    = note: for more information, see issue #42238 <https://github.com/rust-lang/rust/issues/42238>

This compiler error is really hard for me to overcome unfortunately.

Serializing/Deserializing u8 vectors forced me to look more closely into serde crate which tarpc uses indirectly through the bincode crate. I'm starting to realize working with u8 vectors is treated differently/carefully within the rust compiler itself and serde mentions that in its documentation.
https://serde.rs/impl-serialize.html#other-special-cases
https://docs.serde.rs/serde_bytes/

use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct Efficient<'a> {
    #[serde(with = "serde_bytes")]
    bytes: &'a [u8],

    #[serde(with = "serde_bytes")]
    byte_buf: Vec<u8>,
}

I'm still wrapping my head around tweaking the code to use this and hope the above compile error will disappear.

Here is the code:
NOTE: extra crates in Cargo.toml
hexdump = "0.1.0"
byte_string = "1.0.0"
sha2 = "0.8.0"

// to build
// cargo build --example rs-mfw-tarpc-serviceregistry --release

// to run
// cargo run --example rs-mfw-tarpc-serviceregistry --release


#![feature(
    async_await,
    await_macro,
    futures_api,
    arbitrary_self_types,
    proc_macro_hygiene,
    impl_trait_in_bindings
)]

extern crate hexdump;
extern crate byte_string;
use byte_string::ByteStr;


mod registry {
    use bytes::Bytes;
    use futures::{
        future::{ready, Ready},
        prelude::*,
    };
    use serde::{Deserialize, Serialize};
    use std::{
        io,
        pin::Pin,
        sync::Arc,
        task::{Context, Poll},
    };
    use tarpc::{
        client::{self, Client},
        context,
    };

    /// A request to a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceRequest {
        service_name: String,
        request: Bytes,
    }

    /// A response from a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceResponse {
        response: Bytes,
    }

    /// A list of registered services.
    pub struct Registry<Services> {
        registrations: Services,
    }

    impl Default for Registry<Nil> {
        fn default() -> Self {
            Registry { registrations: Nil }
        }
    }

    impl<Services: MaybeServe + Sync> Registry<Services> {
        /// Returns a function that serves requests for the registered services.
        pub fn serve(
            self,
        ) -> impl FnOnce(
            context::Context,
            ServiceRequest,
        ) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
                         + Clone {
            let registrations = Arc::new(self.registrations);
            move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
                Some(serve) => Either::Left(serve),
                None => Either::Right(ready(Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("Service '{}' not registered", req.service_name),
                )))),
            }
        }

        /// Registers `serve` with the given `name` using the given serialization scheme.
        pub fn register<S, Req, Resp, RespFut, Ser, De>(
            self,
            name: String,
            serve: S,
            deserialize: De,
            serialize: Ser,
        ) -> Registry<Registration<impl Serve + Send + 'static, Services>>
        where
            Req: Send,
            S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
            RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
            De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
            Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
        {
            let registrations = Registration {
                name: name,
                serve: move |cx, req: Bytes| {
                    async move {
                        let req = deserialize.clone()(req)?;
                        let response = await!(serve.clone()(cx, req))?;
                        let response = serialize.clone()(response)?;
                        Ok(ServiceResponse { response })
                    }
                },
                rest: self.registrations,
            };
            Registry { registrations }
        }
    }

    /// Creates a client that sends requests to a service
    /// named `service_name`, over the given channel, using
    /// the specified serialization scheme.
    pub fn new_client<Req, Resp, Ser, De>(
        service_name: String,
        channel: &client::Channel<ServiceRequest, ServiceResponse>,
        mut serialize: Ser,
        mut deserialize: De,
    ) -> client::MapResponse<
        client::WithRequest<
            client::Channel<ServiceRequest, ServiceResponse>,
            impl FnMut(Req) -> ServiceRequest,
        >,
        impl FnMut(ServiceResponse) -> Resp,
    >
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
        Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
    {
        channel
            .clone()
            .with_request(move |req| {
                ServiceRequest {
                    service_name: service_name.clone(),
                    // TODO: shouldn't need to unwrap here. Maybe with_request should allow for
                    // returning Result.
                    request: serialize(req).unwrap(),
                }
            })
            // TODO: same thing. Maybe this should be more like and_then rather than map.
            .map_response(move |resp| deserialize(resp.response).unwrap())
    }

    /// Serves a request.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait Serve: Clone + Send + 'static {
        type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
        fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
    }

    /// Serves a request if the request is for a registered service.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait MaybeServe: Send + 'static {
        type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
    }

    /// A registry starting with service S, followed by Rest.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Registration<S, Rest> {
        /// The registered service's name. Must be unique across all registered services.
        name: String,
        /// The registered service.
        serve: S,
        /// Any remaining registered services.
        rest: Rest,
    }

    /// An empty registry.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Nil;

    impl MaybeServe for Nil {
        type Future = futures::future::Ready<io::Result<ServiceResponse>>;

        fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
            None
        }
    }

    impl<S, Rest> MaybeServe for Registration<S, Rest>
    where
        S: Serve,
        Rest: MaybeServe,
    {
        type Future = Either<S::Response, Rest::Future>;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
            if self.name == request.service_name {
                Some(Either::Left(
                    self.serve.clone().serve(cx, request.request.clone()),
                ))
            } else {
                self.rest.serve(cx, request).map(Either::Right)
            }
        }
    }

    /// Wraps either of two future types that both resolve to the same output type.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless polled"]
    pub enum Either<Left, Right> {
        Left(Left),
        Right(Right),
    }

    impl<Output, Left, Right> Future for Either<Left, Right>
    where
        Left: Future<Output = Output>,
        Right: Future<Output = Output>,
    {
        type Output = Output;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
            unsafe {
                match Pin::get_unchecked_mut(self) {
                    Either::Left(car) => Pin::new_unchecked(car).poll(cx),
                    Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
                }
            }
        }
    }

    impl<Resp, F> Serve for F
    where
        F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
        Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
    {
        type Response = Resp;

        fn serve(self, cx: context::Context, request: Bytes) -> Resp {
            self(cx, request)
        }
    }
}

// Example
use bytes::Bytes;
use futures::{
    compat::Executor01CompatExt,
    future::{ready, Ready},
    prelude::*,
};
use serde::{Deserialize, Serialize};
use std::{
    collections::HashMap,
    io,
    sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};

fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
    Req: for<'a> Deserialize<'a> + Send,
{
    bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}

fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
    Resp: Serialize,
{
    Ok(bincode::serialize(&resp)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
        .into())
}



//these are the rpc service declarations
mod write_service {
    tarpc::service! {
        rpc write(key: String, value: String);
    }
}

mod read_service {
    tarpc::service! {
        rpc read(key: String) -> Option<String>;
    }
}



//let array = Array { data: [0u8; 1024] };
//sfWriteChunks : Vec<[u8; 1024]>,

    //I will be replacing Vec<[u8; 1024]> and Vec<Array<[u8; 1024]>>
    //with byte_string::ByteStr since it has a Debug implementation I need
    //for the serializaton/deserialization to work properly




    





#[derive(Debug,Default, Clone)]
struct ArgsWriteFile {
    sfFileName : String,
    sfWriteChunks : Vec<ByteStr::new([u8; 1024])>,
    sfNumberOfChunks : u64,
    sfChunkSize : usize,
    sfLastChunkSize : usize,
    sfResultSumSHA256 : sha2::Sha256,
}
    
mod mfw_sendfile_service {

    tarpc::service! {
        rpc mfwsendfile( allSendFileArgs : crate::ArgsWriteFile ) -> String;
    }
}





#[derive(Default, Clone)]
struct Server {
    data: Arc<RwLock<HashMap<String, String>>>,
}

impl write_service::Service for Server {
    type WriteFut = Ready<()>;

    fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
        self.data.write().unwrap().insert(key, value);
        ready(())
    }
}

impl read_service::Service for Server {
    type ReadFut = Ready<Option<String>>;

    fn read(self, _: context::Context, key: String) -> Self::ReadFut {
        ready(self.data.read().unwrap().get(&key).cloned())
    }
}

impl mfw_sendfile_service::Service for Server {
    type MfwsendfileFut = Ready<String>;

    fn mfwsendfile(self, _: context::Context, allSendFileArgs : crate::ArgsWriteFile) -> Self::MfwsendfileFut {
        ready(format!("mfwsendfile fullpathname:<<{}>> sent", allSendFileArgs.sfFileName.clone().to_string()))
    }
}

trait DefaultSpawn {
    fn spawn(self);
}

impl<F> DefaultSpawn for F
where
    F: Future<Output = ()> + Send + 'static,
{
    fn spawn(self) {
        tokio_executor::spawn(self.unit_error().boxed().compat())
    }
}

struct BincodeRegistry<Services> {
    registry: registry::Registry<Services>,
}

impl Default for BincodeRegistry<registry::Nil> {
    fn default() -> Self {
        BincodeRegistry {
            registry: registry::Registry::default(),
        }
    }
}

impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
    fn serve(
        self,
    ) -> impl FnOnce(
        context::Context,
        registry::ServiceRequest,
    ) -> registry::Either<
        Services::Future,
        Ready<io::Result<registry::ServiceResponse>>,
    > + Clone {
        self.registry.serve()
    }

    fn register<S, Req, Resp, RespFut>(
        self,
        name: String,
        serve: S,
    ) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
    where
        Req: for<'a> Deserialize<'a> + Send + 'static,
        Resp: Serialize + 'static,
        S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
        RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
    {
        let registry = self.registry.register(name, serve, deserialize, serialize);
        BincodeRegistry { registry }
    }
}

pub fn new_client<Req, Resp>(
    service_name: String,
    channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
    client::WithRequest<
        client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
        impl FnMut(Req) -> registry::ServiceRequest,
    >,
    impl FnMut(registry::ServiceResponse) -> Resp,
>
where
    Req: Serialize + Send + 'static,
    Resp: for<'a> Deserialize<'a> + Send + 'static,
{
    registry::new_client(service_name, channel, serialize, deserialize)
}


use std::fs::File;
use sha2::{Sha256, Sha512, Digest};

async fn run() -> io::Result<()> {
    let server = Server::default();
    let registry = BincodeRegistry::default()
        .register(
            "WriteService".to_string(),
            write_service::serve(server.clone()),
        )
        .register(
            "ReadService".to_string(),
            read_service::serve(server.clone()),
        )
        .register(
            "MFWSendFileService".to_string(),
            mfw_sendfile_service::serve(server.clone()),
        );

    let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
    let server_addr = listener.local_addr();
    let server = tarpc::Server::default()
        .incoming(listener)
        .take(1)
        .respond_with(registry.serve());
    tokio_executor::spawn(server.unit_error().boxed().compat());

    let transport = await!(bincode_transport::connect(&server_addr))?;
    let channel = await!(client::new(client::Config::default(), transport))?;

    //let write_client = new_client("WriteService".to_string(), &channel);
    //let mut write_client = write_service::Client::from(write_client);

    //let read_client = new_client("ReadService".to_string(), &channel);
    //let mut read_client = read_service::Client::from(read_client);

    //await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
    //let val = await!(read_client.read(context::current(), "key".to_string()))?;
    //println!("Just returned from read_service: <<{:?}>>", val);

    let mfwsendfile_client = new_client("MFWSendFileService".to_string(), &channel);
    let mut mfwsendfile_client = mfw_sendfile_service::Client::from(mfwsendfile_client);

    let mut thefilepathname : String;
    thefilepathname = "/home/davidm/.bashrc".to_string();

    //let mut resChunks : Vec<[u8; 1024]>;
    //let mut resChunks : Vec<Array<[u8; 1024]>>;
    let mut resChunks : Vec<ByteStr::new([u8; 1024])>;
    
    let mut resNumberOfChunks : u64;
    let mut resLastChunkSize : usize;

    resChunks = Vec::new();
    resNumberOfChunks = 0;
    resLastChunkSize = 0;
    
    let mut theFileNameToSend : String = "/home/davidm/.bashrc".to_string();
    let resultSplitFile = SplitFileIntoChunks(&theFileNameToSend);
    match resultSplitFile {
        Err(why) => {
        },
        Ok( (theChunks, theNumberOfChunks, theLastChunkSize) ) => {
            resChunks = theChunks;
            resNumberOfChunks = theNumberOfChunks;
            resLastChunkSize = theLastChunkSize;
        },
    }

    let mut file = File::open(&theFileNameToSend)?;
    let mut mySha256 : sha2::Sha256 = Sha256::new();
    io::copy(&mut file, &mut mySha256)?;
    let hash = mySha256.clone().result();
    println!("hash is: {:x}", hash);              
    let mut myArgsWriteFile : ArgsWriteFile = ArgsWriteFile{
        sfFileName : theFileNameToSend.clone().to_string(),
        sfWriteChunks : resChunks,
        sfNumberOfChunks : resNumberOfChunks,
        sfChunkSize : 1024,
        sfLastChunkSize : resLastChunkSize,
        sfResultSumSHA256 : mySha256.clone(),
    };
    println!("myArgsWriteFile.sfResultSumSHA256 hash is: {:x}", myArgsWriteFile.sfResultSumSHA256.clone().result());

    let response_mfwsendfile = await!(
        mfwsendfile_client.mfwsendfile(context::current(), myArgsWriteFile )
    )?;
    println!("just returned from sendfile: <<{:?}>>", response_mfwsendfile);                       

    Ok(())
}

pub fn SplitFileIntoChunks(theFileNameToSend : &str) -> io::Result<(Vec<ByteStr::new([u8; 1024])>, u64, usize)> {
    use std::io::Cursor;
    use std::io::IoSliceMut;
    use std::io::Read;

    use std::io;
    use std::io::prelude::*;
    use std::fs::File;
    use std::fs;
    use std::str;
    let metadata = fs::metadata(theFileNameToSend.clone())?;
    let theActualFileSize = metadata.len();
    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
    println!("theActualFileSize:<<{}>>",theActualFileSize);
    let mut bufferSize : usize = 1024;
    let mut buffer = [0; 1024 ];
    //let mut NumberOfChunks : u64 = (theActualFileSize/bufferSize as u64 +1) ;
    //println!("NumberOfChunks:<<{}>>",NumberOfChunks);
    let mut WriteChunks : Vec<ByteStr::new([u8; 1024])>;
    WriteChunks = Vec::new();
    //const NumberOfChunks2 : usize = NumberOfChunks;                                         
    //let mut WriteChunks : [[u8; 1024 ]; NumberOfChunks2];
    
    let mut f = File::open(theFileNameToSend)?;
    let mut resReadVectored : io::Result<usize> = Ok(999999);
    let mut myActualBytesRead : usize = 0;
    let mut myLastChunkSize : usize = 0;
    loop {
        buffer = [0; 1024 ];
        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
        println!("resReadVectored:<<{:?}>>", resReadVectored);
        match resReadVectored {
            Err(why) => {
                break;
            },
            Ok(bytesRead) => {
                myActualBytesRead = bytesRead;
                myLastChunkSize = myActualBytesRead;
            },
        }
        //WriteChunks.push(buffer.clone());
        WriteChunks.push(ByteStr::new(buffer.clone()));  

        if myActualBytesRead < bufferSize {
            myLastChunkSize = myActualBytesRead;
            break;
        }
    }

    println!("buffer length:<<{}>>", buffer.len());
    println!("myActualBytesRead:<<{}>>",myActualBytesRead);

    println!("<<");    
    for chunkCounter in 0..WriteChunks.len() {
        
        if chunkCounter == WriteChunks.len()-1 {
            println!("last chunk chunkCounter:{}", chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter][0..myLastChunkSize]
            );            
        } else {
        println!("chunkCounter:{}",chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter]
            );            
        }
    }
    println!(">>");
    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
}


fn main() {
    tarpc::init(tokio::executor::DefaultExecutor::current().compat());
    tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}


@tikue
Copy link
Collaborator

tikue commented May 2, 2019

Thank you for this example! I'll dig into it soon.

@omac777
Copy link
Author

omac777 commented May 2, 2019

I failed to make it work with another approach to hold a vector of vector of u8.

    #[serde(with = "serde_bytes")]
    sfWriteChunks : Vec<ByteBuf>,    

I just discovered bytevec:
https://github.com/fero23/bytevec
I found a very useful example within the tests code:
https://github.com/fero23/bytevec/blob/master/tests/lib.rs#L7
which tests serializing a vector of employee structures.

I'm going to try to embed bytevec encoded bytes within a serde structure for the same effect within WriteChunks.

I also found the rust array type has chunk functionality but not necessarily what I need for encoding/decoding into something useful for serde unfortunately. It would be nice to find it there instead for nested vectors within structures to be serializeable/deserializeb from within std types.

// to build
// cargo build --example rs-mfw-tarpc-serviceregistry --release

// to run
// cargo run --example rs-mfw-tarpc-serviceregistry --release


#![feature(
    async_await,
    await_macro,
    futures_api,
    arbitrary_self_types,
    proc_macro_hygiene,
    impl_trait_in_bindings
)]

extern crate hexdump;
extern crate byte_string;
extern crate serde;
extern crate serde_bytes;
extern crate generic_array;

use serde_bytes::ByteBuf;
use serde::{Deserialize, Serialize};


mod registry {
    use bytes::Bytes;
    use futures::{
        future::{ready, Ready},
        prelude::*,
    };
    use serde::{Deserialize, Serialize};
    use std::{
        io,
        pin::Pin,
        sync::Arc,
        task::{Context, Poll},
    };
    use tarpc::{
        client::{self, Client},
        context,
    };

    /// A request to a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceRequest {
        service_name: String,
        request: Bytes,
    }

    /// A response from a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceResponse {
        response: Bytes,
    }

    /// A list of registered services.
    pub struct Registry<Services> {
        registrations: Services,
    }

    impl Default for Registry<Nil> {
        fn default() -> Self {
            Registry { registrations: Nil }
        }
    }

    impl<Services: MaybeServe + Sync> Registry<Services> {
        /// Returns a function that serves requests for the registered services.
        pub fn serve(
            self,
        ) -> impl FnOnce(
            context::Context,
            ServiceRequest,
        ) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
                         + Clone {
            let registrations = Arc::new(self.registrations);
            move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
                Some(serve) => Either::Left(serve),
                None => Either::Right(ready(Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("Service '{}' not registered", req.service_name),
                )))),
            }
        }

        /// Registers `serve` with the given `name` using the given serialization scheme.
        pub fn register<S, Req, Resp, RespFut, Ser, De>(
            self,
            name: String,
            serve: S,
            deserialize: De,
            serialize: Ser,
        ) -> Registry<Registration<impl Serve + Send + 'static, Services>>
        where
            Req: Send,
            S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
            RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
            De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
            Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
        {
            let registrations = Registration {
                name: name,
                serve: move |cx, req: Bytes| {
                    async move {
                        let req = deserialize.clone()(req)?;
                        let response = await!(serve.clone()(cx, req))?;
                        let response = serialize.clone()(response)?;
                        Ok(ServiceResponse { response })
                    }
                },
                rest: self.registrations,
            };
            Registry { registrations }
        }
    }

    /// Creates a client that sends requests to a service
    /// named `service_name`, over the given channel, using
    /// the specified serialization scheme.
    pub fn new_client<Req, Resp, Ser, De>(
        service_name: String,
        channel: &client::Channel<ServiceRequest, ServiceResponse>,
        mut serialize: Ser,
        mut deserialize: De,
    ) -> client::MapResponse<
        client::WithRequest<
            client::Channel<ServiceRequest, ServiceResponse>,
            impl FnMut(Req) -> ServiceRequest,
        >,
        impl FnMut(ServiceResponse) -> Resp,
    >
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
        Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
    {
        channel
            .clone()
            .with_request(move |req| {
                ServiceRequest {
                    service_name: service_name.clone(),
                    // TODO: shouldn't need to unwrap here. Maybe with_request should allow for
                    // returning Result.
                    request: serialize(req).unwrap(),
                }
            })
            // TODO: same thing. Maybe this should be more like and_then rather than map.
            .map_response(move |resp| deserialize(resp.response).unwrap())
    }

    /// Serves a request.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait Serve: Clone + Send + 'static {
        type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
        fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
    }

    /// Serves a request if the request is for a registered service.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait MaybeServe: Send + 'static {
        type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
    }

    /// A registry starting with service S, followed by Rest.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Registration<S, Rest> {
        /// The registered service's name. Must be unique across all registered services.
        name: String,
        /// The registered service.
        serve: S,
        /// Any remaining registered services.
        rest: Rest,
    }

    /// An empty registry.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Nil;

    impl MaybeServe for Nil {
        type Future = futures::future::Ready<io::Result<ServiceResponse>>;

        fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
            None
        }
    }

    impl<S, Rest> MaybeServe for Registration<S, Rest>
    where
        S: Serve,
        Rest: MaybeServe,
    {
        type Future = Either<S::Response, Rest::Future>;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
            if self.name == request.service_name {
                Some(Either::Left(
                    self.serve.clone().serve(cx, request.request.clone()),
                ))
            } else {
                self.rest.serve(cx, request).map(Either::Right)
            }
        }
    }

    /// Wraps either of two future types that both resolve to the same output type.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless polled"]
    pub enum Either<Left, Right> {
        Left(Left),
        Right(Right),
    }

    impl<Output, Left, Right> Future for Either<Left, Right>
    where
        Left: Future<Output = Output>,
        Right: Future<Output = Output>,
    {
        type Output = Output;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
            unsafe {
                match Pin::get_unchecked_mut(self) {
                    Either::Left(car) => Pin::new_unchecked(car).poll(cx),
                    Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
                }
            }
        }
    }

    impl<Resp, F> Serve for F
    where
        F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
        Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
    {
        type Response = Resp;

        fn serve(self, cx: context::Context, request: Bytes) -> Resp {
            self(cx, request)
        }
    }
}

// Example
use bytes::Bytes;
use futures::{
    compat::Executor01CompatExt,
    future::{ready, Ready},
    prelude::*,
};
//use serde::{Deserialize, Serialize};
use std::{
    collections::HashMap,
    io,
    sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};

fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
    Req: for<'a> Deserialize<'a> + Send,
{
    bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}

fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
    Resp: Serialize,
{
    Ok(bincode::serialize(&resp)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
        .into())
}



//these are the rpc service declarations
mod write_service {
    tarpc::service! {
        rpc write(key: String, value: String);
    }
}

mod read_service {
    tarpc::service! {
        rpc read(key: String) -> Option<String>;
    }
}


//sfWriteChunks : Vec<ByteStr::new([u8; 1024])>,
//let array = Array { data: [0u8; 1024] };
//sfWriteChunks : Vec<[u8; 1024]>,

    //I will be replacing Vec<[u8; 1024]> and Vec<Array<[u8; 1024]>>
    //with byte_string::ByteStr since it has a Debug implementation I need
    //for the serializaton/deserialization to work properly




    




#[derive(Debug,Default,Clone,Serialize,Deserialize)]
struct ArgsWriteFile {
    sfFileName : String,

    #[serde(with = "serde_bytes")]
    sfWriteChunks : Vec<ByteBuf>,    

    sfNumberOfChunks : u64,
    sfChunkSize : usize,
    sfLastChunkSize : usize,

    sfResultSumSHA256 : String,
}
    
mod mfw_sendfile_service {

    tarpc::service! {
        rpc mfwsendfile( allSendFileArgs : crate::ArgsWriteFile ) -> String;
    }
}





#[derive(Default, Clone)]
struct Server {
    data: Arc<RwLock<HashMap<String, String>>>,
}

impl write_service::Service for Server {
    type WriteFut = Ready<()>;

    fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
        self.data.write().unwrap().insert(key, value);
        ready(())
    }
}

impl read_service::Service for Server {
    type ReadFut = Ready<Option<String>>;

    fn read(self, _: context::Context, key: String) -> Self::ReadFut {
        ready(self.data.read().unwrap().get(&key).cloned())
    }
}

impl mfw_sendfile_service::Service for Server {
    type MfwsendfileFut = Ready<String>;

    fn mfwsendfile(self, _: context::Context, allSendFileArgs : crate::ArgsWriteFile) -> Self::MfwsendfileFut {
        ready(format!("mfwsendfile fullpathname:<<{}>> sent", allSendFileArgs.sfFileName.clone().to_string()))
    }
}

trait DefaultSpawn {
    fn spawn(self);
}

impl<F> DefaultSpawn for F
where
    F: Future<Output = ()> + Send + 'static,
{
    fn spawn(self) {
        tokio_executor::spawn(self.unit_error().boxed().compat())
    }
}

struct BincodeRegistry<Services> {
    registry: registry::Registry<Services>,
}

impl Default for BincodeRegistry<registry::Nil> {
    fn default() -> Self {
        BincodeRegistry {
            registry: registry::Registry::default(),
        }
    }
}

impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
    fn serve(
        self,
    ) -> impl FnOnce(
        context::Context,
        registry::ServiceRequest,
    ) -> registry::Either<
        Services::Future,
        Ready<io::Result<registry::ServiceResponse>>,
    > + Clone {
        self.registry.serve()
    }

    fn register<S, Req, Resp, RespFut>(
        self,
        name: String,
        serve: S,
    ) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
    where
        Req: for<'a> Deserialize<'a> + Send + 'static,
        Resp: Serialize + 'static,
        S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
        RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
    {
        let registry = self.registry.register(name, serve, deserialize, serialize);
        BincodeRegistry { registry }
    }
}

pub fn new_client<Req, Resp>(
    service_name: String,
    channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
    client::WithRequest<
        client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
        impl FnMut(Req) -> registry::ServiceRequest,
    >,
    impl FnMut(registry::ServiceResponse) -> Resp,
>
where
    Req: Serialize + Send + 'static,
    Resp: for<'a> Deserialize<'a> + Send + 'static,
{
    registry::new_client(service_name, channel, serialize, deserialize)
}


use std::fs::File;
use sha2::{Sha256, Sha512, Digest};

async fn run() -> io::Result<()> {
    let server = Server::default();
    let registry = BincodeRegistry::default()
        .register(
            "WriteService".to_string(),
            write_service::serve(server.clone()),
        )
        .register(
            "ReadService".to_string(),
            read_service::serve(server.clone()),
        )
        .register(
            "MFWSendFileService".to_string(),
            mfw_sendfile_service::serve(server.clone()),
        );

    let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
    let server_addr = listener.local_addr();
    let server = tarpc::Server::default()
        .incoming(listener)
        .take(1)
        .respond_with(registry.serve());
    tokio_executor::spawn(server.unit_error().boxed().compat());

    let transport = await!(bincode_transport::connect(&server_addr))?;
    let channel = await!(client::new(client::Config::default(), transport))?;

    //let write_client = new_client("WriteService".to_string(), &channel);
    //let mut write_client = write_service::Client::from(write_client);

    //let read_client = new_client("ReadService".to_string(), &channel);
    //let mut read_client = read_service::Client::from(read_client);

    //await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
    //let val = await!(read_client.read(context::current(), "key".to_string()))?;
    //println!("Just returned from read_service: <<{:?}>>", val);

    let mfwsendfile_client = new_client("MFWSendFileService".to_string(), &channel);
    let mut mfwsendfile_client = mfw_sendfile_service::Client::from(mfwsendfile_client);

    let mut thefilepathname : String;
    thefilepathname = "/home/davidm/.bashrc".to_string();

    //let mut resChunks : Vec<[u8; 1024]>;
    //let mut resChunks : Vec<Array<[u8; 1024]>>;
    //let mut resChunks : Vec<ByteStr::new([u8; 1024])>;
    let mut resChunks : Vec<ByteBuf>;    
    let mut resNumberOfChunks : u64;
    let mut resLastChunkSize : usize;

    resChunks = Vec::new();
    resNumberOfChunks = 0;
    resLastChunkSize = 0;
    
    let mut theFileNameToSend : String = "/home/davidm/.bashrc".to_string();
    let resultSplitFile = SplitFileIntoChunks(&theFileNameToSend);
    match resultSplitFile {
        Err(why) => {
        },
        Ok( (theChunks, theNumberOfChunks, theLastChunkSize) ) => {
            resChunks = theChunks;
            resNumberOfChunks = theNumberOfChunks;
            resLastChunkSize = theLastChunkSize;
        },
    }

    let mut file = File::open(&theFileNameToSend)?;
    let mut mySha256 : sha2::Sha256 = Sha256::new();
    io::copy(&mut file, &mut mySha256)?;
    let hash = mySha256.clone().result();
    //println!("hash is: {:x}", hash);
    let mut myHash256String : String = format!("{:x}", hash);
    println!("myHash256String:<<{}>>", myHash256String);
    //let mut myHash256ByteBuf : ByteBuf = ByteBuf::new();
    //myHash256ByteBuf = ByteBuf::from(mySha256.clone().result().as_slice());
    //sfResultSumSHA256 : mySha256.clone(),
    let mut myArgsWriteFile : ArgsWriteFile = ArgsWriteFile{
        sfFileName : theFileNameToSend.clone().to_string(),
        sfWriteChunks : resChunks,
        sfNumberOfChunks : resNumberOfChunks,
        sfChunkSize : 1024,
        sfLastChunkSize : resLastChunkSize,
        sfResultSumSHA256 : myHash256String.clone(),
    };

    println!("myArgsWriteFile.sfResultSumSHA256 hash is:<<{}>>", myArgsWriteFile.sfResultSumSHA256.clone().to_string());

    let response_mfwsendfile = await!(
        mfwsendfile_client.mfwsendfile(context::current(), myArgsWriteFile )
    )?;
    println!("just returned from sendfile: <<{:?}>>", response_mfwsendfile);                       

    Ok(())
}

pub fn SplitFileIntoChunks(theFileNameToSend : &str) -> io::Result<(Vec<ByteBuf>, u64, usize)> {
    use std::io::Cursor;
    use std::io::IoSliceMut;
    use std::io::Read;

    use std::io;
    use std::io::prelude::*;
    use std::fs::File;
    use std::fs;
    //use std::str;
    let metadata = fs::metadata(theFileNameToSend.clone())?;
    let theActualFileSize = metadata.len();
    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
    println!("theActualFileSize:<<{}>>",theActualFileSize);
    let mut bufferSize : usize = 1024;
    let mut buffer = [0; 1024 ];
    //let mut NumberOfChunks : u64 = (theActualFileSize/bufferSize as u64 +1) ;
    //println!("NumberOfChunks:<<{}>>",NumberOfChunks);
    let mut WriteChunks : Vec<ByteBuf>;
    WriteChunks = Vec::new();
    //const NumberOfChunks2 : usize = NumberOfChunks;                                         
    //let mut WriteChunks : [[u8; 1024 ]; NumberOfChunks2];
    
    let mut f = File::open(theFileNameToSend)?;
    let mut resReadVectored : io::Result<usize> = Ok(999999);
    let mut myActualBytesRead : usize = 0;
    let mut myLastChunkSize : usize = 0;
    loop {
        buffer = [0; 1024 ];
        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
        println!("resReadVectored:<<{:?}>>", resReadVectored);
        match resReadVectored {
            Err(why) => {
                break;
            },
            Ok(bytesRead) => {
                myActualBytesRead = bytesRead;
                myLastChunkSize = myActualBytesRead;
            },
        }

        //WriteChunks.push(buffer.clone());
        //WriteChunks.push(ByteBuf::new(buffer.clone() ) );
        //std::convert::Into(<std::vec::Vec<u8>>        
        let mut bufferIter = buffer.clone().iter_mut();    
        WriteChunks.push(ByteBuf::from(bufferIter.into_slice()));

        if myActualBytesRead < bufferSize {
            myLastChunkSize = myActualBytesRead;
            break;
        }
    }

    println!("buffer length:<<{}>>", buffer.len());
    println!("myActualBytesRead:<<{}>>",myActualBytesRead);

    println!("<<");    
    for chunkCounter in 0..WriteChunks.len() {
        
        if chunkCounter == WriteChunks.len()-1 {
            println!("last chunk chunkCounter:{}", chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter][0..myLastChunkSize]
            );            
        } else {
        println!("chunkCounter:{}",chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter]
            );            
        }
    }
    println!(">>");
    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
}


fn main() {
    tarpc::init(tokio::executor::DefaultExecutor::current().compat());
    tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}

@omac777
Copy link
Author

omac777 commented May 2, 2019

This compiles, but it's noisy code with lots of commented code for discussing what didn't work but should IMHO. I'm looking forward to hearing from you as to what you think should be the adequate layout to get this done as a working complex example that can build and be easily understood by everyone.

// to build
// cargo build --example rs-mfw-tarpc-serviceregistry --release

// to run
// cargo run --example rs-mfw-tarpc-serviceregistry --release


#![feature(
    async_await,
    await_macro,
    futures_api,
    arbitrary_self_types,
    proc_macro_hygiene,
    impl_trait_in_bindings
)]

extern crate hexdump;
extern crate byte_string;
extern crate serde;
extern crate serde_bytes;
extern crate generic_array;

#[macro_use]
extern crate bytevec;

use bytevec::{ByteEncodable, ByteDecodable};



use serde_bytes::ByteBuf;
use serde::{Deserialize, Serialize};


mod registry {
    use bytes::Bytes;
    use futures::{
        future::{ready, Ready},
        prelude::*,
    };
    use serde::{Deserialize, Serialize};
    use std::{
        io,
        pin::Pin,
        sync::Arc,
        task::{Context, Poll},
    };
    use tarpc::{
        client::{self, Client},
        context,
    };

    /// A request to a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceRequest {
        service_name: String,
        request: Bytes,
    }

    /// A response from a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceResponse {
        response: Bytes,
    }

    /// A list of registered services.
    pub struct Registry<Services> {
        registrations: Services,
    }

    impl Default for Registry<Nil> {
        fn default() -> Self {
            Registry { registrations: Nil }
        }
    }

    impl<Services: MaybeServe + Sync> Registry<Services> {
        /// Returns a function that serves requests for the registered services.
        pub fn serve(
            self,
        ) -> impl FnOnce(
            context::Context,
            ServiceRequest,
        ) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
                         + Clone {
            let registrations = Arc::new(self.registrations);
            move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
                Some(serve) => Either::Left(serve),
                None => Either::Right(ready(Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("Service '{}' not registered", req.service_name),
                )))),
            }
        }

        /// Registers `serve` with the given `name` using the given serialization scheme.
        pub fn register<S, Req, Resp, RespFut, Ser, De>(
            self,
            name: String,
            serve: S,
            deserialize: De,
            serialize: Ser,
        ) -> Registry<Registration<impl Serve + Send + 'static, Services>>
        where
            Req: Send,
            S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
            RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
            De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
            Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
        {
            let registrations = Registration {
                name: name,
                serve: move |cx, req: Bytes| {
                    async move {
                        let req = deserialize.clone()(req)?;
                        let response = await!(serve.clone()(cx, req))?;
                        let response = serialize.clone()(response)?;
                        Ok(ServiceResponse { response })
                    }
                },
                rest: self.registrations,
            };
            Registry { registrations }
        }
    }

    /// Creates a client that sends requests to a service
    /// named `service_name`, over the given channel, using
    /// the specified serialization scheme.
    pub fn new_client<Req, Resp, Ser, De>(
        service_name: String,
        channel: &client::Channel<ServiceRequest, ServiceResponse>,
        mut serialize: Ser,
        mut deserialize: De,
    ) -> client::MapResponse<
        client::WithRequest<
            client::Channel<ServiceRequest, ServiceResponse>,
            impl FnMut(Req) -> ServiceRequest,
        >,
        impl FnMut(ServiceResponse) -> Resp,
    >
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
        Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
    {
        channel
            .clone()
            .with_request(move |req| {
                ServiceRequest {
                    service_name: service_name.clone(),
                    // TODO: shouldn't need to unwrap here. Maybe with_request should allow for
                    // returning Result.
                    request: serialize(req).unwrap(),
                }
            })
            // TODO: same thing. Maybe this should be more like and_then rather than map.
            .map_response(move |resp| deserialize(resp.response).unwrap())
    }

    /// Serves a request.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait Serve: Clone + Send + 'static {
        type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
        fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
    }

    /// Serves a request if the request is for a registered service.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait MaybeServe: Send + 'static {
        type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
    }

    /// A registry starting with service S, followed by Rest.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Registration<S, Rest> {
        /// The registered service's name. Must be unique across all registered services.
        name: String,
        /// The registered service.
        serve: S,
        /// Any remaining registered services.
        rest: Rest,
    }

    /// An empty registry.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Nil;

    impl MaybeServe for Nil {
        type Future = futures::future::Ready<io::Result<ServiceResponse>>;

        fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
            None
        }
    }

    impl<S, Rest> MaybeServe for Registration<S, Rest>
    where
        S: Serve,
        Rest: MaybeServe,
    {
        type Future = Either<S::Response, Rest::Future>;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
            if self.name == request.service_name {
                Some(Either::Left(
                    self.serve.clone().serve(cx, request.request.clone()),
                ))
            } else {
                self.rest.serve(cx, request).map(Either::Right)
            }
        }
    }

    /// Wraps either of two future types that both resolve to the same output type.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless polled"]
    pub enum Either<Left, Right> {
        Left(Left),
        Right(Right),
    }

    impl<Output, Left, Right> Future for Either<Left, Right>
    where
        Left: Future<Output = Output>,
        Right: Future<Output = Output>,
    {
        type Output = Output;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
            unsafe {
                match Pin::get_unchecked_mut(self) {
                    Either::Left(car) => Pin::new_unchecked(car).poll(cx),
                    Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
                }
            }
        }
    }

    impl<Resp, F> Serve for F
    where
        F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
        Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
    {
        type Response = Resp;

        fn serve(self, cx: context::Context, request: Bytes) -> Resp {
            self(cx, request)
        }
    }
}

// Example
use bytes::Bytes;
use futures::{
    compat::Executor01CompatExt,
    future::{ready, Ready},
    prelude::*,
};
//use serde::{Deserialize, Serialize};
use std::{
    collections::HashMap,
    io,
    sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};

fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
    Req: for<'a> Deserialize<'a> + Send,
{
    bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}

fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
    Resp: Serialize,
{
    Ok(bincode::serialize(&resp)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
        .into())
}



//these are the rpc service declarations
mod write_service {
    tarpc::service! {
        rpc write(key: String, value: String);
    }
}

mod read_service {
    tarpc::service! {
        rpc read(key: String) -> Option<String>;
    }
}


//sfWriteChunks : Vec<ByteStr::new([u8; 1024])>,
//let array = Array { data: [0u8; 1024] };
//sfWriteChunks : Vec<[u8; 1024]>,

//I will be replacing Vec<[u8; 1024]> and Vec<Array<[u8; 1024]>>
//with byte_string::ByteStr since it has a Debug implementation I need
//for the serializaton/deserialization to work properly

// oneChunkOfBytes : [u8; 1024]


bytevec_decl! {
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct Employee {
     id: u32,
    oneChunkOfBytes: String
    //oneChunkOfBytes : &mut [u8]
    }
}

    //     profile: Profile,
    // #[derive(PartialEq, Eq, Debug)]
    // struct Profile {
    //   id: u32,
    //   name: String,
    //   last_name: String
    //   }
    //}
    


    




#[derive(Debug,Default,Clone,Serialize,Deserialize)]
pub struct ArgsWriteFile {
    sfFileName : String,
    
    #[serde(with = "serde_bytes")]
    sfWriteChunks : std::vec::Vec<u8>,

    sfNumberOfChunks : u64,
    sfChunkSize : usize,
    sfLastChunkSize : usize,

    sfResultSumSHA256 : String,
}
    
mod mfw_sendfile_service {

    tarpc::service! {
        rpc mfwsendfile( allSendFileArgs : crate::ArgsWriteFile ) -> String;
    }
}





#[derive(Default, Clone)]
struct Server {
    data: Arc<RwLock<HashMap<String, String>>>,
}

impl write_service::Service for Server {
    type WriteFut = Ready<()>;

    fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
        self.data.write().unwrap().insert(key, value);
        ready(())
    }
}

impl read_service::Service for Server {
    type ReadFut = Ready<Option<String>>;

    fn read(self, _: context::Context, key: String) -> Self::ReadFut {
        ready(self.data.read().unwrap().get(&key).cloned())
    }
}

impl mfw_sendfile_service::Service for Server {
    type MfwsendfileFut = Ready<String>;

    fn mfwsendfile(self, _: context::Context, allSendFileArgs : crate::ArgsWriteFile) -> Self::MfwsendfileFut {
        ready(format!("mfwsendfile fullpathname:<<{}>> sent", allSendFileArgs.sfFileName.clone().to_string()))
    }
}

trait DefaultSpawn {
    fn spawn(self);
}

impl<F> DefaultSpawn for F
where
    F: Future<Output = ()> + Send + 'static,
{
    fn spawn(self) {
        tokio_executor::spawn(self.unit_error().boxed().compat())
    }
}

struct BincodeRegistry<Services> {
    registry: registry::Registry<Services>,
}

impl Default for BincodeRegistry<registry::Nil> {
    fn default() -> Self {
        BincodeRegistry {
            registry: registry::Registry::default(),
        }
    }
}

impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
    fn serve(
        self,
    ) -> impl FnOnce(
        context::Context,
        registry::ServiceRequest,
    ) -> registry::Either<
        Services::Future,
        Ready<io::Result<registry::ServiceResponse>>,
    > + Clone {
        self.registry.serve()
    }

    fn register<S, Req, Resp, RespFut>(
        self,
        name: String,
        serve: S,
    ) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
    where
        Req: for<'a> Deserialize<'a> + Send + 'static,
        Resp: Serialize + 'static,
        S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
        RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
    {
        let registry = self.registry.register(name, serve, deserialize, serialize);
        BincodeRegistry { registry }
    }
}

pub fn new_client<Req, Resp>(
    service_name: String,
    channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
    client::WithRequest<
        client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
        impl FnMut(Req) -> registry::ServiceRequest,
    >,
    impl FnMut(registry::ServiceResponse) -> Resp,
>
where
    Req: Serialize + Send + 'static,
    Resp: for<'a> Deserialize<'a> + Send + 'static,
{
    registry::new_client(service_name, channel, serialize, deserialize)
}


use std::fs::File;
use sha2::{Sha256, Sha512, Digest};

async fn run() -> io::Result<()> {
    let server = Server::default();
    let registry = BincodeRegistry::default()
        .register(
            "WriteService".to_string(),
            write_service::serve(server.clone()),
        )
        .register(
            "ReadService".to_string(),
            read_service::serve(server.clone()),
        )
        .register(
            "MFWSendFileService".to_string(),
            mfw_sendfile_service::serve(server.clone()),
        );

    let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
    let server_addr = listener.local_addr();
    let server = tarpc::Server::default()
        .incoming(listener)
        .take(1)
        .respond_with(registry.serve());
    tokio_executor::spawn(server.unit_error().boxed().compat());

    let transport = await!(bincode_transport::connect(&server_addr))?;
    let channel = await!(client::new(client::Config::default(), transport))?;

    //let write_client = new_client("WriteService".to_string(), &channel);
    //let mut write_client = write_service::Client::from(write_client);

    //let read_client = new_client("ReadService".to_string(), &channel);
    //let mut read_client = read_service::Client::from(read_client);

    //await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
    //let val = await!(read_client.read(context::current(), "key".to_string()))?;
    //println!("Just returned from read_service: <<{:?}>>", val);

    let mfwsendfile_client = new_client("MFWSendFileService".to_string(), &channel);
    let mut mfwsendfile_client = mfw_sendfile_service::Client::from(mfwsendfile_client);

    let mut thefilepathname : String;
    thefilepathname = "/home/davidm/.bashrc".to_string();

    let mut resChunks : Vec<[u8; 1024]>;
    //let mut resChunks : Vec<Array<[u8; 1024]>>;
    //let mut resChunks : Vec<ByteStr::new([u8; 1024])>;
    //let mut resChunks : Vec<ByteBuf>;
    //let mut resChunks : std::vec::Vec<Employee>;
    let mut resNumberOfChunks : u64;
    let mut resLastChunkSize : usize;

    resChunks = Vec::new();
    resNumberOfChunks = 0;
    resLastChunkSize = 0;
    
    let mut theFileNameToSend : String = "/home/davidm/.bashrc".to_string();
    let resultSplitFile = SplitFileIntoChunks(&theFileNameToSend);
    match resultSplitFile {
        Err(why) => {
        },
        Ok( (theChunks, theNumberOfChunks, theLastChunkSize) ) => {
            resChunks = theChunks;
            resNumberOfChunks = theNumberOfChunks;
            resLastChunkSize = theLastChunkSize;
        },
    }

    let mut file = File::open(&theFileNameToSend)?;
    let mut mySha256 : sha2::Sha256 = Sha256::new();
    io::copy(&mut file, &mut mySha256)?;
    let hash = mySha256.clone().result();
    //println!("hash is: {:x}", hash);
    let mut myHash256String : String = format!("{:x}", hash);
    println!("myHash256String:<<{}>>", myHash256String);
    //let mut myHash256ByteBuf : ByteBuf = ByteBuf::new();
    //myHash256ByteBuf = ByteBuf::from(mySha256.clone().result().as_slice());
    //sfResultSumSHA256 : mySha256.clone(),
    let mut buffer = [0u8; 1024 ];
    let mut bufferString : String = "".to_string();
    let mut resFromUtf8 : std::result::Result<std::string::String, std::string::FromUtf8Error>;

    resFromUtf8 = String::from_utf8(buffer.clone().to_vec());
    match resFromUtf8 {
        Ok(theString) => {
            bufferString = theString;
        },
        Err(e) => {
            println!("error converting to utf8string: {:?}", e)
        },
    }






    let employees_1 : std::vec::Vec<Employee> = vec![
        Employee {
            id: 1,
            // profile: Profile {
            //     id: 10000,
            //     name: "Michael".to_string(),
            //     last_name: "Jackson".to_string()
            // },
            //dept: "music".to_string(),
            oneChunkOfBytes : bufferString.clone()
        },
        Employee {
            id: 2,
            // profile: Profile {
            //     id: 10001,
            //     name: "John".to_string(),
            //     last_name: "Cena".to_string()
            // },
            //dept: "wrestling".to_string(),
            oneChunkOfBytes : bufferString.clone()
        }
    ];
    let bytes : std::vec::Vec<u8> = employees_1.encode::<u32>().unwrap();
    let employees_2 : std::vec::Vec<Employee> = Vec::<Employee>::decode::<u32>(&bytes).unwrap();

                                                                                                           
    
    let mut myArgsWriteFile : ArgsWriteFile = ArgsWriteFile{
        sfFileName : theFileNameToSend.clone().to_string(),
        sfWriteChunks : bytes.clone(),
        //sfWriteChunks : resChunks,
        sfNumberOfChunks : resNumberOfChunks,
        sfChunkSize : 1024,
        sfLastChunkSize : resLastChunkSize,
        sfResultSumSHA256 : myHash256String.clone(),
    };

    println!("myArgsWriteFile.sfResultSumSHA256 hash is:<<{}>>", myArgsWriteFile.sfResultSumSHA256.clone().to_string());

    let response_mfwsendfile = await!(
        mfwsendfile_client.mfwsendfile(context::current(), myArgsWriteFile )
    )?;
    println!("just returned from sendfile: <<{:?}>>", response_mfwsendfile);                       

    Ok(())
}

pub fn SplitFileIntoChunks(theFileNameToSend : &str) -> io::Result<(Vec<[u8; 1024]>, u64, usize)> {
    use std::io::Cursor;
    use std::io::IoSliceMut;
    use std::io::Read;

    use std::io;
    use std::io::prelude::*;
    use std::fs::File;
    use std::fs;
    //use std::str;
    let metadata = fs::metadata(theFileNameToSend.clone())?;
    let theActualFileSize = metadata.len();
    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
    println!("theActualFileSize:<<{}>>",theActualFileSize);
    let mut bufferSize : usize = 1024;
    let mut buffer = [0; 1024 ];
    //let mut NumberOfChunks : u64 = (theActualFileSize/bufferSize as u64 +1) ;
    //println!("NumberOfChunks:<<{}>>",NumberOfChunks);
    let mut WriteChunks : Vec<[u8; 1024]>;
    WriteChunks = Vec::new();
    //const NumberOfChunks2 : usize = NumberOfChunks;                                         
    //let mut WriteChunks : [[u8; 1024 ]; NumberOfChunks2];
    
    let mut f = File::open(theFileNameToSend)?;
    let mut resReadVectored : io::Result<usize> = Ok(999999);
    let mut myActualBytesRead : usize = 0;
    let mut myLastChunkSize : usize = 0;
    loop {
        buffer = [0; 1024 ];
        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
        println!("resReadVectored:<<{:?}>>", resReadVectored);
        match resReadVectored {
            Err(why) => {
                break;
            },
            Ok(bytesRead) => {
                myActualBytesRead = bytesRead;
                myLastChunkSize = myActualBytesRead;
            },
        }

        WriteChunks.push(buffer.clone());
        //WriteChunks.push(ByteBuf::new(buffer.clone() ) );
        //std::convert::Into(<std::vec::Vec<u8>>        

        //let mut bufferIter = buffer.clone().iter_mut();    
        //WriteChunks.push(ByteBuf::from(bufferIter.into_slice()));

        if myActualBytesRead < bufferSize {
            myLastChunkSize = myActualBytesRead;
            break;
        }
    }

    println!("buffer length:<<{}>>", buffer.len());
    println!("myActualBytesRead:<<{}>>",myActualBytesRead);

    println!("<<");    
    for chunkCounter in 0..WriteChunks.len() {
        
        if chunkCounter == WriteChunks.len()-1 {
            println!("last chunk chunkCounter:{}", chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter][0..myLastChunkSize]
            );            
        } else {
        println!("chunkCounter:{}",chunkCounter);
            //range bounded inclusively below and exclusively above
            hexdump::hexdump(
                &WriteChunks[chunkCounter]
            );            
        }
    }
    println!(">>");
    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
}





pub fn SplitFileIntoChunks2(theFileNameToSend : &str) -> io::Result<(std::vec::Vec<Employee>, u64, usize)> {
    use std::io::Cursor;
    use std::io::IoSliceMut;
    use std::io::Read;

    use std::io;
    use std::io::prelude::*;
    use std::fs::File;
    use std::fs;
    //use std::str;
    let metadata = fs::metadata(theFileNameToSend.clone())?;
    let theActualFileSize = metadata.len();
    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
    println!("theActualFileSize:<<{}>>",theActualFileSize);
    let mut bufferSize : usize = 1024;
    let mut buffer = [0; 1024 ];
    //let mut NumberOfChunks : u64 = (theActualFileSize/bufferSize as u64 +1) ;
    //println!("NumberOfChunks:<<{}>>",NumberOfChunks);
    let mut WriteChunks : std::vec::Vec<Employee>;
    WriteChunks = Vec::new();
    //const NumberOfChunks2 : usize = NumberOfChunks;                                         
    //let mut WriteChunks : [[u8; 1024 ]; NumberOfChunks2];
    
    let mut f = File::open(theFileNameToSend)?;
    let mut resReadVectored : io::Result<usize> = Ok(999999);
    let mut myActualBytesRead : usize = 0;
    let mut myLastChunkSize : usize = 0;
    loop {
        buffer = [0; 1024 ];
        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
        println!("resReadVectored:<<{:?}>>", resReadVectored);
        match resReadVectored {
            Err(why) => {
                break;
            },
            Ok(bytesRead) => {
                myActualBytesRead = bytesRead;
                myLastChunkSize = myActualBytesRead;
            },
        }


        let mut bufferString : String = "".to_string();
        let mut resFromUtf8 : std::result::Result<std::string::String, std::string::FromUtf8Error>;
        resFromUtf8 = String::from_utf8(buffer.clone().to_vec());
        match resFromUtf8 {
            Ok(theString) => {
                   bufferString = theString;
            },
            Err(e) => {
            println!("error converting to utf8string: {:?}", e)
            },
        }
        

        WriteChunks.push(
            Employee {
                id: 1,
                // profile: Profile {
                //     id: 10000,
                //     name: "Michael".to_string(),
                //     last_name: "Jackson".to_string()
                // },
                //dept: "music".to_string(),
                oneChunkOfBytes : bufferString.clone()
            }            
        );
        //WriteChunks.push(buffer.clone());
        //WriteChunks.push(ByteBuf::new(buffer.clone() ) );
        //std::convert::Into(<std::vec::Vec<u8>>        

        //let mut bufferIter = buffer.clone().iter_mut();    
        //WriteChunks.push(ByteBuf::from(bufferIter.into_slice()));

        if myActualBytesRead < bufferSize {
            myLastChunkSize = myActualBytesRead;
            break;
        }
    }

    println!("buffer length:<<{}>>", buffer.len());
    println!("myActualBytesRead:<<{}>>",myActualBytesRead);

    // println!("<<");    
    // for chunkCounter in 0..WriteChunks.len() {
        
    //     if chunkCounter == WriteChunks.len()-1 {
    //         println!("last chunk chunkCounter:{}", chunkCounter);
    //         //range bounded inclusively below and exclusively above
    //         hexdump::hexdump(
    //             &WriteChunks[chunkCounter][0..myLastChunkSize]
    //         );            
    //     } else {
    //     println!("chunkCounter:{}",chunkCounter);
    //         //range bounded inclusively below and exclusively above
    //         hexdump::hexdump(
    //             &WriteChunks[chunkCounter]
    //         );            
    //     }
    // }
    // println!(">>");
    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
}







                                                                   

fn main() {
    tarpc::init(tokio::executor::DefaultExecutor::current().compat());
    tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
}


@tikue
Copy link
Collaborator

tikue commented May 2, 2019

@omac777 thank you for the examples! Do you happen to have a git repo you're working in? It might be illuminating if I could look at diffs between what worked and what didn't.

@omac777
Copy link
Author

omac777 commented May 3, 2019

I'm sorry I don't have the above code in a repo, but the last comment that I posted above has the compiled stuff. tarpc seems to work with ok with serde on simple structures, but with Vec<Vec> as a member, it is insufficient IMHO. bytevec helps to trick the compiler everything is ok for serializing/deserializing.
There are 3 structures:
1)ArgsWriteFile...the one that gets truly passed via the service api call
2)The bytevec struct Employee repurposed to hold oneChunkOfBytes from a local file that we want to send.
3)The local file to we want to send is made up of many chunks which means we have many Employee instances each filled with oneChunkOfBytes. These are held in a Vec but serde doesn't like this so we bytevec encode that one into one array of bytes then wrap it up a String aka utf8 string which technically should be assumed as an invalid String; note: we can still get back the bytes from that invalid string via into_bytes() when we are decoding/deserializing back into a Vec to reconstruct the file on the server side.

It's not beautiful nor straightforward but it's the best I can do working around all the compiler errors about traits/serialize/deserialize. It's definitely a painpoint and wastes a lot of time. There needs to be a more straightforward, higher-level way with zero-cost. Your way was the closest and I can acknowledge that. Having to build idl/proto files in separate files is not as straightforward as the tarpc way for sure. tarpc is the closest golang-rpcx-like experience I could find for rust. I have to admit I really enjoy rpcx, but if I'm going to be a purist rust programmer, I need to find similar capabilities within rust before I can migrate all golang tools into rust.

@omac777
Copy link
Author

omac777 commented May 3, 2019

I want to comment on something about the above example's output
Please notice the last read vectored was a smaller size of bytes

resReadVectored:<<Ok(994)>>

because of course files aren't always multiples of buffer sizes.
My golang rpcx version handles the last chunk correctly by taking those 994 bytes and placing them into a new buffer exactly 994 bytes size and as a result it sends three 1024-byte chunks followed by a 994-byte chunk. When the destination receives all the chunks, it reconstructs the file without ever having to refer to an extra field holding the lastchunksize since the last chunk holds the correct amount to begin width.

So my question is this: Why is it that golang/rpcx is supple enough to handle this correctly and the rust tarpc version to define a similar capability is like pulling teeth? Please consider this as a constructive criticism since I really would like rust to exceed in terms of performance and ease of use. Currently rust/tarpc can only compete in terms of performance, but ease of use is still IMHO. Thank you for listening.

-*- mode: compilation; default-directory: "~/tarpc/" -*-
Compilation started at Fri May  3 15:18:21

cargo run --example rs-mfw-tarpc-serviceregistry --release
    Finished release [optimized] target(s) in 0.17s
     Running `target/release/examples/rs-mfw-tarpc-serviceregistry`
theFileNameToSend:<</home/davidm/.bashrc>>
theActualFileSize:<<4066>>
resReadVectored:<<Ok(1024)>>
resReadVectored:<<Ok(1024)>>
resReadVectored:<<Ok(1024)>>
resReadVectored:<<Ok(994)>>
buffer length:<<1024>>
myActualBytesRead:<<994>>
myHash256String:<<4665945294ad3c0025f63f65da22b78a2ea09eb6a4723af184dc6850799d7a00>>
myArgsWriteFile.sfResultSumSHA256 hash is:<<4665945294ad3c0025f63f65da22b78a2ea09eb6a4723af184dc6850799d7a00>>
just returned from sendfile: <<"mfwsendfile fullpathname:<</home/davidm/.bashrc>> sent">>

Compilation finished at Fri May  3 15:18:21

@tikue
Copy link
Collaborator

tikue commented May 6, 2019

I just looked a bit more closely at the first error you posted above.

error: parenthesized type parameters may only be used with a `Fn` trait
   --> tarpc/examples/rs-mfw-tarpc-serviceregistry.rs:319:37
    |
319 |     sfWriteChunks : Vec<ByteStr::new([u8; 1024])>,
    |                                     ^^^^^^^^^^^^

I want to point out that this doesn't appear to be valid rust syntax. It looks like you're mixing types (Vec<...> and [u8; 1024]) and expressions (the function ByteStr::new(...)).

It looks like you're using this in type position, so I suggest you try replacing this with Vec<ByteStr>. However, I'll also note that you'll likely hit issues deserializing a ByteStr with serde, because it doesn't appear that this crate implements serde::Serialize or serde::Deserialize. You can file an issue against the byte_string repository to ask for serde support.

@omac777
Copy link
Author

omac777 commented May 6, 2019

Thanks for the reply. I have no intention on asking serde for support. If they don't have it already, it's because they don't plan to support it.

I did find a way to circumvent the issue as I stated above with bytevec.

I took out all the dead code and comments and here is something that compiles and runs. It's not the polished final version, but the core concepts are correct.

// to build
// cargo build --example rs-mfw-tarpc-serviceregistry --release

// to run
// cargo run --example rs-mfw-tarpc-serviceregistry --release


#![feature(
    async_await,
    await_macro,
    futures_api,
    arbitrary_self_types,
    proc_macro_hygiene,
    impl_trait_in_bindings
)]

extern crate hexdump;
extern crate byte_string;
extern crate serde;
extern crate serde_bytes;
extern crate generic_array;

#[macro_use]
extern crate bytevec;

use bytevec::{ByteEncodable, ByteDecodable};



use serde_bytes::ByteBuf;
use serde::{Deserialize, Serialize};


mod registry {
    use bytes::Bytes;
    use futures::{
        future::{ready, Ready},
        prelude::*,
    };
    use serde::{Deserialize, Serialize};
    use std::{
        io,
        pin::Pin,
        sync::Arc,
        task::{Context, Poll},
    };
    use tarpc::{
        client::{self, Client},
        context,
    };

    /// A request to a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceRequest {
        service_name: String,
        request: Bytes,
    }

    /// A response from a named service.
    #[derive(Serialize, Deserialize)]
    pub struct ServiceResponse {
        response: Bytes,
    }

    /// A list of registered services.
    pub struct Registry<Services> {
        registrations: Services,
    }

    impl Default for Registry<Nil> {
        fn default() -> Self {
            Registry { registrations: Nil }
        }
    }

    impl<Services: MaybeServe + Sync> Registry<Services> {
        /// Returns a function that serves requests for the registered services.
        pub fn serve(
            self,
        ) -> impl FnOnce(
            context::Context,
            ServiceRequest,
        ) -> Either<Services::Future, Ready<io::Result<ServiceResponse>>>
                         + Clone {
            let registrations = Arc::new(self.registrations);
            move |cx, req: ServiceRequest| match registrations.serve(cx, &req) {
                Some(serve) => Either::Left(serve),
                None => Either::Right(ready(Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("Service '{}' not registered", req.service_name),
                )))),
            }
        }

        /// Registers `serve` with the given `name` using the given serialization scheme.
        pub fn register<S, Req, Resp, RespFut, Ser, De>(
            self,
            name: String,
            serve: S,
            deserialize: De,
            serialize: Ser,
        ) -> Registry<Registration<impl Serve + Send + 'static, Services>>
        where
            Req: Send,
            S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
            RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
            De: FnOnce(Bytes) -> io::Result<Req> + Send + 'static + Clone,
            Ser: FnOnce(Resp) -> io::Result<Bytes> + Send + 'static + Clone,
        {
            let registrations = Registration {
                name: name,
                serve: move |cx, req: Bytes| {
                    async move {
                        let req = deserialize.clone()(req)?;
                        let response = await!(serve.clone()(cx, req))?;
                        let response = serialize.clone()(response)?;
                        Ok(ServiceResponse { response })
                    }
                },
                rest: self.registrations,
            };
            Registry { registrations }
        }
    }

    /// Creates a client that sends requests to a service
    /// named `service_name`, over the given channel, using
    /// the specified serialization scheme.
    pub fn new_client<Req, Resp, Ser, De>(
        service_name: String,
        channel: &client::Channel<ServiceRequest, ServiceResponse>,
        mut serialize: Ser,
        mut deserialize: De,
    ) -> client::MapResponse<
        client::WithRequest<
            client::Channel<ServiceRequest, ServiceResponse>,
            impl FnMut(Req) -> ServiceRequest,
        >,
        impl FnMut(ServiceResponse) -> Resp,
    >
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        De: FnMut(Bytes) -> io::Result<Resp> + Clone + Send + 'static,
        Ser: FnMut(Req) -> io::Result<Bytes> + Clone + Send + 'static,
    {
        channel
            .clone()
            .with_request(move |req| {
                ServiceRequest {
                    service_name: service_name.clone(),
                    // TODO: shouldn't need to unwrap here. Maybe with_request should allow for
                    // returning Result.
                    request: serialize(req).unwrap(),
                }
            })
            // TODO: same thing. Maybe this should be more like and_then rather than map.
            .map_response(move |resp| deserialize(resp.response).unwrap())
    }

    /// Serves a request.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait Serve: Clone + Send + 'static {
        type Response: Future<Output = io::Result<ServiceResponse>> + Send + 'static;
        fn serve(self, cx: context::Context, request: Bytes) -> Self::Response;
    }

    /// Serves a request if the request is for a registered service.
    ///
    /// This trait is mostly an implementation detail that isn't used outside of the registry
    /// internals.
    pub trait MaybeServe: Send + 'static {
        type Future: Future<Output = io::Result<ServiceResponse>> + Send + 'static;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future>;
    }

    /// A registry starting with service S, followed by Rest.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Registration<S, Rest> {
        /// The registered service's name. Must be unique across all registered services.
        name: String,
        /// The registered service.
        serve: S,
        /// Any remaining registered services.
        rest: Rest,
    }

    /// An empty registry.
    ///
    /// This type is mostly an implementation detail that is not used directly
    /// outside of the registry internals.
    pub struct Nil;

    impl MaybeServe for Nil {
        type Future = futures::future::Ready<io::Result<ServiceResponse>>;

        fn serve(&self, _: context::Context, _: &ServiceRequest) -> Option<Self::Future> {
            None
        }
    }

    impl<S, Rest> MaybeServe for Registration<S, Rest>
    where
        S: Serve,
        Rest: MaybeServe,
    {
        type Future = Either<S::Response, Rest::Future>;

        fn serve(&self, cx: context::Context, request: &ServiceRequest) -> Option<Self::Future> {
            if self.name == request.service_name {
                Some(Either::Left(
                    self.serve.clone().serve(cx, request.request.clone()),
                ))
            } else {
                self.rest.serve(cx, request).map(Either::Right)
            }
        }
    }

    /// Wraps either of two future types that both resolve to the same output type.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless polled"]
    pub enum Either<Left, Right> {
        Left(Left),
        Right(Right),
    }

    impl<Output, Left, Right> Future for Either<Left, Right>
    where
        Left: Future<Output = Output>,
        Right: Future<Output = Output>,
    {
        type Output = Output;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Output> {
            unsafe {
                match Pin::get_unchecked_mut(self) {
                    Either::Left(car) => Pin::new_unchecked(car).poll(cx),
                    Either::Right(cdr) => Pin::new_unchecked(cdr).poll(cx),
                }
            }
        }
    }

    impl<Resp, F> Serve for F
    where
        F: FnOnce(context::Context, Bytes) -> Resp + Clone + Send + 'static,
        Resp: Future<Output = io::Result<ServiceResponse>> + Send + 'static,
    {
        type Response = Resp;

        fn serve(self, cx: context::Context, request: Bytes) -> Resp {
            self(cx, request)
        }
    }
}

// Example
use bytes::Bytes;
use futures::{
    compat::Executor01CompatExt,
    future::{ready, Ready},
    prelude::*,
};
//use serde::{Deserialize, Serialize};
use std::{
    collections::HashMap,
    io,
    sync::{Arc, RwLock},
};
use tarpc::{client, context, server::Handler};

fn deserialize<Req>(req: Bytes) -> io::Result<Req>
where
    Req: for<'a> Deserialize<'a> + Send,
{
    bincode::deserialize(req.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}

fn serialize<Resp>(resp: Resp) -> io::Result<Bytes>
where
    Resp: Serialize,
{
    Ok(bincode::serialize(&resp)
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
        .into())
}



//these are the rpc service declarations
mod write_service {
    tarpc::service! {
        rpc write(key: String, value: String);
    }
}

mod read_service {
    tarpc::service! {
        rpc read(key: String) -> Option<String>;
    }
}

bytevec_decl! {
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct FileChunk {
    oneChunkOfBytes: String
    }
}

#[derive(Debug,Default,Clone,Serialize,Deserialize)]
pub struct ArgsWriteFile {
    sfFileName : String,
    
    #[serde(with = "serde_bytes")]
    sfWriteChunks : std::vec::Vec<u8>,

    sfNumberOfChunks : u64,
    sfChunkSize : usize,
    sfLastChunkSize : usize,

    sfResultSumSHA256 : String,
}
    
mod mfw_sendfile_service {

    tarpc::service! {
        rpc mfwsendfile( allSendFileArgs : crate::ArgsWriteFile ) -> String;
    }
}




#[derive(Default, Clone)]
struct Server {
    data: Arc<RwLock<HashMap<String, String>>>,
}

impl write_service::Service for Server {
    type WriteFut = Ready<()>;

    fn write(self, _: context::Context, key: String, value: String) -> Self::WriteFut {
        self.data.write().unwrap().insert(key, value);
        ready(())
    }
}

impl read_service::Service for Server {
    type ReadFut = Ready<Option<String>>;

    fn read(self, _: context::Context, key: String) -> Self::ReadFut {
        ready(self.data.read().unwrap().get(&key).cloned())
    }
}

impl mfw_sendfile_service::Service for Server {
    type MfwsendfileFut = Ready<String>;

    fn mfwsendfile(self, _: context::Context, allSendFileArgs : crate::ArgsWriteFile) -> Self::MfwsendfileFut {
        ready(format!("mfwsendfile fullpathname:<<{}>> sent", allSendFileArgs.sfFileName.clone().to_string()))
    }
}

trait DefaultSpawn {
    fn spawn(self);
}

impl<F> DefaultSpawn for F
where
    F: Future<Output = ()> + Send + 'static,
{
    fn spawn(self) {
        tokio_executor::spawn(self.unit_error().boxed().compat())
    }
}

struct BincodeRegistry<Services> {
    registry: registry::Registry<Services>,
}

impl Default for BincodeRegistry<registry::Nil> {
    fn default() -> Self {
        BincodeRegistry {
            registry: registry::Registry::default(),
        }
    }
}

impl<Services: registry::MaybeServe + Sync> BincodeRegistry<Services> {
    fn serve(
        self,
    ) -> impl FnOnce(
        context::Context,
        registry::ServiceRequest,
    ) -> registry::Either<
        Services::Future,
        Ready<io::Result<registry::ServiceResponse>>,
    > + Clone {
        self.registry.serve()
    }

    fn register<S, Req, Resp, RespFut>(
        self,
        name: String,
        serve: S,
    ) -> BincodeRegistry<registry::Registration<impl registry::Serve + Send + 'static, Services>>
    where
        Req: for<'a> Deserialize<'a> + Send + 'static,
        Resp: Serialize + 'static,
        S: FnOnce(context::Context, Req) -> RespFut + Send + 'static + Clone,
        RespFut: Future<Output = io::Result<Resp>> + Send + 'static,
    {
        let registry = self.registry.register(name, serve, deserialize, serialize);
        BincodeRegistry { registry }
    }
}

pub fn new_client<Req, Resp>(
    service_name: String,
    channel: &client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
) -> client::MapResponse<
    client::WithRequest<
        client::Channel<registry::ServiceRequest, registry::ServiceResponse>,
        impl FnMut(Req) -> registry::ServiceRequest,
    >,
    impl FnMut(registry::ServiceResponse) -> Resp,
>
where
    Req: Serialize + Send + 'static,
    Resp: for<'a> Deserialize<'a> + Send + 'static,
{
    registry::new_client(service_name, channel, serialize, deserialize)
}


use std::fs::File;
use sha2::{Sha256, Sha512, Digest};

pub fn SplitFileIntoChunks2(theFileNameToSend : &str) -> io::Result<(std::vec::Vec<FileChunk>, u64, usize)> {
    use std::io::Cursor;
    use std::io::IoSliceMut;
    use std::io::Read;

    use std::io;
    use std::io::prelude::*;
    use std::fs::File;
    use std::fs;

    let metadata = fs::metadata(theFileNameToSend.clone())?;
    let theActualFileSize = metadata.len();
    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
    println!("theActualFileSize:<<{}>>",theActualFileSize);
    let mut bufferSize : usize = 1024;
    let mut buffer = [0; 1024 ];

    let mut WriteChunks : std::vec::Vec<FileChunk>;
    WriteChunks = Vec::new();
    
    let mut f = File::open(theFileNameToSend)?;
    let mut resReadVectored : io::Result<usize> = Ok(999999);
    let mut myActualBytesRead : usize = 0;
    let mut myLastChunkSize : usize = 0;
    loop {
        buffer = [0; 1024 ];
        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
        println!("resReadVectored:<<{:?}>>", resReadVectored);
        match resReadVectored {
            Err(why) => {
                break;
            },
            Ok(bytesRead) => {
                myActualBytesRead = bytesRead;
                myLastChunkSize = myActualBytesRead;
            },
        }


        let mut bufferString : String = "".to_string();
        let mut resFromUtf8 : std::result::Result<std::string::String, std::string::FromUtf8Error>;
        resFromUtf8 = String::from_utf8(buffer.clone().to_vec());
        match resFromUtf8 {
            Ok(theString) => {
                   bufferString = theString;
            },
            Err(e) => {
            println!("error converting to utf8string: {:?}", e)
            },
        }
        

        WriteChunks.push(
            FileChunk {
                oneChunkOfBytes : bufferString.clone()
            }            
        );

        if myActualBytesRead < bufferSize {
            myLastChunkSize = myActualBytesRead;
            break;
        }
    }

    println!("buffer length:<<{}>>", buffer.len());
    println!("myActualBytesRead:<<{}>>",myActualBytesRead);

    // println!("<<");    
    // for chunkCounter in 0..WriteChunks.len() {
        
    //     if chunkCounter == WriteChunks.len()-1 {
    //         println!("last chunk chunkCounter:{}", chunkCounter);
    //         //range bounded inclusively below and exclusively above
    //         hexdump::hexdump(
    //             &WriteChunks[chunkCounter][0..myLastChunkSize]
    //         );            
    //     } else {
    //     println!("chunkCounter:{}",chunkCounter);
    //         //range bounded inclusively below and exclusively above
    //         hexdump::hexdump(
    //             &WriteChunks[chunkCounter]
    //         );            
    //     }
    // }
    // println!(">>");
    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
}

async fn run2() -> io::Result<()> {
    let server = Server::default();
    let registry = BincodeRegistry::default()
        .register(
            "WriteService".to_string(),
            write_service::serve(server.clone()),
        )
        .register(
            "ReadService".to_string(),
            read_service::serve(server.clone()),
        )
        .register(
            "MFWSendFileService".to_string(),
            mfw_sendfile_service::serve(server.clone()),
        );

    let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
    let server_addr = listener.local_addr();
    let server = tarpc::Server::default()
        .incoming(listener)
        .take(1)
        .respond_with(registry.serve());
    tokio_executor::spawn(server.unit_error().boxed().compat());

    let transport = await!(bincode_transport::connect(&server_addr))?;
    let channel = await!(client::new(client::Config::default(), transport))?;

    let mfwsendfile_client = new_client("MFWSendFileService".to_string(), &channel);
    let mut mfwsendfile_client = mfw_sendfile_service::Client::from(mfwsendfile_client);

    let mut thefilepathname : String;
    thefilepathname = "/home/davidm/.bashrc".to_string();

    let mut resChunks : std::vec::Vec<FileChunk>;
    let mut resNumberOfChunks : u64;
    let mut resLastChunkSize : usize;

    resChunks = Vec::new();
    resNumberOfChunks = 0;
    resLastChunkSize = 0;
    
    let mut theFileNameToSend : String = "/home/davidm/.bashrc".to_string();
    let resultSplitFile = SplitFileIntoChunks2(&theFileNameToSend);
    match resultSplitFile {
        Err(why) => {
        },
        Ok( (theChunks, theNumberOfChunks, theLastChunkSize) ) => {
            resChunks = theChunks;
            resNumberOfChunks = theNumberOfChunks;
            resLastChunkSize = theLastChunkSize;
        },
    }

    let mut file = File::open(&theFileNameToSend)?;
    let mut mySha256 : sha2::Sha256 = Sha256::new();
    io::copy(&mut file, &mut mySha256)?;
    let hash = mySha256.clone().result();
    let mut myHash256String : String = format!("{:x}", hash);
    println!("myHash256String:<<{}>>", myHash256String);

    let mut buffer = [0u8; 1024 ];
    let mut bufferString : String = "".to_string();
    let mut resFromUtf8 : std::result::Result<std::string::String, std::string::FromUtf8Error>;

    resFromUtf8 = String::from_utf8(buffer.clone().to_vec());
    match resFromUtf8 {
        Ok(theString) => {
            bufferString = theString;
        },
        Err(e) => {
            println!("error converting to utf8string: {:?}", e)
        },
    }

    let theFileChunks_1 : std::vec::Vec<FileChunk> = vec![
        FileChunk {
            oneChunkOfBytes : bufferString.clone()
        },
        FileChunk {
            oneChunkOfBytes : bufferString.clone()
        }
    ];
    let bytes : std::vec::Vec<u8> = theFileChunks_1.encode::<u32>().unwrap();
    let theFileChunks_2 : std::vec::Vec<FileChunk> = Vec::<FileChunk>::decode::<u32>(&bytes).unwrap();                                                                                                         
    
    let mut myArgsWriteFile : ArgsWriteFile = ArgsWriteFile{
        sfFileName : theFileNameToSend.clone().to_string(),
        sfWriteChunks : bytes.clone(),
        sfNumberOfChunks : resNumberOfChunks,
        sfChunkSize : 1024,
        sfLastChunkSize : resLastChunkSize,
        sfResultSumSHA256 : myHash256String.clone(),
    };

    println!("myArgsWriteFile.sfResultSumSHA256 hash is:<<{}>>", myArgsWriteFile.sfResultSumSHA256.clone().to_string());

    let response_mfwsendfile = await!(
        mfwsendfile_client.mfwsendfile(context::current(), myArgsWriteFile )
    )?;
    println!("just returned from sendfile: <<{:?}>>", response_mfwsendfile);                       

    Ok(())
}

fn main() {
    tarpc::init(tokio::executor::DefaultExecutor::current().compat());
    tokio::run(run2().boxed().map_err(|e| panic!(e)).boxed().compat());
}

@tikue
Copy link
Collaborator

tikue commented May 6, 2019

I'm glad you got it to work! Can I ask your opinion on the "registry" style service? Is it better than nothing? If you think it's usable, I could publish it to crates.io.

@omac777
Copy link
Author

omac777 commented May 7, 2019

I think your registry service example is adequate for exposing many services from one server.

tarpc is lacking examples. It needs more examples clearly solving common use case problems.
Within an argument structure for a rpc service, have an example of
example 1)some bounded nested structures
example 2)bounded vector of structures
example 3)bounded vector of simple type elements
example 4)unbounded nested structures
example 5)unbounded vector of structures
example 6)unbounded vector of simple type elements

You're right about serde. It should be supporting Vec in both bounded and unbounded forms.
I don't directly care about serde. I do care that tarpc would support passing an argument structure that would contain a Vec<Vec> in order to apply it to the File being split into a Vector of chunks. chunks are a vector of u8 bytes. each chunk should be allowed to be different size since it is a vector after all.
In other words a chunk should not be a constant, but a variable that is mutable.
In the case I plan to use it, I expect most of the chunks to be the same size except for the last one.
How we treat the last chunk shouldn't be different though at the destination side. The vector of chunks should be sufficient to loop on and the size of each chunk should be queried before writing the chunk to the destination file anyways to conform to secure bounded filewrite api's.

The bottom line is tarpc should compile in most scenarios users envision using it without seeing the rust compiler error out about insufficient serde support. Having a weakly supported serde makes tarpc weakly supported also. This will affect the rate of adoption of tarpc will be directly related to the strength and versatility of the serde library.

Recently I saw a json to golang tool that converts a json file into a golang structure.
We need a similar tool for rust/tarpc. a json file to rust structure that complies
with serde serialize/deserialize and tarpc.
rather than json, perhaps bson:
https://crates.io/crates/bson
BSON adds additional data types unavailable in JSON, notably the BinData and Date data types.
https://github.com/zonyitoo/bson-rs/blob/44ad4c4ab7c47d811cdd841c91c0ff201b4bbcc4/src/bson.rs#L69
https://github.com/zonyitoo/bson-rs/blob/master/tests/serde.rs#L160
https://github.com/zonyitoo/bson-rs/blob/ae5f93bcc5ebd9348350fcfe0b151fe175b7eded/tests/modules/macros.rs#L45

@omac777
Copy link
Author

omac777 commented May 8, 2019

Try this. It's slow and I made a lot of unnecessary clones but I don't care. It works as a prototype for sending small and large files via your tarpc. If you run it again, it will just send the file again with a same filename suffixed with a different timestamp ensuring uniqueness.

diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml
index 501b0b1..6307bbb 100644
--- a/tarpc/Cargo.toml
+++ b/tarpc/Cargo.toml
@@ -24,6 +24,13 @@ log = "0.4"
 serde = { optional = true, version = "1.0" }
 rpc = { package = "tarpc-lib", path = "../rpc", version = "0.5" }
 tarpc-plugins = { path = "../plugins", version = "0.5.0" }
+hexdump = "0.1.0"
+sha2 = "0.8.0"
+byte_string = "1.0.0"
+serde_bytes = "0.11.1"
+generic-array = "0.13.0"
+bytevec = "0.2.0"
+chrono = "0.4.6"
 
 [dev-dependencies]
 bincode = "1"
diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs
index 1f27e27..b74c034 100644
--- a/tarpc/examples/service_registry.rs
+++ b/tarpc/examples/service_registry.rs
@@ -1,11 +1,34 @@
+// to build
+// cargo build --example service_registry --release
+
+// to run
+// cargo run --example service_registry --release
+
+
 #![feature(
     async_await,
     await_macro,
+    futures_api,
     arbitrary_self_types,
     proc_macro_hygiene,
     impl_trait_in_bindings
 )]
 
+extern crate hexdump;
+extern crate byte_string;
+extern crate serde;
+extern crate serde_bytes;
+extern crate generic_array;
+extern crate chrono;
+use chrono::prelude::*;
+
+#[macro_use]
+extern crate bytevec;
+
+use bytevec::{ByteEncodable, ByteDecodable};
+use serde_bytes::ByteBuf;
+use serde::{Deserialize, Serialize};
+
 mod registry {
     use bytes::Bytes;
     use futures::{
@@ -242,7 +265,7 @@ use futures::{
     future::{ready, Ready},
     prelude::*,
 };
-use serde::{Deserialize, Serialize};
+//use serde::{Deserialize, Serialize};
 use std::{
     collections::HashMap,
     io,
@@ -266,6 +289,7 @@ where
         .into())
 }
 
+//these are the rpc service declarations
 mod write_service {
     tarpc::service! {
         rpc write(key: String, value: String);
@@ -278,7 +302,35 @@ mod read_service {
     }
 }
 
-#[derive(Debug, Default, Clone)]
+bytevec_decl! {
+#[derive(PartialEq, Eq, Debug, Clone)]
+pub struct FileChunk {
+    oneChunkOfBytes: std::vec::Vec<u8>    
+    }
+}
+
+#[derive(Debug,Default,Clone,Serialize,Deserialize)]
+pub struct ArgsWriteFile {
+    sfFileName : String,
+    
+    #[serde(with = "serde_bytes")]
+    sfWriteChunks : std::vec::Vec<u8>,
+
+    sfNumberOfChunks : u64,
+    sfChunkSize : usize,
+    sfLastChunkSize : usize,
+
+    sfResultSumSHA256 : String,
+}
+    
+mod mfw_sendfile_service {
+
+    tarpc::service! {
+        rpc mfwsendfile( allSendFileArgs : crate::ArgsWriteFile ) -> String;
+    }
+}
+
+#[derive(Default, Clone)]
 struct Server {
     data: Arc<RwLock<HashMap<String, String>>>,
 }
@@ -300,6 +352,85 @@ impl read_service::Service for Server {
     }
 }
 
+impl mfw_sendfile_service::Service for Server {
+    type MfwsendfileFut = Ready<String>;
+
+    fn mfwsendfile(self, theContext: context::Context, allSendFileArgs : crate::ArgsWriteFile) -> Self::MfwsendfileFut {
+        println!("mfw_sendfile_service::Service::mfwsendfile()...");
+        //println!("self:<<{:?}>>",self);
+        //let theClientAddr = self.client_addr(&self);
+        //println!("theClientAddr:<<{}>>", theClientAddr);
+
+        use std::io::prelude::*;
+        use std::fs;
+        use std::fs::File;
+        use std::io::Write;    
+        use std::path::Path;
+        use std::ffi::OsStr;
+
+        let tmpPath = allSendFileArgs.sfFileName.clone();
+        let path = Path::new(&tmpPath);
+        let mut theExtractedFileName : String;
+
+        let extractedFileNameFromPath = path.file_name();
+        println!("extractedFileNameFromPath:<<{:?}>>",extractedFileNameFromPath);
+        match extractedFileNameFromPath {
+            None => {
+                theExtractedFileName = "".to_string();                
+            },
+            Some(theFileName) => {
+                let mut resToOSString = theFileName.to_os_string();
+                let mut resIntoString = resToOSString.into_string();
+                 match resIntoString {
+                     Ok(theString) => {
+                          theExtractedFileName = theString;
+                     },
+                     Err(why) => {
+                         theExtractedFileName = "".to_string();                
+                     },                   
+                 } 
+            },
+        }
+
+        println!("theExtractedFileName:<<{}>>",theExtractedFileName);
+        let destPathToSave = Path::new("/home/davidm/tarpc/tarpc/examples/received");
+        let mut resCreateDirAll = fs::create_dir_all(destPathToSave);
+        let local: DateTime<Local> = Local::now(); // e.g. `2014-11-28T21:45:59.324310806+09:00`
+        let sIsoLocalTime = local.format("%Y-%m-%dT%H:%M:%S%z").to_string();
+        println!("sIsoLocalTime:<<{}>>", sIsoLocalTime);
+        let ensureUniqueFiletoSave = theExtractedFileName.clone().to_string() + &".".to_string() + &sIsoLocalTime.clone().to_string();
+        let destFileToSave = destPathToSave.join(ensureUniqueFiletoSave);
+
+        let mut fToWrite = File::create(destFileToSave).expect("Unable to create destFileToSave");    
+    
+        println!("theContext:<<{:?}>>",theContext);
+        println!("allSendFileArgs.sfFileName:<<{}>>", allSendFileArgs.sfFileName);
+        let theFileChunks_2 : std::vec::Vec<FileChunk> = Vec::<FileChunk>::decode::<u32>(&allSendFileArgs.sfWriteChunks).unwrap();
+        for chunkCounter in 0..theFileChunks_2.len() {
+
+         if chunkCounter == theFileChunks_2.len()-1 {
+             let thebytes = theFileChunks_2[chunkCounter].oneChunkOfBytes.clone();
+             let mut resFileWrite = fToWrite.write(&thebytes[0..allSendFileArgs.sfLastChunkSize]);
+         } else {
+             let thebytes2 = theFileChunks_2[chunkCounter].oneChunkOfBytes.clone();
+             let mut resFileWrite2 = fToWrite.write(&thebytes2);
+         }
+                
+        }   
+
+        let mut resFileSyncAll = fToWrite.sync_all();
+
+        
+        println!("allSendFileArgs.sfNumberOfChunks:<<{}>>", allSendFileArgs.sfNumberOfChunks);        
+        println!("allSendFileArgs.sfChunkSize:<<{}>>", allSendFileArgs.sfChunkSize);
+        println!("allSendFileArgs.sfLastChunkSize:<<{}>>", allSendFileArgs.sfLastChunkSize);
+        println!("allSendFileArgs.sfResultSumSHA256:<<{}>>", allSendFileArgs.sfResultSumSHA256);
+
+
+        ready(format!("mfwsendfile fullpathname:<<{}>> sent", allSendFileArgs.sfFileName.clone().to_string()))
+    }
+}
+
 trait DefaultSpawn {
     fn spawn(self);
 }
@@ -371,7 +502,68 @@ where
     registry::new_client(service_name, channel, serialize, deserialize)
 }
 
-async fn run() -> io::Result<()> {
+
+use std::fs::File;
+use sha2::{Sha256, Sha512, Digest};
+
+pub fn SplitFileIntoChunks2(theFileNameToSend : &str) -> io::Result<(std::vec::Vec<FileChunk>, u64, usize)> {
+    use std::io::Cursor;
+    use std::io::IoSliceMut;
+    use std::io::Read;
+
+    use std::io;
+    use std::io::prelude::*;
+    use std::fs::File;
+    use std::fs;
+
+    let metadata = fs::metadata(theFileNameToSend.clone())?;
+    let theActualFileSize = metadata.len();
+    println!("theFileNameToSend:<<{}>>",theFileNameToSend.clone().to_string());
+    println!("theActualFileSize:<<{}>>",theActualFileSize);
+    let mut bufferSize : usize = 1024;
+    let mut buffer = [0; 1024 ];
+
+    let mut WriteChunks : std::vec::Vec<FileChunk>;
+    WriteChunks = Vec::new();
+    
+    let mut f = File::open(theFileNameToSend)?;
+    let mut resReadVectored : io::Result<usize> = Ok(999999);
+    let mut myActualBytesRead : usize = 0;
+    let mut myLastChunkSize : usize = 0;
+    loop {
+        buffer = [0; 1024 ];
+        resReadVectored = f.read_vectored(&mut [IoSliceMut::new(&mut buffer)]);
+        match resReadVectored {
+            Err(why) => {
+                break;
+            },
+            Ok(bytesRead) => {
+                myActualBytesRead = bytesRead;
+                myLastChunkSize = myActualBytesRead;
+            },
+        }
+
+        let mut bufferString : String = "".to_string();
+
+        WriteChunks.push(
+            FileChunk {                
+                oneChunkOfBytes : buffer.clone().to_vec()
+            }            
+        );
+
+        if myActualBytesRead < bufferSize {
+            myLastChunkSize = myActualBytesRead;
+            break;
+        }
+    }
+
+    println!("buffer length:<<{}>>", buffer.len());
+    println!("myActualBytesRead:<<{}>>",myActualBytesRead);
+
+    Ok((WriteChunks.clone(), WriteChunks.len() as u64, myLastChunkSize))
+}
+
+async fn run2() -> io::Result<()> {
     let server = Server::default();
     let registry = BincodeRegistry::default()
         .register(
@@ -381,6 +573,10 @@ async fn run() -> io::Result<()> {
         .register(
             "ReadService".to_string(),
             read_service::serve(server.clone()),
+        )
+        .register(
+            "MFWSendFileService".to_string(),
+            mfw_sendfile_service::serve(server.clone()),
         );
 
     let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
@@ -394,20 +590,64 @@ async fn run() -> io::Result<()> {
     let transport = await!(bincode_transport::connect(&server_addr))?;
     let channel = await!(client::new(client::Config::default(), transport))?;
 
-    let write_client = new_client("WriteService".to_string(), &channel);
-    let mut write_client = write_service::Client::from(write_client);
+    let mfwsendfile_client = new_client("MFWSendFileService".to_string(), &channel);
+    let mut mfwsendfile_client = mfw_sendfile_service::Client::from(mfwsendfile_client);
+
+    let mut resChunks : std::vec::Vec<FileChunk>;
+    let mut resNumberOfChunks : u64;
+    let mut resLastChunkSize : usize;
+
+    resChunks = Vec::new();
+    resNumberOfChunks = 0;
+    resLastChunkSize = 0;
+
+    // ALL THE FOLLOWING FILES WERE TESTED AND SHA256SUMS
+    // WERE MANUALLY CHECKED IN THE SOURCE AND DESTINATION DIRECTORIES
+    //let mut theFileNameToSend : String = "/home/davidm/.bashrc".to_string();
+    //let mut theFileNameToSend : String = "/home/davidm/Pictures/pfstoolsForOpenEXR.png".to_string();     
+    //let mut theFileNameToSend : String = "/home/davidm/Downloads/go1.12.5.linux-amd64.tar.gz".to_string();
+    let mut theFileNameToSend : String = "/home/davidm/Downloads/archlinux-2019.04.01-x86_64.iso".to_string();
+
+    let resultSplitFile = SplitFileIntoChunks2(&theFileNameToSend);
+    match resultSplitFile {
+        Err(why) => {
+        },
+        Ok( (theChunks, theNumberOfChunks, theLastChunkSize) ) => {
+            resChunks = theChunks;
+            resNumberOfChunks = theNumberOfChunks;
+            resLastChunkSize = theLastChunkSize;
+        },
+    }
+
+    let mut file = File::open(&theFileNameToSend)?;
+    let mut mySha256 : sha2::Sha256 = Sha256::new();
+    io::copy(&mut file, &mut mySha256)?;
+    let hash = mySha256.clone().result();
+    let mut myHash256String : String = format!("{:x}", hash);
+    println!("myHash256String:<<{}>>", myHash256String);
+
+    let bytes : std::vec::Vec<u8> = resChunks.encode::<u32>().unwrap();
+    
+    let mut myArgsWriteFile : ArgsWriteFile = ArgsWriteFile{
+        sfFileName : theFileNameToSend.clone().to_string(),
+        sfWriteChunks : bytes.clone(),
+        sfNumberOfChunks : resNumberOfChunks,
+        sfChunkSize : 1024,
+        sfLastChunkSize : resLastChunkSize,
+        sfResultSumSHA256 : myHash256String.clone(),
+    };
 
-    let read_client = new_client("ReadService".to_string(), &channel);
-    let mut read_client = read_service::Client::from(read_client);
+    println!("myArgsWriteFile.sfResultSumSHA256 hash is:<<{}>>", myArgsWriteFile.sfResultSumSHA256.clone().to_string());
 
-    await!(write_client.write(context::current(), "key".to_string(), "val".to_string()))?;
-    let val = await!(read_client.read(context::current(), "key".to_string()))?;
-    println!("{:?}", val);
+    let response_mfwsendfile = await!(
+        mfwsendfile_client.mfwsendfile(context::current(), myArgsWriteFile )
+    )?;
+    println!("just returned from sendfile: <<{:?}>>", response_mfwsendfile);                       
 
     Ok(())
 }
 
 fn main() {
     tarpc::init(tokio::executor::DefaultExecutor::current().compat());
-    tokio::run(run().boxed().map_err(|e| panic!(e)).boxed().compat());
+    tokio::run(run2().boxed().map_err(|e| panic!(e)).boxed().compat());
 }

@stevefan1999-personal
Copy link

@tikue is it possible to revive the service registry concept again? Like the server and register multiple services to serve, then on the client side we can use generics to resolve the wanted services.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants