From 41fca1eb601139922817a17b6335a8c2d2c8db51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Sat, 22 Apr 2023 14:53:01 +0200 Subject: [PATCH] [events] Use `dataclass` (#987) * [events] Use `dataclass` - [events] `FileSystemEvent`, and subclasses, are now `dataclass`es, and their `repr()` has changed - [windows] `WinAPINativeEvent` is now a `dataclass`, and its `repr()` has changed * chore: run black with line-lentgh=120 --- changelog.rst | 3 +- src/watchdog/events.py | 94 +++---------------- src/watchdog/observers/__init__.py | 1 + src/watchdog/observers/api.py | 4 +- src/watchdog/observers/fsevents.py | 24 ++--- src/watchdog/observers/fsevents2.py | 23 +---- src/watchdog/observers/inotify.py | 4 +- src/watchdog/observers/inotify_buffer.py | 10 +- src/watchdog/observers/inotify_c.py | 38 ++------ src/watchdog/observers/kqueue.py | 20 +--- .../observers/read_directory_changes.py | 10 +- src/watchdog/observers/winapi.py | 16 +--- src/watchdog/tricks/__init__.py | 8 +- src/watchdog/utils/__init__.py | 8 +- src/watchdog/utils/dirsnapshot.py | 8 +- src/watchdog/utils/echo.py | 10 +- src/watchdog/utils/patterns.py | 16 +--- src/watchdog/watchmedo.py | 42 +++------ tests/conftest.py | 4 +- tests/markers.py | 4 +- tests/test_0_watchmedo.py | 13 +-- tests/test_emitter.py | 55 +++-------- tests/test_events.py | 57 +---------- tests/test_fsevents.py | 20 +--- tests/test_inotify_buffer.py | 6 +- tests/test_inotify_c.py | 20 +--- tests/test_pattern_matching_event_handler.py | 8 +- tests/test_patterns.py | 22 +---- tests/test_regex_matching_event_handler.py | 8 +- tests/utils.py | 5 +- 30 files changed, 121 insertions(+), 440 deletions(-) diff --git a/changelog.rst b/changelog.rst index 4425b225..5d964990 100644 --- a/changelog.rst +++ b/changelog.rst @@ -8,12 +8,13 @@ Changelog 2023-xx-xx • `full history `__ +- [events] ``FileSystemEvent``, and subclasses, are now ``dataclass``es, and their ``repr()`` has changed +- [windows] ``WinAPINativeEvent`` is now a ``dataclass``, and its ``repr()`` has changed - [events] Log ``FileOpenedEvent``, and ``FileClosedEvent``, events in ``LoggingEventHandler`` - [tests] Improve ``FileSystemEvent`` coverage - [watchmedo] Log all events in ``LoggerTrick`` - Thanks to our beloved contributors: @BoboTiG - 3.0.0 ~~~~~ diff --git a/src/watchdog/events.py b/src/watchdog/events.py index d5010a30..51c6c5bf 100755 --- a/src/watchdog/events.py +++ b/src/watchdog/events.py @@ -96,6 +96,7 @@ import logging import os.path import re +from dataclasses import dataclass, field from typing import Optional from watchdog.utils.patterns import match_any_paths @@ -108,6 +109,7 @@ EVENT_TYPE_OPENED = "opened" +@dataclass(unsafe_hash=True) class FileSystemEvent: """ Immutable type that represents a file system event that is triggered @@ -117,76 +119,24 @@ class FileSystemEvent: can be used as keys in dictionaries or be added to sets. """ - event_type = "" - """The type of the event as a string.""" + src_path: str + dest_path: str = "" + event_type: str = field(default="", init=False) + is_directory: bool = field(default=False, init=False) - is_directory = False - """True if event was emitted for a directory; False otherwise.""" - - is_synthetic = False """ True if event was synthesized; False otherwise. - These are events that weren't actually broadcast by the OS, but are presumed to have happened based on other, actual events. """ - - def __init__(self, src_path): - self._src_path = src_path - - @property - def src_path(self): - """Source path of the file system object that triggered this event.""" - return self._src_path - - def __str__(self): - return self.__repr__() - - def __repr__(self): - return ( - f"<{type(self).__name__}: event_type={self.event_type}, " - f"src_path={self.src_path!r}, is_directory={self.is_directory}>" - ) - - # Used for comparison of events. - @property - def key(self): - return (self.event_type, self.src_path, self.is_directory) - - def __eq__(self, event): - return self.key == event.key - - def __hash__(self): - return hash(self.key) + is_synthetic: bool = field(default=False) class FileSystemMovedEvent(FileSystemEvent): - """ - File system event representing any kind of file system movement. - """ + """File system event representing any kind of file system movement.""" event_type = EVENT_TYPE_MOVED - def __init__(self, src_path, dest_path): - super().__init__(src_path) - self._dest_path = dest_path - - @property - def dest_path(self): - """The destination path of the move event.""" - return self._dest_path - - # Used for hashing this as an immutable object. - @property - def key(self): - return (self.event_type, self.src_path, self.dest_path, self.is_directory) - - def __repr__(self): - return ( - f"<{type(self).__name__}: src_path={self.src_path!r}, " - f"dest_path={self.dest_path!r}, is_directory={self.is_directory}>" - ) - # File events. @@ -519,9 +469,7 @@ def on_moved(self, event: FileSystemEvent) -> None: super().on_moved(event) what = "directory" if event.is_directory else "file" - self.logger.info( - "Moved %s: from %s to %s", what, event.src_path, event.dest_path # type: ignore[attr-defined] - ) + self.logger.info("Moved %s: from %s to %s", what, event.src_path, event.dest_path) def on_created(self, event: FileSystemEvent) -> None: super().on_created(event) @@ -568,20 +516,12 @@ def generate_sub_moved_events(src_dir_path, dest_dir_path): for root, directories, filenames in os.walk(dest_dir_path): for directory in directories: full_path = os.path.join(root, directory) - renamed_path = ( - full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else None - ) - dir_moved_event = DirMovedEvent(renamed_path, full_path) - dir_moved_event.is_synthetic = True - yield dir_moved_event + renamed_path = full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else "" + yield DirMovedEvent(renamed_path, full_path, is_synthetic=True) for filename in filenames: full_path = os.path.join(root, filename) - renamed_path = ( - full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else None - ) - file_moved_event = FileMovedEvent(renamed_path, full_path) - file_moved_event.is_synthetic = True - yield file_moved_event + renamed_path = full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else "" + yield FileMovedEvent(renamed_path, full_path, is_synthetic=True) def generate_sub_created_events(src_dir_path): @@ -597,10 +537,6 @@ def generate_sub_created_events(src_dir_path): """ for root, directories, filenames in os.walk(src_dir_path): for directory in directories: - dir_created_event = DirCreatedEvent(os.path.join(root, directory)) - dir_created_event.is_synthetic = True - yield dir_created_event + yield DirCreatedEvent(os.path.join(root, directory), is_synthetic=True) for filename in filenames: - file_created_event = FileCreatedEvent(os.path.join(root, filename)) - file_created_event.is_synthetic = True - yield file_created_event + yield FileCreatedEvent(os.path.join(root, filename), is_synthetic=True) diff --git a/src/watchdog/observers/__init__.py b/src/watchdog/observers/__init__.py index 339f9417..ab92c618 100644 --- a/src/watchdog/observers/__init__.py +++ b/src/watchdog/observers/__init__.py @@ -74,6 +74,7 @@ except Exception: try: from .kqueue import KqueueObserver as Observer + warnings.warn("Failed to import fsevents. Fall back to kqueue") except Exception: from .polling import PollingObserver as Observer diff --git a/src/watchdog/observers/api.py b/src/watchdog/observers/api.py index 2612f79a..98b46de1 100644 --- a/src/watchdog/observers/api.py +++ b/src/watchdog/observers/api.py @@ -294,9 +294,7 @@ def schedule(self, event_handler, path, recursive=False): # If we don't have an emitter for this watch already, create it. if self._emitter_for_watch.get(watch) is None: - emitter = self._emitter_class( - event_queue=self.event_queue, watch=watch, timeout=self.timeout - ) + emitter = self._emitter_class(event_queue=self.event_queue, watch=watch, timeout=self.timeout) if self.is_alive(): emitter.start() self._add_emitter(emitter) diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index 0e022588..d62765a3 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -85,9 +85,7 @@ def __init__( self._start_time = 0.0 self._starting_state = None self._lock = threading.Lock() - self._absolute_watch_path = os.path.realpath( - os.path.abspath(os.path.expanduser(self.watch.path)) - ) + self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path))) def on_thread_stop(self): _fsevents.remove_watch(self.watch) @@ -107,9 +105,7 @@ def queue_event(self, event): logger.debug("drop event %s", event) def _is_recursive_event(self, event): - src_path = ( - event.src_path if event.is_directory else os.path.dirname(event.src_path) - ) + src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path) if src_path == self._absolute_watch_path: return False @@ -136,9 +132,7 @@ def _queue_modified_event(self, event, src_path, dirname): cls = DirModifiedEvent if event.is_directory else FileModifiedEvent self.queue_event(cls(src_path)) - def _queue_renamed_event( - self, src_event, src_path, dst_path, src_dirname, dst_dirname - ): + def _queue_renamed_event(self, src_event, src_path, dst_path, src_dirname, dst_dirname): cls = DirMovedEvent if src_event.is_directory else FileMovedEvent dst_path = self._encode_path(dst_path) self.queue_event(cls(src_path, dst_path)) @@ -170,9 +164,7 @@ def _is_meta_mod(event): def queue_events(self, timeout, events): if logger.getEffectiveLevel() <= logging.DEBUG: for event in events: - flags = ", ".join( - attr for attr in dir(event) if getattr(event, attr) is True - ) + flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True) logger.debug(f"{event}: {flags}") if time.monotonic() - self._start_time > 60: @@ -238,9 +230,7 @@ def queue_events(self, timeout, events): if event.is_renamed: # Check if we have a corresponding destination event in the watched path. dst_event = next( - iter( - e for e in events if e.is_renamed and e.inode == event.inode - ), + iter(e for e in events if e.is_renamed and e.inode == event.inode), None, ) @@ -251,9 +241,7 @@ def queue_events(self, timeout, events): dst_path = self._encode_path(dst_event.path) dst_dirname = os.path.dirname(dst_path) - self._queue_renamed_event( - event, src_path, dst_path, src_dirname, dst_dirname - ) + self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname) self._fs_view.add(event.inode) for sub_event in generate_sub_moved_events(src_path, dst_path): diff --git a/src/watchdog/observers/fsevents2.py b/src/watchdog/observers/fsevents2.py index 5710b759..49cfc4b2 100644 --- a/src/watchdog/observers/fsevents2.py +++ b/src/watchdog/observers/fsevents2.py @@ -106,9 +106,7 @@ def __init__(self, path): def run(self): pool = AppKit.NSAutoreleasePool.alloc().init() self._run_loop = CFRunLoopGetCurrent() - FSEventStreamScheduleWithRunLoop( - self._stream_ref, self._run_loop, kCFRunLoopDefaultMode - ) + FSEventStreamScheduleWithRunLoop(self._stream_ref, self._run_loop, kCFRunLoopDefaultMode) if not FSEventStreamStart(self._stream_ref): FSEventStreamInvalidate(self._stream_ref) FSEventStreamRelease(self._stream_ref) @@ -126,13 +124,8 @@ def stop(self): if self._run_loop is not None: CFRunLoopStop(self._run_loop) - def _callback( - self, streamRef, clientCallBackInfo, numEvents, eventPaths, eventFlags, eventIDs - ): - events = [ - NativeEvent(path, flags, _id) - for path, flags, _id in zip(eventPaths, eventFlags, eventIDs) - ] + def _callback(self, streamRef, clientCallBackInfo, numEvents, eventPaths, eventFlags, eventIDs): + events = [NativeEvent(path, flags, _id) for path, flags, _id in zip(eventPaths, eventFlags, eventIDs)] logger.debug(f"FSEvents callback. Got {numEvents} events:") for e in events: logger.debug(e) @@ -219,17 +212,11 @@ def queue_events(self, timeout): # from a single move operation. (None of this is documented!) # Otherwise, guess whether file was moved in or out. # TODO: handle id wrapping - if ( - i + 1 < len(events) - and events[i + 1].is_renamed - and events[i + 1].event_id == event.event_id + 1 - ): + if i + 1 < len(events) and events[i + 1].is_renamed and events[i + 1].event_id == event.event_id + 1: cls = DirMovedEvent if event.is_directory else FileMovedEvent self.queue_event(cls(event.path, events[i + 1].path)) self.queue_event(DirModifiedEvent(os.path.dirname(event.path))) - self.queue_event( - DirModifiedEvent(os.path.dirname(events[i + 1].path)) - ) + self.queue_event(DirModifiedEvent(os.path.dirname(events[i + 1].path))) i += 1 elif os.path.exists(event.path): cls = DirCreatedEvent if event.is_directory else FileCreatedEvent diff --git a/src/watchdog/observers/inotify.py b/src/watchdog/observers/inotify.py index 4fdeaa28..e4b6a678 100644 --- a/src/watchdog/observers/inotify.py +++ b/src/watchdog/observers/inotify.py @@ -155,7 +155,7 @@ def queue_events(self, timeout, full_events=False): if event.is_moved_to: if full_events: cls = DirMovedEvent if event.is_directory else FileMovedEvent - self.queue_event(cls(None, src_path)) + self.queue_event(cls("", src_path)) else: cls = DirCreatedEvent if event.is_directory else FileCreatedEvent self.queue_event(cls(src_path)) @@ -175,7 +175,7 @@ def queue_events(self, timeout, full_events=False): self.queue_event(DirModifiedEvent(os.path.dirname(src_path))) elif event.is_moved_from and full_events: cls = DirMovedEvent if event.is_directory else FileMovedEvent - self.queue_event(cls(src_path, None)) + self.queue_event(cls(src_path, "")) self.queue_event(DirModifiedEvent(os.path.dirname(src_path))) elif event.is_create: cls = DirCreatedEvent if event.is_directory else FileCreatedEvent diff --git a/src/watchdog/observers/inotify_buffer.py b/src/watchdog/observers/inotify_buffer.py index 80303daa..719a8798 100644 --- a/src/watchdog/observers/inotify_buffer.py +++ b/src/watchdog/observers/inotify_buffer.py @@ -59,11 +59,7 @@ def _group_events(self, event_list): logger.debug("in-event %s", inotify_event) def matching_from_event(event): - return ( - not isinstance(event, tuple) - and event.is_moved_from - and event.cookie == inotify_event.cookie - ) + return not isinstance(event, tuple) and event.is_moved_from and event.cookie == inotify_event.cookie if inotify_event.is_moved_to: # Check if move_from is already in the buffer @@ -104,9 +100,7 @@ def run(self): continue # Only add delay for unmatched move_from events - delay = ( - not isinstance(inotify_event, tuple) and inotify_event.is_moved_from - ) + delay = not isinstance(inotify_event, tuple) and inotify_event.is_moved_from self._queue.put(inotify_event, delay) if ( diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index bec6690d..1a42a6ba 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -28,20 +28,12 @@ libc = ctypes.CDLL(None) -if ( - not hasattr(libc, "inotify_init") - or not hasattr(libc, "inotify_add_watch") - or not hasattr(libc, "inotify_rm_watch") -): +if not hasattr(libc, "inotify_init") or not hasattr(libc, "inotify_add_watch") or not hasattr(libc, "inotify_rm_watch"): raise UnsupportedLibc(f"Unsupported libc version found: {libc._name}") -inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)( - ("inotify_add_watch", libc) -) +inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)(("inotify_add_watch", libc)) -inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)( - ("inotify_rm_watch", libc) -) +inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)(("inotify_rm_watch", libc)) inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(("inotify_init", libc)) @@ -320,9 +312,7 @@ def _recursive_simulate(src_path): if wd == -1: continue wd_path = self._path_for_wd[wd] - src_path = ( - os.path.join(wd_path, name) if name else wd_path - ) # avoid trailing slash + src_path = os.path.join(wd_path, name) if name else wd_path # avoid trailing slash inotify_event = InotifyEvent(wd, mask, cookie, name, src_path) if inotify_event.is_moved_from: @@ -336,13 +326,9 @@ def _recursive_simulate(src_path): self._path_for_wd[moved_wd] = inotify_event.src_path if self.is_recursive: for _path, _wd in self._wd_for_path.copy().items(): - if _path.startswith( - move_src_path + os.path.sep.encode() - ): + if _path.startswith(move_src_path + os.path.sep.encode()): moved_wd = self._wd_for_path.pop(_path) - _move_to_path = _path.replace( - move_src_path, inotify_event.src_path - ) + _move_to_path = _path.replace(move_src_path, inotify_event.src_path) self._wd_for_path[_move_to_path] = moved_wd self._path_for_wd[moved_wd] = _move_to_path src_path = os.path.join(wd_path, name) @@ -356,11 +342,7 @@ def _recursive_simulate(src_path): event_list.append(inotify_event) - if ( - self.is_recursive - and inotify_event.is_directory - and inotify_event.is_create - ): + if self.is_recursive and inotify_event.is_directory and inotify_event.is_create: # TODO: When a directory from another part of the # filesystem is moved into a watched directory, this # will not generate events for the directory tree. @@ -561,11 +543,7 @@ def is_directory(self): # It looks like the kernel does not provide this information for # IN_DELETE_SELF and IN_MOVE_SELF. In this case, assume it's a dir. # See also: https://github.com/seb-m/pyinotify/blob/2c7e8f8/python2/pyinotify.py#L897 - return ( - self.is_delete_self - or self.is_move_self - or self._mask & InotifyConstants.IN_ISDIR > 0 - ) + return self.is_delete_self or self.is_move_self or self._mask & InotifyConstants.IN_ISDIR > 0 @property def key(self): diff --git a/src/watchdog/observers/kqueue.py b/src/watchdog/observers/kqueue.py index 152ba619..070b9332 100644 --- a/src/watchdog/observers/kqueue.py +++ b/src/watchdog/observers/kqueue.py @@ -431,9 +431,7 @@ class KqueueEmitter(EventEmitter): :param stat: stat function. See ``os.stat`` for details. """ - def __init__( - self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, stat=os.stat - ): + def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, stat=os.stat): super().__init__(event_queue, watch, timeout) self._kq = select.kqueue() @@ -447,9 +445,7 @@ def custom_stat(path, self=self): self._register_kevent(path, S_ISDIR(stat_info.st_mode)) return stat_info - self._snapshot = DirectorySnapshot( - watch.path, recursive=watch.is_recursive, stat=custom_stat - ) + self._snapshot = DirectorySnapshot(watch.path, recursive=watch.is_recursive, stat=custom_stat) def _register_kevent(self, path, is_directory): """ @@ -543,9 +539,7 @@ def _gen_kqueue_events(self, kev, ref_snapshot, new_snapshot): # Kqueue does not specify the destination names for renames # to, so we have to process these using the a snapshot # of the directory. - for event in self._gen_renamed_events( - src_path, descriptor.is_directory, ref_snapshot, new_snapshot - ): + for event in self._gen_renamed_events(src_path, descriptor.is_directory, ref_snapshot, new_snapshot): yield event elif is_attrib_modified(kev): if descriptor.is_directory: @@ -656,9 +650,7 @@ def queue_events(self, timeout): # Take a fresh snapshot of the directory and update the # saved snapshot. - new_snapshot = DirectorySnapshot( - self.watch.path, self.watch.is_recursive - ) + new_snapshot = DirectorySnapshot(self.watch.path, self.watch.is_recursive) ref_snapshot = self._snapshot self._snapshot = new_snapshot diff_events = new_snapshot - ref_snapshot @@ -672,9 +664,7 @@ def queue_events(self, timeout): self.queue_event(FileModifiedEvent(file_modified)) for kev in event_list: - for event in self._gen_kqueue_events( - kev, ref_snapshot, new_snapshot - ): + for event in self._gen_kqueue_events(kev, ref_snapshot, new_snapshot): self.queue_event(event) except OSError as e: diff --git a/src/watchdog/observers/read_directory_changes.py b/src/watchdog/observers/read_directory_changes.py index e0bb073e..8183c182 100644 --- a/src/watchdog/observers/read_directory_changes.py +++ b/src/watchdog/observers/read_directory_changes.py @@ -99,19 +99,13 @@ def queue_events(self, timeout): # TODO: Come up with a better solution, possibly # a way to wait for I/O to complete before # queuing events. - for sub_moved_event in generate_sub_moved_events( - src_path, dest_path - ): + for sub_moved_event in generate_sub_moved_events(src_path, dest_path): self.queue_event(sub_moved_event) self.queue_event(event) else: self.queue_event(FileMovedEvent(src_path, dest_path)) elif winapi_event.is_modified: - cls = ( - DirModifiedEvent - if os.path.isdir(src_path) - else FileModifiedEvent - ) + cls = DirModifiedEvent if os.path.isdir(src_path) else FileModifiedEvent self.queue_event(cls(src_path)) elif winapi_event.is_added: isdir = os.path.isdir(src_path) diff --git a/src/watchdog/observers/winapi.py b/src/watchdog/observers/winapi.py index 13cc0329..a4956c11 100644 --- a/src/watchdog/observers/winapi.py +++ b/src/watchdog/observers/winapi.py @@ -37,6 +37,7 @@ from __future__ import annotations import sys +from dataclasses import dataclass from functools import reduce assert sys.platform.startswith("win"), f"{__name__} requires Windows" @@ -311,9 +312,7 @@ def _is_observed_path_deleted(handle, path): def _generate_observed_path_deleted_event(): # Create synthetic event for notify that observed directory is deleted path = ctypes.create_unicode_buffer(".") - event = FILE_NOTIFY_INFORMATION( - 0, FILE_ACTION_DELETED_SELF, len(path), path.value.encode("utf-8") - ) + event = FILE_NOTIFY_INFORMATION(0, FILE_ACTION_DELETED_SELF, len(path), path.value.encode("utf-8")) event_size = ctypes.sizeof(event) buff = ctypes.create_string_buffer(PATH_BUFFER_SIZE) ctypes.memmove(buff, ctypes.addressof(event), event_size) @@ -375,10 +374,10 @@ def read_directory_changes(handle, path, recursive): return event_buffer.raw, int(nbytes.value) +@dataclass(unsafe_hash=True) class WinAPINativeEvent: - def __init__(self, action, src_path): - self.action = action - self.src_path = src_path + action: int + src_path: str @property def is_added(self): @@ -404,11 +403,6 @@ def is_renamed_new(self): def is_removed_self(self): return self.action == FILE_ACTION_REMOVED_SELF - def __repr__(self): - return ( - f"<{type(self).__name__}: action={self.action}, src_path={self.src_path!r}>" - ) - def read_events(handle, path, recursive): buf, nbytes = read_directory_changes(handle, path, recursive) diff --git a/src/watchdog/tricks/__init__.py b/src/watchdog/tricks/__init__.py index 90bb8a50..65bf074a 100644 --- a/src/watchdog/tricks/__init__.py +++ b/src/watchdog/tricks/__init__.py @@ -155,9 +155,7 @@ def on_any_event(self, event): process_watcher.start() def is_process_running(self): - return self._process_watchers or ( - self.process is not None and self.process.poll() is None - ) + return self._process_watchers or (self.process is not None and self.process.poll() is None) class AutoRestartTrick(Trick): @@ -240,9 +238,7 @@ def _start_process(self): return # windows doesn't have setsid - self.process = subprocess.Popen( - self.command, preexec_fn=getattr(os, "setsid", None) - ) + self.process = subprocess.Popen(self.command, preexec_fn=getattr(os, "setsid", None)) if self.restart_on_command_exit: self.process_watcher = ProcessWatcher(self.process, self._restart_process) self.process_watcher.start() diff --git a/src/watchdog/utils/__init__.py b/src/watchdog/utils/__init__.py index b2ad79f2..aea5f4e0 100644 --- a/src/watchdog/utils/__init__.py +++ b/src/watchdog/utils/__init__.py @@ -122,9 +122,7 @@ def load_class(dotted_path): """ dotted_path_split = dotted_path.split(".") if len(dotted_path_split) <= 1: - raise ValueError( - f"Dotted module path {dotted_path} must contain a module name and a classname" - ) + raise ValueError(f"Dotted module path {dotted_path} must contain a module name and a classname") klass_name = dotted_path_split[-1] module_name = ".".join(dotted_path_split[:-1]) @@ -134,9 +132,7 @@ def load_class(dotted_path): # Finally create and return an instance of the class # return klass(*args, **kwargs) else: - raise AttributeError( - f"Module {module_name} does not have class attribute {klass_name}" - ) + raise AttributeError(f"Module {module_name} does not have class attribute {klass_name}") if TYPE_CHECKING or sys.version_info >= (3, 8): diff --git a/src/watchdog/utils/dirsnapshot.py b/src/watchdog/utils/dirsnapshot.py index b8ec5906..c1ff0331 100644 --- a/src/watchdog/utils/dirsnapshot.py +++ b/src/watchdog/utils/dirsnapshot.py @@ -121,15 +121,11 @@ def get_inode(directory, full_path): modified = set() for path in ref.paths & snapshot.paths: if get_inode(ref, path) == get_inode(snapshot, path): - if ref.mtime(path) != snapshot.mtime(path) or ref.size( - path - ) != snapshot.size(path): + if ref.mtime(path) != snapshot.mtime(path) or ref.size(path) != snapshot.size(path): modified.add(path) for old_path, new_path in moved: - if ref.mtime(old_path) != snapshot.mtime(new_path) or ref.size( - old_path - ) != snapshot.size(new_path): + if ref.mtime(old_path) != snapshot.mtime(new_path) or ref.size(old_path) != snapshot.size(new_path): modified.add(old_path) self._dirs_created = [path for path in created if snapshot.isdir(path)] diff --git a/src/watchdog/utils/echo.py b/src/watchdog/utils/echo.py index ab34134d..2e4ade99 100644 --- a/src/watchdog/utils/echo.py +++ b/src/watchdog/utils/echo.py @@ -49,11 +49,7 @@ def is_classmethod(instancemethod, klass): def is_static_method(method, klass): """Returns True if method is an instance method of klass.""" return next( - ( - isinstance(c.__dict__[name(method)], staticmethod) - for c in klass.mro() - if name(method) in c.__dict__ - ), + (isinstance(c.__dict__[name(method)], staticmethod) for c in klass.mro() if name(method) in c.__dict__), False, ) @@ -107,9 +103,7 @@ def wrapped(*v, **k): # Collect function arguments by chaining together positional, # defaulted, extra positional and keyword arguments. positional = list(map(format_arg_value, list(zip(argnames, v)))) - defaulted = [ - format_arg_value((a, argdefs[a])) for a in argnames[len(v) :] if a not in k - ] + defaulted = [format_arg_value((a, argdefs[a])) for a in argnames[len(v) :] if a not in k] nameless = list(map(repr, v[argcount:])) keyword = list(map(format_arg_value, list(k.items()))) args = positional + defaulted + nameless + keyword diff --git a/src/watchdog/utils/patterns.py b/src/watchdog/utils/patterns.py index a13baf1d..0785c5cd 100644 --- a/src/watchdog/utils/patterns.py +++ b/src/watchdog/utils/patterns.py @@ -27,17 +27,11 @@ def _match_path(path, included_patterns, excluded_patterns, case_sensitive): common_patterns = included_patterns & excluded_patterns if common_patterns: - raise ValueError( - "conflicting patterns `{}` included and excluded".format(common_patterns) - ) - return any(path.match(p) for p in included_patterns) and not any( - path.match(p) for p in excluded_patterns - ) + raise ValueError("conflicting patterns `{}` included and excluded".format(common_patterns)) + return any(path.match(p) for p in included_patterns) and not any(path.match(p) for p in excluded_patterns) -def filter_paths( - paths, included_patterns=None, excluded_patterns=None, case_sensitive=True -): +def filter_paths(paths, included_patterns=None, excluded_patterns=None, case_sensitive=True): """ Filters from a set of paths based on acceptable patterns and ignorable patterns. @@ -65,9 +59,7 @@ def filter_paths( yield path -def match_any_paths( - paths, included_patterns=None, excluded_patterns=None, case_sensitive=True -): +def match_any_paths(paths, included_patterns=None, excluded_patterns=None, case_sensitive=True): """ Matches from a set of paths based on acceptable patterns and ignorable patterns. diff --git a/src/watchdog/watchmedo.py b/src/watchdog/watchmedo.py index 589235cf..997de393 100755 --- a/src/watchdog/watchmedo.py +++ b/src/watchdog/watchmedo.py @@ -97,17 +97,11 @@ def command(args=[], parent=subparsers, cmd_aliases=[]): def decorator(func): name = func.__name__.replace("_", "-") desc = dedent(func.__doc__) - parser = parent.add_parser( - name, description=desc, aliases=cmd_aliases, formatter_class=HelpFormatter - ) + parser = parent.add_parser(name, description=desc, aliases=cmd_aliases, formatter_class=HelpFormatter) command_parsers[name] = parser verbosity_group = parser.add_mutually_exclusive_group() - verbosity_group.add_argument( - "-q", "--quiet", dest="verbosity", action="append_const", const=-1 - ) - verbosity_group.add_argument( - "-v", "--verbose", dest="verbosity", action="append_const", const=1 - ) + verbosity_group.add_argument("-q", "--quiet", dest="verbosity", action="append_const", const=-1) + verbosity_group.add_argument("-v", "--verbose", dest="verbosity", action="append_const", const=1) for arg in args: parser.add_argument(*arg[0], **arg[1]) parser.set_defaults(func=func) @@ -237,9 +231,7 @@ def schedule_tricks(observer, tricks, pathname, recursive): default=True, help="Recursively monitor paths (defaults to True).", ), - argument( - "--debug-force-polling", action="store_true", help="[debug] Forces polling." - ), + argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), argument( "--debug-force-kqueue", action="store_true", @@ -296,9 +288,7 @@ def tricks_from(args): try: tricks = config[CONFIG_KEY_TRICKS] except KeyError: - raise KeyError( - f"No {CONFIG_KEY_TRICKS!r} key specified in {tricks_file!r}." - ) + raise KeyError(f"No {CONFIG_KEY_TRICKS!r} key specified in {tricks_file!r}.") if CONFIG_KEY_PYTHON_PATH in config: add_to_sys_path(config[CONFIG_KEY_PYTHON_PATH]) @@ -429,12 +419,8 @@ def tricks_generate_yaml(args): type=float, help="Use this as the polling interval/blocking timeout.", ), - argument( - "--trace", action="store_true", help="Dumps complete dispatching trace." - ), - argument( - "--debug-force-polling", action="store_true", help="[debug] Forces polling." - ), + argument("--trace", action="store_true", help="Dumps complete dispatching trace."), + argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), argument( "--debug-force-kqueue", action="store_true", @@ -576,9 +562,7 @@ def log(args): help="Ignore events that occur while command is still being" " executed to avoid multiple simultaneous instances.", ), - argument( - "--debug-force-polling", action="store_true", help="[debug] Forces polling." - ), + argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), ] ) def shell_command(args): @@ -629,8 +613,7 @@ def shell_command(args): dest="directories", metavar="DIRECTORY", action="append", - help="Directory to watch. Use another -d or --directory option " - "for each directory.", + help="Directory to watch. Use another -d or --directory option " "for each directory.", ), argument( "-p", @@ -677,16 +660,13 @@ def shell_command(args): default="SIGINT", help="Stop the subprocess with this signal (default SIGINT).", ), - argument( - "--debug-force-polling", action="store_true", help="[debug] Forces polling." - ), + argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), argument( "--kill-after", dest="kill_after", default=10.0, type=float, - help="When stopping, kill the subprocess after the specified timeout " - "in seconds (default 10.0).", + help="When stopping, kill the subprocess after the specified timeout " "in seconds (default 10.0).", ), argument( "--debounce-interval", diff --git a/tests/conftest.py b/tests/conftest.py index 1011efd6..b79f7ebc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,9 +29,7 @@ def no_thread_leaks(): old_thread_count = threading.active_count() yield gc.collect() # Clear the stuff from other function-level fixtures - assert ( - threading.active_count() == old_thread_count - ) # Only previously existing threads + assert threading.active_count() == old_thread_count # Only previously existing threads @pytest.fixture(autouse=True) diff --git a/tests/markers.py b/tests/markers.py index 88e19fe5..66291fca 100644 --- a/tests/markers.py +++ b/tests/markers.py @@ -4,6 +4,4 @@ import pytest -cpython_only = pytest.mark.skipif( - python_implementation() != "CPython", reason="CPython only." -) +cpython_only = pytest.mark.skipif(python_implementation() != "CPython", reason="CPython only.") diff --git a/tests/test_0_watchmedo.py b/tests/test_0_watchmedo.py index a38f7696..c9fc5369 100644 --- a/tests/test_0_watchmedo.py +++ b/tests/test_0_watchmedo.py @@ -58,10 +58,7 @@ def test_load_config_invalid(tmpdir): def make_dummy_script(tmpdir, n=10): script = os.path.join(tmpdir, f"auto-test-{n}.py") with open(script, "w") as f: - f.write( - 'import time\nfor i in range(%d):\n\tprint("+++++ %%d" %% i, flush=True)\n\ttime.sleep(1)\n' - % n - ) + f.write('import time\nfor i in range(%d):\n\tprint("+++++ %%d" %% i, flush=True)\n\ttime.sleep(1)\n' % n) return script @@ -196,9 +193,7 @@ def test_auto_restart_subprocess_termination(tmpdir, capfd, restart_on_command_e After 5 seconds, expect it to have been restarted at least once. """ script = make_dummy_script(tmpdir, n=2) - trick = AutoRestartTrick( - [sys.executable, script], restart_on_command_exit=restart_on_command_exit - ) + trick = AutoRestartTrick([sys.executable, script], restart_on_command_exit=restart_on_command_exit) trick.start() time.sleep(5) trick.stop() @@ -212,9 +207,7 @@ def test_auto_restart_subprocess_termination(tmpdir, capfd, restart_on_command_e def test_auto_restart_arg_parsing_basic(): - args = watchmedo.cli.parse_args( - ["auto-restart", "-d", ".", "--recursive", "--debug-force-polling", "cmd"] - ) + args = watchmedo.cli.parse_args(["auto-restart", "-d", ".", "--recursive", "--debug-force-polling", "cmd"]) assert args.func is watchmedo.auto_restart assert args.command == "cmd" assert args.directories == ["."] diff --git a/tests/test_emitter.py b/tests/test_emitter.py index 6b3f54f4..ef29182b 100644 --- a/tests/test_emitter.py +++ b/tests/test_emitter.py @@ -77,9 +77,7 @@ def test_create(p: P, event_queue: TestEventQueue, start_watching: StartWatching @pytest.mark.xfail(reason="known to be problematic") -@pytest.mark.skipif( - not platform.is_linux(), reason="FileCloseEvent only supported in GNU/Linux" -) +@pytest.mark.skipif(not platform.is_linux(), reason="FileCloseEvent only supported in GNU/Linux") @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) def test_close(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: f_d = open(p("a"), "a") @@ -246,9 +244,7 @@ def test_move_to(p: P, start_watching: StartWatching, expect_event: ExpectEvent) expect_event(DirModifiedEvent(p("dir2"))) -@pytest.mark.skipif( - not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux" -) +@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux") def test_move_to_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: mkdir(p("dir1")) mkdir(p("dir2")) @@ -259,7 +255,7 @@ def test_move_to_full(p: P, event_queue: TestEventQueue, start_watching: StartWa event = event_queue.get(timeout=5)[0] assert isinstance(event, FileMovedEvent) assert event.dest_path == p("dir2", "b") - assert event.src_path is None # Should equal None since the path was not watched + assert event.src_path == "" # Should be blank since the path was not watched @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) @@ -277,9 +273,7 @@ def test_move_from(p: P, start_watching: StartWatching, expect_event: ExpectEven expect_event(DirModifiedEvent(p("dir1"))) -@pytest.mark.skipif( - not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux" -) +@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux") def test_move_from_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: mkdir(p("dir1")) mkdir(p("dir2")) @@ -290,7 +284,7 @@ def test_move_from_full(p: P, event_queue: TestEventQueue, start_watching: Start event = event_queue.get(timeout=5)[0] assert isinstance(event, FileMovedEvent) assert event.src_path == p("dir1", "a") - assert event.dest_path is None # Should equal None since path not watched + assert event.dest_path == "" # Should be blank since path not watched @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) @@ -321,9 +315,7 @@ def test_separate_consecutive_moves(p: P, start_watching: StartWatching, expect_ @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -@pytest.mark.skipif( - platform.is_bsd(), reason="BSD create another set of events for this test" -) +@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test") def test_delete_self(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) emitter = start_watching(p("dir1")) @@ -360,9 +352,7 @@ def test_fast_subdirectory_creation_deletion(p: P, event_queue: TestEventQueue, count[etype] += 1 assert event.src_path == etype_for_dir[etype] assert count[DirCreatedEvent] >= count[DirDeletedEvent] - assert ( - count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent] - ) + assert count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent] assert count == { DirCreatedEvent: times, DirModifiedEvent: times * 2, @@ -448,9 +438,7 @@ def test_recursive_off( event_queue.get(timeout=5) mkdir(p("dir3")) - expect_event( - DirModifiedEvent(p()) - ) # the contents of the parent directory changed + expect_event(DirModifiedEvent(p())) # the contents of the parent directory changed mv(p("dir1", "dir2", "somefile"), p("somefile")) expect_event(FileMovedEvent(p("dir1", "dir2", "somefile"), p("somefile"))) @@ -461,9 +449,7 @@ def test_recursive_off( expect_event(DirModifiedEvent(p())) -@pytest.mark.skipif( - platform.is_windows(), reason="Windows create another set of events for this test" -) +@pytest.mark.skipif(platform.is_windows(), reason="Windows create another set of events for this test") def test_renaming_top_level_directory( p: P, event_queue: TestEventQueue, @@ -485,7 +471,7 @@ def test_renaming_top_level_directory( expect_event(DirModifiedEvent(p())) expect_event(DirModifiedEvent(p())) - expect_event(DirMovedEvent(p("a", "b"), p("a2", "b"))) + expect_event(DirMovedEvent(p("a", "b"), p("a2", "b"), is_synthetic=True)) if platform.is_bsd(): expect_event(DirModifiedEvent(p())) @@ -568,14 +554,7 @@ def test_renaming_top_level_directory_on_windows( if event_queue.empty(): break - assert all( - [ - isinstance( - e, (FileCreatedEvent, FileMovedEvent, DirMovedEvent, DirModifiedEvent) - ) - for e in events - ] - ) + assert all(isinstance(e, (FileCreatedEvent, FileMovedEvent, DirMovedEvent, DirModifiedEvent)) for e in events) for event in events: if isinstance(event, FileCreatedEvent): @@ -590,9 +569,7 @@ def test_renaming_top_level_directory_on_windows( assert event.src_path == p("a2", "b") -@pytest.mark.skipif( - platform.is_windows(), reason="Windows create another set of events for this test" -) +@pytest.mark.skipif(platform.is_windows(), reason="Windows create another set of events for this test") def test_move_nested_subdirectories( p: P, event_queue: TestEventQueue, @@ -608,8 +585,8 @@ def test_move_nested_subdirectories( expect_event(DirModifiedEvent(p("dir1"))) expect_event(DirModifiedEvent(p())) - expect_event(DirMovedEvent(p("dir1", "dir2", "dir3"), p("dir2", "dir3"))) - expect_event(FileMovedEvent(p("dir1", "dir2", "dir3", "a"), p("dir2", "dir3", "a"))) + expect_event(DirMovedEvent(p("dir1", "dir2", "dir3"), p("dir2", "dir3"), is_synthetic=True)) + expect_event(FileMovedEvent(p("dir1", "dir2", "dir3", "a"), p("dir2", "dir3", "a"), is_synthetic=True)) if platform.is_bsd(): event = event_queue.get(timeout=5)[0] @@ -681,9 +658,7 @@ def test_move_nested_subdirectories_on_windows( @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -@pytest.mark.skipif( - platform.is_bsd(), reason="BSD create another set of events for this test" -) +@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test") def test_file_lifecyle(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: start_watching() diff --git a/tests/test_events.py b/tests/test_events.py index 516aaf2a..3afe5b2e 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -15,8 +15,6 @@ from __future__ import annotations -import pytest - from watchdog.events import ( EVENT_TYPE_CLOSED, EVENT_TYPE_CREATED, @@ -34,7 +32,6 @@ FileModifiedEvent, FileMovedEvent, FileOpenedEvent, - FileSystemEvent, FileSystemEventHandler, ) @@ -183,58 +180,6 @@ def on_opened(self, event): handler.dispatch(event) -@pytest.mark.parametrize( - "event, prez", - [ - # Files - ( - FileDeletedEvent("a"), - "", - ), - ( - FileModifiedEvent("a"), - "", - ), - ( - FileCreatedEvent("a"), - "", - ), - ( - FileMovedEvent("a", "b"), - "", - ), - ( - FileClosedEvent("a"), - "", - ), - ( - FileOpenedEvent("a"), - "", - ), - # Folders - ( - DirDeletedEvent("a"), - "", - ), - ( - DirModifiedEvent("a"), - "", - ), - ( - DirCreatedEvent("a"), - "", - ), - ( - DirMovedEvent("a", "b"), - "", - ), - ], -) -def test_event_repr_and_str(event: FileSystemEvent, prez: str) -> None: - assert repr(event) == prez - assert str(event) == prez - - def test_event_comparison(): creation1 = FileCreatedEvent("foo") creation2 = FileCreatedEvent("foo") @@ -247,7 +192,7 @@ def test_event_comparison(): move2 = FileMovedEvent("a", "b") move3 = FileMovedEvent("a", "c") move4 = FileMovedEvent("b", "a") - assert creation1 != move1 + assert creation1 != move1 # type: ignore[comparison-overlap] assert move1 == move2 assert move1 != move3 assert move1 != move4 diff --git a/tests/test_fsevents.py b/tests/test_fsevents.py index bcec61a7..01e9fee2 100644 --- a/tests/test_fsevents.py +++ b/tests/test_fsevents.py @@ -55,16 +55,12 @@ def observer(): (_fsevents.NativeEvent("", 0, 0x00000800 | 0x00000200 | 0x00000100, 0), True), # renamed, removed, created, itemfindermod ( - _fsevents.NativeEvent( - "", 0, 0x00000800 | 0x00000200 | 0x00000100 | 0x00002000, 0 - ), + _fsevents.NativeEvent("", 0, 0x00000800 | 0x00000200 | 0x00000100 | 0x00002000, 0), True, ), # xattr, removed, modified, itemfindermod ( - _fsevents.NativeEvent( - "", 0, 0x00008000 | 0x00000200 | 0x00001000 | 0x00002000, 0 - ), + _fsevents.NativeEvent("", 0, 0x00008000 | 0x00000200 | 0x00001000 | 0x00002000, 0), False, ), ], @@ -119,9 +115,7 @@ def cb(*args): FSEventsEmitter.stop(emitter) orig(*args) - with caplog.at_level(logging.ERROR), patch.object( - FSEventsEmitter, "events_callback", new=cb - ): + with caplog.at_level(logging.ERROR), patch.object(FSEventsEmitter, "events_callback", new=cb): emitter = start_watching(tmpdir) # Less than 100 is not enough events to trigger the error for n in range(100): @@ -276,9 +270,7 @@ def done(self): cwd = os.getcwd() os.chdir(p()) - event_handler = TestEventHandler( - patterns=["*.json"], ignore_patterns=[], ignore_directories=True - ) + event_handler = TestEventHandler(patterns=["*.json"], ignore_patterns=[], ignore_directories=True) observer = Observer() observer.schedule(event_handler, ".") observer.start() @@ -329,9 +321,7 @@ def on_any_event(self, event): while not expected.issubset(handler.changes) and time.time() < timeout_at: time.sleep(0.2) - assert expected.issubset( - handler.changes - ), f"Did not find expected changes. Found: {handler.changes}" + assert expected.issubset(handler.changes), f"Did not find expected changes. Found: {handler.changes}" finally: for watch in watches: observer.unschedule(watch) diff --git a/tests/test_inotify_buffer.py b/tests/test_inotify_buffer.py index 6d2c1569..56738b60 100644 --- a/tests/test_inotify_buffer.py +++ b/tests/test_inotify_buffer.py @@ -119,7 +119,7 @@ def test_delete_watched_directory(p): @pytest.mark.timeout(5) -@pytest.mark.skipIf("GITHUB_REF" not in os.environ) +@pytest.mark.skipif("GITHUB_REF" not in os.environ, reason="sudo password prompt") def test_unmount_watched_directory_filesystem(p): mkdir(p("dir1")) mount_tmpfs(p("dir1")) @@ -156,9 +156,7 @@ def run(self, *args, **kwargs): return super().run(*args, **kwargs) -@pytest.mark.parametrize( - argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead] -) +@pytest.mark.parametrize(argnames="cls", argvalues=[InotifyBuffer, InotifyBufferDelayedRead]) def test_close_should_terminate_thread(p, cls): inotify = cls(p("").encode(), recursive=True) diff --git a/tests/test_inotify_c.py b/tests/test_inotify_c.py index e143fb50..f835db7c 100644 --- a/tests/test_inotify_c.py +++ b/tests/test_inotify_c.py @@ -45,12 +45,8 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, # CREATE DELETE CREATE DELETE DELETE_SELF IGNORE DELETE_SELF IGNORE inotify_fd.buf = ( - struct_inotify( - wd=1, mask=const.IN_CREATE | const.IN_ISDIR, length=16, name=b"subdir1" - ) - + struct_inotify( - wd=1, mask=const.IN_DELETE | const.IN_ISDIR, length=16, name=b"subdir1" - ) + struct_inotify(wd=1, mask=const.IN_CREATE | const.IN_ISDIR, length=16, name=b"subdir1") + + struct_inotify(wd=1, mask=const.IN_DELETE | const.IN_ISDIR, length=16, name=b"subdir1") ) * 2 + ( struct_inotify(wd=2, mask=const.IN_DELETE_SELF) + struct_inotify(wd=2, mask=const.IN_IGNORED) @@ -159,15 +155,9 @@ def test_event_equality(p: P) -> None: wd_parent_dir = 42 filename = "file.ext" full_path = p(filename) - event1 = InotifyEvent( - wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path - ) - event2 = InotifyEvent( - wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path - ) - event3 = InotifyEvent( - wd_parent_dir, InotifyConstants.IN_ACCESS, 0, filename, full_path - ) + event1 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path) + event2 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path) + event3 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_ACCESS, 0, filename, full_path) assert event1 == event2 assert event1 != event3 assert event2 != event3 diff --git a/tests/test_pattern_matching_event_handler.py b/tests/test_pattern_matching_event_handler.py index 03fbea60..db87f68b 100644 --- a/tests/test_pattern_matching_event_handler.py +++ b/tests/test_pattern_matching_event_handler.py @@ -142,12 +142,8 @@ def on_created(self, event): assert event.event_type == EVENT_TYPE_CREATED assert_patterns(event) - no_dirs_handler = TestableEventHandler( - patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=True - ) - handler = TestableEventHandler( - patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=False - ) + no_dirs_handler = TestableEventHandler(patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=True) + handler = TestableEventHandler(patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=False) for event in all_events: no_dirs_handler.dispatch(event) diff --git a/tests/test_patterns.py b/tests/test_patterns.py index f4e746ac..5ab46933 100644 --- a/tests/test_patterns.py +++ b/tests/test_patterns.py @@ -17,17 +17,12 @@ ("/users/gorakhargosh/foobar.py", {"*.py"}, {"*.PY"}, False, ValueError), ], ) -def test_match_path( - input, included_patterns, excluded_patterns, case_sensitive, expected -): +def test_match_path(input, included_patterns, excluded_patterns, case_sensitive, expected): if expected == ValueError: with pytest.raises(expected): _match_path(input, included_patterns, excluded_patterns, case_sensitive) else: - assert ( - _match_path(input, included_patterns, excluded_patterns, case_sensitive) - is expected - ) + assert _match_path(input, included_patterns, excluded_patterns, case_sensitive) is expected @pytest.mark.parametrize( @@ -50,9 +45,7 @@ def test_filter_paths(included_patterns, excluded_patterns, case_sensitive, expe "/etc/pdnsd.conf", "/usr/local/bin/python", } - actual = set( - filter_paths(pathnames, included_patterns, excluded_patterns, case_sensitive) - ) + actual = set(filter_paths(pathnames, included_patterns, excluded_patterns, case_sensitive)) assert actual == expected if expected else pathnames @@ -66,16 +59,11 @@ def test_filter_paths(included_patterns, excluded_patterns, case_sensitive, expe (["*.txt"], None, True, False), ], ) -def test_match_any_paths( - included_patterns, excluded_patterns, case_sensitive, expected -): +def test_match_any_paths(included_patterns, excluded_patterns, case_sensitive, expected): pathnames = { "/users/gorakhargosh/foobar.py", "/var/cache/pdnsd.status", "/etc/pdnsd.conf", "/usr/local/bin/python", } - assert ( - match_any_paths(pathnames, included_patterns, excluded_patterns, case_sensitive) - == expected - ) + assert match_any_paths(pathnames, included_patterns, excluded_patterns, case_sensitive) == expected diff --git a/tests/test_regex_matching_event_handler.py b/tests/test_regex_matching_event_handler.py index 66c4f942..f6ae68aa 100644 --- a/tests/test_regex_matching_event_handler.py +++ b/tests/test_regex_matching_event_handler.py @@ -140,12 +140,8 @@ def on_created(self, event): assert event.event_type == EVENT_TYPE_CREATED assert_regexes(self, event) - no_dirs_handler = TestableEventHandler( - regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=True - ) - handler = TestableEventHandler( - regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=False - ) + no_dirs_handler = TestableEventHandler(regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=True) + handler = TestableEventHandler(regexes=regexes, ignore_regexes=ignore_regexes, ignore_directories=False) for event in all_events: no_dirs_handler.dispatch(event) diff --git a/tests/utils.py b/tests/utils.py index 00dcf401..c70ad91b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -66,9 +66,7 @@ def start_watching( emitter: EventEmitter if sys.platform.startswith("linux") and use_full_emitter: - emitter = InotifyFullEmitter( - self.event_queue, ObservedWatch(path, recursive=recursive) - ) + emitter = InotifyFullEmitter(self.event_queue, ObservedWatch(path, recursive=recursive)) else: emitter = Emitter(self.event_queue, ObservedWatch(path, recursive=recursive)) @@ -78,6 +76,7 @@ def start_watching( # TODO: I think this could be better... .suppress_history should maybe # become a common attribute. from watchdog.observers.fsevents import FSEventsEmitter + assert isinstance(emitter, FSEventsEmitter) emitter.suppress_history = True