Skip to content

Commit

Permalink
Expose libuv uv_fs_event functionality (#474)
Browse files Browse the repository at this point in the history
* Also updated pyOpenSSL to 22.x

Co-authored-by: Jens Jorgensen <jens@consiliumb.sg>
Co-authored-by: Fantix King <fantix.king@gmail.com>
  • Loading branch information
3 people committed Sep 9, 2022
1 parent 637a77a commit 74d381e
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 1 deletion.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -34,7 +34,7 @@
'flake8~=3.9.2',
'psutil',
'pycodestyle~=2.7.0',
'pyOpenSSL~=19.0.0',
'pyOpenSSL~=22.0.0',
'mypy>=0.800',
CYTHON_DEPENDENCY,
]
Expand Down
100 changes: 100 additions & 0 deletions tests/test_fs_event.py
@@ -0,0 +1,100 @@
import asyncio
import os.path
import tempfile

from uvloop import _testbase as tb
from uvloop.loop import FileSystemEvent


class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return

def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FileSystemEvent.CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)

def test_fs_event_change(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop._monitor_fs(tf.name, self.event_cb)
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(self.change_event_count, 4)


class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()

def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)

def test_fs_event_rename(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
f.write('hello!')
f.close()
h = self.loop._monitor_fs(td_name, self.event_cb)
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(len(self.changed_set), 0)
12 changes: 12 additions & 0 deletions uvloop/handles/fsevent.pxd
@@ -0,0 +1,12 @@
cdef class UVFSEvent(UVHandle):
cdef:
object callback
bint running

cdef _init(self, Loop loop, object callback, object context)
cdef _close(self)
cdef start(self, char* path, int flags)
cdef stop(self)

@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context)
112 changes: 112 additions & 0 deletions uvloop/handles/fsevent.pyx
@@ -0,0 +1,112 @@
import enum


class FileSystemEvent(enum.IntEnum):
RENAME = uv.UV_RENAME
CHANGE = uv.UV_CHANGE
RENAME_CHANGE = RENAME | CHANGE


@cython.no_gc_clear
cdef class UVFSEvent(UVHandle):
cdef _init(self, Loop loop, object callback, object context):
cdef int err

self._start_init(loop)

self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
sizeof(uv.uv_fs_event_t)
)
if self._handle is NULL:
self._abort_init()
raise MemoryError()

err = uv.uv_fs_event_init(
self._loop.uvloop, <uv.uv_fs_event_t*>self._handle
)
if err < 0:
self._abort_init()
raise convert_error(err)

self._finish_init()

self.running = 0
self.callback = callback
if context is None:
context = Context_CopyCurrent()
self.context = context

cdef start(self, char* path, int flags):
cdef int err

self._ensure_alive()

if self.running == 0:
err = uv.uv_fs_event_start(
<uv.uv_fs_event_t*>self._handle,
__uvfsevent_callback,
path,
flags,
)
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return
self.running = 1

cdef stop(self):
cdef int err

if not self._is_alive():
self.running = 0
return

if self.running == 1:
err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle)
self.running = 0
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return

cdef _close(self):
try:
self.stop()
finally:
UVHandle._close(<UVHandle>self)

def cancel(self):
self._close()

def cancelled(self):
return self.running == 0

@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context):
cdef UVFSEvent handle
handle = UVFSEvent.__new__(UVFSEvent)
handle._init(loop, callback, context)
return handle


cdef void __uvfsevent_callback(uv.uv_fs_event_t* handle, const char *filename,
int events, int status) with gil:
if __ensure_handle_data(
<uv.uv_handle_t*>handle, "UVFSEvent callback"
) == 0:
return

cdef:
UVFSEvent fs_event = <UVFSEvent> handle.data
Handle h

try:
h = new_Handle(
fs_event._loop,
fs_event.callback,
(filename, FileSystemEvent(events)),
fs_event.context,
)
h._run()
except BaseException as ex:
fs_event._error(ex, False)
19 changes: 19 additions & 0 deletions uvloop/includes/uv.pxd
Expand Up @@ -183,6 +183,10 @@ cdef extern from "uv.h" nogil:
int pid
# ...

ctypedef struct uv_fs_event_t:
void* data
# ...

ctypedef enum uv_req_type:
UV_UNKNOWN_REQ = 0,
UV_REQ,
Expand Down Expand Up @@ -215,6 +219,10 @@ cdef extern from "uv.h" nogil:
UV_LEAVE_GROUP = 0,
UV_JOIN_GROUP

cpdef enum uv_fs_event:
UV_RENAME = 1,
UV_CHANGE = 2

const char* uv_strerror(int err)
const char* uv_err_name(int err)

Expand Down Expand Up @@ -253,6 +261,10 @@ cdef extern from "uv.h" nogil:
const uv_buf_t* buf,
const system.sockaddr* addr,
unsigned flags) with gil
ctypedef void (*uv_fs_event_cb)(uv_fs_event_t* handle,
const char *filename,
int events,
int status) with gil

# Generic request functions
int uv_cancel(uv_req_t* req)
Expand Down Expand Up @@ -397,6 +409,13 @@ cdef extern from "uv.h" nogil:
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb)
int uv_poll_stop(uv_poll_t* poll)

# FS Event

int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle)
int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb,
const char *path, unsigned int flags)
int uv_fs_event_stop(uv_fs_event_t *handle)

# Misc

ctypedef struct uv_timeval_t:
Expand Down
1 change: 1 addition & 0 deletions uvloop/loop.pxd
Expand Up @@ -219,6 +219,7 @@ include "handles/streamserver.pxd"
include "handles/tcp.pxd"
include "handles/pipe.pxd"
include "handles/process.pxd"
include "handles/fsevent.pxd"

include "request.pxd"
include "sslproto.pxd"
Expand Down
14 changes: 14 additions & 0 deletions uvloop/loop.pyx
Expand Up @@ -3149,6 +3149,19 @@ cdef class Loop:
await waiter
return udp, protocol

def _monitor_fs(self, path: str, callback) -> asyncio.Handle:
cdef:
UVFSEvent fs_handle
char* c_str_path

self._check_closed()
fs_handle = UVFSEvent.new(self, callback, None)
p_bytes = path.encode('UTF-8')
c_str_path = p_bytes
flags = 0
fs_handle.start(c_str_path, flags)
return fs_handle

def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')
Expand Down Expand Up @@ -3301,6 +3314,7 @@ include "handles/streamserver.pyx"
include "handles/tcp.pyx"
include "handles/pipe.pyx"
include "handles/process.pyx"
include "handles/fsevent.pyx"

include "request.pyx"
include "dns.pyx"
Expand Down

0 comments on commit 74d381e

Please sign in to comment.