Skip to content

Commit

Permalink
open the file already on creation of the file attacher
Browse files Browse the repository at this point in the history
otherwise it races against to_disk (the file will be gone / moved)
  • Loading branch information
Frens Jan Rumph committed Apr 18, 2017
1 parent 3978958 commit c8e7b16
Showing 1 changed file with 65 additions and 24 deletions.
89 changes: 65 additions & 24 deletions bndl/net/sendfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
Adapted from and generalized into a utility outside the web context:
https://github.com/KeepSafe/aiohttp/blob/72e615b508dc2def975419da1bddc2e3a0970203/aiohttp/web_urldispatcher.py#L439
'''

from functools import partial
import asyncio
import contextlib
import logging
import os

from bndl.net import aio
from bndl.util.exceptions import catch


logger = logging.getLogger(__name__)


CHUNK_SIZE = 8 * 1024
Expand All @@ -34,32 +39,68 @@ def is_remote(data):


def file_attachment(filename, offset, size, maybe_local=True):
assert hasattr(os, "sendfile")
attacher = FileAttacher(filename, offset, size, maybe_local)
return attacher.key, attacher


filename = filename.encode('utf-8')
class FileAttacher(object):
def __init__(self, filename, offset, size, maybe_local=True):
assert hasattr(os, "sendfile")
self.key = filename.encode('utf-8')
self.offset = offset
self.size = size
self.maybe_local = maybe_local
self.fd = os.open(filename, os.O_RDONLY)

@contextlib.contextmanager
def _attacher(loop, writer):
socket = writer.get_extra_info('socket')
if maybe_local and socket.getpeername()[0] in ('::1', '127.0.0.1', socket.getsockname()[0]):
@asyncio.coroutine
def sender():
writer.write(_LOCAL)
yield 1, sender

def __call__(self, loop, writer):
self.loop = loop
self.writer = writer
return self


def __enter__(self):
socket = self.writer.get_extra_info('socket')
if self.maybe_local and \
socket.getpeername()[0] in ('::1', '127.0.0.1', socket.getsockname()[0]):
return 1, self.send_local
else:
@asyncio.coroutine
def sender():
socket = writer.get_extra_info('socket')
with open(filename, 'rb') as file:
if maybe_local:
writer.write(_REMOTE)
yield from aio.drain(writer)
socket = socket.dup()
socket.setblocking(False)
yield from sendfile(socket.fileno(), file.fileno(), offset, size, loop)
yield size + int(maybe_local), sender

return filename, _attacher
return self.size + int(self.maybe_local), self.send_remote


def __exit__(self, exc_type, exc_value, traceback):
self.close()


def close(self):
with catch():
os.close(self.fd)


def __del__(self):
self.close()


@asyncio.coroutine
def send_local(self):
self.writer.write(_LOCAL)


@asyncio.coroutine
def send_remote(self):
try:
socket = self.writer.get_extra_info('socket')
if self.maybe_local:
self.writer.write(_REMOTE)
yield from aio.drain(self.writer)
socket = socket.dup()
socket.setblocking(False)
yield from sendfile(socket.fileno(), self.fd,
self.offset, self.size,
self.loop)
except:
logger.exception('Unable to send file %r', self.key)



def _sendfile_cb_system(loop, fut, out_fd, in_fd, offset, nbytes, registered):
Expand Down

0 comments on commit c8e7b16

Please sign in to comment.