Skip to content

Commit

Permalink
Add an async variant of AtomicWriteFile
Browse files Browse the repository at this point in the history
  • Loading branch information
andreacorbellini committed Apr 25, 2024
1 parent 3f7a41b commit 588dab7
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ categories = ["filesystem"]
[dependencies]
rand = { version = "0.8" }

async-std = { version = "1.12", optional = true }

[target.'cfg(unix)'.dependencies]
nix = { version = "0.28", features = ["fs", "user"] }

Expand Down
207 changes: 207 additions & 0 deletions src/future/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use crate::imp;
use crate::OpenOptions;
use async_std::fs::File;
use async_std::io::IoSlice;
use async_std::io::IoSliceMut;
use async_std::io::Read;
use async_std::io::Seek;
use async_std::io::SeekFrom;
use async_std::io::Write;
use async_std::sync::Arc;
use async_std::task::block_on;
use async_std::task::spawn_blocking;
use async_std::task::Context;
use async_std::task::Poll;
use std::io::Result;
use std::mem::ManuallyDrop;
use std::ops::Deref;
use std::path::Path;
use std::pin::pin;
use std::pin::Pin;
use std::ptr;

#[cfg(test)]
mod tests;

#[derive(Clone, Debug)]
pub struct AtomicWriteFile {
temporary_file: Arc<imp::TemporaryFile<File>>,
finalized: bool,
}

impl AtomicWriteFile {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = spawn_blocking(move || OpenOptions::new().open(path)).await?;

// Take the `temporary_file` out of the blocking `AtomicWriteFile`, so that we can convert
// it to an async file. This requires unsafe code because `AtomicWriteFile` has a
// destructor, and we want to avoid running it now
let file = ManuallyDrop::new(file);
// SAFETY: we're taking ownership of the `temporary_file`, and disposing of `file` without
// running its destructor
let temporary_file = unsafe { ptr::read(&(*file).temporary_file) };

Ok(Self {
temporary_file: Arc::new(temporary_file.into()),
finalized: false,
})
}

#[inline]
pub fn as_file(&self) -> &File {
&self.temporary_file.file
}

pub async fn commit(mut self) -> Result<()> {
self._commit().await
}

async fn _commit(&mut self) -> Result<()> {
if self.finalized {
return Ok(());
}
self.finalized = true;
self.sync_all().await?;
let temporary_file = Arc::clone(&self.temporary_file);
spawn_blocking(move || temporary_file.rename_file()).await
}

pub async fn discard(mut self) -> Result<()> {
self._discard().await
}

async fn _discard(&mut self) -> Result<()> {
if self.finalized {
return Ok(());
}
self.finalized = true;
let temporary_file = Arc::clone(&self.temporary_file);
spawn_blocking(move || temporary_file.remove_file()).await
}
}

impl Drop for AtomicWriteFile {
#[inline]
fn drop(&mut self) {
if !self.finalized {
// Ignore all errors
let _ = block_on(self._discard());
}
}
}

impl Deref for AtomicWriteFile {
type Target = File;

#[inline]
fn deref(&self) -> &Self::Target {
self.as_file()
}
}

impl Read for AtomicWriteFile {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_read(cx, buf)
}

#[inline]
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_read_vectored(cx, bufs)
}
}

impl Read for &AtomicWriteFile {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_read(cx, buf)
}

#[inline]
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_read_vectored(cx, bufs)
}
}

impl Write for AtomicWriteFile {
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_write(cx, buf)
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
pin!(&(*self.temporary_file).file).poll_flush(cx)
}

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
pin!(&(*self.temporary_file).file).poll_close(cx)
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_write_vectored(cx, bufs)
}
}

impl Write for &AtomicWriteFile {
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_write(cx, buf)
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
pin!(&(*self.temporary_file).file).poll_flush(cx)
}

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
pin!(&(*self.temporary_file).file).poll_close(cx)
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
pin!(&(*self.temporary_file).file).poll_write_vectored(cx, bufs)
}
}

impl Seek for AtomicWriteFile {
#[inline]
fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
pin!(&(*self.temporary_file).file).poll_seek(cx, pos)
}
}

impl Seek for &AtomicWriteFile {
#[inline]
fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
pin!(&(*self.temporary_file).file).poll_seek(cx, pos)
}
}
65 changes: 65 additions & 0 deletions src/future/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::future::AtomicWriteFile;
use crate::tests::test_file;
use crate::tests::verify_no_leftovers;
use async_std::fs;
use async_std::io::WriteExt;
use async_std::task::block_on;
use std::io::Result;

#[test]
fn create_new() -> Result<()> {
block_on(async {
let path = test_file("async-new");
assert!(!path.exists());

let mut file = AtomicWriteFile::open(&path).await?;
assert!(!path.exists());

file.write_all(b"hello ").await?;
assert!(!path.exists());
file.flush().await?;
assert!(!path.exists());
file.write_all(b"world\n").await?;
assert!(!path.exists());
file.flush().await?;
assert!(!path.exists());

file.commit().await?;

assert!(path.exists());
assert_eq!(fs::read(&path).await?, b"hello world\n");

verify_no_leftovers(path);

Ok(())
})
}

#[test]
fn overwrite_existing() -> Result<()> {
block_on(async {
let path = test_file("async-existing");
fs::write(&path, b"initial contents\n").await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");

let mut file = AtomicWriteFile::open(&path).await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");

file.write_all(b"hello ").await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");
file.flush().await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");
file.write_all(b"world\n").await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");
file.flush().await?;
assert_eq!(fs::read(&path).await?, b"initial contents\n");

file.commit().await?;

assert_eq!(fs::read(&path).await?, b"hello world\n");

verify_no_leftovers(path);

Ok(())
})
}
21 changes: 19 additions & 2 deletions src/imp/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ impl Default for OpenOptions {
}

#[derive(Debug)]
pub(crate) struct TemporaryFile {
pub(crate) struct TemporaryFile<F = File> {
pub(crate) temp_path: PathBuf,
pub(crate) dest_path: PathBuf,
pub(crate) file: File,
pub(crate) file: F,
}

impl TemporaryFile {
Expand Down Expand Up @@ -69,7 +69,9 @@ impl TemporaryFile {
file,
})
}
}

impl<F> TemporaryFile<F> {
pub(crate) fn rename_file(&self) -> Result<()> {
fs::rename(&self.temp_path, &self.dest_path)
}
Expand All @@ -84,6 +86,21 @@ impl TemporaryFile {
}
}

#[cfg(feature = "async-std")]
impl<F> TemporaryFile<F> {
#[inline]
pub(crate) fn into<G>(self) -> TemporaryFile<G>
where
G: From<F>,
{
TemporaryFile::<G> {
temp_path: self.temp_path,
dest_path: self.dest_path,
file: self.file.into(),
}
}
}

// An enum without variants, so that it can never be constructed
#[derive(Debug)]
pub(crate) enum Dir {}
Expand Down
22 changes: 20 additions & 2 deletions src/imp/unix/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use std::io::Result;
use std::path::Path;

#[derive(Debug)]
pub(crate) struct TemporaryFile {
pub(crate) struct TemporaryFile<F = File> {
pub(crate) dir: Dir,
pub(crate) file: File,
pub(crate) file: F,
pub(crate) name: OsString,
pub(crate) temporary_name: OsString,
}
Expand Down Expand Up @@ -42,7 +42,9 @@ impl TemporaryFile {
temporary_name,
})
}
}

impl<F> TemporaryFile<F> {
pub(crate) fn rename_file(&self) -> Result<()> {
rename_temporary_file(&self.dir, &self.temporary_name, &self.name)?;
Ok(())
Expand All @@ -58,3 +60,19 @@ impl TemporaryFile {
Some(&self.dir)
}
}

#[cfg(feature = "async-std")]
impl<F> TemporaryFile<F> {
#[inline]
pub(crate) fn into<G>(self) -> TemporaryFile<G>
where
G: From<F>,
{
TemporaryFile::<G> {
dir: self.dir,
file: self.file.into(),
name: self.name,
temporary_name: self.temporary_name,
}
}
}

0 comments on commit 588dab7

Please sign in to comment.