Skip to content

Commit

Permalink
Merge 4d3a07e into 6d9b2fd
Browse files Browse the repository at this point in the history
  • Loading branch information
touilleMan committed Oct 3, 2018
2 parents 6d9b2fd + 4d3a07e commit 3cf4d42
Show file tree
Hide file tree
Showing 16 changed files with 579 additions and 803 deletions.
8 changes: 4 additions & 4 deletions parsec/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@click.command()
def core_cmd():
raise SystemExit("No available.")
raise SystemExit("Not available.")


try:
Expand All @@ -17,11 +17,11 @@ def core_cmd():

@click.command()
def backend_cmd():
raise SystemExit("No available.")
raise SystemExit("Not available.")

@click.command()
def init_cmd():
raise SystemExit("No available.")
raise SystemExit("Not available.")


try:
Expand All @@ -30,7 +30,7 @@ def init_cmd():

@click.command()
def fuse_cmd():
raise RuntimeError("No available, is fusepy installed ?")
raise RuntimeError("Not available, is fusepy installed ?")


except NameError:
Expand Down
2 changes: 1 addition & 1 deletion parsec/core/api/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from parsec.schema import BaseCmdSchema, fields
from parsec.core.app import Core, ClientContext
from parsec.core.fuse_manager import FuseNotAvailable, FuseAlreadyStarted, FuseNotStarted
from parsec.core.fuse import FuseNotAvailable, FuseAlreadyStarted, FuseNotStarted
from parsec.core.backend_connection import BackendNotAvailable
from parsec.core.devices_manager import DeviceLoadingError

Expand Down
12 changes: 7 additions & 5 deletions parsec/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from parsec.core.beacons_monitor import monitor_beacons
from parsec.core.messages_monitor import monitor_messages
from parsec.core.devices_manager import LocalDevicesManager
from parsec.core.fuse_manager import FuseManager
from parsec.core.encryption_manager import EncryptionManager
from parsec.core.backend_cmds_sender import BackendCmdsSender
from parsec.core.backend_events_manager import BackendEventsManager
from parsec.core.connection_monitor import monitor_connection
from parsec.core.fuse import FuseManager


logger = logbook.Logger("parsec.core.app")
Expand Down Expand Up @@ -134,9 +134,10 @@ def __init__(self, device, config, event_bus, nursery):
)
self.backend_cmds_sender = BackendCmdsSender(device, self.config.backend_addr)
self.encryption_manager = EncryptionManager(device, self.backend_cmds_sender)
self.fuse_manager = FuseManager(self.config.addr, self.event_bus)

self.fs = FS(device, self.backend_cmds_sender, self.encryption_manager, self.event_bus)

self.fuse_manager = FuseManager(self.fs, self.event_bus)
self.sync_monitor = SyncMonitor(self.fs, self.event_bus)

async def start(self):
Expand All @@ -149,7 +150,6 @@ async def start(self):
# Components initialization must respect dependencies
await self.backend_cmds_sender.init(self._nursery)
await self.encryption_manager.init(self._nursery)
await self.fuse_manager.init(self._nursery)
# Keep event manager last, so it will know what events the other
# modules need before connecting to the backend
await self.backend_events_manager.init(self._nursery)
Expand All @@ -166,15 +166,17 @@ async def start(self):
self._stop_monitor_sync = await self._nursery.start(taskify(self.sync_monitor.run))

async def stop(self):
# First stop monitoring coroutine
# First make sure fuse is not started
await self.fuse_manager.teardown()

# Then stop monitoring coroutine
# TODO: Only needed by old core-based hypothesis tests
if self.config.auto_sync:
await self._stop_monitor_beacons()
await self._stop_monitor_messages()
await self._stop_monitor_sync()

# Then teardown components, again while respecting dependencies
await self.fuse_manager.teardown()
await self.encryption_manager.teardown()
await self.backend_cmds_sender.teardown()
await self.backend_events_manager.teardown()
Expand Down
152 changes: 15 additions & 137 deletions parsec/core/fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,137 +1,15 @@
import math
import inspect

from parsec.core.fs.local_folder_fs import FSManifestLocalMiss, LocalFolderFS
from parsec.core.fs.local_file_fs import LocalFileFS, FSBlocksLocalMiss
from parsec.core.fs.syncer import Syncer
from parsec.core.fs.sharing import Sharing
from parsec.core.fs.remote_loader import RemoteLoader


class FS:
def __init__(self, device, backend_cmds_sender, encryption_manager, event_bus):
super().__init__()
self.event_bus = event_bus
self.device = device

self._local_folder_fs = LocalFolderFS(device, event_bus)
self._local_file_fs = LocalFileFS(device, self._local_folder_fs, event_bus)
self._remote_loader = RemoteLoader(backend_cmds_sender, encryption_manager, device.local_db)
self._syncer = Syncer(
device,
backend_cmds_sender,
encryption_manager,
self._local_folder_fs,
self._local_file_fs,
event_bus,
)
self._sharing = Sharing(
device,
backend_cmds_sender,
encryption_manager,
self._local_folder_fs,
self._syncer,
self._remote_loader,
event_bus,
)

async def _load_and_retry(self, fn, *args, **kwargs):
while True:
try:
if inspect.iscoroutinefunction(fn):
return await fn(*args, **kwargs)
else:
return fn(*args, **kwargs)

except FSManifestLocalMiss as exc:
await self._remote_loader.load_manifest(exc.access)

except FSBlocksLocalMiss as exc:
for access in exc.accesses:
await self._remote_loader.load_block(access)

async def stat(self, path):
return await self._load_and_retry(self._local_folder_fs.stat, path)

async def file_write(self, path, content, offset=0):
fd = await self.file_fd_open(path)
try:
if offset:
await self.file_fd_seek(fd, offset)
await self.file_fd_write(fd, content)
finally:
await self.file_fd_close(fd)

async def file_truncate(self, path, length):
fd = await self.file_fd_open(path)
try:
await self.file_fd_truncate(fd, length)
finally:
await self.file_fd_close(fd)

async def file_read(self, path, size=math.inf, offset=0):
fd = await self.file_fd_open(path)
try:
if offset:
await self.file_fd_seek(fd, offset)
return await self.file_fd_read(fd, size)
finally:
await self.file_fd_close(fd)

async def file_fd_open(self, path):
access = await self._load_and_retry(self._local_folder_fs.get_access, path)
return self._local_file_fs.open(access)

async def file_fd_close(self, fd):
self._local_file_fs.close(fd)

async def file_fd_seek(self, fd, offset):
self._local_file_fs.seek(fd, offset)

async def file_fd_truncate(self, fd, length):
self._local_file_fs.truncate(fd, length)

async def file_fd_write(self, fd, content):
self._local_file_fs.write(fd, content)

async def file_fd_flush(self, fd):
self._local_file_fs.flush(fd)

async def file_fd_read(self, fd, size=-1):
return await self._load_and_retry(self._local_file_fs.read, fd, size)

async def file_create(self, path):
await self._load_and_retry(self._local_folder_fs.touch, path)

async def folder_create(self, path):
await self._load_and_retry(self._local_folder_fs.mkdir, path)

async def workspace_create(self, path):
await self._load_and_retry(self._local_folder_fs.mkdir, path, workspace=True)

async def move(self, src, dst):
await self._load_and_retry(self._local_folder_fs.move, src, dst)

async def delete(self, path):
await self._load_and_retry(self._local_folder_fs.delete, path)

async def sync(self, path, recursive=True):
await self._load_and_retry(self._syncer.sync, path, recursive=recursive)

# TODO: do we really need this ? or should we provide id manipulation at this level ?
async def sync_by_id(self, entry_id):
await self._load_and_retry(self._syncer.sync_by_id, entry_id)

# TODO: do we really need this ? or should we optimize `sync(path='/')` ?
async def full_sync(self):
await self._load_and_retry(self._syncer.full_sync)

async def get_entry_path(self, id):
path, _, _ = await self._load_and_retry(self._local_folder_fs.get_entry_path, id)
return path

async def share(self, path, recipient):
await self._load_and_retry(self._sharing.share, path, recipient)

async def process_last_messages(self):
await self._sharing.process_last_messages()
from parsec.core.fs.fs import FS
from parsec.core.fs.local_folder_fs import FSManifestLocalMiss
from parsec.core.fs.local_file_fs import FSBlocksLocalMiss, FSInvalidFileDescriptor
from parsec.core.fs.syncer import SyncConcurrencyError
from parsec.core.fs.sharing import SharingError


__init__ = (
"FS",
"FSManifestLocalMiss",
"FSBlocksLocalMiss",
"FSInvalidFileDescriptor",
"SyncConcurrencyError",
"SharingError",
)
137 changes: 137 additions & 0 deletions parsec/core/fs/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import math
import inspect

from parsec.core.fs.local_folder_fs import FSManifestLocalMiss, LocalFolderFS
from parsec.core.fs.local_file_fs import LocalFileFS, FSBlocksLocalMiss
from parsec.core.fs.syncer import Syncer
from parsec.core.fs.sharing import Sharing
from parsec.core.fs.remote_loader import RemoteLoader


class FS:
def __init__(self, device, backend_cmds_sender, encryption_manager, event_bus):
super().__init__()
self.event_bus = event_bus
self.device = device

self._local_folder_fs = LocalFolderFS(device, event_bus)
self._local_file_fs = LocalFileFS(device, self._local_folder_fs, event_bus)
self._remote_loader = RemoteLoader(backend_cmds_sender, encryption_manager, device.local_db)
self._syncer = Syncer(
device,
backend_cmds_sender,
encryption_manager,
self._local_folder_fs,
self._local_file_fs,
event_bus,
)
self._sharing = Sharing(
device,
backend_cmds_sender,
encryption_manager,
self._local_folder_fs,
self._syncer,
self._remote_loader,
event_bus,
)

async def _load_and_retry(self, fn, *args, **kwargs):
while True:
try:
if inspect.iscoroutinefunction(fn):
return await fn(*args, **kwargs)
else:
return fn(*args, **kwargs)

except FSManifestLocalMiss as exc:
await self._remote_loader.load_manifest(exc.access)

except FSBlocksLocalMiss as exc:
for access in exc.accesses:
await self._remote_loader.load_block(access)

async def stat(self, path):
return await self._load_and_retry(self._local_folder_fs.stat, path)

async def file_write(self, path, content, offset=0):
fd = await self.file_fd_open(path)
try:
if offset:
await self.file_fd_seek(fd, offset)
await self.file_fd_write(fd, content)
finally:
await self.file_fd_close(fd)

async def file_truncate(self, path, length):
fd = await self.file_fd_open(path)
try:
await self.file_fd_truncate(fd, length)
finally:
await self.file_fd_close(fd)

async def file_read(self, path, size=math.inf, offset=0):
fd = await self.file_fd_open(path)
try:
if offset:
await self.file_fd_seek(fd, offset)
return await self.file_fd_read(fd, size)
finally:
await self.file_fd_close(fd)

async def file_fd_open(self, path):
access = await self._load_and_retry(self._local_folder_fs.get_access, path)
return self._local_file_fs.open(access)

async def file_fd_close(self, fd):
self._local_file_fs.close(fd)

async def file_fd_seek(self, fd, offset):
self._local_file_fs.seek(fd, offset)

async def file_fd_truncate(self, fd, length):
self._local_file_fs.truncate(fd, length)

async def file_fd_write(self, fd, content):
return self._local_file_fs.write(fd, content)

async def file_fd_flush(self, fd):
self._local_file_fs.flush(fd)

async def file_fd_read(self, fd, size=-1):
return await self._load_and_retry(self._local_file_fs.read, fd, size)

async def file_create(self, path):
await self._load_and_retry(self._local_folder_fs.touch, path)

async def folder_create(self, path):
await self._load_and_retry(self._local_folder_fs.mkdir, path)

async def workspace_create(self, path):
await self._load_and_retry(self._local_folder_fs.mkdir, path, workspace=True)

async def move(self, src, dst):
await self._load_and_retry(self._local_folder_fs.move, src, dst)

async def delete(self, path):
await self._load_and_retry(self._local_folder_fs.delete, path)

async def sync(self, path, recursive=True):
await self._load_and_retry(self._syncer.sync, path, recursive=recursive)

# TODO: do we really need this ? or should we provide id manipulation at this level ?
async def sync_by_id(self, entry_id):
await self._load_and_retry(self._syncer.sync_by_id, entry_id)

# TODO: do we really need this ? or should we optimize `sync(path='/')` ?
async def full_sync(self):
await self._load_and_retry(self._syncer.full_sync)

async def get_entry_path(self, id):
path, _, _ = await self._load_and_retry(self._local_folder_fs.get_entry_path, id)
return path

async def share(self, path, recipient):
await self._load_and_retry(self._sharing.share, path, recipient)

async def process_last_messages(self):
await self._sharing.process_last_messages()

0 comments on commit 3cf4d42

Please sign in to comment.