Skip to content

Commit

Permalink
feat: Synchronize pipeline and log persisting
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Aug 16, 2023
1 parent 9ec5341 commit 9a14c25
Show file tree
Hide file tree
Showing 35 changed files with 522 additions and 490 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dozer_cache::dozer_log::home_dir::BuildId;
use dozer_cache::dozer_log::replication::Log;
use dozer_cache::dozer_log::replication::{Log, LogResponseFuture};
use dozer_types::bincode;
use dozer_types::grpc_types::internal::internal_pipeline_service_server::{
InternalPipelineService, InternalPipelineServiceServer,
Expand All @@ -11,13 +11,13 @@ use dozer_types::grpc_types::internal::{
use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tonic::transport::server::TcpIncoming;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
Expand Down Expand Up @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer {
) -> Result<Response<StorageResponse>, Status> {
let endpoint = request.into_inner().endpoint;
let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
let storage = log.lock().await.describe_storage();
let storage = log.lock().describe_storage();
Ok(Response::new(StorageResponse {
storage: Some(storage),
}))
Expand Down Expand Up @@ -102,7 +102,14 @@ impl InternalPipelineService for InternalPipelineServer {
Err(e) => return Either::Left(std::future::ready(Err(e))),
}
.log;
Either::Right(get_log(log.clone(), request))

let response = log.lock().read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
);

Either::Right(serialize_log_response(response))
})
.boxed(),
))
Expand All @@ -121,15 +128,7 @@ fn find_log_endpoint<'a>(
})
}

async fn get_log(log: Arc<Mutex<Log>>, request: LogRequest) -> Result<LogResponse, Status> {
let mut log_mut = log.lock().await;
let response = log_mut.read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
);
// Must drop log before awaiting response, otherwise we will deadlock.
drop(log_mut);
async fn serialize_log_response(response: LogResponseFuture) -> Result<LogResponse, Status> {
let response = response
.await
.map_err(|e| Status::new(tonic::Code::Internal, e.to_string()))?;
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use dozer_types::log::debug;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::connection::Connection;
use dozer_types::models::source::Source;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
Expand Down
5 changes: 5 additions & 0 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use dozer_cache::dozer_log::storage::Queue;
use dozer_core::{
epoch::Epoch,
executor_operation::ProcessorOperation,
Expand Down Expand Up @@ -46,6 +47,10 @@ impl Sink for DummySink {
Ok(())
}

fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
Ok(())
}

fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
Ok(())
}
Expand Down
56 changes: 23 additions & 33 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
use dozer_cache::dozer_log::{
attach_progress,
replication::{Log, LogOperation},
storage::Queue,
};
use dozer_core::{
epoch::Epoch,
Expand All @@ -12,10 +13,10 @@ use dozer_core::{
DEFAULT_PORT_HANDLE,
};
use dozer_sql::pipeline::builder::SchemaSQLContext;
use dozer_types::errors::internal::BoxedError;
use dozer_types::indicatif::{MultiProgress, ProgressBar};
use dozer_types::types::Schema;
use tokio::{runtime::Runtime, sync::Mutex};
use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex};
use tokio::runtime::Runtime;

#[derive(Debug)]
pub struct LogSinkFactory {
Expand Down Expand Up @@ -105,47 +106,36 @@ impl Sink for LogSink {
record_store: &ProcessorRecordStore,
op: ProcessorOperation,
) -> Result<(), BoxedError> {
self.runtime.block_on(async {
let mut log = self.log.lock().await;
log.write(
dozer_cache::dozer_log::replication::LogOperation::Op {
op: record_store
.load_operation(&op)
.map_err(Into::<BoxedError>::into)?,
},
self.log.clone(),
)
.await
.map_err(Into::<BoxedError>::into)
})?;
self.log
.lock()
.write(dozer_cache::dozer_log::replication::LogOperation::Op {
op: record_store
.load_operation(&op)
.map_err(Into::<BoxedError>::into)?,
});
self.update_counter();
Ok(())
}

fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> {
self.runtime.block_on(async {
let mut log = self.log.lock().await;
log.write(
LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
},
self.log.clone(),
)
.await
})?;
self.log.lock().write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.update_counter();
Ok(())
}

fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> {
self.log
.lock()
.persist(queue, self.log.clone(), &self.runtime)?;
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
self.runtime.block_on(async {
let mut log = self.log.lock().await;
log.write(
LogOperation::SnapshottingDone { connection_name },
self.log.clone(),
)
.await
})?;
self.log
.lock()
.write(LogOperation::SnapshottingDone { connection_name });
self.update_counter();
Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions dozer-cli/src/simple/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ async fn needs_build(
let mut futures = vec![];
for endpoint in contract.endpoints.keys() {
let endpoint_path = build_path.get_endpoint_path(endpoint);
let (storage, prefix) =
create_data_storage(storage_config.clone(), endpoint_path.log_dir.into()).await?;
let log_dir = build_path
.data_dir
.join(endpoint_path.log_dir_relative_to_data_dir);
let (storage, prefix) = create_data_storage(storage_config.clone(), log_dir.into()).await?;
futures.push(is_empty(storage, prefix));
}
if !try_join_all(futures)
Expand Down
46 changes: 34 additions & 12 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::dyn_clone;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::{Log, LogOptions};
use dozer_cache::dozer_log::replication::{create_data_storage, Log};
use dozer_cache::dozer_log::storage::Storage;
use dozer_core::errors::ExecutionError;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use std::sync::atomic::AtomicBool;
use std::sync::Arc;
Expand All @@ -16,15 +21,15 @@ use dozer_core::executor::{DagExecutor, ExecutorOptions};
use dozer_types::indicatif::MultiProgress;

use dozer_types::models::connection::Connection;
use OrchestrationError::ExecutionError;

use crate::errors::OrchestrationError;

pub struct Executor<'a> {
connections: &'a [Connection],
sources: &'a [Source],
sql: Option<&'a str>,
checkpoint_dir: String,
checkpoint_storage: Box<dyn Storage>,
checkpoint_prefix: String,
/// `ApiEndpoint` and its log.
endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
multi_pb: MultiProgress,
Expand All @@ -37,25 +42,36 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
log_options: LogOptions,
storage_config: DataStorage,
multi_pb: MultiProgress,
) -> Result<Executor<'a>, OrchestrationError> {
let build_path = home_dir
.find_latest_build_path()
.map_err(|(path, error)| OrchestrationError::FileSystem(path.into(), error))?
.ok_or(OrchestrationError::NoBuildFound)?;
let (checkpoint_storage, checkpoint_prefix) =
create_data_storage(storage_config, build_path.data_dir.to_string())
.await
.map_err(ExecutionError::ObjectStorage)?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
let log_endpoint =
create_log_endpoint(&build_path, &endpoint.name, log_options.clone()).await?;
let log_endpoint = create_log_endpoint(
&build_path,
&endpoint.name,
&*checkpoint_storage,
&checkpoint_prefix,
)
.await?;
endpoint_and_logs.push((endpoint.clone(), log_endpoint));
}

Ok(Executor {
connections,
sources,
sql,
checkpoint_dir: build_path.data_dir.into(),
checkpoint_storage,
checkpoint_prefix,
endpoint_and_logs,
multi_pb,
})
Expand Down Expand Up @@ -84,7 +100,8 @@ impl<'a> Executor<'a> {
let dag = builder.build(&runtime)?;
let exec = runtime.block_on(DagExecutor::new(
dag,
self.checkpoint_dir.clone(),
dyn_clone::clone_box(&*self.checkpoint_storage),
self.checkpoint_prefix.clone(),
executor_options,
))?;

Expand All @@ -97,13 +114,16 @@ pub fn run_dag_executor(
running: Arc<AtomicBool>,
) -> Result<(), OrchestrationError> {
let join_handle = dag_executor.start(running)?;
join_handle.join().map_err(ExecutionError)
join_handle
.join()
.map_err(OrchestrationError::ExecutionError)
}

async fn create_log_endpoint(
build_path: &BuildPath,
endpoint_name: &str,
log_options: LogOptions,
checkpoint_storage: &dyn Storage,
checkpoint_prefix: &str,
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);

Expand All @@ -117,7 +137,9 @@ async fn create_log_endpoint(
OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
})?;

let log = Log::new(log_options, endpoint_path.log_dir.into_string(), false).await?;
let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint_prefix)
.join(&endpoint_path.log_dir_relative_to_data_dir);
let log = Log::new(checkpoint_storage, log_prefix.into(), false).await?;
let log = Arc::new(Mutex::new(log));

Ok(LogEndpoint {
Expand Down
6 changes: 3 additions & 3 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{
get_api_security_config, get_app_grpc_config, get_cache_manager_options, get_executor_options,
get_grpc_config, get_log_options, get_rest_config,
get_grpc_config, get_rest_config, get_storage_config,
};

use crate::{flatten_join_handle, join_handle_map_err};
Expand Down Expand Up @@ -175,7 +175,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
get_log_options(&self.config),
get_storage_config(&self.config),
self.multi_pb.clone(),
))?;
let dag_executor = executor
Expand Down Expand Up @@ -303,7 +303,7 @@ impl SimpleOrchestrator {
)?;

// Run build
let storage_config = get_log_options(&self.config).storage_config;
let storage_config = get_storage_config(&self.config);
self.runtime
.block_on(build::build(&home_dir, &contract, &storage_config))?;

Expand Down
Loading

0 comments on commit 9a14c25

Please sign in to comment.