Skip to content

Commit

Permalink
fix: Don't start api until internal server is up (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Aug 16, 2023
1 parent 4adfec4 commit adb5119
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 47 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ tower-http = {version = "0.3.5", features = ["full"]}
arc-swap = "1.6.0"
metrics = "0.21.0"
gethostname = "0.4.3"
http-body = "0.4.5"
bytes = "1.4.0"
http = "0.2.9"

[dev-dependencies]
tempdir = "0.3.7"
7 changes: 4 additions & 3 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![allow(clippy::enum_variant_names)]
use std::net::AddrParseError;
use std::net::{AddrParseError, SocketAddr};
use std::path::PathBuf;

use actix_web::http::header::ContentType;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use dozer_cache::dozer_log::errors::ReaderBuilderError;
use dozer_types::errors::internal::BoxedError;
use dozer_types::errors::types::{CannotConvertF64ToJson, TypeError};
use dozer_types::labels::Labels;
use dozer_types::thiserror::Error;
Expand Down Expand Up @@ -61,8 +62,8 @@ pub enum GrpcError {
ServerReflectionError(#[from] tonic_reflection::server::Error),
#[error("Addr parse error: {0}: {1}")]
AddrParse(String, #[source] AddrParseError),
#[error("Transport error: {0:?}")]
Transport(#[from] tonic::transport::Error),
#[error("Failed to listen to address {0}: {1:?}")]
Listen(SocketAddr, #[source] BoxedError),
}

impl From<ApiError> for tonic::Status {
Expand Down
27 changes: 10 additions & 17 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::
use crate::errors::ApiInitError;
use crate::grpc::auth::AuthService;
use crate::grpc::health::HealthService;
use crate::grpc::{common, typed};
use crate::grpc::{common, run_server, typed};
use crate::{errors::GrpcError, CacheEndpoint};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::types::Operation;
Expand All @@ -17,10 +17,10 @@ use dozer_types::{
log::info,
models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
};
use futures_util::stream::{AbortHandle, Abortable, Aborted};
use futures_util::Future;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::broadcast::{self, Receiver};
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tower::Layer;
Expand Down Expand Up @@ -79,12 +79,13 @@ impl ApiServer {
}
}

/// TcpIncoming::new requires a tokio runtime, so we mark this function as async.
pub async fn run(
&self,
cache_endpoints: Vec<Arc<CacheEndpoint>>,
shutdown: impl Future<Output = ()> + Send + 'static,
operations_receiver: Option<Receiver<Operation>>,
) -> Result<(), ApiInitError> {
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, ApiInitError> {
// Create our services.
let mut web_config = tonic_web::config();
if self.flags.grpc_web {
Expand Down Expand Up @@ -162,15 +163,7 @@ impl ApiServer {
grpc_router = grpc_router.add_service(health_service);
grpc_router = grpc_router.add_optional_service(auth_service);

// Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
// So we just abort the server when the shutdown signal is received.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(async move {
shutdown.await;
abort_handle.abort();
});

// Run server.
// Start listening.
let addr = format!("{}:{}", self.host, self.port);
info!(
"Starting gRPC server on {addr} with security: {}",
Expand All @@ -180,13 +173,13 @@ impl ApiServer {
ApiSecurity::Jwt(_) => "JWT".to_string(),
})
);

let addr = addr
.parse()
.map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
match Abortable::new(grpc_router.serve(addr), abort_registration).await {
Ok(result) => result.map_err(|e| ApiInitError::Grpc(GrpcError::Transport(e))),
Err(Aborted) => Ok(()),
}
let incoming =
TcpIncoming::new(addr, true, None).map_err(|e| GrpcError::Listen(addr, e))?;

// Run server.
Ok(run_server(grpc_router, incoming, shutdown))
}
}
25 changes: 10 additions & 15 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use futures_util::future::Either;
use futures_util::stream::{AbortHandle, Abortable, Aborted, BoxStream};
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

use crate::errors::GrpcError;
use crate::grpc::run_server;

#[derive(Debug, Clone)]
pub struct LogEndpoint {
Expand Down Expand Up @@ -140,34 +142,27 @@ async fn get_log(log: Arc<Mutex<Log>>, request: LogRequest) -> Result<LogRespons
Ok(LogResponse { data })
}

/// TcpIncoming::new requires a tokio runtime, so we mark this function as async.
pub async fn start_internal_pipeline_server(
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
options: &AppGrpcOptions,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), GrpcError> {
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, GrpcError> {
let endpoints = endpoint_and_logs
.into_iter()
.map(|(endpoint, log)| (endpoint.name, log))
.collect();
let server = InternalPipelineServer::new(endpoints);

// Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
// So we just abort the server when the shutdown signal is received.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(async move {
shutdown.await;
abort_handle.abort();
});

// Run server.
// Start listening.
let addr = format!("{}:{}", options.host, options.port);
info!("Starting Internal Server on {addr}");
let addr = addr
.parse()
.map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
let incoming = TcpIncoming::new(addr, true, None).map_err(|e| GrpcError::Listen(addr, e))?;

// Run server.
let server = Server::builder().add_service(InternalPipelineServiceServer::new(server));
match Abortable::new(server.serve(addr), abort_registration).await {
Ok(result) => result.map_err(GrpcError::Transport),
Err(Aborted) => Ok(()),
}
Ok(run_server(server, incoming, shutdown))
}
37 changes: 37 additions & 0 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,41 @@ mod shared_impl;
pub mod typed;
pub mod types_helper;

use bytes::Bytes;
pub use client_server::ApiServer;
use dozer_types::errors::internal::BoxedError;
use futures_util::{
stream::{AbortHandle, Abortable, Aborted},
Future,
};
use http::{Request, Response};
use hyper::Body;
use tonic::transport::server::{Router, Routes, TcpIncoming};
use tower::{Layer, Service};

async fn run_server<L, ResBody>(
server: Router<L>,
incoming: TcpIncoming,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), tonic::transport::Error>
where
L: Layer<Routes>,
L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
<<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<BoxedError> + Send,
ResBody: http_body::Body<Data = Bytes> + Send + 'static,
ResBody::Error: Into<BoxedError>,
{
// Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
// So we just abort the server when the shutdown signal is received.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(async move {
shutdown.await;
abort_handle.abort();
});

match Abortable::new(server.serve_with_incoming(incoming), abort_registration).await {
Ok(result) => result,
Err(Aborted) => Ok(()),
}
}
6 changes: 4 additions & 2 deletions dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ pub enum OrchestrationError {
CloudError(#[from] CloudError),
#[error("Failed to initialize api server: {0}")]
ApiInitFailed(#[from] ApiInitError),
#[error("Failed to server API: {0}")]
ApiServeFailed(#[source] std::io::Error),
#[error("Failed to server REST API: {0}")]
RestServeFailed(#[source] std::io::Error),
#[error("Failed to server gRPC API: {0:?}")]
GrpcServeFailed(#[source] tonic::transport::Error),
#[error("Failed to initialize internal server: {0}")]
InternalServerFailed(#[source] GrpcError),
#[error("{0}: Failed to initialize cache. Have you run `dozer build`?")]
Expand Down
24 changes: 15 additions & 9 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl SimpleOrchestrator {
let api_server = api_server
.run(cache_endpoints_for_rest, shutdown_for_rest)
.map_err(OrchestrationError::ApiInitFailed)?;
tokio::spawn(api_server.map_err(OrchestrationError::ApiServeFailed))
tokio::spawn(api_server.map_err(OrchestrationError::RestServeFailed))
} else {
tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
};
Expand All @@ -137,11 +137,14 @@ impl SimpleOrchestrator {
let api_security = get_api_security_config(&self.config).cloned();
let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
let shutdown = shutdown.create_shutdown_future();
let grpc_server = grpc_server
.run(cache_endpoints, shutdown, operations_receiver)
.await
.map_err(OrchestrationError::ApiInitFailed)?;
tokio::spawn(async move {
grpc_server
.run(cache_endpoints, shutdown, operations_receiver)
.await
.map_err(OrchestrationError::ApiInitFailed)
.map_err(OrchestrationError::GrpcServeFailed)
})
} else {
tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
Expand Down Expand Up @@ -179,11 +182,14 @@ impl SimpleOrchestrator {
.create_dag_executor(self.runtime.clone(), get_executor_options(&self.config))?;

let app_grpc_config = get_app_grpc_config(&self.config);
let internal_server_future = start_internal_pipeline_server(
executor.endpoint_and_logs().to_vec(),
&app_grpc_config,
shutdown.create_shutdown_future(),
);
let internal_server_future = self
.runtime
.block_on(start_internal_pipeline_server(
executor.endpoint_and_logs().to_vec(),
&app_grpc_config,
shutdown.create_shutdown_future(),
))
.map_err(OrchestrationError::InternalServerFailed)?;

if let Some(api_notifier) = api_notifier {
api_notifier
Expand All @@ -199,7 +205,7 @@ impl SimpleOrchestrator {
let mut futures = FuturesUnordered::new();
futures.push(
internal_server_future
.map_err(OrchestrationError::InternalServerFailed)
.map_err(OrchestrationError::GrpcServeFailed)
.boxed(),
);
futures.push(flatten_join_handle(pipeline_future).boxed());
Expand Down
2 changes: 1 addition & 1 deletion dozer-log/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use dozer_types::{bincode, serde_json, thiserror, tonic};

#[derive(Error, Debug)]
pub enum ReaderBuilderError {
#[error("Tonic transport error: {0}")]
#[error("Tonic transport error: {0:?}")]
TonicTransport(#[from] tonic::transport::Error),
#[error("Tonic status: {0}")]
TonicStatus(#[from] tonic::Status),
Expand Down

0 comments on commit adb5119

Please sign in to comment.