Skip to content

Commit

Permalink
Turn BaseMountpointManager into an async context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel committed Jan 29, 2019
1 parent 5c7ec2b commit 052a95d
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 248 deletions.
2 changes: 1 addition & 1 deletion parsec/core/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def _run_mountpoint(config, device, mountpoint):
display_device = click.style(device.device_id, fg="yellow")
mountpoint_display = click.style(str(core.mountpoint.absolute()), fg="yellow")
click.echo(f"{display_device}'s drive mounted at {mountpoint_display}")
await core.mountpoint_manager.join()
await core.mountpoint_task.join()


@click.command(short_help="run parsec mountpoint")
Expand Down
31 changes: 12 additions & 19 deletions parsec/core/logged_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
backend_listen_events,
monitor_backend_connection,
)
from parsec.core.mountpoint import mountpoint_manager
from parsec.core.encryption_manager import EncryptionManager
from parsec.core.mountpoint import mountpoint_manager_factory
from parsec.core.beacons_monitor import monitor_beacons
from parsec.core.messages_monitor import monitor_messages
from parsec.core.sync_monitor import monitor_sync
Expand All @@ -33,14 +33,11 @@ class LoggedCore:
local_db = attr.ib()
event_bus = attr.ib()
encryption_manager = attr.ib()
mountpoint_manager = attr.ib()
mountpoint = attr.ib()
mountpoint_task = attr.ib()
backend_cmds = attr.ib()
fs = attr.ib()

@property
def mountpoint(self):
return self.mountpoint_manager.mountpoint


@asynccontextmanager
async def logged_core_factory(
Expand Down Expand Up @@ -85,27 +82,23 @@ async def logged_core_factory(
await monitor_nursery.start(monitor_messages, backend_online, fs, event_bus)
await monitor_nursery.start(monitor_sync, backend_online, fs, event_bus)

# TODO: rework mountpoint manager to avoid init/teardown
mountpoint_manager = mountpoint_manager_factory(fs, event_bus)
await mountpoint_manager.init(monitor_nursery)
if config.mountpoint_enabled:
if not mountpoint:
mountpoint = config.mountpoint_base_dir / device.device_id
await mountpoint_manager.start(mountpoint)
if not config.mountpoint_enabled:
mountpoint = None
elif mountpoint is None:
mountpoint = config.mountpoint_base_dir / device.device_id

try:
async with mountpoint_manager(
fs, event_bus, mountpoint, monitor_nursery
) as mountpoint_task:
yield LoggedCore(
config=config,
device=device,
local_db=local_db,
event_bus=event_bus,
encryption_manager=encryption_manager,
mountpoint_manager=mountpoint_manager,
mountpoint=mountpoint,
mountpoint_task=mountpoint_task,
backend_cmds=backend_cmds_pool,
fs=fs,
)
root_nursery.cancel_scope.cancel()

finally:
if config.mountpoint_enabled:
await mountpoint_manager.teardown()
20 changes: 5 additions & 15 deletions parsec/core/mountpoint/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
from parsec.core.mountpoint.manager import (
FuseMountpointManager,
FUSE_AVAILABLE,
mountpoint_manager,
get_default_mountpoint,
FUSE_AVAILABLE,
)
from parsec.core.mountpoint.not_available import NotAvailableMountpointManager

from parsec.core.mountpoint.exceptions import (
MountpointManagerError,
MountpointManagerNotAvailable,
MountpointAlreadyStarted,
MountpointNotStarted,
MountpointConfigurationError,
)


def mountpoint_manager_factory(fs, event_bus, **kwargs):
if FUSE_AVAILABLE:
return FuseMountpointManager(fs, event_bus, **kwargs)
else:
return NotAvailableMountpointManager(fs, event_bus, **kwargs)


__all__ = (
"mountpoint_manager_factory",
"mountpoint_manager",
"get_default_mountpoint",
"MountpointManagerError",
"MountpointManagerNotAvailable",
"MountpointAlreadyStarted",
"MountpointNotStarted",
"MountpointConfigurationError",
"FUSE_AVAILABLE",
)
21 changes: 0 additions & 21 deletions parsec/core/mountpoint/base.py

This file was deleted.

8 changes: 0 additions & 8 deletions parsec/core/mountpoint/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,5 @@ class MountpointManagerNotAvailable(MountpointManagerError):
pass


class MountpointAlreadyStarted(MountpointManagerError):
pass


class MountpointNotStarted(MountpointManagerError):
pass


class MountpointConfigurationError(MountpointManagerError):
pass
136 changes: 65 additions & 71 deletions parsec/core/mountpoint/manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import warnings
import trio
import logging
from pathlib import Path

import trio
import attr
from async_generator import asynccontextmanager

from parsec.types import DeviceID
from parsec.core.mountpoint.exceptions import MountpointAlreadyStarted, MountpointNotStarted

try:
from fuse import FUSE
Expand All @@ -30,72 +32,64 @@ def get_default_mountpoint(device_id: DeviceID):
return Path(f"/media/{device_id}")


class FuseMountpointManager:
def __init__(self, fs, event_bus, mode="thread", debug: bool = False, nothreads: bool = False):
if not FUSE_AVAILABLE:
raise RuntimeError("Fuse is not available, is fusepy installed ?")

self.event_bus = event_bus
if mode not in ("thread", "process"):
raise ValueError("mode param must be `thread` or `process`")
self.mode = mode
self.mountpoint = None
self._lock = trio.Lock()
self._fs = fs
self._nursery = None
self._join_fuse_runner = None
self._fuse_config = {"debug": debug, "nothreads": nothreads}

def get_abs_mountpoint(self):
return str(self.mountpoint.absolute())

async def init(self, nursery):
self._nursery = nursery

def is_started(self):
return self._join_fuse_runner is not None

async def start(self, mountpoint):
"""
Raises:
MountpointAlreadyStarted
MountpointConfigurationError
"""
async with self._lock:
if self.is_started():
raise MountpointAlreadyStarted(
f"Fuse already started on mountpoint `{self.mountpoint}`"
)

self.mountpoint = Path(mountpoint)
if self.mode == "process":
fuse_runner = run_fuse_in_process
else:
fuse_runner = run_fuse_in_thread
self._join_fuse_runner = await self._nursery.start(
fuse_runner, self._fs, self.mountpoint, self._fuse_config, self.event_bus
)

async def join(self, stop=False):
"""
Raises:
MountpointNotStarted
"""
async with self._lock:
if not self.is_started():
raise MountpointNotStarted()

await self._join_fuse_runner(stop=stop)
self._join_fuse_runner = None

async def stop(self):
await self.join(stop=True)

async def teardown(self):
try:
await self.stop()
except MountpointNotStarted:
pass


MountpointManager = FuseMountpointManager
@attr.s
class TaskStatus:

cancel = attr.ib()
join = attr.ib()
value = attr.ib()

async def cancel_and_join(self):
self.cancel()
await self.join()


async def stoppable(corofn, *args, task_status=trio.TASK_STATUS_IGNORED):
finished = trio.Event()
try:
async with trio.open_nursery() as nursery:
value = await nursery.start(corofn, *args)
status = TaskStatus(cancel=nursery.cancel_scope.cancel, join=finished.wait, value=value)
task_status.started(status)
finally:
finished.set()


@asynccontextmanager
async def fuse_mountpoint_manager(
fs,
event_bus,
mountpoint,
nursery,
*,
mode="thread",
debug: bool = False,
nothreads: bool = False,
):
# No mountpoint
if mountpoint is None:
yield None
return

# Fuse not available
if not FUSE_AVAILABLE:
raise RuntimeError("Fuse is not available, is fusepy installed ?")

# Invalid mode
if mode not in ("thread", "process"):
raise ValueError("mode param must be `thread` or `process`")

fuse_config = {"debug": debug, "nothreads": nothreads}
if mode == "process":
fuse_runner = run_fuse_in_process
else:
fuse_runner = run_fuse_in_thread

task = await nursery.start(stoppable, fuse_runner, fs, mountpoint, fuse_config, event_bus)
try:
yield task
finally:
await task.cancel_and_join()


mountpoint_manager = fuse_mountpoint_manager
29 changes: 0 additions & 29 deletions parsec/core/mountpoint/not_available.py

This file was deleted.

17 changes: 8 additions & 9 deletions parsec/core/mountpoint/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ async def run_fuse_in_thread(
fs_access = ThreadFSAccess(portal, fs)
fuse_operations = FuseOperations(fs_access, abs_mountpoint)

async def _join_fuse_runner(stop=False):
await _join_fuse_thread(mountpoint, fuse_operations, fuse_thread_stopped, stop=stop)
async def _stop_fuse_runner():
await _stop_fuse_thread(mountpoint, fuse_operations, fuse_thread_stopped)

initial_st_dev = _bootstrap_mountpoint(mountpoint)

Expand All @@ -84,11 +84,11 @@ def _run_fuse_thread():

await _wait_for_fuse_ready(mountpoint, fuse_thread_started, initial_st_dev)

task_status.started(_join_fuse_runner),
event_bus.send("mountpoint.started", mountpoint=abs_mountpoint)
task_status.started(abs_mountpoint)

finally:
await _join_fuse_runner(stop=True)
await _stop_fuse_runner()
event_bus.send("mountpoint.stopped", mountpoint=abs_mountpoint)
_teardown_mountpoint(mountpoint)

Expand Down Expand Up @@ -117,7 +117,7 @@ def _wait_for_fuse_ready_thread():
need_stop = True


async def _join_fuse_thread(mountpoint, fuse_operations, fuse_thread_stopped, stop=False):
async def _stop_fuse_thread(mountpoint, fuse_operations, fuse_thread_stopped):
if fuse_thread_stopped.is_set():
return

Expand All @@ -133,10 +133,9 @@ def _wakeup_fuse():
pass

with trio.open_cancel_scope(shield=True):
if stop:
logger.info("Stopping fuse thread...")
fuse_operations.schedule_exit()
await trio.run_sync_in_worker_thread(_wakeup_fuse)
logger.info("Stopping fuse thread...")
fuse_operations.schedule_exit()
await trio.run_sync_in_worker_thread(_wakeup_fuse)
await trio.run_sync_in_worker_thread(fuse_thread_stopped.wait)
logger.info("Fuse thread stopped")

Expand Down

0 comments on commit 052a95d

Please sign in to comment.