Skip to content

Commit

Permalink
Merge f605c33 into 605aee1
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Feb 26, 2023
2 parents 605aee1 + f605c33 commit 2d51ee6
Show file tree
Hide file tree
Showing 57 changed files with 831 additions and 511 deletions.
11 changes: 6 additions & 5 deletions dozer-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ impl ApiError {
}

#[derive(Error, Debug)]
pub enum GRPCError {
#[error("Internal GRPC server error: {0}")]
pub enum GrpcError {
#[error("Internal gRPC server error: {0}")]
InternalError(#[from] BoxedError),

#[error("Cannot send to broadcast channel")]
CannotSendToBroadcastChannel,
#[error(transparent)]
SerizalizeError(#[from] serde_json::Error),
#[error("Missing primary key to query by id: {0}")]
Expand All @@ -75,8 +76,8 @@ pub enum GRPCError {
#[error("{0}")]
TransportErrorDetail(String),
}
impl From<GRPCError> for tonic::Status {
fn from(input: GRPCError) -> Self {
impl From<GrpcError> for tonic::Status {
fn from(input: GrpcError) -> Self {
tonic::Status::new(tonic::Code::Internal, input.to_string())
}
}
Expand Down
66 changes: 16 additions & 50 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
use crate::grpc::health::HealthService;
use crate::grpc::{common, typed};
use crate::{errors::GRPCError, generator::protoc::generator::ProtoGenerator, RoCacheEndpoint};
use crate::{errors::GrpcError, generator::protoc::generator::ProtoGenerator, RoCacheEndpoint};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::types::Operation;
use dozer_types::grpc_types::{
common::common_grpc_service_server::CommonGrpcServiceServer,
health::health_grpc_service_server::HealthGrpcServiceServer,
internal::{
internal_pipeline_service_client::InternalPipelineServiceClient, PipelineRequest,
PipelineResponse,
},
};
use dozer_types::tracing::Level;
use dozer_types::{
log::{info, warn},
log::info,
models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
};
use futures_util::{FutureExt, StreamExt};
use futures_util::FutureExt;
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tonic::{transport::Server, Streaming};
use tokio::sync::broadcast::{self, Receiver};
use tonic::transport::Server;
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tower::Layer;
use tower_http::trace::{self, TraceLayer};
Expand All @@ -33,30 +30,16 @@ pub struct ApiServer {
}

impl ApiServer {
async fn connect_internal_client(
app_grpc_config: GrpcApiOptions,
) -> Result<Streaming<PipelineResponse>, GRPCError> {
let address = format!("http://{:}:{:}", app_grpc_config.host, app_grpc_config.port);
let mut client = InternalPipelineServiceClient::connect(address)
.await
.map_err(|err| GRPCError::InternalError(Box::new(err)))?;
let stream_response = client
.stream_pipeline_request(PipelineRequest {})
.await
.map_err(|err| GRPCError::InternalError(Box::new(err)))?;
let stream: Streaming<PipelineResponse> = stream_response.into_inner();
Ok(stream)
}
fn get_dynamic_service(
&self,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
rx1: Option<broadcast::Receiver<PipelineResponse>>,
operations_receiver: Option<broadcast::Receiver<Operation>>,
) -> Result<
(
Option<TypedService>,
ServerReflectionServer<impl ServerReflection>,
),
GRPCError,
GrpcError,
> {
info!(
"Starting gRPC server on http://{}:{} with security: {}",
Expand All @@ -82,7 +65,7 @@ impl ApiServer {
Some(TypedService::new(
&descriptor_path,
cache_endpoints,
rx1.map(|r| r.resubscribe()),
operations_receiver,
self.security.clone(),
)?)
} else {
Expand Down Expand Up @@ -111,8 +94,8 @@ impl ApiServer {
&self,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
rx1: Option<Receiver<PipelineResponse>>,
) -> Result<(), GRPCError> {
operations_receiver: Option<Receiver<Operation>>,
) -> Result<(), GrpcError> {
// Create our services.
let mut web_config = tonic_web::config();
if self.flags.grpc_web {
Expand All @@ -121,11 +104,12 @@ impl ApiServer {

let common_service = CommonGrpcServiceServer::new(CommonService::new(
cache_endpoints.clone(),
rx1.as_ref().map(|r| r.resubscribe()),
operations_receiver.as_ref().map(|r| r.resubscribe()),
));
let common_service = web_config.enable(common_service);

let (typed_service, reflection_service) = self.get_dynamic_service(cache_endpoints, rx1)?;
let (typed_service, reflection_service) =
self.get_dynamic_service(cache_endpoints, operations_receiver)?;
let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
let reflection_service = web_config.enable(reflection_service);

Expand Down Expand Up @@ -187,27 +171,9 @@ impl ApiServer {
let inner_error: Box<dyn std::error::Error> = e.into();
let detail = inner_error.source();
if let Some(detail) = detail {
return GRPCError::TransportErrorDetail(detail.to_string());
return GrpcError::TransportErrorDetail(detail.to_string());
}
GRPCError::TransportErrorDetail(inner_error.to_string())
GrpcError::TransportErrorDetail(inner_error.to_string())
})
}

pub async fn setup_broad_cast_channel(
tx: Sender<PipelineResponse>,
app_grpc_config: GrpcApiOptions,
) -> Result<(), GRPCError> {
info!(
"Connecting to Internal service on http://{}:{}",
app_grpc_config.host, app_grpc_config.port
);
let mut stream = ApiServer::connect_internal_client(app_grpc_config.to_owned()).await?;
while let Some(event_response) = stream.next().await {
if let Ok(event) = event_response {
let _ = tx.send(event);
}
}
warn!("exiting internal grpc connection on api thread");
Ok::<(), GRPCError>(())
}
}
9 changes: 4 additions & 5 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::grpc::shared_impl;
use crate::grpc::types_helper::{map_field_definitions, map_record};
use crate::RoCacheEndpoint;
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
use dozer_types::grpc_types::internal::PipelineResponse;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

Expand All @@ -24,13 +23,13 @@ type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
pub struct CommonService {
/// For look up endpoint from its name. `key == value.endpoint.name`.
pub endpoint_map: HashMap<String, Arc<RoCacheEndpoint>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
}

impl CommonService {
pub fn new(
endpoints: Vec<Arc<RoCacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
) -> Self {
let endpoint_map = endpoints
.into_iter()
Expand Down Expand Up @@ -120,8 +119,8 @@ impl CommonGrpcService for CommonService {
query_request.filter.as_deref(),
self.event_notifier.as_ref().map(|r| r.resubscribe()),
access.cloned(),
move |op, endpoint| {
if endpoint == query_request.endpoint {
move |op| {
if op.endpoint_name == query_request.endpoint {
Some(Ok(op))
} else {
None
Expand Down
15 changes: 9 additions & 6 deletions dozer-api/src/grpc/common/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::grpc::typed::tests::{
fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, service::setup_pipeline,
};
Expand All @@ -15,8 +17,8 @@ use tonic::Request;

use super::CommonService;

fn setup_common_service() -> CommonService {
let (endpoints, rx1) = setup_pipeline();
async fn setup_common_service() -> CommonService {
let (endpoints, rx1) = setup_pipeline().await;
CommonService::new(endpoints, Some(rx1))
}

Expand Down Expand Up @@ -48,7 +50,7 @@ async fn count_and_query(

#[tokio::test]
async fn test_grpc_common_count_and_query() {
let service = setup_common_service();
let service = setup_common_service().await;
let endpoint = "films";

// Empty query.
Expand Down Expand Up @@ -81,7 +83,7 @@ async fn test_grpc_common_count_and_query() {

#[tokio::test]
async fn test_grpc_common_get_endpoints() {
let service = setup_common_service();
let service = setup_common_service().await;
let response = service
.get_endpoints(Request::new(GetEndpointsRequest {}))
.await
Expand All @@ -92,7 +94,7 @@ async fn test_grpc_common_get_endpoints() {

#[tokio::test]
async fn test_grpc_common_get_fields() {
let service = setup_common_service();
let service = setup_common_service().await;
let response = service
.get_fields(Request::new(GetFieldsRequest {
endpoint: "films".to_string(),
Expand Down Expand Up @@ -142,7 +144,8 @@ async fn test_grpc_common_on_event() {
default_pipeline_internal.port,
rx_internal,
));
let service = setup_common_service();
tokio::time::sleep(Duration::from_millis(100)).await; // wait for the mock server to start.
let service = setup_common_service().await;
let mut rx = service
.on_event(Request::new(OnEventRequest {
endpoint: "films".to_string(),
Expand Down
90 changes: 90 additions & 0 deletions dozer-api/src/grpc/internal/internal_pipeline_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::fmt::Debug;

use dozer_types::{
grpc_types::{
internal::{
internal_pipeline_service_client::InternalPipelineServiceClient, AliasEventsRequest,
AliasRedirected, OperationsRequest,
},
types::Operation,
},
log::warn,
models::api_config::GrpcApiOptions,
};
use futures_util::{Future, StreamExt};
use tokio::sync::broadcast::{Receiver, Sender};
use tonic::{transport::Channel, Streaming};

use crate::errors::GrpcError;

#[derive(Debug)]
pub struct InternalPipelineClient {
client: InternalPipelineServiceClient<Channel>,
}

impl InternalPipelineClient {
pub async fn new(app_grpc_config: &GrpcApiOptions) -> Result<Self, GrpcError> {
let address = format!(
"http://{:}:{:}",
&app_grpc_config.host, app_grpc_config.port
);
let client = InternalPipelineServiceClient::connect(address)
.await
.map_err(|err| GrpcError::InternalError(Box::new(err)))?;
Ok(Self { client })
}

pub async fn stream_alias_events(
&mut self,
) -> Result<
(
Receiver<AliasRedirected>,
impl Future<Output = Result<(), GrpcError>>,
),
GrpcError,
> {
let stream = self
.client
.stream_alias_events(AliasEventsRequest {})
.await
.map_err(|err| GrpcError::InternalError(Box::new(err)))?
.into_inner();
let (sender, receiver) = tokio::sync::broadcast::channel(16);
let future = redirect_loop(stream, sender);
Ok((receiver, future))
}

pub async fn stream_operations(
&mut self,
) -> Result<
(
Receiver<Operation>,
impl Future<Output = Result<(), GrpcError>>,
),
GrpcError,
> {
let stream = self
.client
.stream_operations(OperationsRequest {})
.await
.map_err(|err| GrpcError::InternalError(Box::new(err)))?
.into_inner();
let (sender, receiver) = tokio::sync::broadcast::channel(16);
let future = redirect_loop(stream, sender);
Ok((receiver, future))
}
}

async fn redirect_loop<T: Debug>(
mut stream: Streaming<T>,
sender: Sender<T>,
) -> Result<(), GrpcError> {
while let Some(event) = stream.next().await {
let event = event.map_err(|err| GrpcError::InternalError(Box::new(err)))?;
sender
.send(event)
.map_err(|_| GrpcError::CannotSendToBroadcastChannel)?;
}
warn!("exiting internal grpc connection on api thread");
Ok(())
}
Loading

0 comments on commit 2d51ee6

Please sign in to comment.