diff --git a/Cargo.lock b/Cargo.lock index f4d12dc..d520827 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,26 +1,300 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "atomic-option" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "autocfg" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bitflags" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bus" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "cfg-if" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-channel" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "lazy_static" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "libc" +version = "0.2.58" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "linked-hash-map" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "mainframer" version = "3.0.0-dev" dependencies = [ - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", - "yaml-rust 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bus 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num_cpus" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "parking_lot_core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_isaac 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_jitter 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_os 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_pcg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_chacha" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rand_core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rand_hc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_isaac" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_jitter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_os" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_pcg" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand_xorshift" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "smallvec" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "yaml-rust" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [metadata] -"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" -"checksum yaml-rust 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "95acf0db5515d07da9965ec0e0ba6cc2d825e2caeb7303b66ca441729801254e" +"checksum atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" +"checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf" +"checksum bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3d155346769a6855b86399e9bc3814ab343cd3d62c7e985113d46a0ec3c281fd" +"checksum bus 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "504f634a58035c52be107e638291fd395c51f6d98305daeaea6c557438475d9e" +"checksum cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "b486ce3ccf7ffd79fdeb678eac06a9e6c09fc88d33836340becb8fffe87c5e33" +"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0ed1a4de2235cabda8558ff5840bffb97fcb64c97827f354a451307df5f72b" +"checksum crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f8306fcef4a7b563b76b7dd949ca48f52bc1141aa067d2ea09565f3e2652aa5c" +"checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +"checksum lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" +"checksum libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)" = "6281b86796ba5e4366000be6e9e18bf35580adf9e63fbe2294aadb587613a319" +"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83" +"checksum num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bcef43580c035376c0705c42792c294b66974abbfd2789b511784023f71f3273" +"checksum parking_lot_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94c8c7923936b28d546dfd14d4472eaf34c99b14e1c973a32b3e6d4eb04298c9" +"checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" +"checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" +"checksum rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +"checksum rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d0e7a549d590831370895ab7ba4ea0c1b6b011d106b5ff2da6eee112615e6dc0" +"checksum rand_hc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4" +"checksum rand_isaac 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08" +"checksum rand_jitter 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b" +"checksum rand_os 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" +"checksum rand_pcg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" +"checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" +"checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +"checksum smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ab606a9c5e214920bb66c458cd7be8ef094f813f20fe77a54cc7dbfff220d4b7" +"checksum winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f10e386af2b13e47c89e7236a7a14a086791a2b88ebad6df9bf42040195cf770" +"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "65923dd1784f44da1d2c3dbbc5e822045628c590ba72123e1c73d3c230c4434d" diff --git a/Cargo.toml b/Cargo.toml index c78b2cf..91c82b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,5 @@ authors = ["Artem Zinnatullin ", "Artur Dryomov , pub local_ignore_file: Option, diff --git a/src/intermediate_config.rs b/src/intermediate_config.rs index ae61233..4231ea2 100644 --- a/src/intermediate_config.rs +++ b/src/intermediate_config.rs @@ -4,6 +4,10 @@ extern crate yaml_rust; use std::fs::File; use std::io::prelude::*; use std::path::Path; +use std::time::Duration; + +use sync::PullMode; + use self::linked_hash_map::LinkedHashMap; use self::yaml_rust::Yaml; use self::yaml_rust::YamlLoader; @@ -22,12 +26,13 @@ pub struct IntermediateRemote { #[derive(Debug, PartialEq)] pub struct IntermediatePush { - pub compression: Option + pub compression: Option, } #[derive(Debug, PartialEq)] pub struct IntermediatePull { - pub compression: Option + pub compression: Option, + pub mode: Option, } impl IntermediateConfig { @@ -92,13 +97,12 @@ fn parse_config_from_str(config_content: &str) -> Result { let compression = parse_compression(pull, "compression", "pull"); + let mode = parse_pull_mode(pull); - match compression { - Ok(value) => Some(IntermediatePull { - compression: value, - }), - Err(error) => return Err(error) - } + Some(IntermediatePull { + compression: compression?, + mode: mode?, + }) } Yaml::Null | Yaml::BadValue => None, _ => return Err(String::from("'pull' must be an object")) @@ -126,6 +130,21 @@ fn parse_compression(yaml: &LinkedHashMap, field_name: &str, scope_n } } +fn parse_pull_mode(yaml: &LinkedHashMap) -> Result, String> { + match yaml.get(&Yaml::String("mode".to_string())) { + Some(mode) => match mode { + Yaml::String(mode) => match mode.as_ref() { + "serial" => Ok(Some(PullMode::Serial)), + "parallel" => Ok(Some(PullMode::Parallel(Duration::from_millis(500)))), // TODO: make duration configurable too. + ref unknown_value => Err(format!("Unsupported pull mode, valid values are 'serial' and 'parallel', but was '{:#?}'", unknown_value)) + }, + Yaml::Null | Yaml::BadValue => Ok(None), + ref something_else => Err(format!("Pull mode must be string, valid values are 'serial' and 'parallel', but was '{:#?}'", something_else)) + }, + None => Ok(None) + } +} + #[cfg(test)] mod tests { use super::*; @@ -139,6 +158,7 @@ push: compression: 5 pull: compression: 2 + mode: serial "; assert_eq!(parse_config_from_str(content), Ok(IntermediateConfig { @@ -149,7 +169,8 @@ pull: compression: Some(5) }), pull: Some(IntermediatePull { - compression: Some(2) + compression: Some(2), + mode: Some(PullMode::Serial), }), })); } @@ -163,6 +184,7 @@ push: compression: 5 pull: compression: 2 + mode: \"serial\" "; assert_eq!(parse_config_from_str(content), Ok(IntermediateConfig { @@ -173,7 +195,8 @@ pull: compression: Some(5) }), pull: Some(IntermediatePull { - compression: Some(2) + compression: Some(2), + mode: Some(PullMode::Serial), }), })); } @@ -187,6 +210,7 @@ push: compression: 5 pull: compression: 2 + mode: serial "; assert_eq!(parse_config_from_str(content), Ok(IntermediateConfig { @@ -197,7 +221,8 @@ pull: compression: Some(5) }), pull: Some(IntermediatePull { - compression: Some(2) + compression: Some(2), + mode: Some(PullMode::Serial), }), })); } @@ -242,7 +267,8 @@ pull: remote: None, push: None, pull: Some(IntermediatePull { - compression: Some(2) + compression: Some(2), + mode: None, }), })); } @@ -273,6 +299,7 @@ pull: pull: if destination == "pull" { Some(IntermediatePull { compression: Some(compression_level), + mode: None, }) } else { None @@ -327,4 +354,45 @@ pull: "; assert_eq!(parse_config_from_str(content), Err(String::from("'pull.compression\' must be a positive integer from 1 to 9, but was String(\n \"yooo\"\n)"))); } + + #[test] + fn parse_config_from_str_only_pull_mode_serial() { + let content = " +pull: + mode: serial +"; + assert_eq!(parse_config_from_str(content), Ok(IntermediateConfig { + remote: None, + push: None, + pull: Some(IntermediatePull { + compression: None, + mode: Some(PullMode::Serial), + }), + })); + } + + #[test] + fn parse_config_from_str_only_pull_mode_parallel() { + let content = " +pull: + mode: parallel +"; + assert_eq!(parse_config_from_str(content), Ok(IntermediateConfig { + remote: None, + push: None, + pull: Some(IntermediatePull { + compression: None, + mode: Some(PullMode::Parallel(Duration::from_millis(500))), + }), + })); + } + + #[test] + fn parse_config_from_str_only_pull_mode_unsupported_value() { + let content = " +pull: + mode: unsupported_value +"; + assert_eq!(parse_config_from_str(content), Err(String::from("Unsupported pull mode, valid values are \'serial\' and \'parallel\', but was \'\"unsupported_value\"\'"))); + } } diff --git a/src/main.rs b/src/main.rs index 3dc2ee1..7862c49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,19 @@ +extern crate bus; +extern crate crossbeam_channel; + +use std::env; +use std::fs; +use std::path::Path; +use std::process; +use std::time::Instant; + +use args::Args; +use config::*; +use ignore::*; +use intermediate_config::IntermediateConfig; +use sync::{PullMode}; +use time::*; + mod args; mod config; mod intermediate_config; @@ -6,18 +22,11 @@ mod remote_command; mod sync; mod time; -use args::Args; -use config::*; -use intermediate_config::IntermediateConfig; -use ignore::*; -use std::env; -use std::fs; -use std::path::Path; -use std::process; -use std::time::Instant; -use time::*; +// TODO use Reactive Streams instead of Channels. fn main() { + let total_start = Instant::now(); + println!(":: Mainframer v{}\n", env!("CARGO_PKG_VERSION")); let raw_args: Vec = env::args().skip(1).collect(); @@ -27,11 +36,11 @@ fn main() { }; let local_dir_absolute_path = match env::current_dir() { - Err(_) => exit_with_error(&"Could not resolve working directory, make sure it exists and user has enough permissions to work with it.", 1), + Err(_) => exit_with_error("Could not resolve working directory, make sure it exists and user has enough permissions to work with it.", 1), Ok(value) => fs::canonicalize(value).unwrap() }; - let mut config_file = local_dir_absolute_path.to_owned(); + let mut config_file = local_dir_absolute_path.clone(); config_file.push(".mainframer/config.yml"); let config = match merge_configs(&config_file) { @@ -39,25 +48,55 @@ fn main() { Ok(value) => value }; - let ignore = Ignore::from_working_dir(&local_dir_absolute_path); + let ignore = Ignore::from_working_dir(&local_dir_absolute_path.clone()); - let start = Instant::now(); + println!("Pushing..."); - if let Err(error) = sync_before_remote_command(&local_dir_absolute_path, &config, &ignore) { - exit_with_error(&format!("Sync local → remote machine failed: {}.", error), 1) + match sync::push(&local_dir_absolute_path, &config, &ignore) { + Err(err) => exit_with_error(&format!("Push failed: {}, took {}", err.message, format_duration(err.duration)), 1), + Ok(ok) => println!("Push done: took {}.\n", format_duration(ok.duration)), } - let remote_command_result = execute_remote_command(&local_dir_absolute_path, &args, &config); - - if let Err(error) = sync_after_remote_command(&local_dir_absolute_path, &config, &ignore) { - exit_with_error(&format!("Sync remote → local machine failed: {}.", error), 1) + match config.pull.mode { + PullMode::Serial => println!("Executing command on remote machine...\n"), + PullMode::Parallel(_) => println!("Executing command on remote machine (pulling in parallel)...\n") } - let duration = start.elapsed(); + let mut remote_command_readers = remote_command::execute_remote_command( + args.command.clone(), + config.clone(), + sync::project_dir_on_remote_machine(&local_dir_absolute_path.clone()), + 2 + ); + + let pull_finished_rx = sync::pull(&local_dir_absolute_path, config.clone(), ignore, &config.pull.mode, remote_command_readers.pop().unwrap()); + + let remote_command_result = remote_command_readers + .pop() + .unwrap() + .recv() + .unwrap(); match remote_command_result { - Err(_) => exit_with_error(&format!("\nFailure: took {}.", format_duration(duration)), 1), - _ => println!("\nSuccess: took {}.", format_duration(duration)) + Err(ref err) => eprintln!("\nExecution failed: took {}.\nPulling...", format_duration(err.duration)), + Ok(ref ok) => println!("\nExecution done: took {}.\nPulling...", format_duration(ok.duration)) + } + + let pull_result = pull_finished_rx + .recv() + .expect("Could not receive remote_to_local_sync_result"); + + let total_duration = total_start.elapsed(); + + match pull_result { + Err(ref err) => eprintln!("Pull failed: {}, took {}.", err.message, format_duration(err.duration)), + Ok(ref ok) => println!("Pull done: took {}", format_duration(ok.duration)), + } + + if remote_command_result.is_err() || pull_result.is_err() { + exit_with_error(&format!("\nFailure: took {}.", format_duration(total_duration)), 1); + } else { + println!("\nSuccess: took {}.", format_duration(total_duration)); } } @@ -71,6 +110,7 @@ fn exit_with_error(message: &str, code: i32) -> ! { fn merge_configs(project_config_file: &Path) -> Result { let default_push_compression = 3; let default_pull_compression = 1; + let default_pull_mode = PullMode::Serial; // TODO: consider making Parallel the default mode. Ok(match IntermediateConfig::from_file(project_config_file) { Err(message) => return Err(message), @@ -91,26 +131,18 @@ fn merge_configs(project_config_file: &Path) -> Result { None => Push { compression: default_push_compression, }, - Some(push) => match push.compression { - None => Push { - compression: default_push_compression - }, - Some(compression) => Push { - compression, - } + Some(push) => Push { + compression: push.compression.unwrap_or(default_push_compression), } }, pull: match intermediate_config.pull { None => Pull { - compression: default_pull_compression + compression: default_pull_compression, + mode: default_pull_mode, }, - Some(pull) => match pull.compression { - None => Pull { - compression: default_pull_compression, - }, - Some(compression) => Pull { - compression, - } + Some(pull) => Pull { + compression: pull.compression.unwrap_or(default_pull_compression), + mode: pull.mode.unwrap_or(default_pull_mode), } }, } @@ -118,67 +150,3 @@ fn merge_configs(project_config_file: &Path) -> Result { }) } -fn sync_before_remote_command(local_dir_absolute_path: &Path, config: &Config, ignore: &Ignore) -> Result<(), String> { - println!("Sync local → remote machine..."); - - let start = Instant::now(); - - let result = sync::sync_local_to_remote( - &local_dir_absolute_path, - config, - ignore, - ); - - let duration = start.elapsed(); - - match result { - Err(error) => Err(error), - Ok(_) => { - println!("Sync done: took {}.\n", format_duration(duration)); - Ok(()) - } - } -} - -fn execute_remote_command(local_dir_absolute_path: &Path, args: &Args, config: &Config) -> Result<(), ()> { - println!("Executing command on remote machine...\n"); - - let start = Instant::now(); - - let result = remote_command::execute_remote_command( - &args.command.clone(), - config, - sync::project_dir_on_remote_machine(local_dir_absolute_path).as_ref(), - ); - - let duration = start.elapsed(); - - match result { - Err(_) => eprintln!("\nExecution failed: took {}.\n", format_duration(duration)), - Ok(_) => println!("\nExecution done: took {}.\n", format_duration(duration)) - } - - result -} - -fn sync_after_remote_command(working_dir_name: &Path, config: &Config, ignore: &Ignore) -> Result<(), String> { - println!("Sync remote → local machine..."); - - let start = Instant::now(); - - let result = sync::sync_remote_to_local( - working_dir_name, - config, - ignore, - ); - - let duration = start.elapsed(); - - match result { - Err(error) => Err(error), - Ok(_) => { - println!("Sync done: took {}.", format_duration(duration)); - Ok(()) - } - } -} diff --git a/src/remote_command.rs b/src/remote_command.rs index 2627b2f..92e993d 100644 --- a/src/remote_command.rs +++ b/src/remote_command.rs @@ -1,8 +1,40 @@ -use config::Config; use std::process::Command; use std::process::Stdio; +use std::thread; +use std::time::{Duration, Instant}; + +use bus::{Bus, BusReader}; + +use config::Config; + +#[derive(Debug, PartialEq, Clone)] +pub struct RemoteCommandOk { + pub duration: Duration, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct RemoteCommandErr { + pub duration: Duration, +} + +pub fn execute_remote_command(remote_command: String, config: Config, project_dir_on_remote_machine: String, number_of_readers: usize) -> Vec>> { + let mut bus: Bus> = Bus::new(1); + let mut readers: Vec>> = Vec::with_capacity(number_of_readers); + + for _ in 0..number_of_readers { + readers.push(bus.add_rx()) + } + + thread::spawn(move || { + bus.broadcast(_execute_remote_command(&remote_command, &config, &project_dir_on_remote_machine)); + }); + + readers +} + +fn _execute_remote_command(remote_command: &str, config: &Config, project_dir_on_remote_machine: &str) -> Result { + let start_time = Instant::now(); -pub fn execute_remote_command(remote_command: &str, config: &Config, project_dir_on_remote_machine: &str) -> Result<(), ()> { let mut command = Command::new("ssh"); command @@ -21,11 +53,13 @@ pub fn execute_remote_command(remote_command: &str, config: &Config, project_dir .unwrap(); match process.wait() { - Err(_) => Err(()), // No need to get error description as we've already piped command output to Mainframer output. + Err(_) => Err(RemoteCommandErr { + duration: start_time.elapsed() + }), // No need to get error description as we've already piped command output to Mainframer output. Ok(exit_status) => if exit_status.success() { - Ok(()) + Ok(RemoteCommandOk { duration: start_time.elapsed() }) } else { - Err(()) + Err(RemoteCommandErr { duration: start_time.elapsed() }) } } } diff --git a/src/sync.rs b/src/sync.rs index 89b4c17..3a22d93 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,11 +1,54 @@ -use config::Config; -use ignore::Ignore; use std::path::Path; use std::path::PathBuf; use std::process::Command; +use std::sync::mpsc::TryRecvError::*; +use std::thread; +use std::time::{Duration, Instant}; + +use bus::BusReader; +use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use crossbeam_channel::unbounded; + +use config::Config; +use ignore::Ignore; +use remote_command::{RemoteCommandOk, RemoteCommandErr}; + +#[derive(Debug, PartialEq, Clone)] +pub struct PushOk { + pub duration: Duration, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct PushErr { + pub duration: Duration, + pub message: String, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum PullMode { + /// Serial, after remote command execution. + Serial, + + /// Parallel to remote command execution. + /// First parameter is pause between pulls. + Parallel(Duration), +} + +#[derive(Debug, PartialEq, Clone)] +pub struct PullOk { + pub duration: Duration, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct PullErr { + pub duration: Duration, + pub message: String, +} + +pub fn push(local_dir_absolute_path: &Path, config: &Config, ignore: &Ignore) -> Result { + let start_time = Instant::now(); -// TODO add internal version of sync functions with closures as parameters to unit test properly. -pub fn sync_local_to_remote(local_dir_absolute_path: &Path, config: &Config, ignore: &Ignore) -> Result<(), String> { let mut command = Command::new("rsync"); command @@ -28,10 +71,93 @@ pub fn sync_local_to_remote(local_dir_absolute_path: &Path, config: &Config, ign project_dir_on_remote_machine = project_dir_on_remote_machine(local_dir_absolute_path)) ); - execute_rsync(&mut command) + match execute_rsync(&mut command) { + Err(reason) => Err(PushErr { + duration: start_time.elapsed(), + message: reason, + }), + Ok(_) => Ok(PushOk { + duration: start_time.elapsed() + }), + } } -pub fn sync_remote_to_local(local_dir_absolute_path: &Path, config: &Config, ignore: &Ignore) -> Result<(), String> { +pub fn pull(local_dir_absolute_path: &Path, config: Config, ignore: Ignore, pull_mode: &PullMode, remote_command_finished_signal: BusReader>) -> Receiver> { + match pull_mode { + PullMode::Serial => pull_serial(local_dir_absolute_path.to_path_buf(), config, ignore, remote_command_finished_signal), + PullMode::Parallel(pause_between_pulls) => pull_parallel(local_dir_absolute_path.to_path_buf(), config, ignore, *pause_between_pulls, remote_command_finished_signal) + } +} + +fn pull_serial(local_dir_absolute_path: PathBuf, config: Config, ignore: Ignore, mut remote_command_finished_rx: BusReader>) -> Receiver> { + let (pull_finished_tx, pull_finished_rx): (Sender>, Receiver>) = unbounded(); + + #[allow(unused_must_use)] // We don't handle remote_command_result, in any case we need to pull after it. + thread::spawn(move || { + remote_command_finished_rx + .recv() + .expect("Could not receive remote_command_finished_rx"); + + pull_finished_tx + .send(_pull(local_dir_absolute_path.as_path(), &config, &ignore)) + .expect("Could not send pull_finished signal"); + }); + + pull_finished_rx +} + +fn pull_parallel(local_dir_absolute_path: PathBuf, config: Config, ignore: Ignore, pause_between_pulls: Duration, mut remote_command_finished_signal: BusReader>) -> Receiver> { + let (pull_finished_tx, pull_finished_rx): (Sender>, Receiver>) = unbounded(); + let start_time = Instant::now(); + + thread::spawn(move || { + loop { + if let Err(pull_err) = _pull(local_dir_absolute_path.as_path(), &config, &ignore) { + pull_finished_tx + .send(Err(pull_err)) // TODO handle code 24. + .expect("Could not send pull_finished signal"); + break; + } + + match remote_command_finished_signal.try_recv() { + Err(reason) => match reason { + Disconnected => break, + Empty => thread::sleep(pause_between_pulls) + }, + Ok(remote_command_result) => { + let remote_command_duration = match remote_command_result { + Err(err) => err.duration, + Ok(ok) => ok.duration + }; + + // Final pull after remote command to ensure consistency of the files. + match _pull(local_dir_absolute_path.as_path(), &config, &ignore) { + Err(err) => pull_finished_tx + .send(Err(PullErr { + duration: calculate_perceived_pull_duration(start_time.elapsed(), remote_command_duration), + message: err.message + })) + .expect("Could not send pull finished signal (last iteration)"), + + Ok(_) => pull_finished_tx + .send(Ok(PullOk { + duration: calculate_perceived_pull_duration(start_time.elapsed(), remote_command_duration) + })) + .expect("Could not send pull finished signal (last iteration)"), + } + + break; + } + } + } + }); + + pull_finished_rx +} + +fn _pull(local_dir_absolute_path: &Path, config: &Config, ignore: &Ignore) -> Result { + let start_time = Instant::now(); + let mut command = Command::new("rsync"); command @@ -51,7 +177,15 @@ pub fn sync_remote_to_local(local_dir_absolute_path: &Path, config: &Config, ign ) .arg("./"); - execute_rsync(&mut command) + match execute_rsync(&mut command) { + Err(reason) => Err(PullErr { + duration: start_time.elapsed(), + message: reason + }), + Ok(_) => Ok(PullOk { + duration: start_time.elapsed(), + }) + } } pub fn project_dir_on_remote_machine(local_dir_absolute_path: &Path) -> String { @@ -71,9 +205,9 @@ fn execute_rsync(rsync: &mut Command) -> Result<(), String> { let result = rsync.output(); match result { - Err(_) => Err(String::from("Generic sync error.")), // Rust doc doesn't really say when can an error occur. + Err(_) => Err(String::from("Generic rsync error.")), // Rust doc doesn't really say when can an error occur. Ok(output) => match output.status.code() { - None => Err(String::from("Sync was terminated.")), + None => Err(String::from("rsync was terminated.")), Some(status_code) => match status_code { 0 => Ok(()), _ => Err( @@ -88,3 +222,39 @@ fn execute_rsync(rsync: &mut Command) -> Result<(), String> { } } } + +fn calculate_perceived_pull_duration(total_pull_duration: Duration, remote_command_duration: Duration) -> Duration { + match total_pull_duration.checked_sub(remote_command_duration) { + None => Duration::from_millis(0), + Some(duration) => duration, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn calculate_perceived_pull_duration_equals() { + assert_eq!( + calculate_perceived_pull_duration(Duration::from_millis(10), Duration::from_millis(10)), + Duration::from_millis(0) + ); + } + + #[test] + fn calculate_perceived_pull_duration_pull_longer_than_execution() { + assert_eq!( + calculate_perceived_pull_duration(Duration::from_secs(10), Duration::from_secs(8)), + Duration::from_secs(2) + ); + } + + #[test] + fn calculate_perceived_pull_duration_pull_less_than_execution() { + assert_eq!( + calculate_perceived_pull_duration(Duration::from_secs(7), Duration::from_secs(9)), + Duration::from_secs(0) + ); + } +} diff --git a/test/common.sh b/test/common.sh index ce4c851..a5c8fcd 100755 --- a/test/common.sh +++ b/test/common.sh @@ -52,13 +52,13 @@ function buildMainframer { function printTestStarted { echo "" test_name=$(basename "$0") - echo "-------- TEST STARTED $test_name -------- " + echo "-------- TEST STARTED (pull mode = '$TEST_PULL_MODE') $test_name -------- " } function printTestEnded { echo "" test_name=$(basename "$0") - echo "-------- TEST ENDED $test_name -------- " + echo "-------- TEST ENDED (pull mode = '$TEST_PULL_MODE') $test_name -------- " } function cleanBuildDirOnLocalMachine { @@ -128,10 +128,12 @@ function fileMustNotExistOnRemoteMachine { fi } -function setTestRemoteMachineInConfig { +function createConfig { { echo -e "remote:\\n" echo -e " host: \"$TEST_REMOTE_MACHINE\"" + echo -e "pull:\\n" + echo -e " mode: \"$TEST_PULL_MODE\"" } > "$CONFIG_FILE" } @@ -150,7 +152,7 @@ cleanMainfamerDirOnRemoteMachine mkdir -p "$BUILD_DIR/.mainframer" # Create config that sets remote build machine for the test. -setTestRemoteMachineInConfig +createConfig # Set build directory as "working dir". pushd "$BUILD_DIR" diff --git a/test/test.sh b/test/test.sh index 657980a..bb52cf9 100755 --- a/test/test.sh +++ b/test/test.sh @@ -33,10 +33,16 @@ popd > /dev/null echo "Running integration tests…" +# Print stacktrace for debug build in case it panics at runtime. +export RUST_BACKTRACE=1 + # Run all tests. for test_ in "$DIR"/test_*; do TEST_COUNTER=$((TEST_COUNTER+1)) - "$test_" + TEST_PULL_MODE="serial" "$test_" + + TEST_COUNTER=$((TEST_COUNTER+1)) + TEST_PULL_MODE="parallel" "$test_" done TEST_RUN_SUCCESS="true" diff --git a/test/test_pulls_big_files.sh b/test/test_pulls_big_files.sh new file mode 100755 index 0000000..0ed0cf8 --- /dev/null +++ b/test/test_pulls_big_files.sh @@ -0,0 +1,28 @@ +#!/bin/bash +set -e + +# You can run it from any directory. +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# Execute common pre-setup, include test functions. +# shellcheck disable=SC1090 +source "$DIR/common.sh" + +printTestStarted + +# Create several files that should be synced to remote machine. +mkdir "$BUILD_DIR/src" +touch "$BUILD_DIR/src/file1.txt" +touch "$BUILD_DIR/src/file2.txt" +touch "$BUILD_DIR/src/file3.txt" + +# Run mainframer that creates "build" result file that should be synced back to local machine. +# shellcheck disable=SC2016 +"$MAINFRAMER_EXECUTABLE" 'mkdir build && for ((i=0;i<30;i++)); do dd if=/dev/urandom of=build/buildresult-$i.txt bs=16M count=4 iflag=fullblock; sleep 1; done; ls -la build' + +for ((i=0;i<30;i++)); do + # Make sure files exist on local machine after sync. + localFileMustMatchRemote "build/buildresult-$i.txt" "(sync problem)" +done + +printTestEnded