Skip to content

Commit

Permalink
chore: Remove PipelineResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Feb 26, 2023
1 parent 6d8f738 commit 53c27b7
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 99 deletions.
13 changes: 7 additions & 6 deletions dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use crate::grpc::health::HealthService;
use crate::grpc::{common, typed};
use crate::{errors::GrpcError, generator::protoc::generator::ProtoGenerator, RoCacheEndpoint};
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
use dozer_types::grpc_types::types::Operation;
use dozer_types::grpc_types::{
common::common_grpc_service_server::CommonGrpcServiceServer,
health::health_grpc_service_server::HealthGrpcServiceServer, internal::PipelineResponse,
health::health_grpc_service_server::HealthGrpcServiceServer,
};
use dozer_types::tracing::Level;
use dozer_types::{
Expand All @@ -32,7 +33,7 @@ impl ApiServer {
fn get_dynamic_service(
&self,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
pipeline_response_receiver: Option<broadcast::Receiver<PipelineResponse>>,
operations_receiver: Option<broadcast::Receiver<Operation>>,
) -> Result<
(
Option<TypedService>,
Expand Down Expand Up @@ -64,7 +65,7 @@ impl ApiServer {
Some(TypedService::new(
&descriptor_path,
cache_endpoints,
pipeline_response_receiver,
operations_receiver,
self.security.clone(),
)?)
} else {
Expand Down Expand Up @@ -93,7 +94,7 @@ impl ApiServer {
&self,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
pipeline_response_receiver: Option<Receiver<PipelineResponse>>,
operations_receiver: Option<Receiver<Operation>>,
) -> Result<(), GrpcError> {
// Create our services.
let mut web_config = tonic_web::config();
Expand All @@ -103,12 +104,12 @@ impl ApiServer {

let common_service = CommonGrpcServiceServer::new(CommonService::new(
cache_endpoints.clone(),
pipeline_response_receiver.as_ref().map(|r| r.resubscribe()),
operations_receiver.as_ref().map(|r| r.resubscribe()),
));
let common_service = web_config.enable(common_service);

let (typed_service, reflection_service) =
self.get_dynamic_service(cache_endpoints, pipeline_response_receiver)?;
self.get_dynamic_service(cache_endpoints, operations_receiver)?;
let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
let reflection_service = web_config.enable(reflection_service);

Expand Down
9 changes: 4 additions & 5 deletions dozer-api/src/grpc/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::grpc::shared_impl;
use crate::grpc::types_helper::{map_field_definitions, map_record};
use crate::RoCacheEndpoint;
use dozer_types::grpc_types::common::common_grpc_service_server::CommonGrpcService;
use dozer_types::grpc_types::internal::PipelineResponse;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

Expand All @@ -24,13 +23,13 @@ type ResponseStream = ReceiverStream<Result<Operation, tonic::Status>>;
pub struct CommonService {
/// For look up endpoint from its name. `key == value.endpoint.name`.
pub endpoint_map: HashMap<String, Arc<RoCacheEndpoint>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
pub event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
}

impl CommonService {
pub fn new(
endpoints: Vec<Arc<RoCacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
) -> Self {
let endpoint_map = endpoints
.into_iter()
Expand Down Expand Up @@ -120,8 +119,8 @@ impl CommonGrpcService for CommonService {
query_request.filter.as_deref(),
self.event_notifier.as_ref().map(|r| r.resubscribe()),
access.cloned(),
move |op, endpoint| {
if endpoint == query_request.endpoint {
move |op| {
if op.endpoint_name == query_request.endpoint {
Some(Ok(op))
} else {
None
Expand Down
15 changes: 9 additions & 6 deletions dozer-api/src/grpc/internal/internal_pipeline_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::fmt::Debug;

use dozer_types::{
grpc_types::internal::{
internal_pipeline_service_client::InternalPipelineServiceClient, AliasEventsRequest,
AliasRedirected, PipelineRequest, PipelineResponse,
grpc_types::{
internal::{
internal_pipeline_service_client::InternalPipelineServiceClient, AliasEventsRequest,
AliasRedirected, OperationsRequest,
},
types::Operation,
},
log::warn,
models::api_config::GrpcApiOptions,
Expand Down Expand Up @@ -51,18 +54,18 @@ impl InternalPipelineClient {
Ok((receiver, future))
}

pub async fn stream_pipeline_responses(
pub async fn stream_operations(
&mut self,
) -> Result<
(
Receiver<PipelineResponse>,
Receiver<Operation>,
impl Future<Output = Result<(), GrpcError>>,
),
GrpcError,
> {
let stream = self
.client
.stream_pipeline_request(PipelineRequest {})
.stream_operations(OperationsRequest {})
.await
.map_err(|err| GrpcError::InternalError(Box::new(err)))?
.into_inner();
Expand Down
40 changes: 21 additions & 19 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use crossbeam::channel::{Receiver, Sender};
use dozer_types::grpc_types::internal::{
internal_pipeline_service_server::{self, InternalPipelineService},
AliasEventsRequest, AliasRedirected, PipelineRequest, PipelineResponse,
use dozer_types::grpc_types::{
internal::{
internal_pipeline_service_server::{self, InternalPipelineService},
AliasEventsRequest, AliasRedirected, OperationsRequest,
},
types::Operation,
};
use dozer_types::{crossbeam, log::info, models::app_config::Config, tracing::warn};
use std::{fmt::Debug, net::ToSocketAddrs, pin::Pin, thread};
use tokio::{runtime::Runtime, sync::broadcast};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status};

pub type PipelineEventSenders = (Sender<AliasRedirected>, Sender<PipelineResponse>);
pub type PipelineEventReceivers = (Receiver<AliasRedirected>, Receiver<PipelineResponse>);
pub type PipelineEventSenders = (Sender<AliasRedirected>, Sender<Operation>);
pub type PipelineEventReceivers = (Receiver<AliasRedirected>, Receiver<Operation>);

pub struct InternalPipelineServer {
alias_redirected_receiver: broadcast::Receiver<AliasRedirected>,
pipeline_response_receiver: broadcast::Receiver<PipelineResponse>,
operation_receiver: broadcast::Receiver<Operation>,
}
impl InternalPipelineServer {
pub fn new(pipeline_event_receivers: PipelineEventReceivers) -> Self {
let alias_redirected_receiver =
crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.0);
let pipeline_response_receiver =
let operation_receiver =
crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.1);
Self {
alias_redirected_receiver,
pipeline_response_receiver,
operation_receiver: operation_receiver,
}
}
}
Expand Down Expand Up @@ -54,25 +57,24 @@ fn crossbeam_mpsc_receiver_to_tokio_broadcast_receiver<T: Clone + Debug + Send +
broadcast_receiver
}

type ResponseStream = Pin<Box<dyn Stream<Item = Result<PipelineResponse, Status>> + Send>>;
type OperationsStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
type AliasEventsStream = Pin<Box<dyn Stream<Item = Result<AliasRedirected, Status>> + Send>>;

#[tonic::async_trait]
impl InternalPipelineService for InternalPipelineServer {
type StreamPipelineRequestStream = ResponseStream;
async fn stream_pipeline_request(
type StreamOperationsStream = OperationsStream;
async fn stream_operations(
&self,
_request: tonic::Request<PipelineRequest>,
) -> Result<Response<ResponseStream>, Status> {
let (pipeline_response_sender, pipeline_response_receiver) =
tokio::sync::mpsc::channel(1000);
let mut receiver = self.pipeline_response_receiver.resubscribe();
_request: tonic::Request<OperationsRequest>,
) -> Result<Response<OperationsStream>, Status> {
let (operation_sender, operation_receiver) = tokio::sync::mpsc::channel(1000);
let mut receiver = self.operation_receiver.resubscribe();
tokio::spawn(async move {
loop {
let result = receiver.try_recv();
match result {
Ok(pipeline_response) => {
let result = pipeline_response_sender.send(Ok(pipeline_response)).await;
Ok(operation) => {
let result = operation_sender.send(Ok(operation)).await;
if let Err(e) = result {
warn!("Error sending message to mpsc channel: {:?}", e);
break;
Expand All @@ -87,7 +89,7 @@ impl InternalPipelineService for InternalPipelineServer {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
let output_stream = ReceiverStream::new(pipeline_response_receiver);
let output_stream = ReceiverStream::new(operation_receiver);
Ok(Response::new(Box::pin(output_stream)))
}

Expand Down
21 changes: 8 additions & 13 deletions dozer-api/src/grpc/shared_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression};
use dozer_cache::cache::RecordWithId;
use dozer_cache::CacheReader;
use dozer_types::grpc_types::internal::PipelineResponse;
use dozer_types::grpc_types::types::Operation;
use dozer_types::log::warn;
use dozer_types::serde_json;
Expand All @@ -14,8 +13,6 @@ use tonic::{Code, Response, Status};
use crate::api_helper::{get_records, get_records_count};
use crate::auth::Access;

use dozer_types::grpc_types::internal::pipeline_response::ApiEvent;

mod filter;

pub fn from_error(error: impl std::error::Error) -> Status {
Expand Down Expand Up @@ -71,9 +68,9 @@ pub fn on_event<T: Send + 'static>(
reader: &CacheReader,
endpoint_name: &str,
filter: Option<&str>,
mut broadcast_receiver: Option<Receiver<PipelineResponse>>,
mut broadcast_receiver: Option<Receiver<Operation>>,
_access: Option<Access>,
event_mapper: impl Fn(Operation, String) -> Option<T> + Send + Sync + 'static,
event_mapper: impl Fn(Operation) -> Option<T> + Send + Sync + 'static,
) -> Result<Response<ReceiverStream<T>>, Status> {
// TODO: Use access.

Expand Down Expand Up @@ -106,14 +103,12 @@ pub fn on_event<T: Send + 'static>(
if let Some(broadcast_receiver) = broadcast_receiver.as_mut() {
let event = broadcast_receiver.recv().await;
match event {
Ok(event) => {
if let Some(ApiEvent::Op(op)) = event.api_event {
if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
if let Some(event) = event_mapper(op, event.endpoint) {
if (tx.send(event).await).is_err() {
// receiver dropped
break;
}
Ok(op) => {
if filter::op_satisfies_filter(&op, filter.as_ref(), &schema) {
if let Some(event) = event_mapper(op) {
if (tx.send(event).await).is_err() {
// receiver dropped
break;
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions dozer-api/src/grpc/typed/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
RoCacheEndpoint,
};
use dozer_cache::CacheReader;
use dozer_types::{grpc_types::internal::PipelineResponse, models::api_security::ApiSecurity};
use dozer_types::{grpc_types::types::Operation, models::api_security::ApiSecurity};
use futures_util::future;
use prost_reflect::{MethodDescriptor, Value};
use std::{borrow::Cow, collections::HashMap, convert::Infallible, path::Path};
Expand All @@ -39,7 +39,7 @@ pub struct TypedService {
send_compression_encodings: EnabledCompressionEncodings,
/// For look up endpoint from its full service name. `key == value.service_desc.service.full_name()`.
endpoint_map: HashMap<String, TypedEndpoint>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
security: Option<ApiSecurity>,
}

Expand All @@ -59,7 +59,7 @@ impl TypedService {
pub fn new(
descriptor_path: &Path,
cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
security: Option<ApiSecurity>,
) -> Result<Self, GrpcError> {
let endpoint_map = cache_endpoints
Expand Down Expand Up @@ -174,7 +174,7 @@ impl TypedService {
struct EventService {
cache_endpoint: Arc<RoCacheEndpoint>,
event_desc: Option<EventDesc>,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
}
impl tonic::server::ServerStreamingService<DynamicMessage> for EventService {
type Response = TypedResponse;
Expand Down Expand Up @@ -334,7 +334,7 @@ fn on_event(
reader: &CacheReader,
endpoint_name: &str,
event_desc: EventDesc,
event_notifier: Option<tokio::sync::broadcast::Receiver<PipelineResponse>>,
event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
) -> Result<Response<ReceiverStream<Result<TypedResponse, tonic::Status>>>, Status> {
let parts = request.into_parts();
let extensions = parts.1;
Expand All @@ -357,8 +357,8 @@ fn on_event(
filter,
event_notifier,
access.cloned(),
move |op, endpoint| {
if endpoint_to_be_streamed == endpoint {
move |op| {
if endpoint_to_be_streamed == op.endpoint_name {
Some(Ok(on_event_to_typed_response(op, event_desc.clone())))
} else {
None
Expand Down
27 changes: 4 additions & 23 deletions dozer-orchestrator/src/pipeline/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use dozer_core::storage::lmdb_storage::SharedTransaction;
use dozer_core::DEFAULT_PORT_HANDLE;
use dozer_sql::pipeline::builder::SchemaSQLContext;
use dozer_types::crossbeam::channel::Sender;
use dozer_types::grpc_types::internal::pipeline_response::ApiEvent;
use dozer_types::grpc_types::internal::{AliasRedirected, PipelineResponse};
use dozer_types::grpc_types::internal::AliasRedirected;
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use dozer_types::log::{debug, info};
use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex};
Expand Down Expand Up @@ -373,13 +372,7 @@ impl Sink for CacheSink {
if let Some(notifier) = &self.notifier {
let op =
types_helper::map_delete_operation(self.api_endpoint.name.clone(), old);
try_send(
&notifier.1,
PipelineResponse {
endpoint: self.api_endpoint.name.clone(),
api_event: Some(ApiEvent::Op(op)),
},
)?;
try_send(&notifier.1, op)?;
}
}
Operation::Insert { mut new } => {
Expand All @@ -394,13 +387,7 @@ impl Sink for CacheSink {
if let Some(notifier) = &self.notifier {
let op =
types_helper::map_insert_operation(self.api_endpoint.name.clone(), new, id);
try_send(
&notifier.1,
PipelineResponse {
endpoint: self.api_endpoint.name.clone(),
api_event: Some(ApiEvent::Op(op)),
},
)?;
try_send(&notifier.1, op)?;
}
}
Operation::Update { mut old, mut new } => {
Expand All @@ -421,13 +408,7 @@ impl Sink for CacheSink {
old,
new,
);
try_send(
&notifier.1,
PipelineResponse {
endpoint: self.api_endpoint.name.clone(),
api_event: Some(ApiEvent::Op(op)),
},
)?;
try_send(&notifier.1, op)?;
}
}
// FIXME: Maybe we should only switch cache when all source nodes snapshotting are done? (by chubei 2023-02-24)
Expand Down
Loading

0 comments on commit 53c27b7

Please sign in to comment.