Skip to content

Commit

Permalink
Merge branch 'trio-sharing' into trio
Browse files Browse the repository at this point in the history
  • Loading branch information
touilleMan committed Mar 25, 2018
2 parents 386cec2 + 9f61dbe commit 62ac6e8
Show file tree
Hide file tree
Showing 23 changed files with 408 additions and 134 deletions.
1 change: 1 addition & 0 deletions parsec/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ async def _api_event_subscribe(self, client_ctx, msg):

if (event in ('user_vlob_updated', 'message_arrived', 'device_try_claim') and
subject not in (None, client_ctx.user_id)):
# TODO: is the `subject == None` valid here ?
return {
'status': 'private_event',
'reason': 'This type of event is private.'
Expand Down
6 changes: 3 additions & 3 deletions parsec/backend/drivers/memory/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def __init__(self, *args):
super().__init__(*args)
self._messages = defaultdict(list)

async def perform_message_new(self, recipient, body):
self._messages[recipient].append(body)
self._signal_message_arrived.send(recipient)
async def perform_message_new(self, sender_device_id, recipient_user_id, body):
self._messages[recipient_user_id].append((sender_device_id, body))
self._signal_message_arrived.send(recipient_user_id)

async def perform_message_get(self, id, offset):
return self._messages[id][offset:]
12 changes: 8 additions & 4 deletions parsec/backend/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ class BaseMessageComponent:
def __init__(self, signal_ns):
self._signal_message_arrived = signal_ns.signal('message_arrived')

async def perform_message_new(self, recipient, body):
async def perform_message_new(self, sender_device_id, recipient_user_id, body):
raise NotImplementedError()

async def perform_message_get(self, id, offset):
raise NotImplementedError()

async def api_message_new(self, client_ctx, msg):
msg = cmd_NEW_Schema().load_or_abort(msg)
await self.perform_message_new(**msg)
await self.perform_message_new(
sender_device_id=client_ctx.id,
recipient_user_id=msg['recipient'],
body=msg['body']
)
return {'status': 'ok'}

async def api_message_get(self, client_ctx, msg):
Expand All @@ -34,6 +38,6 @@ async def api_message_get(self, client_ctx, msg):
messages = await self.perform_message_get(client_ctx.user_id, offset)
return {
'status': 'ok',
'messages': [{'count': i, 'body': to_jsonb64(msg)}
for i, msg in enumerate(messages, offset + 1)]
'messages': [{'count': i, 'body': to_jsonb64(data[1]), 'sender_id': data[0]}
for i, data in enumerate(messages, offset + 1)]
}
74 changes: 71 additions & 3 deletions parsec/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@
from nacl.signing import SigningKey
from json import JSONDecodeError

from parsec.core.sharing import Sharing
from parsec.core.fs import fs_factory
from parsec.core.fs_api import FSApi, PathOnlySchema
from parsec.core.synchronizer import Synchronizer
from parsec.core.devices_manager import DevicesManager, DeviceLoadingError
from parsec.core.backend_connection import (
BackendConnection, BackendNotAvailable, backend_send_anonymous_cmd)
from parsec.ui import fuse
from parsec.utils import CookedSocket, ParsecError, to_jsonb64, from_jsonb64
from parsec.schema import BaseCmdSchema, fields
from parsec.utils import (
CookedSocket, ParsecError, to_jsonb64, from_jsonb64, ejson_dumps)
from parsec.schema import BaseCmdSchema, fields, validate


logger = logbook.Logger("parsec.core.app")
Expand Down Expand Up @@ -64,6 +66,11 @@ class cmd_FUSE_START_Schema(BaseCmdSchema):
mountpoint = fields.String(required=True)


class cmd_SHARE_Schema(BaseCmdSchema):
path = fields.String(required=True)
recipient = fields.String(required=True)


class CoreApp:

def __init__(self, config):
Expand All @@ -81,6 +88,7 @@ def __init__(self, config):
self.auth_events = None
self.fs = None
self.synchronizer = None
self.sharing = None
self.backend_connection = None
self.devices_manager = DevicesManager(config.base_settings_path)
self.fuse_process = None
Expand Down Expand Up @@ -122,6 +130,8 @@ def __init__(self, config):
'move': self._fs_api.move,
'delete': self._fs_api.delete,
'file_truncate': self._fs_api.file_truncate,

'share': self._api_share,
}

async def init(self, nursery):
Expand Down Expand Up @@ -166,6 +176,8 @@ async def handle_client(self, sockstream):
raise

async def login(self, device):
# TODO: create a login/logout lock to avoid concurrency crash
# during logout
self.auth_subscribed_events = {}
self.auth_events = trio.Queue(100)
self.backend_connection = BackendConnection(
Expand All @@ -186,8 +198,15 @@ async def login(self, device):
# # await blocks_manager.init()
# self.fs = FS(manifests_manager, blocks_manager)
await self.fs.init()
try:
self.sharing = Sharing(device, self.signal_ns, self.fs, self.backend_connection)
await self.sharing.init(self.nursery)
except BaseException:
await self.fs.teardown()
raise
except BaseException:
await self.synchronizer.teardown()
if self.synchronizer:
await self.synchronizer.teardown()
raise
except BaseException:
await self.backend_connection.teardown()
Expand All @@ -198,6 +217,8 @@ async def login(self, device):
self.auth_device = device

async def logout(self):
self._handle_new_message = None
await self.sharing.teardown()
await self.fs.teardown()
if self.synchronizer:
await self.synchronizer.teardown()
Expand Down Expand Up @@ -493,3 +514,50 @@ async def _api_fuse_open(self, req):
return {'status': 'fuse_not_started'}
webbrowser.open(os.path.join(self.mountpoint, msg['path'][1:]))
return {'status': 'ok'}

async def _api_share(self, req):
# TODO: super rough stuff...
if not self.auth_device:
return {'status': 'login_required'}

try:
cmd_SHARE_Schema().load_or_abort(req)
entry = await self.fs.fetch_path(req['path'])
# Cannot share a placeholder !
if entry.is_placeholder:
# TODO: use minimal_sync_if_placeholder ?
await entry.sync()
sharing_msg = {
'type': 'share',
'content': entry._access.dump(with_type=False),
'name': entry.name
}

recipient = req['recipient']
rep = await self.backend_connection.send({
'cmd': 'user_get',
'user_id': recipient,
})
if rep['status'] != 'ok':
# TODO: better cooking of the answer
return rep

from nacl.public import SealedBox, PublicKey

broadcast_key = PublicKey(from_jsonb64(rep['broadcast_key']))
box = SealedBox(broadcast_key)
sharing_msg_clear = ejson_dumps(sharing_msg).encode('utf8')
sharing_msg_signed = self.auth_device.device_signkey.sign(sharing_msg_clear)
sharing_msg_encrypted = box.encrypt(sharing_msg_signed)

rep = await self.backend_connection.send({
'cmd': 'message_new',
'recipient': recipient,
'body': to_jsonb64(sharing_msg_encrypted)
})
if rep['status'] != 'ok':
# TODO: better cooking of the answer
return rep
except BackendNotAvailable:
return {'status': 'backend_not_availabled'}
return {'status': 'ok'}
2 changes: 1 addition & 1 deletion parsec/core/backend_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def _socket_connection_factory(self):
logger.debug('connecting to backend {}:{}', self.addr.hostname, self.addr.port)
sockstream = await trio.open_tcp_stream(self.addr.hostname, self.addr.port)
try:
logger.debug('handshake has {}', self.handshake_id)
logger.debug('handshake as {}', self.handshake_id)
sock = CookedSocket(sockstream)
if self.handshake_id == 'anonymous':
ch = AnonymousClientHandshake()
Expand Down
13 changes: 13 additions & 0 deletions parsec/core/fs/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ def end(self):
def _fs(self):
raise NotImplementedError()

async def sync(self):
if not self.is_dirty:
return
if not self._data:
raise RuntimeError('Nothing to sync')
dirty_access = self._access
# TODO: add a lock ? should be more secure, but on the other hand
# a block is sync only once, really close to where is has been created.
id = await self._fs.blocks_manager.sync_new_block_with_backend(
dirty_access.key, self._data)
self._access = self._fs._block_access_cls(
id, dirty_access.key, dirty_access.offset, dirty_access.size)

def is_dirty(self):
return isinstance(self._access, BaseDirtyBlockAccess)

Expand Down
29 changes: 17 additions & 12 deletions parsec/core/fs/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ async def minimal_sync_if_placeholder(self):
self._access = self._fs._vlob_access_cls(id, rts, wts, key)

async def sync(self, recursive=False):
await self.minimal_sync_if_placeholder()
# Note recursive argument is not needed here
# Make sure we are not a placeholder
await self.minimal_sync_if_placeholder()
if not self._need_sync:
return
# Make sure we are not a placeholder
async with self.acquire_write():
if not self._need_sync:
return
Expand All @@ -213,15 +213,17 @@ async def sync(self, recursive=False):
'version': self._base_version + 1,
'created': self._created,
'updated': self._updated,
'size': self._size,
'blocks': None, # Will be computed later
}
merged_blocks = []
for block, offset, end in get_merged_blocks(
self._blocks, self._dirty_blocks, self._size):
if offset and end:
buffer = await block.fetch_data()
buffer = buffer[offset:end]
access = self._fs._dirty_block_access_cls(offset=offset, size=len(buffer))
block = self._fs._block_cls(access, data=buffer)
# TODO: block taken verbatim are rewritten
buffer = await block.fetch_data()
buffer = buffer[offset:end]
access = self._fs._dirty_block_access_cls(offset=offset, size=len(buffer))
block = self._fs._block_cls(access, data=buffer)
merged_blocks.append(block)
normalized_blocks = []
for block_group in get_normalized_blocks(merged_blocks, 4096):
Expand All @@ -234,8 +236,11 @@ async def sync(self, recursive=False):
normalized_blocks.append(block)
dirty_blocks_count = len(self._dirty_blocks)

# Flush data here given we don't want to lose the upload blocks
# TODO...
# TODO: Flush manifest here given we don't want to lose the upload blocks
# TODO: block upload in parallel ?
for normalized_block in normalized_blocks:
await normalized_block.sync()
manifest['blocks'] = [v._access.dump() for v in normalized_blocks]

# Upload the file manifest as new vlob version
await self._fs.manifests_manager.sync_with_backend(
Expand All @@ -249,7 +254,7 @@ async def sync(self, recursive=False):
# TODO: notify blocks_managers the dirty blocks are no longer useful ?
self._base_version = manifest['version']
await self.flush_no_lock()
self._need_sync = bool(self._dirty_blocks)
self._need_sync = self._updated != manifest['updated']


def get_merged_blocks(file_blocks, file_dirty_blocks, file_size):
Expand Down Expand Up @@ -296,7 +301,7 @@ def get_merged_blocks(file_blocks, file_dirty_blocks, file_size):
block_end_data = offset - block.offset
if block_start_data != block_end_data:
if block.offset == block_start_data and block.end == block_end_data:
blocks.append((block, None, None))
blocks.append((block, 0, block.size))
else:
blocks.append((block, block_start_data, block_end_data))
del start_offset[index]
Expand Down Expand Up @@ -329,7 +334,7 @@ def get_normalized_blocks(blocks, block_size=4096):
offset_splits.pop(0)
else:
if offset == block.offset and end == block.end:
block_group.append((block, None, None))
block_group.append((block, 0, block.size))
else:
block_group.append((block, offset - block.offset, end - block.offset))
if end == offset_splits[0]:
Expand Down
16 changes: 16 additions & 0 deletions parsec/core/fs/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ async def delete_child_no_lock(self, name):
entry._parent = None
return entry

async def insert_child_from_access(self, name, access):
async with self.acquire_write():
return await self.insert_child_from_access_no_lock(name, access)

async def insert_child_from_access_no_lock(self, name, access):
if name in self._children:
raise FSInvalidPath("Path `%s/%s` already exists" % (self.path, name))
child = self._fs._not_loaded_entry_cls(
access=access,
name=name,
parent=self
)
self._children[name] = child
self._modified()
return child

async def insert_child(self, name, child):
async with self.acquire_write():
await self.insert_child_no_lock(name, child)
Expand Down
79 changes: 79 additions & 0 deletions parsec/core/sharing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import trio
from nacl.public import SealedBox, PublicKey
from nacl.signing import VerifyKey

from parsec.utils import from_jsonb64, ejson_loads
from parsec.core.fs import FSInvalidPath


class Sharing:
def __init__(self, device, signal_ns, fs, backend_connection):
self.signal_ns = signal_ns
self.fs = fs
self.backend_connection = backend_connection
self.device = device
self.msg_arrived = trio.Event()

async def _message_listener_task(self, *, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as cancel_scope:
task_status.started(cancel_scope)
while True:
await self.msg_arrived.wait()
self.msg_arrived.clear()
# TODO: should keep a message counter in the user manifest
# too know which message should be processed here
rep = await self.backend_connection.send({'cmd': 'message_get'})
assert rep['status'] == 'ok'
msg = rep['messages'][-1]

sender_user_id, sender_device_name = msg['sender_id'].split('@')
rep = await self.backend_connection.send({
'cmd': 'user_get', 'user_id': sender_user_id})
assert rep['status'] == 'ok'
# TODO: handle crash, handle key validity expiration

sender_verifykey = VerifyKey(
from_jsonb64(rep['devices'][sender_device_name]['verify_key']))
box = SealedBox(self.device.user_privkey)

# TODO: handle bad signature, bad encryption, bad json, bad payload...
sharing_msg_encrypted = from_jsonb64(msg['body'])
sharing_msg_signed = box.decrypt(sharing_msg_encrypted)
sharing_msg_clear = sender_verifykey.verify(sharing_msg_signed)
sharing_msg = ejson_loads(sharing_msg_clear.decode('utf8'))

# TODO: handle other type of message
assert sharing_msg['type'] == 'share'
sharing_access = self.fs._vlob_access_cls(
sharing_msg['content']['id'],
sharing_msg['content']['rts'],
sharing_msg['content']['wts'],
from_jsonb64(sharing_msg['content']['key'])
)

shared_with_folder_name = 'shared-with-%s' % sender_user_id
if shared_with_folder_name not in self.fs.root:
shared_with_folder = await self.fs.root.create_folder(shared_with_folder_name)
else:
shared_with_folder = await self.fs.root.fetch_child(shared_with_folder_name)
sharing_name = sharing_msg['name']
while True:
try:
child = await shared_with_folder.insert_child_from_access(
sharing_name, sharing_access)
break
except FSInvalidPath:
sharing_name += '-dup'

self.signal_ns.signal('new_sharing').send(child.path)

def _msg_arrived_cb(self, sender):
self.msg_arrived.set()

async def init(self, nursery):
self._message_listener_task_cancel_scope = await nursery.start(self._message_listener_task)
await self.backend_connection.subscribe_event('message_arrived', self.device.user_id)
self.signal_ns.signal('message_arrived').connect(self._msg_arrived_cb, weak=True)

async def teardown(self):
self._message_listener_task_cancel_scope.cancel()

0 comments on commit 62ac6e8

Please sign in to comment.