diff --git a/dozer-api/src/grpc/common/tests/mod.rs b/dozer-api/src/grpc/common/tests/mod.rs index 46df7f4704..7a78cd8192 100644 --- a/dozer-api/src/grpc/common/tests/mod.rs +++ b/dozer-api/src/grpc/common/tests/mod.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::grpc::typed::tests::{ fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, service::setup_pipeline, }; @@ -15,8 +17,8 @@ use tonic::Request; use super::CommonService; -fn setup_common_service() -> CommonService { - let (endpoints, rx1) = setup_pipeline(); +async fn setup_common_service() -> CommonService { + let (endpoints, rx1) = setup_pipeline().await; CommonService::new(endpoints, Some(rx1)) } @@ -48,7 +50,7 @@ async fn count_and_query( #[tokio::test] async fn test_grpc_common_count_and_query() { - let service = setup_common_service(); + let service = setup_common_service().await; let endpoint = "films"; // Empty query. @@ -81,7 +83,7 @@ async fn test_grpc_common_count_and_query() { #[tokio::test] async fn test_grpc_common_get_endpoints() { - let service = setup_common_service(); + let service = setup_common_service().await; let response = service .get_endpoints(Request::new(GetEndpointsRequest {})) .await @@ -92,7 +94,7 @@ async fn test_grpc_common_get_endpoints() { #[tokio::test] async fn test_grpc_common_get_fields() { - let service = setup_common_service(); + let service = setup_common_service().await; let response = service .get_fields(Request::new(GetFieldsRequest { endpoint: "films".to_string(), @@ -142,7 +144,8 @@ async fn test_grpc_common_on_event() { default_pipeline_internal.port, rx_internal, )); - let service = setup_common_service(); + tokio::time::sleep(Duration::from_millis(100)).await; // wait for the mock server to start. + let service = setup_common_service().await; let mut rx = service .on_event(Request::new(OnEventRequest { endpoint: "films".to_string(), diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index 0bb3604821..121c4de3b2 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -27,7 +27,7 @@ impl InternalPipelineServer { crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.1); Self { alias_redirected_receiver, - operation_receiver: operation_receiver, + operation_receiver, } } } diff --git a/dozer-api/src/grpc/typed/tests/fake_internal_pipeline_server.rs b/dozer-api/src/grpc/typed/tests/fake_internal_pipeline_server.rs index f065f89553..18b9df8c21 100644 --- a/dozer-api/src/grpc/typed/tests/fake_internal_pipeline_server.rs +++ b/dozer-api/src/grpc/typed/tests/fake_internal_pipeline_server.rs @@ -1,72 +1,62 @@ use core::time; -use crossbeam::channel::Receiver; -use dozer_types::crossbeam; -use dozer_types::grpc_types::internal::{ - internal_pipeline_service_server::{InternalPipelineService, InternalPipelineServiceServer}, - PipelineRequest, -}; -use dozer_types::grpc_types::{ - internal::{pipeline_response::ApiEvent, PipelineResponse}, - types::{value, Operation, OperationType, Record, Value}, +use dozer_types::grpc_types::internal::internal_pipeline_service_server::{ + InternalPipelineService, InternalPipelineServiceServer, }; +use dozer_types::grpc_types::internal::{AliasEventsRequest, AliasRedirected, OperationsRequest}; +use dozer_types::grpc_types::types::{value, Operation, OperationType, Record, Value}; use futures_util::FutureExt; use std::{net::ToSocketAddrs, pin::Pin, thread}; use tokio_stream::wrappers::ReceiverStream; use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status}; pub struct FakeInternalPipelineServer {} -type ResponseStream = Pin> + Send>>; + +type OperationsStream = Pin> + Send>>; +type AliasEventsStream = Pin> + Send>>; #[tonic::async_trait] impl InternalPipelineService for FakeInternalPipelineServer { - type StreamPipelineRequestStream = ResponseStream; - async fn stream_pipeline_request( + type StreamOperationsStream = OperationsStream; + async fn stream_operations( &self, - _request: tonic::Request, - ) -> Result, Status> { + _request: tonic::Request, + ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(1000); thread::spawn(move || loop { thread::sleep(time::Duration::from_millis(100)); - let fake_event = PipelineResponse { - endpoint: "films".to_string(), - api_event: Some(ApiEvent::Op(Operation { - typ: OperationType::Insert as i32, - old: None, - new: Some(Record { - values: vec![ - Value { - value: Some(value::Value::UintValue(32)), - }, - Value { - value: Some(value::Value::StringValue("description".to_string())), - }, - Value { value: None }, - Value { value: None }, - ], - version: 1, - }), - new_id: Some(0), - endpoint_name: "films".to_string(), - })), + let op = Operation { + typ: OperationType::Insert as i32, + old: None, + new: Some(Record { + values: vec![ + Value { + value: Some(value::Value::UintValue(32)), + }, + Value { + value: Some(value::Value::StringValue("description".to_string())), + }, + Value { value: None }, + Value { value: None }, + ], + version: 1, + }), + new_id: Some(0), + endpoint_name: "films".to_string(), }; - tx.try_send(Ok(fake_event)).unwrap(); + tx.try_send(Ok(op)).unwrap(); }); let output_stream = ReceiverStream::new(rx); - Ok(Response::new( - Box::pin(output_stream) as Self::StreamPipelineRequestStream - )) + Ok(Response::new(Box::pin(output_stream))) } -} -struct InternalIterator { - receiver: Receiver, -} -impl Iterator for InternalIterator { - type Item = PipelineResponse; - fn next(&mut self) -> Option { - match self.receiver.recv() { - Ok(msg) => Some(msg), - Err(_) => None, - } + type StreamAliasEventsStream = AliasEventsStream; + + async fn stream_alias_events( + &self, + _request: tonic::Request, + ) -> Result, Status> { + let (_, alias_redirected_receiver) = tokio::sync::mpsc::channel(1000); + let output_stream = ReceiverStream::new(alias_redirected_receiver); + Ok(Response::new(Box::pin(output_stream))) } } diff --git a/dozer-api/src/grpc/typed/tests/service.rs b/dozer-api/src/grpc/typed/tests/service.rs index 22156ce9a1..93e941e796 100644 --- a/dozer-api/src/grpc/typed/tests/service.rs +++ b/dozer-api/src/grpc/typed/tests/service.rs @@ -1,8 +1,9 @@ use crate::{ auth::{Access, Authorizer}, + errors::GrpcError, grpc::{ auth_middleware::AuthMiddlewareLayer, - client_server::ApiServer, + internal::internal_pipeline_client::InternalPipelineClient, typed::{ tests::fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, TypedService, }, @@ -16,8 +17,7 @@ use dozer_types::grpc_types::{ films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest, QueryFilmsResponse, }, - internal::PipelineResponse, - types::EventType, + types::{EventType, Operation}, }; use dozer_types::models::{api_config::default_api_config, api_security::ApiSecurity}; use futures_util::FutureExt; @@ -38,7 +38,15 @@ use tonic::{ Code, Request, }; -pub fn setup_pipeline() -> (Vec>, Receiver) { +async fn start_internal_pipeline_client() -> Result, GrpcError> { + let default_api_internal = default_api_config().app_grpc.unwrap_or_default(); + let mut client = InternalPipelineClient::new(&default_api_internal).await?; + let (receiver, future) = client.stream_operations().await?; + tokio::spawn(future); + Ok(receiver) +} + +pub async fn setup_pipeline() -> (Vec>, Receiver) { let endpoint = test_utils::get_endpoint(); let cache_endpoint = Arc::new( RoCacheEndpoint::new( @@ -48,22 +56,18 @@ pub fn setup_pipeline() -> (Vec>, Receiver(16); - let default_api_internal = default_api_config().app_grpc.unwrap_or_default(); - tokio::spawn(async { - ApiServer::setup_broad_cast_channel(tx, default_api_internal) - .await - .unwrap(); - }); + let receiver = start_internal_pipeline_client() + .await + .unwrap_or(broadcast::channel::(1).1); - (vec![cache_endpoint], rx1) + (vec![cache_endpoint], receiver) } -fn setup_typed_service(security: Option) -> TypedService { +async fn setup_typed_service(security: Option) -> TypedService { // Copy this file from dozer-tests output directory if it changes let res = env::current_dir().unwrap(); let path = res.join("src/grpc/typed/tests/generated_films.bin"); - let (endpoints, rx1) = setup_pipeline(); + let (endpoints, rx1) = setup_pipeline().await; TypedService::new(&path, endpoints, Some(rx1), security).unwrap() } @@ -82,7 +86,7 @@ async fn test_grpc_count_and_query_common( rx_internal, )); - let typed_service = setup_typed_service(api_security.to_owned()); + let typed_service = setup_typed_service(api_security.to_owned()).await; let (_tx, rx) = oneshot::channel::<()>(); // middleware let layer = tower::ServiceBuilder::new() @@ -226,7 +230,7 @@ async fn test_typed_streaming1() { )); let (_tx, rx) = oneshot::channel::<()>(); let _jh = tokio::spawn(async move { - let typed_service = setup_typed_service(None); + let typed_service = setup_typed_service(None).await; Server::builder() .add_service(typed_service) .serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop)) @@ -265,7 +269,7 @@ async fn test_typed_streaming2() { )); let (_tx, rx) = oneshot::channel::<()>(); let _jh = tokio::spawn(async move { - let typed_service = setup_typed_service(None); + let typed_service = setup_typed_service(None).await; Server::builder() .add_service(typed_service) .serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop)) @@ -303,7 +307,7 @@ async fn test_typed_streaming3() { )); let (_tx, rx) = oneshot::channel::<()>(); let _jh = tokio::spawn(async move { - let typed_service = setup_typed_service(None); + let typed_service = setup_typed_service(None).await; Server::builder() .add_service(typed_service) .serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop)) diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index e2a9f8f29b..400a675d7d 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -12,7 +12,7 @@ use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT}; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction}; -use dozer_types::node::NodeHandle; +use dozer_types::node::{NodeHandle, SourceStates}; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, }; @@ -450,6 +450,7 @@ impl SinkFactory for ErrSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(ErrSink { err_at: self.err_at, diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index 85c5e0cdac..d5469d1ffb 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -7,7 +7,7 @@ use crate::node::{ use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_storage::lmdb_storage::LmdbExclusiveTransaction; -use dozer_types::node::NodeHandle; +use dozer_types::node::{NodeHandle, SourceStates}; use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition}; use std::collections::HashMap; @@ -186,6 +186,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { todo!() } diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs index f4322f4aa2..cb066c5d2e 100644 --- a/dozer-core/src/tests/sinks.rs +++ b/dozer-core/src/tests/sinks.rs @@ -4,6 +4,7 @@ use crate::node::{PortHandle, Sink, SinkFactory}; use crate::record_store::RecordReader; use crate::DEFAULT_PORT_HANDLE; use dozer_storage::lmdb_storage::SharedTransaction; +use dozer_types::node::SourceStates; use dozer_types::types::{Operation, Schema}; use dozer_types::log::debug; @@ -45,6 +46,7 @@ impl SinkFactory for CountingSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(CountingSink { expected: self.expected, @@ -113,6 +115,7 @@ impl SinkFactory for ConnectivityTestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { unimplemented!("This struct is for connectivity test, only input ports are defined") } @@ -136,6 +139,7 @@ impl SinkFactory for NoInputPortSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { unimplemented!("This struct is for connectivity test, only input ports are defined") } diff --git a/dozer-ingestion/src/connectors/kafka/tests.rs b/dozer-ingestion/src/connectors/kafka/tests.rs index dd969b667d..8b91b53f10 100644 --- a/dozer-ingestion/src/connectors/kafka/tests.rs +++ b/dozer-ingestion/src/connectors/kafka/tests.rs @@ -99,6 +99,7 @@ fn connector_disabled_test_e2e_connect_debezium_and_use_kafka_stream() { panic!("Unexpected operation"); } } + Operation::SnapshottingDone {} => (), } } } diff --git a/dozer-orchestrator/src/pipeline/connector_source.rs b/dozer-orchestrator/src/pipeline/connector_source.rs index 92f9dcc6f4..34205b4020 100644 --- a/dozer-orchestrator/src/pipeline/connector_source.rs +++ b/dozer-orchestrator/src/pipeline/connector_source.rs @@ -63,6 +63,7 @@ impl ConnectorSourceFactory { }) } + #[allow(clippy::type_complexity)] fn get_schema_map( connection: Connection, tables: Vec, diff --git a/dozer-orchestrator/src/pipeline/sinks.rs b/dozer-orchestrator/src/pipeline/sinks.rs index 5fd6399df1..98a631bbcb 100644 --- a/dozer-orchestrator/src/pipeline/sinks.rs +++ b/dozer-orchestrator/src/pipeline/sinks.rs @@ -483,7 +483,7 @@ mod tests { use crate::test_utils; - use dozer_cache::cache::{index, CacheManager}; + use dozer_cache::cache::index; use dozer_core::node::Sink; use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager; use dozer_core::DEFAULT_PORT_HANDLE; diff --git a/dozer-orchestrator/src/test_utils.rs b/dozer-orchestrator/src/test_utils.rs index 1d7dfdba40..cce5f15e08 100644 --- a/dozer-orchestrator/src/test_utils.rs +++ b/dozer-orchestrator/src/test_utils.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::pipeline::CacheSink; use dozer_cache::cache::{CacheManager, LmdbCacheManager}; use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex}; @@ -29,13 +31,18 @@ pub fn get_schema() -> Schema { pub fn init_sink( schema: Schema, secondary_indexes: Vec, -) -> (LmdbCacheManager, CacheSink) { - let cache_manager = LmdbCacheManager::new(Default::default()).unwrap(); - let api_endpoint = init_endpoint(); - let cache = cache_manager - .create_cache(vec![(api_endpoint.name, schema, secondary_indexes)]) - .unwrap(); - let cache = CacheSink::new(cache, init_endpoint(), None, None).unwrap(); +) -> (Arc, CacheSink) { + let cache_manager = Arc::new(LmdbCacheManager::new(Default::default()).unwrap()); + let cache = CacheSink::new( + cache_manager.clone(), + init_endpoint(), + &Default::default(), + schema, + secondary_indexes, + None, + None, + ) + .unwrap(); (cache_manager, cache) } pub fn init_endpoint() -> ApiEndpoint { diff --git a/dozer-sql/src/pipeline/product/tests/left_join_test.rs b/dozer-sql/src/pipeline/product/tests/left_join_test.rs index 8007232a57..49a90d36f4 100644 --- a/dozer-sql/src/pipeline/product/tests/left_join_test.rs +++ b/dozer-sql/src/pipeline/product/tests/left_join_test.rs @@ -10,6 +10,7 @@ use dozer_core::node::{ use dozer_core::record_store::RecordReader; use dozer_core::storage::lmdb_storage::SharedTransaction; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_types::node::SourceStates; use dozer_types::ordered_float::OrderedFloat; use dozer_types::tracing::{debug, info}; use dozer_types::types::{ @@ -469,6 +470,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(TestSink { expected: self.expected, @@ -506,6 +508,9 @@ impl Sink for TestSink { Operation::Update { old, new } => { info!("o0:-> - {:?}, + {:?}", old.values, new.values) } + Operation::SnapshottingDone {} => { + info!("o0:-> SnapshottingDone") + } } self.current += 1; diff --git a/dozer-sql/src/pipeline/product/tests/pipeline_test.rs b/dozer-sql/src/pipeline/product/tests/pipeline_test.rs index 954703f157..c1b3b88fdd 100644 --- a/dozer-sql/src/pipeline/product/tests/pipeline_test.rs +++ b/dozer-sql/src/pipeline/product/tests/pipeline_test.rs @@ -10,6 +10,7 @@ use dozer_core::node::{ use dozer_core::record_store::RecordReader; use dozer_core::storage::lmdb_storage::SharedTransaction; use dozer_core::DEFAULT_PORT_HANDLE; +use dozer_types::node::SourceStates; use dozer_types::ordered_float::OrderedFloat; use dozer_types::tracing::{debug, info}; use dozer_types::types::{ @@ -463,6 +464,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(TestSink { expected: self.expected, @@ -500,6 +502,9 @@ impl Sink for TestSink { Operation::Update { old, new } => { info!("o0:-> - {:?}, + {:?}", old.values, new.values) } + Operation::SnapshottingDone {} => { + info!("o0:-> SnapshottingDone") + } } self.current += 1; diff --git a/dozer-sql/src/pipeline/product/tests/set_operator_test.rs b/dozer-sql/src/pipeline/product/tests/set_operator_test.rs index 5b3eeb875a..e2e3d08cb6 100644 --- a/dozer-sql/src/pipeline/product/tests/set_operator_test.rs +++ b/dozer-sql/src/pipeline/product/tests/set_operator_test.rs @@ -13,6 +13,7 @@ use dozer_core::storage::lmdb_storage::SharedTransaction; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_types::chrono::NaiveDate; use dozer_types::log::debug; +use dozer_types::node::SourceStates; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, }; @@ -448,6 +449,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(TestSink { expected: self.expected, @@ -485,6 +487,7 @@ impl Sink for TestSink { Operation::Update { old, new } => { debug!("o0:-> - {:?}, + {:?}", old.values, new.values) } + Operation::SnapshottingDone {} => debug!("o0:-> SnapshottingDone"), } self.current += 1; diff --git a/dozer-sql/src/pipeline/tests/builder_test.rs b/dozer-sql/src/pipeline/tests/builder_test.rs index b95207e1f0..dc595d531c 100644 --- a/dozer-sql/src/pipeline/tests/builder_test.rs +++ b/dozer-sql/src/pipeline/tests/builder_test.rs @@ -10,6 +10,7 @@ use dozer_core::record_store::RecordReader; use dozer_core::storage::lmdb_storage::SharedTransaction; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_types::log::debug; +use dozer_types::node::SourceStates; use dozer_types::ordered_float::OrderedFloat; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -148,6 +149,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, _input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { Ok(Box::new(TestSink {})) } diff --git a/dozer-tests/src/cache_tests/film/load_database.rs b/dozer-tests/src/cache_tests/film/load_database.rs index 358d654534..a83434127b 100644 --- a/dozer-tests/src/cache_tests/film/load_database.rs +++ b/dozer-tests/src/cache_tests/film/load_database.rs @@ -62,7 +62,7 @@ pub async fn load_database( .await .unwrap(); } - cache.commit().unwrap(); + cache.commit(&Default::default()).unwrap(); let cache_name = cache.name().to_string(); drop(cache); diff --git a/dozer-tests/src/sql_tests/mapper.rs b/dozer-tests/src/sql_tests/mapper.rs index aa1c494f8a..88d0a3094b 100644 --- a/dozer-tests/src/sql_tests/mapper.rs +++ b/dozer-tests/src/sql_tests/mapper.rs @@ -231,6 +231,7 @@ impl SqlMapper { map_field_to_string(&pkey_value) )) } + Operation::SnapshottingDone {} => Ok(String::new()), } } diff --git a/dozer-tests/src/sql_tests/pipeline.rs b/dozer-tests/src/sql_tests/pipeline.rs index e865a1bb4a..fecaba69b2 100644 --- a/dozer-tests/src/sql_tests/pipeline.rs +++ b/dozer-tests/src/sql_tests/pipeline.rs @@ -17,6 +17,7 @@ use dozer_core::storage::lmdb_storage::SharedTransaction; use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext}; use dozer_types::crossbeam::channel::{Receiver, Sender}; +use dozer_types::node::SourceStates; use dozer_types::types::{Operation, Schema, SourceDefinition}; use std::collections::HashMap; @@ -185,6 +186,7 @@ impl SinkFactory for TestSinkFactory { fn build( &self, input_schemas: HashMap, + _source_states: &SourceStates, ) -> Result, ExecutionError> { let schema = input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone();