Skip to content

Commit

Permalink
refactor: Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
abdulrahman1s committed Sep 7, 2022
1 parent e280c80 commit 46e92be
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ anyhow = "1.0.62"
dirs = "4.0.0"
lazy_static = "1.4.0"
notify = "5.0.0"
rust-s3 = { version = "0.32.3", default-features = false, features = ["tokio-rustls-tls", "fail-on-err"] }
rust-s3 = { version = "0.32.3", default-features = false, features = ["tokio-rustls-tls"] }
online = { git = "https://github.com/abdulrahman1s/online", branch = "tokio", default-features = false, features = ["tokio-runtime"] }
log = "0.4.17"
env_logger = "0.9.0"
Expand Down
19 changes: 11 additions & 8 deletions src/cloud/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::config::CloudOptions;
pub use anyhow::Result;
pub use std::path::Path;
use tokio::fs;
use dashmap::DashSet;
pub use std::path::{Path, PathBuf};

pub enum Operation {
Write(PathBuf), // Load file from cloud
WriteEmpty(PathBuf),
Save(PathBuf), // Save local file to cloud
}

#[async_trait]
pub trait CloudAdapter {
fn new(options: CloudOptions) -> Self;
async fn sync(&self) -> Result<u32>;
async fn init(options: CloudOptions) -> Self;
async fn sync(&self) -> Result<(DashSet<PathBuf>, Vec<Operation>)>;
async fn get(&self, path: &Path) -> Result<Vec<u8>>;
async fn exists(&self, path: &Path) -> Result<bool>;
async fn delete(&self, path: &Path) -> Result<()>;
async fn save(&self, path: &Path) -> Result<()>;
async fn save(&self, path: &Path, content: &[u8]) -> Result<()>;
async fn rename(&self, oldpath: &Path, path: &Path) -> Result<()>;
async fn read_file(path: &Path) -> Result<Vec<u8>> {
Ok(fs::read(path).await?)
}
fn kind(&self) -> &'static str;
}
4 changes: 2 additions & 2 deletions src/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::config::CloudOptions;
pub use adapter::*;
use providers::*;

pub fn cloud_storage(options: CloudOptions) -> impl CloudAdapter {
pub async fn select_provider(options: CloudOptions) -> impl CloudAdapter {
match options {
CloudOptions::S3 { .. } => S3Storage::new(options),
CloudOptions::S3 { .. } => S3Storage::init(options).await,
// TODO: Support more providers
// CloudOptions::GoogleDrive { .. } => GoogleDrive:new(options),
// CloudOptions::Dropbox { .. } => Dropbox::new(options),
Expand Down
75 changes: 27 additions & 48 deletions src/cloud/providers/s3.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use crate::cloud::CloudAdapter;
use crate::cloud::Operation;
use crate::config::*;
use crate::util::*;
use crate::SYNCED_PATHS;
use dashmap::DashSet;
use s3::{creds::Credentials, Bucket, Region};
use std::path::Path;
use std::path::PathBuf;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::fs;

const TRASH_PATH: &str = ".trash/";

pub struct S3Storage {
bucket: Bucket,
}

#[async_trait]
impl CloudAdapter for S3Storage {
fn new(options: CloudOptions) -> Self {
async fn init(options: CloudOptions) -> Self {
let CloudOptions::S3 {
key,
secret,
Expand All @@ -37,28 +40,28 @@ impl CloudAdapter for S3Storage {
}
}

async fn sync(&self) -> Result<u32> {
let mut synced = 0;
async fn sync(&self) -> Result<(DashSet<PathBuf>, Vec<Operation>)> {
let objects = DashSet::new();
let mut operations = vec![];

for list in self.bucket.list("/".to_owned(), None).await? {
for obj in list.contents {
if obj.key.starts_with(TRASH_PATH) {
continue;
}

let path = key_to_path(&obj.key);

objects.insert(path.clone());

if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}

let (size, last_modified) = fs::metadata(&path)
.await
.map(|m| (m.len(), m.modified().ok()))
.unwrap_or((0, None));
let (exists, size, last_modified) = metadata_of(&path).await;

SYNCED_PATHS.0.insert(stringify_path(&path));

if size == obj.size {
if obj.size == 0 && !exists {
operations.push(Operation::WriteEmpty(path));
}
continue;
}

Expand All @@ -73,7 +76,7 @@ impl CloudAdapter for S3Storage {
if let Some(last_modified) = last_modified {
let cloud_last_modified =
OffsetDateTime::parse(&obj.last_modified, &Rfc3339).unwrap();
let local_last_modified = OffsetDateTime::from(last_modified);
let local_last_modified = last_modified;
log::debug!("{path:?} last modified: local({local_last_modified}) > cloud({cloud_last_modified}) = {}", local_last_modified > cloud_last_modified);
return obj.size == 0 || local_last_modified > cloud_last_modified;
}
Expand All @@ -83,44 +86,14 @@ impl CloudAdapter for S3Storage {

if prefer_local() {
log::debug!("Preferring local {path:?} instead of cloud version");
self.save(&path).await?;
} else {
let buffer = if obj.size == 0 {
vec![]
} else {
self.get(&path).await?
};
fs::write(&path, &buffer).await?;
}

synced += 1;
}
}

for entry in walk_dir(&CONFIG.path)? {
let path = entry.path();

if !objects.contains(&path) {
if SYNCED_PATHS.0.remove(&stringify_path(&path)).is_some() {
if path.is_dir() {
fs::remove_dir(&path).await?;
} else if path.is_file() {
fs::remove_file(&path).await?;
} else {
unreachable!()
}
operations.push(Operation::Save(path));
} else {
log::debug!("{:?} not synced, Saving to cloud...", path);
self.save(&path).await?;
SYNCED_PATHS.0.insert(stringify_path(&path));
synced += 1;
operations.push(Operation::Write(path));
}
}
}

SYNCED_PATHS.save()?;

Ok(synced)
Ok((objects, operations))
}

async fn exists(&self, path: &Path) -> Result<bool> {
Expand All @@ -134,13 +107,19 @@ impl CloudAdapter for S3Storage {
}

async fn delete(&self, path: &Path) -> Result<()> {
self.bucket
.copy_object_internal(
normalize_path(path),
TRASH_PATH.to_owned() + &normalize_path(path),
)
.await?;
self.bucket.delete_object(normalize_path(path)).await?;
Ok(())
}

async fn save(&self, path: &Path) -> Result<()> {
async fn save(&self, path: &Path, content: &[u8]) -> Result<()> {
self.bucket
.put_object(normalize_path(path), &Self::read_file(path).await?)
.put_object(normalize_path(path), content)
.await?;
Ok(())
}
Expand Down
Loading

0 comments on commit 46e92be

Please sign in to comment.