Skip to content

Commit

Permalink
Added event filter for the emitter (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
marioga committed May 14, 2023
1 parent 363fe62 commit 48c49a1
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 23 deletions.
40 changes: 32 additions & 8 deletions src/watchdog/observers/api.py
Expand Up @@ -43,14 +43,17 @@ class ObservedWatch:
Path string.
:param recursive:
``True`` if watch is recursive; ``False`` otherwise.
:param event_filter:
Optional collection of :class:`watchdog.events.FileSystemEvent` to watch
"""

def __init__(self, path, recursive):
def __init__(self, path, recursive, event_filter=None):
if isinstance(path, Path):
self._path = str(path)
else:
self._path = path
self._is_recursive = recursive
self._event_filter = frozenset(event_filter) if event_filter is not None else None

@property
def path(self):
Expand All @@ -62,9 +65,14 @@ def is_recursive(self):
"""Determines whether subdirectories are watched for the path."""
return self._is_recursive

@property
def event_filter(self):
"""Collection of event types watched for the path"""
return self._event_filter

@property
def key(self):
return self.path, self.is_recursive
return self.path, self.is_recursive, self.event_filter

def __eq__(self, watch):
return self.key == watch.key
Expand All @@ -76,7 +84,12 @@ def __hash__(self):
return hash(self.key)

def __repr__(self):
return f"<{type(self).__name__}: path={self.path!r}, is_recursive={self.is_recursive}>"
if self.event_filter is not None:
event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter))
event_filter_str = f", event_filter={event_filter_str}"
else:
event_filter_str = ""
return f"<{type(self).__name__}: path={self.path!r}, is_recursive={self.is_recursive}{event_filter_str}>"


# Observer classes
Expand All @@ -97,13 +110,18 @@ class EventEmitter(BaseThread):
Timeout (in seconds) between successive attempts at reading events.
:type timeout:
``float``
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
super().__init__()
self._event_queue = event_queue
self._watch = watch
self._timeout = timeout
self._event_filter = frozenset(event_filter) if event_filter is not None else None

@property
def timeout(self):
Expand All @@ -129,7 +147,8 @@ def queue_event(self, event):
An instance of :class:`watchdog.events.FileSystemEvent`
or a subclass.
"""
self._event_queue.put((event, self.watch))
if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter):
self._event_queue.put((event, self.watch))

def queue_events(self, timeout):
"""Override this method to populate the event queue with events
Expand Down Expand Up @@ -264,7 +283,7 @@ def start(self):
raise
super().start()

def schedule(self, event_handler, path, recursive=False):
def schedule(self, event_handler, path, recursive=False, event_filter=None):
"""
Schedules watching a path and calls appropriate methods specified
in the given event handler in response to file system events.
Expand All @@ -284,17 +303,22 @@ def schedule(self, event_handler, path, recursive=False):
traversed recursively; ``False`` otherwise.
:type recursive:
``bool``
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
:return:
An :class:`ObservedWatch` object instance representing
a watch.
"""
with self._lock:
watch = ObservedWatch(path, recursive)
watch = ObservedWatch(path, recursive, event_filter)
self._add_handler_for_watch(event_handler, watch)

# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
emitter = self._emitter_class(event_queue=self.event_queue, watch=watch, timeout=self.timeout)
emitter = self._emitter_class(event_queue=self.event_queue, watch=watch, timeout=self.timeout,
event_filter=event_filter)
if self.is_alive():
emitter.start()
self._add_emitter(emitter)
Expand Down
5 changes: 4 additions & 1 deletion src/watchdog/observers/fsevents.py
Expand Up @@ -62,6 +62,8 @@ class FSEventsEmitter(EventEmitter):
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:param suppress_history:
The FSEvents API may emit historic events up to 30 sec before the watch was
started. When ``suppress_history`` is ``True``, those events will be suppressed
Expand All @@ -77,9 +79,10 @@ def __init__(
event_queue,
watch,
timeout=DEFAULT_EMITTER_TIMEOUT,
event_filter=None,
suppress_history=False,
):
super().__init__(event_queue, watch, timeout)
super().__init__(event_queue, watch, timeout, event_filter)
self._fs_view = set()
self.suppress_history = suppress_history
self._start_time = 0.0
Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/fsevents2.py
Expand Up @@ -185,8 +185,8 @@ class FSEventsEmitter(EventEmitter):
FSEvents based event emitter. Handles conversion of native events.
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
super().__init__(event_queue, watch, timeout)
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
super().__init__(event_queue, watch, timeout, event_filter)
self._fsevents = FSEventsQueue(watch.path)
self._fsevents.start()

Expand Down
38 changes: 35 additions & 3 deletions src/watchdog/observers/inotify.py
Expand Up @@ -89,6 +89,7 @@
from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter

from .inotify_buffer import InotifyBuffer
from .inotify_c import InotifyConstants

logger = logging.getLogger(__name__)

Expand All @@ -107,16 +108,21 @@ class InotifyEmitter(EventEmitter):
Read events blocking timeout (in seconds).
:type timeout:
``float``
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
super().__init__(event_queue, watch, timeout)
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
super().__init__(event_queue, watch, timeout, event_filter)
self._lock = threading.Lock()
self._inotify = None

def on_thread_start(self):
path = os.fsencode(self.watch.path)
self._inotify = InotifyBuffer(path, self.watch.is_recursive)
event_mask = self.get_event_mask_from_filter()
self._inotify = InotifyBuffer(path, self.watch.is_recursive, event_mask)

def on_thread_stop(self):
if self._inotify:
Expand Down Expand Up @@ -202,6 +208,32 @@ def _decode_path(self, path):
return path
return os.fsdecode(path)

def get_event_mask_from_filter(self):
"""Optimization: Only include events we are filtering in inotify call"""
if self._event_filter is None:
return None

# always listen to delete self
event_mask = InotifyConstants.IN_DELETE_SELF
for cls in self._event_filter:
if cls in (DirMovedEvent, FileMovedEvent):
event_mask |= InotifyConstants.IN_MOVE
elif cls in (DirCreatedEvent, FileCreatedEvent):
event_mask |= InotifyConstants.IN_MOVE | InotifyConstants.IN_CREATE
elif cls is DirModifiedEvent:
event_mask |= (InotifyConstants.IN_MOVE | InotifyConstants.IN_ATTRIB |
InotifyConstants.IN_MODIFY | InotifyConstants.IN_CREATE |
InotifyConstants.IN_CLOSE_WRITE)
elif cls is FileModifiedEvent:
event_mask |= InotifyConstants.IN_ATTRIB | InotifyConstants.IN_MODIFY
elif cls in (DirDeletedEvent, FileDeletedEvent):
event_mask |= InotifyConstants.IN_DELETE
elif cls is FileClosedEvent:
event_mask |= InotifyConstants.IN_CLOSE
elif cls is FileOpenedEvent:
event_mask |= InotifyConstants.IN_OPEN
return event_mask


class InotifyFullEmitter(InotifyEmitter):
"""
Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/inotify_buffer.py
Expand Up @@ -31,10 +31,10 @@ class InotifyBuffer(BaseThread):

delay = 0.5

def __init__(self, path, recursive=False):
def __init__(self, path, recursive=False, event_mask=None):
super().__init__()
self._queue = DelayedQueue[InotifyEvent](self.delay)
self._inotify = Inotify(path, recursive)
self._inotify = Inotify(path, recursive, event_mask)
self.start()

def read_event(self):
Expand Down
5 changes: 4 additions & 1 deletion src/watchdog/observers/inotify_c.py
Expand Up @@ -152,7 +152,7 @@ class Inotify:
``True`` if subdirectories should be monitored; ``False`` otherwise.
"""

def __init__(self, path, recursive=False, event_mask=WATCHDOG_ALL_EVENTS):
def __init__(self, path, recursive=False, event_mask=None):
# The file descriptor associated with the inotify instance.
inotify_fd = inotify_init()
if inotify_fd == -1:
Expand All @@ -165,6 +165,9 @@ def __init__(self, path, recursive=False, event_mask=WATCHDOG_ALL_EVENTS):
self._path_for_wd = {}

self._path = path
# Default to all events
if event_mask is None:
event_mask = WATCHDOG_ALL_EVENTS
self._event_mask = event_mask
self._is_recursive = recursive
if os.path.isdir(path):
Expand Down
8 changes: 6 additions & 2 deletions src/watchdog/observers/kqueue.py
Expand Up @@ -428,11 +428,15 @@ class KqueueEmitter(EventEmitter):
Read events blocking timeout (in seconds).
:type timeout:
``float``
:param event_filter:
Collection of event types to emit, or None for no filtering (default).
:type event_filter:
Optional[Iterable[:class:`watchdog.events.FileSystemEvent`]]
:param stat: stat function. See ``os.stat`` for details.
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, stat=os.stat):
super().__init__(event_queue, watch, timeout)
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None, stat=os.stat):
super().__init__(event_queue, watch, timeout, event_filter)

self._kq = select.kqueue()
self._lock = threading.RLock()
Expand Down
3 changes: 2 additions & 1 deletion src/watchdog/observers/polling.py
Expand Up @@ -63,10 +63,11 @@ def __init__(
event_queue,
watch,
timeout=DEFAULT_EMITTER_TIMEOUT,
event_filter=None,
stat=os.stat,
listdir=os.scandir,
):
super().__init__(event_queue, watch, timeout)
super().__init__(event_queue, watch, timeout, event_filter)
self._snapshot = None
self._lock = threading.Lock()
self._take_snapshot = lambda: DirectorySnapshot(
Expand Down
4 changes: 2 additions & 2 deletions src/watchdog/observers/read_directory_changes.py
Expand Up @@ -49,8 +49,8 @@ class WindowsApiEmitter(EventEmitter):
to detect file system changes for a watch.
"""

def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
super().__init__(event_queue, watch, timeout)
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None):
super().__init__(event_queue, watch, timeout, event_filter)
self._lock = threading.Lock()
self._handle = None

Expand Down
7 changes: 6 additions & 1 deletion tests/test_observers_api.py
Expand Up @@ -20,7 +20,7 @@

import pytest

from watchdog.events import FileModifiedEvent, LoggingEventHandler
from watchdog.events import FileModifiedEvent, FileOpenedEvent, LoggingEventHandler
from watchdog.observers.api import BaseObserver, EventDispatcher, EventEmitter, EventQueue, ObservedWatch


Expand Down Expand Up @@ -57,6 +57,11 @@ def test_observer__repr__():
assert observed_watch.__repr__() == repr(observed_watch)
assert repr(observed_watch) == repr_str

observed_watch = ObservedWatch("/foobar", False, [FileOpenedEvent, FileModifiedEvent])
repr_str = "<ObservedWatch: path='/foobar', is_recursive=False, event_filter=FileModifiedEvent|FileOpenedEvent>"
assert observed_watch.__repr__() == repr(observed_watch)
assert repr(observed_watch) == repr_str


def test_event_emitter():
event_queue = EventQueue()
Expand Down

0 comments on commit 48c49a1

Please sign in to comment.