Skip to content

Commit

Permalink
fix: tests and clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Feb 26, 2023
1 parent 53c27b7 commit f605c33
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 86 deletions.
15 changes: 9 additions & 6 deletions dozer-api/src/grpc/common/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::grpc::typed::tests::{
fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, service::setup_pipeline,
};
Expand All @@ -15,8 +17,8 @@ use tonic::Request;

use super::CommonService;

fn setup_common_service() -> CommonService {
let (endpoints, rx1) = setup_pipeline();
async fn setup_common_service() -> CommonService {
let (endpoints, rx1) = setup_pipeline().await;
CommonService::new(endpoints, Some(rx1))
}

Expand Down Expand Up @@ -48,7 +50,7 @@ async fn count_and_query(

#[tokio::test]
async fn test_grpc_common_count_and_query() {
let service = setup_common_service();
let service = setup_common_service().await;
let endpoint = "films";

// Empty query.
Expand Down Expand Up @@ -81,7 +83,7 @@ async fn test_grpc_common_count_and_query() {

#[tokio::test]
async fn test_grpc_common_get_endpoints() {
let service = setup_common_service();
let service = setup_common_service().await;
let response = service
.get_endpoints(Request::new(GetEndpointsRequest {}))
.await
Expand All @@ -92,7 +94,7 @@ async fn test_grpc_common_get_endpoints() {

#[tokio::test]
async fn test_grpc_common_get_fields() {
let service = setup_common_service();
let service = setup_common_service().await;
let response = service
.get_fields(Request::new(GetFieldsRequest {
endpoint: "films".to_string(),
Expand Down Expand Up @@ -142,7 +144,8 @@ async fn test_grpc_common_on_event() {
default_pipeline_internal.port,
rx_internal,
));
let service = setup_common_service();
tokio::time::sleep(Duration::from_millis(100)).await; // wait for the mock server to start.
let service = setup_common_service().await;
let mut rx = service
.on_event(Request::new(OnEventRequest {
endpoint: "films".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl InternalPipelineServer {
crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.1);
Self {
alias_redirected_receiver,
operation_receiver: operation_receiver,
operation_receiver,
}
}
}
Expand Down
90 changes: 40 additions & 50 deletions dozer-api/src/grpc/typed/tests/fake_internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,62 @@
use core::time;
use crossbeam::channel::Receiver;
use dozer_types::crossbeam;
use dozer_types::grpc_types::internal::{
internal_pipeline_service_server::{InternalPipelineService, InternalPipelineServiceServer},
PipelineRequest,
};
use dozer_types::grpc_types::{
internal::{pipeline_response::ApiEvent, PipelineResponse},
types::{value, Operation, OperationType, Record, Value},
use dozer_types::grpc_types::internal::internal_pipeline_service_server::{
InternalPipelineService, InternalPipelineServiceServer,
};
use dozer_types::grpc_types::internal::{AliasEventsRequest, AliasRedirected, OperationsRequest};
use dozer_types::grpc_types::types::{value, Operation, OperationType, Record, Value};
use futures_util::FutureExt;
use std::{net::ToSocketAddrs, pin::Pin, thread};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status};
pub struct FakeInternalPipelineServer {}
type ResponseStream = Pin<Box<dyn Stream<Item = Result<PipelineResponse, Status>> + Send>>;

type OperationsStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
type AliasEventsStream = Pin<Box<dyn Stream<Item = Result<AliasRedirected, Status>> + Send>>;

#[tonic::async_trait]
impl InternalPipelineService for FakeInternalPipelineServer {
type StreamPipelineRequestStream = ResponseStream;
async fn stream_pipeline_request(
type StreamOperationsStream = OperationsStream;
async fn stream_operations(
&self,
_request: tonic::Request<PipelineRequest>,
) -> Result<Response<ResponseStream>, Status> {
_request: tonic::Request<OperationsRequest>,
) -> Result<Response<OperationsStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
thread::spawn(move || loop {
thread::sleep(time::Duration::from_millis(100));
let fake_event = PipelineResponse {
endpoint: "films".to_string(),
api_event: Some(ApiEvent::Op(Operation {
typ: OperationType::Insert as i32,
old: None,
new: Some(Record {
values: vec![
Value {
value: Some(value::Value::UintValue(32)),
},
Value {
value: Some(value::Value::StringValue("description".to_string())),
},
Value { value: None },
Value { value: None },
],
version: 1,
}),
new_id: Some(0),
endpoint_name: "films".to_string(),
})),
let op = Operation {
typ: OperationType::Insert as i32,
old: None,
new: Some(Record {
values: vec![
Value {
value: Some(value::Value::UintValue(32)),
},
Value {
value: Some(value::Value::StringValue("description".to_string())),
},
Value { value: None },
Value { value: None },
],
version: 1,
}),
new_id: Some(0),
endpoint_name: "films".to_string(),
};
tx.try_send(Ok(fake_event)).unwrap();
tx.try_send(Ok(op)).unwrap();
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(
Box::pin(output_stream) as Self::StreamPipelineRequestStream
))
Ok(Response::new(Box::pin(output_stream)))
}
}
struct InternalIterator {
receiver: Receiver<PipelineResponse>,
}
impl Iterator for InternalIterator {
type Item = PipelineResponse;

fn next(&mut self) -> Option<Self::Item> {
match self.receiver.recv() {
Ok(msg) => Some(msg),
Err(_) => None,
}
type StreamAliasEventsStream = AliasEventsStream;

async fn stream_alias_events(
&self,
_request: tonic::Request<AliasEventsRequest>,
) -> Result<Response<Self::StreamAliasEventsStream>, Status> {
let (_, alias_redirected_receiver) = tokio::sync::mpsc::channel(1000);
let output_stream = ReceiverStream::new(alias_redirected_receiver);
Ok(Response::new(Box::pin(output_stream)))
}
}

Expand Down
40 changes: 22 additions & 18 deletions dozer-api/src/grpc/typed/tests/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::{
auth::{Access, Authorizer},
errors::GrpcError,
grpc::{
auth_middleware::AuthMiddlewareLayer,
client_server::ApiServer,
internal::internal_pipeline_client::InternalPipelineClient,
typed::{
tests::fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, TypedService,
},
Expand All @@ -16,8 +17,7 @@ use dozer_types::grpc_types::{
films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
QueryFilmsResponse,
},
internal::PipelineResponse,
types::EventType,
types::{EventType, Operation},
};
use dozer_types::models::{api_config::default_api_config, api_security::ApiSecurity};
use futures_util::FutureExt;
Expand All @@ -38,7 +38,15 @@ use tonic::{
Code, Request,
};

pub fn setup_pipeline() -> (Vec<Arc<RoCacheEndpoint>>, Receiver<PipelineResponse>) {
async fn start_internal_pipeline_client() -> Result<Receiver<Operation>, GrpcError> {
let default_api_internal = default_api_config().app_grpc.unwrap_or_default();
let mut client = InternalPipelineClient::new(&default_api_internal).await?;
let (receiver, future) = client.stream_operations().await?;
tokio::spawn(future);
Ok(receiver)
}

pub async fn setup_pipeline() -> (Vec<Arc<RoCacheEndpoint>>, Receiver<Operation>) {
let endpoint = test_utils::get_endpoint();
let cache_endpoint = Arc::new(
RoCacheEndpoint::new(
Expand All @@ -48,22 +56,18 @@ pub fn setup_pipeline() -> (Vec<Arc<RoCacheEndpoint>>, Receiver<PipelineResponse
.unwrap(),
);

let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
let default_api_internal = default_api_config().app_grpc.unwrap_or_default();
tokio::spawn(async {
ApiServer::setup_broad_cast_channel(tx, default_api_internal)
.await
.unwrap();
});
let receiver = start_internal_pipeline_client()
.await
.unwrap_or(broadcast::channel::<Operation>(1).1);

(vec![cache_endpoint], rx1)
(vec![cache_endpoint], receiver)
}

fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
async fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
// Copy this file from dozer-tests output directory if it changes
let res = env::current_dir().unwrap();
let path = res.join("src/grpc/typed/tests/generated_films.bin");
let (endpoints, rx1) = setup_pipeline();
let (endpoints, rx1) = setup_pipeline().await;

TypedService::new(&path, endpoints, Some(rx1), security).unwrap()
}
Expand All @@ -82,7 +86,7 @@ async fn test_grpc_count_and_query_common(
rx_internal,
));

let typed_service = setup_typed_service(api_security.to_owned());
let typed_service = setup_typed_service(api_security.to_owned()).await;
let (_tx, rx) = oneshot::channel::<()>();
// middleware
let layer = tower::ServiceBuilder::new()
Expand Down Expand Up @@ -226,7 +230,7 @@ async fn test_typed_streaming1() {
));
let (_tx, rx) = oneshot::channel::<()>();
let _jh = tokio::spawn(async move {
let typed_service = setup_typed_service(None);
let typed_service = setup_typed_service(None).await;
Server::builder()
.add_service(typed_service)
.serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop))
Expand Down Expand Up @@ -265,7 +269,7 @@ async fn test_typed_streaming2() {
));
let (_tx, rx) = oneshot::channel::<()>();
let _jh = tokio::spawn(async move {
let typed_service = setup_typed_service(None);
let typed_service = setup_typed_service(None).await;
Server::builder()
.add_service(typed_service)
.serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop))
Expand Down Expand Up @@ -303,7 +307,7 @@ async fn test_typed_streaming3() {
));
let (_tx, rx) = oneshot::channel::<()>();
let _jh = tokio::spawn(async move {
let typed_service = setup_typed_service(None);
let typed_service = setup_typed_service(None).await;
Server::builder()
.add_service(typed_service)
.serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop))
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
use dozer_types::node::NodeHandle;
use dozer_types::node::{NodeHandle, SourceStates};
use dozer_types::types::{
Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
};
Expand Down Expand Up @@ -450,6 +450,7 @@ impl SinkFactory<NoneContext> for ErrSinkFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_source_states: &SourceStates,
) -> Result<Box<dyn Sink>, ExecutionError> {
Ok(Box::new(ErrSink {
err_at: self.err_at,
Expand Down
3 changes: 2 additions & 1 deletion dozer-core/src/tests/dag_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::node::{
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};

use dozer_storage::lmdb_storage::LmdbExclusiveTransaction;
use dozer_types::node::NodeHandle;
use dozer_types::node::{NodeHandle, SourceStates};
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
use std::collections::HashMap;

Expand Down Expand Up @@ -186,6 +186,7 @@ impl SinkFactory<NoneContext> for TestSinkFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_source_states: &SourceStates,
) -> Result<Box<dyn crate::node::Sink>, ExecutionError> {
todo!()
}
Expand Down
4 changes: 4 additions & 0 deletions dozer-core/src/tests/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::node::{PortHandle, Sink, SinkFactory};
use crate::record_store::RecordReader;
use crate::DEFAULT_PORT_HANDLE;
use dozer_storage::lmdb_storage::SharedTransaction;
use dozer_types::node::SourceStates;
use dozer_types::types::{Operation, Schema};

use dozer_types::log::debug;
Expand Down Expand Up @@ -45,6 +46,7 @@ impl SinkFactory<NoneContext> for CountingSinkFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_source_states: &SourceStates,
) -> Result<Box<dyn Sink>, ExecutionError> {
Ok(Box::new(CountingSink {
expected: self.expected,
Expand Down Expand Up @@ -113,6 +115,7 @@ impl SinkFactory<NoneContext> for ConnectivityTestSinkFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_source_states: &SourceStates,
) -> Result<Box<dyn Sink>, ExecutionError> {
unimplemented!("This struct is for connectivity test, only input ports are defined")
}
Expand All @@ -136,6 +139,7 @@ impl SinkFactory<NoneContext> for NoInputPortSinkFactory {
fn build(
&self,
_input_schemas: HashMap<PortHandle, Schema>,
_source_states: &SourceStates,
) -> Result<Box<dyn Sink>, ExecutionError> {
unimplemented!("This struct is for connectivity test, only input ports are defined")
}
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/src/connectors/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn connector_disabled_test_e2e_connect_debezium_and_use_kafka_stream() {
panic!("Unexpected operation");
}
}
Operation::SnapshottingDone {} => (),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions dozer-orchestrator/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ConnectorSourceFactory {
})
}

#[allow(clippy::type_complexity)]
fn get_schema_map(
connection: Connection,
tables: Vec<TableInfo>,
Expand Down
2 changes: 1 addition & 1 deletion dozer-orchestrator/src/pipeline/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ mod tests {

use crate::test_utils;

use dozer_cache::cache::{index, CacheManager};
use dozer_cache::cache::index;
use dozer_core::node::Sink;
use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
use dozer_core::DEFAULT_PORT_HANDLE;
Expand Down
Loading

0 comments on commit f605c33

Please sign in to comment.