Skip to content

Commit

Permalink
Merge pull request #160 from sandreas/fix-deprecations
Browse files Browse the repository at this point in the history
WIP: Fix deprecations and some general improvements
  • Loading branch information
dpc committed Dec 13, 2019
2 parents e4ed864 + 4997bca commit 1c32a8a
Show file tree
Hide file tree
Showing 25 changed files with 173 additions and 175 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: rust
cache: cargo
rust:
- stable
- 1.27.0
- 1.39.0
- beta
- nightly

Expand Down Expand Up @@ -39,4 +39,4 @@ notifications:
webhooks:
on_success: change # options: [always|never|change] default: always
on_failure: always # options: [always|never|change] default: always
on_start: false # default: false
on_start: never # options: [always|never|change] default: never
2 changes: 1 addition & 1 deletion README.tpl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!-- README.md is auto-generated from README.tpl with `cargo readme` -->

<p align="center">
<p style="text-align:center;">
<a href="https://travis-ci.org/dpc/{{crate}}">
<img src="https://img.shields.io/travis/dpc/{{crate}}/master.svg?style=flat-square" alt="Travis CI Build Status">
</a>
Expand Down
32 changes: 16 additions & 16 deletions lib/src/aio/b2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Lock {

impl Lock {
fn new(path: PathBuf) -> Self {
Lock { path: path }
Lock { path }
}
}

Expand Down Expand Up @@ -129,9 +129,9 @@ impl B2Thread {

let mut i = B2Thread {
cred: cred.clone(),
client: client,
client,
auth: RefCell::new(None),
bucket: bucket,
bucket,
};

i.reauth()?;
Expand All @@ -141,19 +141,19 @@ impl B2Thread {
}

impl Backend for B2 {
fn new_thread(&self) -> io::Result<Box<BackendThread>> {
Ok(Box::new(B2Thread::new_from_cred(
&self.cred,
self.bucket.clone(),
)?))
fn lock_exclusive(&self) -> io::Result<Box<dyn aio::Lock>> {
Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
}

fn lock_exclusive(&self) -> io::Result<Box<aio::Lock>> {
fn lock_shared(&self) -> io::Result<Box<dyn aio::Lock>> {
Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
}

fn lock_shared(&self) -> io::Result<Box<aio::Lock>> {
Ok(Box::new(Lock::new(PathBuf::from(config::LOCK_FILE))))
fn new_thread(&self) -> io::Result<Box<dyn BackendThread>> {
Ok(Box::new(B2Thread::new_from_cred(
&self.cred,
self.bucket.clone(),
)?))
}
}

Expand All @@ -165,13 +165,17 @@ impl B2 {
};

B2 {
cred: cred,
cred,
bucket: bucket.into(),
}
}
}

impl BackendThread for B2Thread {
fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
fs::remove_dir_all(&path)
}

fn rename(
&mut self,
src_path: PathBuf,
Expand All @@ -186,10 +190,6 @@ impl BackendThread for B2Thread {
}
}

fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
fs::remove_dir_all(&path)
}

fn write(
&mut self,
path: PathBuf,
Expand Down
6 changes: 3 additions & 3 deletions lib/src/aio/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ pub(crate) trait Backend: Send + Sync {
///
/// Use to protect operations that are potentially destructive,
/// like GC.
fn lock_exclusive(&self) -> io::Result<Box<Lock>>;
fn lock_exclusive(&self) -> io::Result<Box<dyn Lock>>;
/// Lock the repository in shared mode
///
/// This will only prevent anyone from grabing exclusive lock.
/// Use to protect operations that only add new data, like `write`.
fn lock_shared(&self) -> io::Result<Box<Lock>>;
fn lock_shared(&self) -> io::Result<Box<dyn Lock>>;

/// Spawn a new thread object of the backend.
fn new_thread(&self) -> io::Result<Box<BackendThread>>;
fn new_thread(&self) -> io::Result<Box<dyn BackendThread>>;
}

pub(crate) trait BackendThread: Send {
Expand Down
36 changes: 18 additions & 18 deletions lib/src/aio/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,7 @@ struct LocalThread {
}

impl Backend for Local {
fn new_thread(&self) -> io::Result<Box<BackendThread>> {
Ok(Box::new(LocalThread {
path: self.path.clone(),
rand_ext: rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.collect::<String>(),
}))
}

fn lock_exclusive(&self) -> io::Result<Box<Lock>> {
fn lock_exclusive(&self) -> io::Result<Box<dyn Lock>> {
let lock_path = lock_file_path(&self.path);

let file = fs::File::create(&lock_path)?;
Expand All @@ -54,23 +44,38 @@ impl Backend for Local {
Ok(Box::new(file))
}

fn lock_shared(&self) -> io::Result<Box<Lock>> {
fn lock_shared(&self) -> io::Result<Box<dyn Lock>> {
let lock_path = lock_file_path(&self.path);

let file = fs::File::create(&lock_path)?;
file.lock_shared()?;

Ok(Box::new(file))
}

fn new_thread(&self) -> io::Result<Box<dyn BackendThread>> {
Ok(Box::new(LocalThread {
path: self.path.clone(),
rand_ext: rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.collect::<String>(),
}))
}
}

impl Local {
pub(crate) fn new(path: PathBuf) -> Self {
Local { path: path }
Local { path }
}
}

impl BackendThread for LocalThread {
fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
let path = self.path.join(path);
fs::remove_dir_all(&path)
}

fn rename(
&mut self,
src_path: PathBuf,
Expand All @@ -88,11 +93,6 @@ impl BackendThread for LocalThread {
}
}

fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()> {
let path = self.path.join(path);
fs::remove_dir_all(&path)
}

fn write(
&mut self,
path: PathBuf,
Expand Down
52 changes: 26 additions & 26 deletions lib/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub struct AsyncIO {

impl AsyncIO {
pub(crate) fn new(
backend: Box<Backend + Send + Sync>,
backend: Box<dyn Backend + Send + Sync>,
log: Logger,
) -> io::Result<Self> {
let thread_num = 4 * num_cpus::get();
Expand Down Expand Up @@ -125,10 +125,10 @@ impl AsyncIO {
}

let shared = AsyncIOShared {
join: join,
join,
log: log.clone(),
stats: shared.clone(),
backend: backend,
backend,
};

Ok(AsyncIO {
Expand All @@ -137,11 +137,11 @@ impl AsyncIO {
})
}

pub(crate) fn lock_exclusive(&self) -> io::Result<Box<Lock>> {
pub(crate) fn lock_exclusive(&self) -> io::Result<Box<dyn Lock>> {
self.shared.backend.lock_exclusive()
}

pub(crate) fn lock_shared(&self) -> io::Result<Box<Lock>> {
pub(crate) fn lock_shared(&self) -> io::Result<Box<dyn Lock>> {
self.shared.backend.lock_shared()
}

Expand All @@ -152,36 +152,36 @@ impl AsyncIO {
pub fn list(&self, path: PathBuf) -> AsyncIOResult<Vec<PathBuf>> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::List(path, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

// TODO: No need for it anymore?
#[allow(dead_code)]
pub fn list_recursively(
&self,
path: PathBuf,
) -> Box<Iterator<Item = io::Result<PathBuf>>> {
) -> Box<dyn Iterator<Item = io::Result<PathBuf>>> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::ListRecursively(path, tx));

let iter = rx.into_iter().flat_map(|batch| match batch {
Ok(batch) => Box::new(batch.into_iter().map(Ok))
as Box<Iterator<Item = io::Result<PathBuf>>>,
as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
Err(e) => Box::new(Some(Err(e)).into_iter())
as Box<Iterator<Item = io::Result<PathBuf>>>,
as Box<dyn Iterator<Item = io::Result<PathBuf>>>,
});
Box::new(iter)
}

pub fn write(&self, path: PathBuf, sg: SGData) -> AsyncIOResult<()> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::Write(WriteArgs {
path: path,
path,
data: sg,
idempotent: false,
complete_tx: Some(tx),
}));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

// TODO: No need for it anymore
Expand All @@ -193,12 +193,12 @@ impl AsyncIO {
) -> AsyncIOResult<()> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::Write(WriteArgs {
path: path,
path,
data: sg,
idempotent: true,
complete_tx: Some(tx),
}));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

/// Will panic the worker thread if fails, but does not require
Expand All @@ -207,7 +207,7 @@ impl AsyncIO {
#[allow(dead_code)]
pub fn write_checked(&self, path: PathBuf, sg: SGData) {
self.tx.send(Message::Write(WriteArgs {
path: path,
path,
data: sg,
idempotent: false,
complete_tx: None,
Expand All @@ -216,7 +216,7 @@ impl AsyncIO {

pub fn write_checked_idempotent(&self, path: PathBuf, sg: SGData) {
self.tx.send(Message::Write(WriteArgs {
path: path,
path,
data: sg,
idempotent: true,
complete_tx: None,
Expand All @@ -226,7 +226,7 @@ impl AsyncIO {
pub fn read(&self, path: PathBuf) -> AsyncIOResult<SGData> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::Read(path, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

pub(crate) fn read_metadata(
Expand All @@ -235,25 +235,25 @@ impl AsyncIO {
) -> AsyncIOResult<Metadata> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::ReadMetadata(path, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

pub fn remove(&self, path: PathBuf) -> AsyncIOResult<()> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::Remove(path, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

pub fn remove_dir_all(&self, path: PathBuf) -> AsyncIOResult<()> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::RemoveDirAll(path, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}

pub fn rename(&self, src: PathBuf, dst: PathBuf) -> AsyncIOResult<()> {
let (tx, rx) = mpsc::channel();
self.tx.send(Message::Rename(src, dst, tx));
AsyncIOResult { rx: rx }
AsyncIOResult { rx }
}
}

Expand All @@ -275,7 +275,7 @@ pub struct AsyncIOShared {
join: Vec<thread::JoinHandle<()>>,
log: slog::Logger,
stats: AsyncIOThreadShared,
backend: Box<Backend + Send + Sync>,
backend: Box<dyn Backend + Send + Sync>,
}

impl Drop for AsyncIOShared {
Expand Down Expand Up @@ -335,7 +335,7 @@ struct AsyncIOThread {
rx: crossbeam_channel::Receiver<Message>,
log: Logger,
time_reporter: TimeReporter,
backend: RefCell<Box<BackendThread>>,
backend: RefCell<Box<dyn BackendThread>>,
}

/// Guard that removes entry from the pending paths on drop
Expand All @@ -352,7 +352,7 @@ impl AsyncIOThread {
fn new(
shared: AsyncIOThreadShared,
rx: crossbeam_channel::Receiver<Message>,
backend: Box<BackendThread>,
backend: Box<dyn BackendThread>,
log: Logger,
) -> Self {
let t = TimeReporter::new_with_level(
Expand All @@ -362,8 +362,8 @@ impl AsyncIOThread {
);
AsyncIOThread {
log: log.new(o!("module" => "asyncio")),
shared: shared,
rx: rx,
shared,
rx,
time_reporter: t,
backend: RefCell::new(backend),
}
Expand Down Expand Up @@ -593,7 +593,7 @@ impl AsyncIOThread {
// ```
pub(crate) fn backend_from_url(
u: &Url,
) -> io::Result<Box<Backend + Send + Sync>> {
) -> io::Result<Box<dyn Backend + Send + Sync>> {
if u.scheme() == "file" {
return Ok(Box::new(Local::new(PathBuf::from(u.path()))));
} else if u.scheme() == "b2" {
Expand Down

0 comments on commit 1c32a8a

Please sign in to comment.