From e2a93927bef0deded139f02ece7fe2c830badad2 Mon Sep 17 00:00:00 2001 From: abdulrahman1s <61483023+abdulrahman1s@users.noreply.github.com> Date: Wed, 7 Sep 2022 12:58:40 +0000 Subject: [PATCH] refactor: Rewrite with tokio runtime --- Cargo.lock | 376 ++++++++++++++++++++++++++++++++++---- Cargo.toml | 8 +- src/cache.rs | 2 +- src/cloud/adapter.rs | 20 +- src/cloud/providers/s3.rs | 57 +++--- src/main.rs | 174 ++++++++++-------- src/util.rs | 11 +- 7 files changed, 495 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 769cace..ecdc9c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "enumflags2" version = "0.7.5" @@ -483,12 +492,48 @@ dependencies = [ "libc", ] +[[package]] +name = "futures" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +[[package]] +name = "futures-executor" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.24" @@ -510,6 +555,17 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "futures-macro" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.24" @@ -528,9 +584,13 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -557,6 +617,25 @@ dependencies = [ "wasi", ] +[[package]] +name = "h2" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -610,12 +689,72 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +dependencies = [ + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "idna" version = "0.2.3" @@ -627,6 +766,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "inlinable_string" version = "0.1.15" @@ -662,6 +811,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "itoa" version = "1.0.3" @@ -789,13 +944,10 @@ dependencies = [ ] [[package]] -name = "minidom" -version = "0.15.0" +name = "mime" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dddfe21863f8d600ed2bd1096cb9b5cd6ff984be6185cf9d563fb4a107bffc5" -dependencies = [ - "rxml", -] +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "mio" @@ -854,6 +1006,16 @@ dependencies = [ "zvariant_derive", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_threads" version = "0.1.6" @@ -900,9 +1062,11 @@ checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0" [[package]] name = "online" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7680985bd550795c0161707f51f9abada87c63a5409114ed818a8618d18ec5e5" +version = "3.0.2-alpha.0" +source = "git+https://github.com/abdulrahman1s/online?branch=tokio#f813dc082b948f6d7185c2e679cfaf22324ec684" +dependencies = [ + "tokio", +] [[package]] name = "ordered-multimap" @@ -1132,6 +1296,46 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -1152,10 +1356,12 @@ name = "rsink" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "dashmap", "dirs", "env_logger", "figment", + "futures", "lazy_static", "log", "notify", @@ -1164,6 +1370,7 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "tokio", ] [[package]] @@ -1183,7 +1390,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6009d9d4cf910505534d62d380a0aa305805a2af0b5c3ad59a3024a0715b847" dependencies = [ "async-trait", - "attohttpc", "aws-creds", "aws-region", "base64", @@ -1194,14 +1400,16 @@ dependencies = [ "log", "maybe-async", "md5", - "minidom", "percent-encoding", + "reqwest", "serde", "serde-xml-rs", "serde_derive", "sha2", "thiserror", "time", + "tokio", + "tokio-stream", "url", ] @@ -1218,24 +1426,14 @@ dependencies = [ ] [[package]] -name = "rxml" -version = "0.8.1" +name = "rustls-pemfile" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7c4cd1e0a04c48f953473383a60143884515b7a8eb7ca7d9b1baa9c05dee75" +checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" dependencies = [ - "bytes", - "pin-project-lite", - "rxml_validation", - "smartstring", - "tokio", + "base64", ] -[[package]] -name = "rxml_validation" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8633dff4bb93061867c8411c6e99068c5f59d9f890c87384169004b0fbb929a" - [[package]] name = "ryu" version = "1.0.11" @@ -1321,6 +1519,18 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.6.1" @@ -1347,6 +1557,15 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.7" @@ -1362,15 +1581,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" -[[package]] -name = "smartstring" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e714dff2b33f2321fdcd475b71cec79781a692d846f37f415fb395a1d2bcd48e" -dependencies = [ - "static_assertions", -] - [[package]] name = "socket2" version = "0.4.7" @@ -1516,9 +1726,64 @@ checksum = "89797afd69d206ccd11fb0ea560a44bbb87731d020670e79416d442919257d42" dependencies = [ "autocfg", "bytes", + "libc", "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", ] [[package]] @@ -1530,6 +1795,12 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.36" @@ -1562,6 +1833,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typenum" version = "1.15.0" @@ -1655,6 +1932,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1686,6 +1973,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa76fb221a1f8acddf5b54ace85912606980ad661ac7a503b4570ffd3a624dad" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.82" @@ -1869,6 +2168,15 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "winrt-notification" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 65b2396..34ffbaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ anyhow = "1.0.62" dirs = "4.0.0" lazy_static = "1.4.0" notify = "5.0.0" -rust-s3 = { version = "0.32.3", default-features = false, features = ["sync", "tags", "sync-rustls-tls"] } -online = { version = "3.0.1", default-features = false, features = ["sync"] } +rust-s3 = { version = "0.32.3", default-features = false, features = ["tokio-rustls-tls", "fail-on-err"] } +online = { git = "https://github.com/abdulrahman1s/online", branch = "tokio", default-features = false, features = ["tokio-runtime"] } log = "0.4.17" env_logger = "0.9.0" serde = "1.0.144" @@ -18,7 +18,9 @@ figment = { version = "0.10.6", features = ["toml", "json", "env"] } serde_json = "1.0.85" dashmap = "5.4.0" notify-rust = "4.5.8" - +tokio = { version = "1.21.0", features = ["full"] } +futures = "0.3.24" +async-trait = "0.1.57" [profile.release] codegen-units = 1 diff --git a/src/cache.rs b/src/cache.rs index c9bd80e..85f525d 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -10,7 +10,7 @@ pub struct Cache(pub DashSet); impl Cache { pub fn new() -> Self { - let array: Vec = serde_json::from_slice(&fs::read(&*PATH).unwrap()).unwrap(); + let array: Vec = serde_json::from_slice(&std::fs::read(&*PATH).unwrap()).unwrap(); Self(DashSet::from_iter(array.into_iter())) } diff --git a/src/cloud/adapter.rs b/src/cloud/adapter.rs index b08ce32..0a73996 100644 --- a/src/cloud/adapter.rs +++ b/src/cloud/adapter.rs @@ -1,17 +1,19 @@ use crate::config::CloudOptions; pub use anyhow::Result; -pub use std::{fs, path::Path}; +pub use std::path::Path; +use tokio::fs; +#[async_trait] pub trait CloudAdapter { fn new(options: CloudOptions) -> Self; - fn sync(&self) -> Result; - fn get(&self, path: &Path) -> Result>; - fn exists(&self, path: &Path) -> Result; - fn delete(&self, path: &Path) -> Result<()>; - fn save(&self, path: &Path) -> Result<()>; - fn rename(&self, oldpath: &Path, path: &Path) -> Result<()>; - fn read_file(path: &Path) -> Result> { - Ok(fs::read(path)?) + async fn sync(&self) -> Result; + async fn get(&self, path: &Path) -> Result>; + async fn exists(&self, path: &Path) -> Result; + async fn delete(&self, path: &Path) -> Result<()>; + async fn save(&self, path: &Path) -> Result<()>; + async fn rename(&self, oldpath: &Path, path: &Path) -> Result<()>; + async fn read_file(path: &Path) -> Result> { + Ok(fs::read(path).await?) } fn kind(&self) -> &'static str; } diff --git a/src/cloud/providers/s3.rs b/src/cloud/providers/s3.rs index 4126f05..8b56858 100644 --- a/src/cloud/providers/s3.rs +++ b/src/cloud/providers/s3.rs @@ -2,16 +2,16 @@ use crate::cloud::CloudAdapter; use crate::config::*; use crate::util::*; use crate::SYNCED_PATHS; -use s3::{ - serde_types::ListBucketResult, - {creds::Credentials, Bucket, Region}, -}; -use std::{collections::HashSet, fs, io::Write, path::Path}; +use dashmap::DashSet; +use s3::{creds::Credentials, Bucket, Region}; +use std::path::Path; +use tokio::{fs, io::AsyncWriteExt}; pub struct S3Storage { bucket: Bucket, } +#[async_trait] impl CloudAdapter for S3Storage { fn new(options: CloudOptions) -> Self { let CloudOptions::S3 { @@ -36,33 +36,34 @@ impl CloudAdapter for S3Storage { } } - fn sync(&self) -> Result { + async fn sync(&self) -> Result { let mut synced = 0; - let mut objects = HashSet::new(); + let objects = DashSet::new(); - for list in self.bucket.list("/".to_owned(), None)? as Vec { + for list in self.bucket.list("/".to_owned(), None).await? { for obj in list.contents { let path = key_to_path(&obj.key); objects.insert(path.clone()); if let Some(parent) = path.parent() { - fs::create_dir_all(parent)?; + fs::create_dir_all(parent).await?; } let mut file = fs::OpenOptions::new() .write(true) .read(true) .create(true) - .open(&path)?; - let metadata = file.metadata()?; + .open(&path) + .await?; + let metadata = file.metadata().await?; SYNCED_PATHS.0.insert(stringify_path(&path)); if metadata.len() != obj.size - /* || !compare_date(metadata.modified()?, obj.last_modified) */ + /* || !compare_date(metadata.modified().await?, obj.last_modified) */ { - file.write_all(&self.get(&path)?)?; + file.write_all(&self.get(&path).await?).await?; synced += 1; } } @@ -74,14 +75,14 @@ impl CloudAdapter for S3Storage { if !objects.contains(&path) { if SYNCED_PATHS.0.remove(&stringify_path(&path)).is_some() { if path.is_dir() { - fs::remove_dir(&path)?; + fs::remove_dir(&path).await?; } else if path.is_file() { - fs::remove_file(&path)?; + fs::remove_file(&path).await?; } else { unreachable!() } } else { - self.save(&path)?; + self.save(&path).await?; SYNCED_PATHS.0.insert(stringify_path(&path)); synced += 1; } @@ -93,31 +94,33 @@ impl CloudAdapter for S3Storage { Ok(synced) } - fn exists(&self, path: &Path) -> Result { - let (_, code): (_, u16) = self.bucket.head_object(normalize_path(path))?; + async fn exists(&self, path: &Path) -> Result { + let (_, code): (_, u16) = self.bucket.head_object(normalize_path(path)).await?; Ok(code == 200) } - fn get(&self, path: &Path) -> Result> { - let res = self.bucket.get_object(normalize_path(path))?; + async fn get(&self, path: &Path) -> Result> { + let res = self.bucket.get_object(normalize_path(path)).await?; Ok(res.bytes().into()) } - fn delete(&self, path: &Path) -> Result<()> { - self.bucket.delete_object(normalize_path(path))?; + async fn delete(&self, path: &Path) -> Result<()> { + self.bucket.delete_object(normalize_path(path)).await?; Ok(()) } - fn save(&self, path: &Path) -> Result<()> { + async fn save(&self, path: &Path) -> Result<()> { self.bucket - .put_object(normalize_path(path), &Self::read_file(path)?)?; + .put_object(normalize_path(path), &Self::read_file(path).await?) + .await?; Ok(()) } - fn rename(&self, from: &Path, to: &Path) -> Result<()> { + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { self.bucket - .copy_object_internal(normalize_path(from), normalize_path(to))?; - self.delete(from)?; + .copy_object_internal(normalize_path(from), normalize_path(to)) + .await?; + self.delete(from).await?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index 43fc016..af3a61d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate async_trait; extern crate notify; mod cache; @@ -10,16 +12,15 @@ mod util; use cache::*; use cloud::{cloud_storage, CloudAdapter}; use config::*; +use dashmap::DashSet; use log::LevelFilter; use notify::{event::*, recommended_watcher, RecursiveMode, Watcher}; use std::{ - collections::HashSet, - fs, str::FromStr, - sync::{mpsc::channel, Arc, Mutex}, - thread, + sync::{Arc, Mutex}, time::Duration, }; +use tokio::{fs, spawn, sync::mpsc::channel, time::sleep}; use util::*; lazy_static! { @@ -28,7 +29,8 @@ lazy_static! { pub static ref SYNCED_PATHS: Cache = Cache::new(); } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { env_logger::builder() .format_timestamp(None) .filter_level(LevelFilter::from_str(&CONFIG.log).expect("Invalid log level format")) @@ -41,14 +43,16 @@ fn main() -> Result<()> { log::info!("Syncing delay: {}ms", CONFIG.interval); let cloud = cloud_ref.clone(); - let fs_task = thread::spawn(move || { - let (tx, rx) = channel(); - let mut watcher = recommended_watcher(tx)?; - let mut changes = HashSet::new(); + let fs_task = spawn(async move { + let (tx, mut rx) = channel(5); + let mut watcher = recommended_watcher(move |event| { + futures::executor::block_on(async { tx.send(event).await.unwrap() }); + })?; + let changes = Arc::new(DashSet::new()); watcher.watch(&CONFIG.path, RecursiveMode::Recursive)?; - while let Ok(event) = rx.recv() { + while let Some(event) = rx.recv().await { let event = match event { Ok(e) => e, Err(err) => { @@ -69,88 +73,112 @@ fn main() -> Result<()> { continue; } - let is_file_exists = || { - fs::metadata(&event.paths[0]) - .map(|m| m.is_file()) - .unwrap_or(false) - }; - - match event.kind { - EventKind::Create(_) if is_file_exists() => { - maybe_error(cloud.save(&event.paths[0]).and_then(|_| { - if SYNCED_PATHS.0.insert(stringify_path(&event.paths[0])) { - SYNCED_PATHS.save()?; - } - Ok(()) - })); - } - - EventKind::Remove(_) => maybe_error(cloud.delete(&event.paths[0]).and_then(|_| { - if SYNCED_PATHS - .0 - .remove(&stringify_path(&event.paths[0])) - .is_some() - { - SYNCED_PATHS.save()?; - } - Ok(()) - })), - EventKind::Access(AccessKind::Close(AccessMode::Write)) => { - if changes.remove(&event.paths[0]) && is_file_exists() { - maybe_error(cloud.save(&event.paths[0])); - } - } - EventKind::Modify(kind) => match kind { - ModifyKind::Data(_) => { - changes.insert(event.paths[0].clone()); + let cloud = cloud.clone(); + let changes = changes.clone(); + + spawn(async move { + let is_file_exists = || async { + fs::metadata(&event.paths[0]) + .await + .map(|m| m.is_file()) + .unwrap_or(false) + }; + + match event.kind { + EventKind::Create(_) if is_file_exists().await => { + cloud + .save(&event.paths[0]) + .await + .and_then(|_| { + if SYNCED_PATHS.0.insert(stringify_path(&event.paths[0])) { + SYNCED_PATHS.save()?; + } + Ok(()) + }) + .or_else(log_error)?; } - ModifyKind::Name(_) if event.paths.len() == 2 => { - maybe_error(cloud.rename(&event.paths[0], &event.paths[1]).and_then( - |_| { - SYNCED_PATHS.0.remove(&stringify_path(&event.paths[0])); - SYNCED_PATHS.0.insert(stringify_path(&event.paths[1])); - SYNCED_PATHS.save()?; + + EventKind::Remove(_) => { + cloud + .delete(&event.paths[0]) + .await + .and_then(|_| { + if SYNCED_PATHS + .0 + .remove(&stringify_path(&event.paths[0])) + .is_some() + { + SYNCED_PATHS.save()?; + } Ok(()) - }, - )); + }) + .or_else(log_error)?; } - #[cfg(target_os = "android")] - ModifyKind::Metadata(MetadataKind::WriteTime) if is_file_exists() => { - maybe_error(cloud.save(&event.paths[0])); + EventKind::Access(AccessKind::Close(AccessMode::Write)) => { + if changes.remove(&event.paths[0]).is_some() && is_file_exists().await { + cloud.save(&event.paths[0]).await.or_else(log_error)?; + } } + EventKind::Modify(kind) => match kind { + ModifyKind::Data(_) => { + changes.insert(event.paths[0].clone()); + } + ModifyKind::Name(_) if event.paths.len() == 2 => { + cloud + .rename(&event.paths[0], &event.paths[1]) + .await + .and_then(|_| { + SYNCED_PATHS.0.remove(&stringify_path(&event.paths[0])); + SYNCED_PATHS.0.insert(stringify_path(&event.paths[1])); + SYNCED_PATHS.save()?; + Ok(()) + }) + .or_else(log_error)?; + } + #[cfg(target_os = "android")] + ModifyKind::Metadata(MetadataKind::WriteTime) if is_file_exists().await => { + cloud.save(&event.paths[0]).await.or_else(log_error)?; + } + _ => {} + }, _ => {} - }, - _ => {} - } + } + + Result::<()>::Ok(()) + }); } Result::<()>::Ok(()) }); let cloud = cloud_ref; - let cloud_task = thread::spawn(move || loop { - check_connectivity(); + let cloud_task = spawn(async move { + loop { + check_connectivity().await; - if *IS_INTERNET_AVAILABLE.lock().unwrap() { - *SYNCING.lock().unwrap() = true; + if *IS_INTERNET_AVAILABLE.lock().unwrap() { + *SYNCING.lock().unwrap() = true; - maybe_error( cloud .sync() - .map(|count| log::debug!("{count:?} file has synced")), - ); + .await + .map(|count| log::debug!("{count:?} file has synced")) + .or_else(log_error)?; + + *SYNCING.lock().unwrap() = false; + } else { + log::warn!("Skip syncing.. there are no internet connection"); + } - *SYNCING.lock().unwrap() = false; - } else { - log::warn!("Skip syncing.. there are no internet connection"); + sleep(Duration::from_millis(CONFIG.interval)).await; } - thread::sleep(Duration::from_millis(CONFIG.interval)); + #[allow(unreachable_code)] + Result::<()>::Ok(()) }); - for task in [fs_task, cloud_task] { - task.join().unwrap()?; + match tokio::try_join!(fs_task, cloud_task) { + Err(err) => panic!("An unexpected error occurred: {err:?}"), + _ => unreachable!(), } - - Ok(()) } diff --git a/src/util.rs b/src/util.rs index c3abd61..5fb73c7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -53,14 +53,13 @@ pub fn settings_file_path(extension: &str) -> PathBuf { path } -pub fn check_connectivity() { - *IS_INTERNET_AVAILABLE.lock().unwrap() = online::sync::check(None).is_ok(); +pub async fn check_connectivity() { + *IS_INTERNET_AVAILABLE.lock().unwrap() = online::check(None).await.is_ok(); } -pub fn maybe_error(result: Result<()>) { - if let Err(error) = result { - log::error!("An error has occurred: {error:?}"); - } +pub fn log_error(err: anyhow::Error) -> Result<()> { + log::error!("An error has occurred: {err:?}"); + Ok(()) } pub fn stringify_path(path: &Path) -> String {