Skip to content

Commit

Permalink
Switch to cleaned up non-async fuser fork
Browse files Browse the repository at this point in the history
Our current fuser fork has a messy commit history and does two big
changes at once: (1) concurrent filesystems and (2) async filesystems.
We don't actually need the async version right now -- it was incomplete
and we were just running it on blocking threads anyway.

This change redirects the fuser fork to our `fuser/fork` branch, which
is a cleaned up history of the fork but without async support. Instead,
we push the async support down into our implementation via `block_on`.
This performs the same, but with a smaller change to fuser.

In the future we'll still want to async-ify fuser so the entire stack
can run on top of (the CRT's?) async runtime. But there's more work
involved in correctly async-ifying fuser, like doing async IO on the
FUSE device, so we can defer that for now.

This change temporarily breaks the `--thread-count` command-line
argument and so will tank concurrent performance tests. I'll fix that in
a followup as I don't want this diff to get any bigger, and will fix
unmount support (#93) at the same time.

Signed-off-by: James Bornholt <bornholt@amazon.com>
  • Loading branch information
jamesbornholt committed Mar 13, 2023
1 parent cdc757a commit 82d8e03
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 74 deletions.
30 changes: 1 addition & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion mountpoint-s3/Cargo.toml
Expand Up @@ -13,7 +13,6 @@ mountpoint-s3-crt = { path = "../mountpoint-s3-crt" }
anyhow = { version = "1.0.64", features = ["backtrace"] }
async-channel = "1.8.0"
async-lock = "2.6.0"
async-trait = "0.1.57"
bytes = "1.2.1"
clap = { version = "3.2.12", features = ["derive"] }
ctrlc = "3.2.3"
Expand Down
68 changes: 32 additions & 36 deletions mountpoint-s3/src/fuse.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use futures::executor::block_on;
use futures::task::Spawn;
use std::ffi::OsStr;
use std::time::Duration;
use tracing::instrument;
use tracing::{instrument, Instrument};

use crate::fs::{DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig};
use fuser::{
Expand All @@ -28,43 +28,42 @@ where
}
}

#[async_trait]
impl<Client, Runtime> Filesystem for S3FuseFilesystem<Client, Runtime>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
{
#[instrument(level = "debug", skip_all)]
async fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> {
self.fs.init(config).await
fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> {
block_on(self.fs.init(config).in_current_span())
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, name=?name))]
async fn lookup(&self, _req: &Request<'_>, parent: InodeNo, name: &OsStr, reply: ReplyEntry) {
match self.fs.lookup(parent, name).await {
fn lookup(&self, _req: &Request<'_>, parent: InodeNo, name: &OsStr, reply: ReplyEntry) {
match block_on(self.fs.lookup(parent, name).in_current_span()) {
Ok(entry) => reply.entry(&entry.ttl, &entry.attr, entry.generation),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino))]
async fn getattr(&self, _req: &Request<'_>, ino: InodeNo, reply: ReplyAttr) {
match self.fs.getattr(ino).await {
fn getattr(&self, _req: &Request<'_>, ino: InodeNo, reply: ReplyAttr) {
match block_on(self.fs.getattr(ino).in_current_span()) {
Ok(attr) => reply.attr(&attr.ttl, &attr.attr),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino))]
async fn open(&self, _req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) {
match self.fs.open(ino, flags).await {
fn open(&self, _req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) {
match block_on(self.fs.open(ino, flags).in_current_span()) {
Ok(opened) => reply.opened(opened.fh, opened.flags),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, size=size))]
async fn read(
fn read(
&self,
_req: &Request<'_>,
ino: InodeNo,
Expand Down Expand Up @@ -103,29 +102,26 @@ where
inner: reply,
bytes_sent: &mut bytes_sent,
};
self.fs.read(ino, fh, offset, size, flags, lock, replier).await;
block_on(
self.fs
.read(ino, fh, offset, size, flags, lock, replier)
.in_current_span(),
);
// return value of read is proof a reply was sent

metrics::counter!("fuse.bytes_read", bytes_sent as u64);
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent))]
async fn opendir(&self, _req: &Request<'_>, parent: InodeNo, flags: i32, reply: ReplyOpen) {
match self.fs.opendir(parent, flags).await {
fn opendir(&self, _req: &Request<'_>, parent: InodeNo, flags: i32, reply: ReplyOpen) {
match block_on(self.fs.opendir(parent, flags).in_current_span()) {
Ok(opened) => reply.opened(opened.fh, opened.flags),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, fh=fh, offset=offset))]
async fn readdir(
&self,
_req: &Request<'_>,
parent: InodeNo,
fh: u64,
offset: i64,
mut reply: fuser::ReplyDirectory,
) {
fn readdir(&self, _req: &Request<'_>, parent: InodeNo, fh: u64, offset: i64, mut reply: fuser::ReplyDirectory) {
struct ReplyDirectory<'a> {
inner: &'a mut fuser::ReplyDirectory,
}
Expand All @@ -146,14 +142,14 @@ where

let replier = ReplyDirectory { inner: &mut reply };

match self.fs.readdir(parent, fh, offset, replier).await {
match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) {
Ok(_) => reply.ok(),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, fh=fh, offset=offset))]
async fn readdirplus(
fn readdirplus(
&self,
_req: &Request<'_>,
parent: InodeNo,
Expand Down Expand Up @@ -181,14 +177,14 @@ where

let replier = ReplyDirectoryPlus { inner: &mut reply };

match self.fs.readdir(parent, fh, offset, replier).await {
match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) {
Ok(_) => reply.ok(),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh))]
async fn release(
fn release(
&self,
_req: &Request<'_>,
ino: InodeNo,
Expand All @@ -198,14 +194,14 @@ where
flush: bool,
reply: ReplyEmpty,
) {
match self.fs.release(ino, fh, flags, lock_owner, flush).await {
match block_on(self.fs.release(ino, fh, flags, lock_owner, flush).in_current_span()) {
Ok(()) => reply.ok(),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), parent=parent, name=?name))]
async fn mknod(
fn mknod(
&self,
_req: &Request<'_>,
parent: InodeNo,
Expand All @@ -218,14 +214,14 @@ where
// mode_t is u32 on Linux but u16 on macOS, so cast it here
let mode = mode as libc::mode_t;

match self.fs.mknod(parent, name, mode, umask, rdev).await {
match block_on(self.fs.mknod(parent, name, mode, umask, rdev).in_current_span()) {
Ok(entry) => reply.entry(&entry.ttl, &entry.attr, entry.generation),
Err(e) => reply.error(e),
}
}

#[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, length=data.len()))]
async fn write(
fn write(
&self,
_req: &Request<'_>,
ino: InodeNo,
Expand All @@ -237,11 +233,11 @@ where
lock_owner: Option<u64>,
reply: ReplyWrite,
) {
match self
.fs
.write(ino, fh, offset, data, write_flags, flags, lock_owner)
.await
{
match block_on(
self.fs
.write(ino, fh, offset, data, write_flags, flags, lock_owner)
.in_current_span(),
) {
Ok(bytes_written) => reply.written(bytes_written),
Err(e) => reply.error(e),
}
Expand Down
8 changes: 2 additions & 6 deletions mountpoint-s3/src/main.rs
Expand Up @@ -355,12 +355,8 @@ fn mount(args: CliArgs) -> anyhow::Result<BackgroundSession> {

let session = Session::new(fs, &args.mount_point, &options).context("Failed to create FUSE session")?;

let session = if let Some(thread_count) = args.thread_count {
BackgroundSession::new_multi_thread(session, thread_count as usize)
} else {
BackgroundSession::new(session)
};
let session = session.context("Failed to start FUSE session")?;
// TODO correctly handle multi-threading and unmounting
let session = BackgroundSession::new(session).context("Failed to start FUSE session")?;

tracing::info!("successfully mounted {:?}", args.mount_point);

Expand Down
4 changes: 2 additions & 2 deletions vendor-fuser.sh
Expand Up @@ -13,11 +13,11 @@ fi

rm -rf $FUSER_FULL_PATH

git clone --branch fuser/async ssh://git@github.com/awslabs/mountpoint-s3.git $FUSER_FULL_PATH
git clone --branch fuser/fork ssh://git@github.com/awslabs/mountpoint-s3.git $FUSER_FULL_PATH
COMMIT=$(git -C $FUSER_FULL_PATH rev-parse --short HEAD)

rm -rf $FUSER_FULL_PATH/.git

git add $FUSER_FULL_PATH

git commit -m "Update vendored fuser to $COMMIT"
git commit -m "Update vendored fuser to $COMMIT" -s

1 comment on commit 82d8e03

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 82d8e03 Previous: 869811c Ratio
random_read_four_threads 1.791015625 MiB/s 7.392578125 MiB/s 4.13
random_read_four_threads_direct_io 2.3525390625 MiB/s 10.7265625 MiB/s 4.56
random_read_four_threads_direct_io_small_file 4.876953125 MiB/s 33.859375 MiB/s 6.94
random_read_four_threads_small_file 8.3798828125 MiB/s 25.984375 MiB/s 3.10
sequential_read_four_threads_direct_io 2590.3466796875 MiB/s 6652.4375 MiB/s 2.57
sequential_read_four_threads_direct_io_small_file 32.2451171875 MiB/s 167.0576171875 MiB/s 5.18
sequential_read_four_threads_small_file 2.8779296875 MiB/s 9.431640625 MiB/s 3.28

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.