Skip to content

Commit

Permalink
Add concurrent upload to file sync
Browse files Browse the repository at this point in the history
  • Loading branch information
touilleMan committed Dec 27, 2018
1 parent bae8851 commit 5fdf936
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions parsec/core/fs/file_syncer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import trio
import pendulum
from itertools import count
from typing import Union, List, Optional
Expand Down Expand Up @@ -69,18 +70,33 @@ def get_sync_map(manifest, block_size: int) -> List[Buffer]:
class FileSyncerMixin(BaseSyncer):
async def _build_data_from_contiguous_space(self, cs):
data = bytearray(cs.size)
for bs in cs.buffers:
if isinstance(bs.buffer, BlockBuffer):
buff = await self._backend_block_read(bs.buffer.access)
elif isinstance(bs.buffer, DirtyBlockBuffer):
buff = self.local_file_fs.get_block(bs.buffer.access)
else:
assert isinstance(bs.buffer, NullFillerBuffer)
buff = bs.buffer.data
assert buff
data[bs.start - cs.start : bs.end - cs.start] = buff[
bs.buffer_slice_start : bs.buffer_slice_end
]
buffers = cs.buffers.copy()

async def _process_buffer():
while buffers:
bs = buffers.pop()
if isinstance(bs.buffer, BlockBuffer):
buff = await self._backend_block_read(bs.buffer.access)
elif isinstance(bs.buffer, DirtyBlockBuffer):
buff = self.local_file_fs.get_block(bs.buffer.access)
else:
assert isinstance(bs.buffer, NullFillerBuffer)
buff = bs.buffer.data
assert buff
data[bs.start - cs.start : bs.end - cs.start] = buff[
bs.buffer_slice_start : bs.buffer_slice_end
]

if len(buffers) < 2:
await _process_buffer()

else:
async with trio.open_nursery() as nursery:
nursery.start_soon(_process_buffer)
nursery.start_soon(_process_buffer)
nursery.start_soon(_process_buffer)
nursery.start_soon(_process_buffer)

return data

def _sync_file_look_resolve_concurrency(
Expand Down Expand Up @@ -156,16 +172,32 @@ async def _sync_file_actual_sync(
sync_map = get_sync_map(manifest, self.block_size)

# Upload the new blocks
for cs in sync_map.spaces:
data = await self._build_data_from_contiguous_space(cs)
if not data:
# Already existing blocks taken verbatim
blocks += [bs.buffer.data for bs in cs.buffers]
else:
# Create a new block from existing data
block_access = new_block_access(data, cs.start)
await self._backend_block_create(block_access, data)
blocks.append(block_access)
spaces = sync_map.spaces
blocks = []

async def _process_spaces():
nonlocal blocks
while spaces:
cs = spaces.pop()
data = await self._build_data_from_contiguous_space(cs)
if not data:
# Already existing blocks taken verbatim
blocks += [bs.buffer.data for bs in cs.buffers]
else:
# Create a new block from existing data
block_access = new_block_access(data, cs.start)
await self._backend_block_create(block_access, data)
blocks.append(block_access)

if len(spaces) < 2:
await _process_spaces()

else:
async with trio.open_nursery() as nursery:
nursery.start_soon(_process_spaces)
nursery.start_soon(_process_spaces)
nursery.start_soon(_process_spaces)
nursery.start_soon(_process_spaces)

to_sync_manifest["blocks"] = blocks
to_sync_manifest["size"] = sync_map.size # TODO: useful ?
Expand Down

0 comments on commit 5fdf936

Please sign in to comment.