Skip to content

Commit

Permalink
Write dozer.lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Aug 30, 2023
1 parent 97ff02d commit 00cc384
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dozer-config.test.*
log4rs.yaml
.DS_Store
queries
dozer.lock

.dozer/
logs/
Expand Down
8 changes: 7 additions & 1 deletion dozer-cli/src/cli/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::errors::CliError::{ConfigurationFilePathNotProvided, FailedToFindConf
use crate::errors::ConfigCombineError::CannotReadConfig;
use crate::errors::OrchestrationError;
use crate::simple::SimpleOrchestrator as Dozer;

use atty::Stream;
use dozer_cache::dozer_log::camino::Utf8PathBuf;
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::default_cache_max_map_size;
use dozer_types::prettytable::{row, Table};
Expand Down Expand Up @@ -35,7 +37,11 @@ pub async fn init_dozer(
let page_size = page_size::get() as u64;
config.cache_max_map_size = Some(cache_max_map_size / page_size * page_size);

Ok(Dozer::new(config, runtime, labels))
let base_directory = std::env::current_dir().map_err(CliError::Io)?;
let base_directory =
Utf8PathBuf::try_from(base_directory).map_err(|e| CliError::Io(e.into_io_error()))?;

Ok(Dozer::new(base_directory, config, runtime, labels))
}

pub async fn list_sources(
Expand Down
3 changes: 3 additions & 0 deletions dozer-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub enum CliError {
MissingConfigOverride(String),
#[error("Failed to deserialize config from json: {0}")]
DeserializeConfigFromJson(#[source] serde_json::Error),
// Generic IO error
#[error(transparent)]
Io(#[from] std::io::Error),
}

#[derive(Error, Debug)]
Expand Down
23 changes: 15 additions & 8 deletions dozer-cli/src/simple/build/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
path::Path,
};

use dozer_cache::dozer_log::{home_dir::BuildPath, schemas::EndpointSchema};
use dozer_cache::dozer_log::schemas::EndpointSchema;
use dozer_core::{
dag_schemas::DagSchemas,
daggy::{self, NodeIndex},
Expand Down Expand Up @@ -162,13 +162,13 @@ impl Contract {
})
}

pub fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
serde_json_to_path(&build_path.dag_path, &self)?;
pub fn serialize(&self, path: &Path) -> Result<(), BuildError> {
serde_json_to_path(path, &self)?;
Ok(())
}

pub fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
serde_json_from_path(&build_path.dag_path)
pub fn deserialize(path: &Path) -> Result<Self, BuildError> {
serde_json_from_path(path)
}
}

Expand Down Expand Up @@ -219,12 +219,15 @@ fn collect_ancestor_sources_recursive(
}

fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
let file = OpenOptions::new()
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path.as_ref())
.map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
serde_json::to_writer_pretty(file, value).map_err(BuildError::SerdeJson)
let res = serde_json::to_writer_pretty(&mut file, value).map_err(BuildError::SerdeJson);
file.sync_all().unwrap();
res
}

fn serde_json_from_path<T>(path: impl AsRef<Path>) -> Result<T, BuildError>
Expand All @@ -235,7 +238,11 @@ where
.read(true)
.open(path.as_ref())
.map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
serde_json::from_reader(file).map_err(BuildError::FailedToLoadExistingContract)
let result = serde_json::from_reader(file).map_err(BuildError::FailedToLoadExistingContract);
if result.is_err() {
let _ = dbg!(std::fs::read_to_string(path.as_ref()));
}
result
}

mod modify_schema;
24 changes: 14 additions & 10 deletions dozer-cli/src/simple/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@ pub use contract::{Contract, PipelineContract};
pub async fn build(
home_dir: &HomeDir,
contract: &Contract,
existing_contract: Option<&Contract>,
storage_config: &DataStorage,
) -> Result<(), BuildError> {
if let Some(build_id) = needs_build(home_dir, contract, storage_config).await? {
if let Some(build_id) =
new_build_id(home_dir, contract, existing_contract, storage_config).await?
{
let build_name = build_id.name().to_string();
create_build(home_dir, build_id, contract)?;
build_endpoint_protos(home_dir, build_id, contract)?;
info!("Created new build {build_name}");
} else {
info!("Building not needed");
}
Ok(())
}

async fn needs_build(
async fn new_build_id(
home_dir: &HomeDir,
contract: &Contract,
existing_contract: Option<&Contract>,
storage_config: &DataStorage,
) -> Result<Option<BuildId>, BuildError> {
let build_path = home_dir
Expand All @@ -40,6 +44,10 @@ async fn needs_build(
return Ok(Some(BuildId::first()));
};

let Some(existing_contract) = existing_contract else {
return Ok(Some(build_path.id.next()));
};

let mut futures = vec![];
for endpoint in contract.endpoints.keys() {
let endpoint_path = build_path.get_endpoint_path(endpoint);
Expand All @@ -52,20 +60,18 @@ async fn needs_build(
if !try_join_all(futures)
.await?
.into_iter()
.all(|is_empty| is_empty)
.all(std::convert::identity)
{
return Ok(Some(build_path.id.next()));
}

let existing_contract = Contract::deserialize(&build_path)?;
for (endpoint, schema) in &contract.endpoints {
if let Some(existing_schema) = existing_contract.endpoints.get(endpoint) {
if schema == existing_schema {
continue;
}
} else {
return Ok(Some(build_path.id.next()));
}
return Ok(Some(build_path.id.next()));
}
Ok(None)
}
Expand All @@ -75,7 +81,7 @@ async fn is_empty(storage: Box<dyn Storage>, prefix: String) -> Result<bool, Bui
Ok(objects.objects.is_empty())
}

fn create_build(
fn build_endpoint_protos(
home_dir: &HomeDir,
build_id: BuildId,
contract: &Contract,
Expand Down Expand Up @@ -104,7 +110,5 @@ fn create_build(
&resources,
)?;

contract.serialize(&build_path)?;

Ok(())
}
9 changes: 7 additions & 2 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ pub struct Executor<'a> {
}

impl<'a> Executor<'a> {
// TODO: Refactor this to not require both `contract` and all of
// connections, sources and sql
#[allow(clippy::too_many_arguments)]
pub async fn new(
home_dir: &'a HomeDir,
contract: &Contract,
connections: &'a [Connection],
sources: &'a [Source],
sql: Option<&'a str>,
Expand All @@ -63,7 +67,8 @@ impl<'a> Executor<'a> {
let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
let log_endpoint =
create_log_endpoint(&build_path, &endpoint.name, &checkpoint_factory).await?;
create_log_endpoint(contract, &build_path, &endpoint.name, &checkpoint_factory)
.await?;
endpoint_and_logs.push((endpoint.clone(), log_endpoint));
}

Expand Down Expand Up @@ -119,13 +124,13 @@ pub fn run_dag_executor(
}

async fn create_log_endpoint(
contract: &Contract,
build_path: &BuildPath,
endpoint_name: &str,
checkpoint_factory: &CheckpointFactory,
) -> Result<LogEndpoint, OrchestrationError> {
let endpoint_path = build_path.get_endpoint_path(endpoint_name);

let contract = Contract::deserialize(build_path)?;
let schema = contract
.endpoints
.get(endpoint_name)
Expand Down
38 changes: 31 additions & 7 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::executor::{run_dag_executor, Executor};
use super::Contract;
use crate::errors::OrchestrationError;
use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
Expand All @@ -15,6 +16,7 @@ use dozer_api::auth::{Access, Authorizer};
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
use dozer_api::{grpc, rest, CacheEndpoint};
use dozer_cache::cache::LmdbRwCacheManager;
use dozer_cache::dozer_log::camino::Utf8PathBuf;
use dozer_cache::dozer_log::home_dir::HomeDir;
use dozer_core::app::AppPipeline;
use dozer_core::dag_schemas::DagSchemas;
Expand Down Expand Up @@ -48,14 +50,21 @@ use tokio::sync::broadcast;

#[derive(Clone)]
pub struct SimpleOrchestrator {
pub base_directory: Utf8PathBuf,
pub config: Config,
pub runtime: Arc<Runtime>,
pub labels: LabelsAndProgress,
}

impl SimpleOrchestrator {
pub fn new(config: Config, runtime: Arc<Runtime>, labels: LabelsAndProgress) -> Self {
pub fn new(
base_directory: Utf8PathBuf,
config: Config,
runtime: Arc<Runtime>,
labels: LabelsAndProgress,
) -> Self {
Self {
base_directory,
config,
runtime,
labels,
Expand Down Expand Up @@ -178,9 +187,13 @@ impl SimpleOrchestrator {
shutdown: ShutdownReceiver,
api_notifier: Option<Sender<bool>>,
) -> Result<(), OrchestrationError> {
let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
let home_dir = self.base_directory.join(&self.config.home_dir);
let cache_dir = self.base_directory.join(&self.config.cache_dir);
let home_dir = HomeDir::new(home_dir, cache_dir);
let contract = Contract::deserialize(self.base_directory.join("dozer.lock").as_std_path())?;
let executor = self.runtime.block_on(Executor::new(
&home_dir,
&contract,
&self.config.connections,
&self.config.sources,
self.config.sql.as_deref(),
Expand Down Expand Up @@ -270,7 +283,9 @@ impl SimpleOrchestrator {
force: bool,
shutdown: ShutdownReceiver,
) -> Result<(), OrchestrationError> {
let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
let home_dir = self.base_directory.join(&self.config.home_dir);
let cache_dir = self.base_directory.join(&self.config.cache_dir);
let home_dir = HomeDir::new(home_dir, cache_dir);

info!(
"Initiating app: {}",
Expand Down Expand Up @@ -327,24 +342,33 @@ impl SimpleOrchestrator {
enable_on_event,
)?;

let contract_path = self.base_directory.join("dozer.lock");
let existing_contract = Contract::deserialize(contract_path.as_std_path()).ok();

// Run build
let storage_config = get_storage_config(&self.config);
self.runtime
.block_on(build::build(&home_dir, &contract, &storage_config))?;
self.runtime.block_on(build::build(
&home_dir,
&contract,
existing_contract.as_ref(),
&storage_config,
))?;

contract.serialize(contract_path.as_std_path())?;

Ok(())
}

// Cleaning the entire folder as there will be inconsistencies
// between pipeline, cache and generated proto files.
pub fn clean(&mut self) -> Result<(), OrchestrationError> {
let cache_dir = PathBuf::from(self.config.cache_dir.clone());
let cache_dir = PathBuf::from(self.base_directory.join(&self.config.cache_dir));
if cache_dir.exists() {
fs::remove_dir_all(&cache_dir)
.map_err(|e| ExecutionError::FileSystemError(cache_dir, e))?;
};

let home_dir = PathBuf::from(self.config.home_dir.clone());
let home_dir = PathBuf::from(self.base_directory.join(&self.config.home_dir));
if home_dir.exists() {
fs::remove_dir_all(&home_dir)
.map_err(|e| ExecutionError::FileSystemError(home_dir, e))?;
Expand Down
25 changes: 10 additions & 15 deletions dozer-ingestion/tests/test_suite/connectors/dozer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use dozer_types::grpc_types::types::Record;
use dozer_types::ingestion_types::GrpcConfig;
use dozer_types::log::info;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::config::{default_cache_dir, default_home_dir};
use dozer_types::models::source::Source;
use dozer_types::types::{Field, FieldDefinition, FieldType};
use dozer_types::{
Expand Down Expand Up @@ -196,14 +197,12 @@ async fn create_nested_dozer_server(
)),
adapter: "default".to_owned(),
};
let dozer_dir = temp_dir.path().join(".dozer");
let cache_dir = dozer_dir.join("cache");
std::fs::create_dir_all(&cache_dir).unwrap();

let config = dozer_types::models::config::Config {
version: 1,
app_name: "nested-dozer-connector-test".to_owned(),
home_dir: dozer_dir.to_str().unwrap().to_owned(),
cache_dir: cache_dir.to_str().unwrap().to_owned(),
home_dir: default_home_dir(),
cache_dir: default_cache_dir(),
connections: vec![dozer_types::models::connection::Connection {
config: Some(dozer_types::models::connection::ConnectionConfig::Grpc(
grpc_config,
Expand All @@ -227,19 +226,13 @@ async fn create_nested_dozer_server(
version: None,
log_reader_options: None,
}],
api: None,
sql: None,
flags: None,
cache_max_map_size: None,
app: None,
telemetry: None,
cloud: None,
udfs: vec![],
..Default::default()
};

let dozer_runtime = Runtime::new().expect("Failed to start tokio runtime for nested dozer");
let runtime = Arc::new(dozer_runtime);
let mut dozer = SimpleOrchestrator::new(config, runtime.clone(), Default::default());
let directory = temp_dir.path().to_owned().try_into().unwrap();
let mut dozer = SimpleOrchestrator::new(directory, config, runtime.clone(), Default::default());
let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
let dozer_thread = std::thread::spawn(move || dozer.run_all(shutdown_receiver).unwrap());

Expand Down Expand Up @@ -281,9 +274,11 @@ impl Drop for DozerConnectorTest {
if let Some((join_handle, shutdown)) = self.shutdown.take() {
shutdown.shutdown();
info!("Sent shutdown signal");
join_handle.join().unwrap();
let _ = join_handle.join();
info!("Joined dozer thread");
}

let _ = std::fs::remove_file("dozer.lock");
}
}

Expand Down
6 changes: 3 additions & 3 deletions dozer-log/src/home_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ pub struct HomeDir {
pub type Error = (Utf8PathBuf, std::io::Error);

impl HomeDir {
pub fn new(home_dir: String, cache_dir: String) -> Self {
pub fn new(home_dir: Utf8PathBuf, cache_dir: Utf8PathBuf) -> Self {
Self {
home_dir: home_dir.into(),
cache_dir: cache_dir.into(),
home_dir,
cache_dir,
}
}

Expand Down
Loading

0 comments on commit 00cc384

Please sign in to comment.