Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose libuv uv_fs_event functionality #474

Merged
merged 18 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 3 additions & 36 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,36 +1,3 @@
*._*
*.pyc
*.pyo
*.ymlc
*.ymlc~
*.scssc
*.so
*~
.#*
.DS_Store
.project
.pydevproject
.settings
.idea
/.ropeproject
\#*#
/pub
/test*.py
/.local
/perf.data*
/config_local.yml
/build
__pycache__/
.d8_history
/*.egg
/*.egg-info
/dist
/.cache
docs/_build
uvloop/loop.*.pyd
/.pytest_cache/
/.mypy_cache/
/.vscode
/.eggs
/.venv*
/wheelhouse
uvloop-dev
build
.eggs
jensbjorgensen marked this conversation as resolved.
Show resolved Hide resolved
102 changes: 102 additions & 0 deletions tests/test_fs_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import asyncio
import os.path
import tempfile

from uvloop import _testbase as tb
from uvloop.const import FS_EVENT_CHANGE, FS_EVENT_RENAME


class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return

def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: int):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FS_EVENT_CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)

def test_fs_event_change(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
self.loop.stop()

with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop.monitor_fs(tf.name, self.event_cb, 0)
try:
self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
h.close()
finally:
self.loop.close()
fantix marked this conversation as resolved.
Show resolved Hide resolved

self.assertEqual(self.change_event_count, 4)


class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()

def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()

def event_cb(self, ev_fname: bytes, evt: int):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FS_EVENT_RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)

def test_fs_event_rename(self):
self.fs_event_setup()

async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
self.loop.stop()

with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
f.write('hello!')
f.close()
h = self.loop.monitor_fs(td_name, self.event_cb, 0)
try:
self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.close()
finally:
self.loop.close()

self.assertEqual(len(self.changed_set), 0)
6 changes: 6 additions & 0 deletions tests/test_get_uvloop_ptr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from uvloop import _testbase as tb


class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
def test_fs_event_change(self):
self.assertGreater(self.loop.get_uvloop_ptr(), 0)
2 changes: 2 additions & 0 deletions uvloop/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FS_EVENT_CHANGE = 2
FS_EVENT_RENAME = 1
jensbjorgensen marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 14 additions & 0 deletions uvloop/handles/fsevent.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
cdef class UVFSEvent(UVHandle):
cdef:
object callback

cdef _init(self, Loop loop, char* path, object callback,
int flags)

cdef _close(self)
#cdef start(self)
#cdef get_when(self)
jensbjorgensen marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
cdef UVFSEvent new(Loop loop, char* path, object callback,
int flags)
70 changes: 70 additions & 0 deletions uvloop/handles/fsevent.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
@cython.no_gc_clear
cdef class UVFSEvent(UVHandle):
cdef _init(self, Loop loop, char* path, object callback,
int flags):

cdef int err

self._start_init(loop)

self._handle = <uv.uv_handle_t*> PyMem_RawMalloc(sizeof(uv.uv_fs_event_t))
if self._handle is NULL:
self._abort_init()
raise MemoryError()

err = uv.uv_fs_event_init(self._loop.uvloop, <uv.uv_fs_event_t*>self._handle)
if err < 0:
self._abort_init()
raise convert_error(err)


err = uv.uv_fs_event_start(<uv.uv_fs_event_t*>self._handle, __uvfsevent_callback, path, flags)
if err < 0:
self._abort_init()
raise convert_error(err)

self._finish_init()

self.callback = callback


cdef _close(self):
cdef int err

if not self._is_alive():
return

err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle)
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return

UVHandle._close(<UVHandle>self)


def close(self):
self._close()

@staticmethod
cdef UVFSEvent new(Loop loop, char* path, object callback,
int flags):

cdef UVFSEvent handle
handle = UVFSEvent.__new__(UVFSEvent)
handle._init(loop, path, callback, flags)
return handle


cdef void __uvfsevent_callback(uv.uv_fs_event_t* handle, const char *filename,
int events, int status) with gil:
if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVFSEvent callback") == 0:
return

cdef:
UVFSEvent fs_event = <UVFSEvent> handle.data

try:
fs_event.callback(filename, events)
except BaseException as ex:
fs_event._error(ex, False)
20 changes: 20 additions & 0 deletions uvloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ cdef extern from "uv.h" nogil:
int pid
# ...

ctypedef struct uv_fs_event_t:
void* data
# ...

ctypedef enum uv_req_type:
UV_UNKNOWN_REQ = 0,
UV_REQ,
Expand Down Expand Up @@ -215,6 +219,15 @@ cdef extern from "uv.h" nogil:
UV_LEAVE_GROUP = 0,
UV_JOIN_GROUP

ctypedef enum uv_fs_event:
UV_RENAME = 1,
UV_CHANGE = 2

ctypedef enum uv_fs_event_flags:
UV_FS_EVENT_WATCH_ENTRY = 1,
UV_FS_EVENT_STAT = 2,
UV_FS_EVENT_RECURSIVE = 4

const char* uv_strerror(int err)
const char* uv_err_name(int err)

Expand Down Expand Up @@ -253,6 +266,7 @@ cdef extern from "uv.h" nogil:
const uv_buf_t* buf,
const system.sockaddr* addr,
unsigned flags) with gil
ctypedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char *filename, int events, int status) with gil

# Generic request functions
int uv_cancel(uv_req_t* req)
Expand Down Expand Up @@ -397,6 +411,12 @@ cdef extern from "uv.h" nogil:
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb)
int uv_poll_stop(uv_poll_t* poll)

# FS Event

int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle)
int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb, const char *path, unsigned int flags)
int uv_fs_event_stop(uv_fs_event_t *handle)

# Misc

ctypedef struct uv_timeval_t:
Expand Down
4 changes: 3 additions & 1 deletion uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .includes cimport uv
from .includes cimport system

from libc.stdint cimport uint64_t, uint32_t, int64_t
from libc.stdint cimport uint64_t, uint32_t, int64_t, uintptr_t


include "includes/consts.pxi"
Expand Down Expand Up @@ -190,6 +190,7 @@ cdef class Loop:
cdef _sock_connect(self, sock, address)
cdef _sock_connect_cb(self, fut, sock, address)

cdef _monitor_fs(self, char* path, int flags, callback)
cdef _sock_set_reuseport(self, int fd)

cdef _setup_or_resume_signals(self)
Expand Down Expand Up @@ -220,6 +221,7 @@ include "handles/streamserver.pxd"
include "handles/tcp.pxd"
include "handles/pipe.pxd"
include "handles/process.pxd"
include "handles/fsevent.pxd"

include "request.pxd"
include "sslproto.pxd"
Expand Down
20 changes: 11 additions & 9 deletions uvloop/loop.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ _T = TypeVar('_T')
_Context = Dict[str, Any]
_ExceptionHandler = Callable[[asyncio.AbstractEventLoop, _Context], Any]
_SSLContext = Union[bool, None, ssl.SSLContext]
_TransProtPair = Tuple[asyncio.transports.BaseTransport, asyncio.transports.BaseTransport]

class Loop:
def call_soon(
Expand Down Expand Up @@ -155,7 +156,7 @@ class Loop:
server_hostname: Optional[str] = ...,
ssl_handshake_timeout: Optional[float] = ...,
ssl_shutdown_timeout: Optional[float] = ...,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
@overload
async def create_connection(
self,
Expand All @@ -172,7 +173,7 @@ class Loop:
server_hostname: Optional[str] = ...,
ssl_handshake_timeout: Optional[float] = ...,
ssl_shutdown_timeout: Optional[float] = ...,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def create_unix_server(
self,
protocol_factory: asyncio.events._ProtocolFactory,
Expand All @@ -195,7 +196,7 @@ class Loop:
server_hostname: Optional[str] = ...,
ssl_handshake_timeout: Optional[float] = ...,
ssl_shutdown_timeout: Optional[float] = ...,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
def default_exception_handler(self, context: _Context) -> None: ...
def get_exception_handler(self) -> Optional[_ExceptionHandler]: ...
def set_exception_handler(self, handler: Optional[_ExceptionHandler]) -> None: ...
Expand All @@ -217,7 +218,7 @@ class Loop:
ssl: _SSLContext = ...,
ssl_handshake_timeout: Optional[float] = ...,
ssl_shutdown_timeout: Optional[float] = ...,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def run_in_executor(
self, executor: Any, func: Callable[..., _T], *args: Any
) -> _T: ...
Expand All @@ -231,7 +232,7 @@ class Loop:
stdout: Any = ...,
stderr: Any = ...,
**kwargs: Any,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def subprocess_exec(
self,
protocol_factory: asyncio.events._ProtocolFactory,
Expand All @@ -240,13 +241,13 @@ class Loop:
stdout: Any = ...,
stderr: Any = ...,
**kwargs: Any,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def connect_read_pipe(
self, protocol_factory: asyncio.events._ProtocolFactory, pipe: Any
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def connect_write_pipe(
self, protocol_factory: asyncio.events._ProtocolFactory, pipe: Any
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
def add_signal_handler(
self, sig: int, callback: Callable[..., Any], *args: Any
) -> None: ...
Expand All @@ -264,7 +265,7 @@ class Loop:
reuse_port: Optional[bool] = ...,
allow_broadcast: Optional[bool] = ...,
sock: Optional[socket] = ...,
) -> asyncio.events._TransProtPair: ...
) -> _TransProtPair: ...
async def shutdown_asyncgens(self) -> None: ...
async def shutdown_default_executor(self) -> None: ...
# Loop doesn't implement these, but since they are marked as abstract in typeshed,
Expand All @@ -287,3 +288,4 @@ class Loop:
*,
fallback: bool = ...
) -> int: ...
def monitor_fs(self, path: str, callback: Callable[..., Any], flags: int) -> asyncio.Handle: ...