Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose libuv uv_fs_event functionality #474

Merged
merged 18 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
'flake8~=3.9.2',
'psutil',
'pycodestyle~=2.7.0',
'pyOpenSSL~=19.0.0',
'pyOpenSSL~=22.0.0',
'mypy>=0.800',
CYTHON_DEPENDENCY,
]
Expand Down
100 changes: 100 additions & 0 deletions tests/test_fs_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import asyncio
import os.path
import tempfile

from uvloop import _testbase as tb
from uvloop.loop import FileSystemEvent


class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return

def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FileSystemEvent.CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)

def test_fs_event_change(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop._monitor_fs(tf.name, self.event_cb)
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(self.change_event_count, 4)


class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()

def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)

def test_fs_event_rename(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()

with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
f.write('hello!')
f.close()
h = self.loop._monitor_fs(td_name, self.event_cb)
self.assertFalse(h.cancelled())

self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.cancel()
self.assertTrue(h.cancelled())

self.assertEqual(len(self.changed_set), 0)
12 changes: 12 additions & 0 deletions uvloop/handles/fsevent.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
cdef class UVFSEvent(UVHandle):
cdef:
object callback
bint running

cdef _init(self, Loop loop, object callback, object context)
cdef _close(self)
cdef start(self, char* path, int flags)
cdef stop(self)

@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context)
112 changes: 112 additions & 0 deletions uvloop/handles/fsevent.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import enum


class FileSystemEvent(enum.IntEnum):
RENAME = uv.UV_RENAME
CHANGE = uv.UV_CHANGE
RENAME_CHANGE = RENAME | CHANGE


@cython.no_gc_clear
cdef class UVFSEvent(UVHandle):
cdef _init(self, Loop loop, object callback, object context):
cdef int err

self._start_init(loop)

self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
sizeof(uv.uv_fs_event_t)
)
if self._handle is NULL:
self._abort_init()
raise MemoryError()

err = uv.uv_fs_event_init(
self._loop.uvloop, <uv.uv_fs_event_t*>self._handle
)
if err < 0:
self._abort_init()
raise convert_error(err)

self._finish_init()

self.running = 0
self.callback = callback
if context is None:
context = Context_CopyCurrent()
self.context = context

cdef start(self, char* path, int flags):
cdef int err

self._ensure_alive()

if self.running == 0:
err = uv.uv_fs_event_start(
<uv.uv_fs_event_t*>self._handle,
__uvfsevent_callback,
path,
flags,
)
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return
self.running = 1

cdef stop(self):
cdef int err

if not self._is_alive():
self.running = 0
return

if self.running == 1:
err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle)
self.running = 0
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return

cdef _close(self):
try:
self.stop()
finally:
UVHandle._close(<UVHandle>self)

def cancel(self):
self._close()

def cancelled(self):
return self.running == 0

@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context):
cdef UVFSEvent handle
handle = UVFSEvent.__new__(UVFSEvent)
handle._init(loop, callback, context)
return handle


cdef void __uvfsevent_callback(uv.uv_fs_event_t* handle, const char *filename,
int events, int status) with gil:
if __ensure_handle_data(
<uv.uv_handle_t*>handle, "UVFSEvent callback"
) == 0:
return

cdef:
UVFSEvent fs_event = <UVFSEvent> handle.data
Handle h

try:
h = new_Handle(
fs_event._loop,
fs_event.callback,
(filename, FileSystemEvent(events)),
fs_event.context,
)
h._run()
except BaseException as ex:
fs_event._error(ex, False)
19 changes: 19 additions & 0 deletions uvloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ cdef extern from "uv.h" nogil:
int pid
# ...

ctypedef struct uv_fs_event_t:
void* data
# ...

ctypedef enum uv_req_type:
UV_UNKNOWN_REQ = 0,
UV_REQ,
Expand Down Expand Up @@ -215,6 +219,10 @@ cdef extern from "uv.h" nogil:
UV_LEAVE_GROUP = 0,
UV_JOIN_GROUP

cpdef enum uv_fs_event:
UV_RENAME = 1,
UV_CHANGE = 2

const char* uv_strerror(int err)
const char* uv_err_name(int err)

Expand Down Expand Up @@ -253,6 +261,10 @@ cdef extern from "uv.h" nogil:
const uv_buf_t* buf,
const system.sockaddr* addr,
unsigned flags) with gil
ctypedef void (*uv_fs_event_cb)(uv_fs_event_t* handle,
const char *filename,
int events,
int status) with gil

# Generic request functions
int uv_cancel(uv_req_t* req)
Expand Down Expand Up @@ -397,6 +409,13 @@ cdef extern from "uv.h" nogil:
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb)
int uv_poll_stop(uv_poll_t* poll)

# FS Event

int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle)
int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb,
const char *path, unsigned int flags)
int uv_fs_event_stop(uv_fs_event_t *handle)

# Misc

ctypedef struct uv_timeval_t:
Expand Down
1 change: 1 addition & 0 deletions uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ include "handles/streamserver.pxd"
include "handles/tcp.pxd"
include "handles/pipe.pxd"
include "handles/process.pxd"
include "handles/fsevent.pxd"

include "request.pxd"
include "sslproto.pxd"
Expand Down
14 changes: 14 additions & 0 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3149,6 +3149,19 @@ cdef class Loop:
await waiter
return udp, protocol

def _monitor_fs(self, path: str, callback) -> asyncio.Handle:
cdef:
UVFSEvent fs_handle
char* c_str_path

self._check_closed()
fs_handle = UVFSEvent.new(self, callback, None)
p_bytes = path.encode('UTF-8')
c_str_path = p_bytes
fantix marked this conversation as resolved.
Show resolved Hide resolved
flags = 0
fs_handle.start(c_str_path, flags)
return fs_handle

def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')
Expand Down Expand Up @@ -3301,6 +3314,7 @@ include "handles/streamserver.pyx"
include "handles/tcp.pyx"
include "handles/pipe.pyx"
include "handles/process.pyx"
include "handles/fsevent.pyx"

include "request.pyx"
include "dns.pyx"
Expand Down