diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 107ea28ff68b..61da0d9cbf3e 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -120,6 +120,10 @@ impl SchedulerServer { .as_millis(), } } + + pub fn set_caller_ip(&mut self, ip: IpAddr) { + self.caller_ip = ip; + } } const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks"; diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 7b79eb1b39ac..23a038641987 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -22,6 +22,7 @@ use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerSe use futures::future::{self, Either, TryFutureExt}; use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; use std::convert::Infallible; +use std::net::{IpAddr, Ipv4Addr}; use std::{net::SocketAddr, sync::Arc}; use tonic::transport::Server as TonicServer; use tower::Service; @@ -62,14 +63,18 @@ async fn start_server( "Ballista v{} Scheduler listening on {:?}", BALLISTA_VERSION, addr ); + //should only call SchedulerServer::new() once in the process + let scheduler_server_without_caller_ip = SchedulerServer::new( + config_backend.clone(), + namespace.clone(), + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + ); Ok(Server::bind(&addr) .serve(make_service_fn(move |request: &AddrStream| { - let scheduler_server = SchedulerServer::new( - config_backend.clone(), - namespace.clone(), - request.remote_addr().ip(), - ); + let mut scheduler_server = scheduler_server_without_caller_ip.clone(); + scheduler_server.set_caller_ip(request.remote_addr().ip()); + let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone());