Skip to content

Commit

Permalink
return address for jna (#433)
Browse files Browse the repository at this point in the history
* return address for jna

* fix
  • Loading branch information
tianliplus committed Jun 24, 2021
1 parent efb0728 commit 85ecfe9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
13 changes: 8 additions & 5 deletions research/gaia/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use std::collections::HashSet;
pub use tag::Tag;
pub use worker::Worker;
pub use worker_id::{get_current_worker, WorkerId};
use std::net::SocketAddr;

lazy_static! {
static ref SERVER_ID: Mutex<Option<u64>> = Mutex::new(None);
Expand Down Expand Up @@ -148,24 +149,26 @@ pub fn startup(conf: Configuration) -> Result<(), StartupError> {

pub fn startup_with<D: ServerDetect + 'static>(
conf: Configuration, detect: D,
) -> Result<(), StartupError> {
) -> Result<Option<SocketAddr>, StartupError> {
let server_id = conf.server_id();
if let Some(id) = set_server_id(server_id) {
return Err(StartupError::AlreadyStarted(id));
}

if let Some(net_conf) = conf.network_config() {
let res = if let Some(net_conf) = conf.network_config() {
let addr = net_conf.local_addr()?;
let conn_conf = net_conf.get_connection_param();
let addr = pegasus_network::start_up(server_id, conn_conf, addr, detect)?;
info!("server {} start on {:?}", server_id, addr);
}
Some(addr)
} else {
None
};

if let Some(pool_size) = conf.max_pool_size {
pegasus_executor::set_core_pool_size(pool_size as usize);
}
pegasus_executor::try_start_executor_async();
Ok(())
Ok(res)
}

pub fn shutdown_all() {
Expand Down
2 changes: 1 addition & 1 deletion research/gaia/pegasus/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ crossbeam-utils = "0.6"
tonic = "0.4"
prost = "0.7"
tokio = { version = "1.0", features = ["macros", "sync", "rt-multi-thread"] }
tokio-stream = "0.1.3"
tokio-stream = { version = "0.1.3", features = ["net"] }
toml = "0.5"
serde = { version = "1.0", features = ["derive"] }
structopt = "0.2"
Expand Down
23 changes: 13 additions & 10 deletions research/gaia/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::time::Instant;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tokio::net::TcpListener;

#[derive(Clone)]
pub struct RpcOutput {
Expand Down Expand Up @@ -163,34 +164,36 @@ pub struct RpcServer<S: pb::job_service_server::JobService> {

pub async fn start_rpc_server<D: AnyData>(
addr: SocketAddr, service: Service<D>, report: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<SocketAddr, Box<dyn std::error::Error>> {
let rpc_service = RpcService { inner: service, report };
let server = RpcServer::new(addr, rpc_service);
server.run().await?;
Ok(())
let local_addr = server.run().await?;
Ok(local_addr)
}

pub async fn start_debug_rpc_server<D: AnyData>(
addr: SocketAddr, service: Service<D>, report: bool,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<SocketAddr, Box<dyn std::error::Error>> {
let rpc_service = DebugRpcService { inner: service, report };
let server = RpcServer::new(addr, rpc_service);
server.run().await?;
Ok(())
let local_addr = server.run().await?;
Ok(local_addr)
}

impl<S: pb::job_service_server::JobService> RpcServer<S> {
pub fn new(addr: SocketAddr, service: S) -> Self {
RpcServer { service, addr }
}

pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
pub async fn run(self) -> Result<SocketAddr, Box<dyn std::error::Error>> {
let RpcServer { service, addr } = self;
info!("Rpc server started on {}", addr);
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
info!("Rpc server started on {}", local_addr);
Server::builder()
.add_service(pb::job_service_server::JobServiceServer::new(service))
.serve(addr)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await?;
Ok(())
Ok(local_addr)
}
}

0 comments on commit 85ecfe9

Please sign in to comment.