|
| 1 | +""" |
| 2 | +Listeners for aggregated keyboard and mouse events. |
| 3 | +
|
| 4 | +This is used for AFK detection on Linux, as well as used in aw-watcher-input to track input activity in general. |
| 5 | +
|
| 6 | +NOTE: Logging usage should be commented out before committed, for performance reasons. |
| 7 | +""" |
| 8 | + |
1 | 9 | import logging |
2 | | -from datetime import datetime, timedelta |
3 | 10 | import threading |
4 | | -from time import sleep |
5 | | - |
6 | | -from pykeyboard import PyKeyboardEvent |
7 | | -from pymouse import PyMouseEvent |
| 11 | +from abc import ABCMeta, abstractmethod |
| 12 | +from collections import defaultdict |
| 13 | +from typing import Dict, Any |
8 | 14 |
|
9 | 15 | logger = logging.getLogger(__name__) |
| 16 | +# logger.setLevel(logging.DEBUG) |
| 17 | + |
10 | 18 |
|
| 19 | +class EventFactory(metaclass=ABCMeta): |
| 20 | + def __init__(self) -> None: |
| 21 | + self.new_event = threading.Event() |
| 22 | + self._reset_data() |
| 23 | + |
| 24 | + @abstractmethod |
| 25 | + def _reset_data(self) -> None: |
| 26 | + self.event_data: Dict[str, Any] = {} |
11 | 27 |
|
12 | | -class EventFactory: |
13 | | - def next_event(self): |
| 28 | + def next_event(self) -> dict: |
14 | 29 | """Returns an event and prepares the internal state so that it can start to build a new event""" |
15 | | - raise NotImplementedError |
| 30 | + self.new_event.clear() |
| 31 | + data = self.event_data |
| 32 | + # self.logger.debug(f"Event: {data}") |
| 33 | + self._reset_data() |
| 34 | + return data |
16 | 35 |
|
17 | | - def has_new_event(self): |
18 | | - raise NotImplementedError |
| 36 | + def has_new_event(self) -> bool: |
| 37 | + return self.new_event.is_set() |
19 | 38 |
|
20 | 39 |
|
21 | | -class KeyboardListener(PyKeyboardEvent, EventFactory): |
| 40 | +class KeyboardListener(EventFactory): |
22 | 41 | def __init__(self): |
23 | | - PyKeyboardEvent.__init__(self) |
| 42 | + EventFactory.__init__(self) |
24 | 43 | self.logger = logger.getChild("keyboard") |
25 | | - # self.logger.setLevel(logging.DEBUG) |
26 | | - self.new_event = threading.Event() |
27 | | - self._reset_data() |
| 44 | + |
| 45 | + def start(self): |
| 46 | + from pynput import keyboard |
| 47 | + |
| 48 | + listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release) |
| 49 | + listener.start() |
28 | 50 |
|
29 | 51 | def _reset_data(self): |
30 | | - self.event_data = { |
31 | | - "presses": 0 |
32 | | - } |
| 52 | + self.event_data = {"presses": 0} |
33 | 53 |
|
34 | | - def tap(self, keycode, character, press): |
35 | | - # logging.debug("Clicked keycode: {}".format(keycode)) |
36 | | - self.logger.debug("Input received") |
| 54 | + def on_press(self, key): |
| 55 | + # self.logger.debug(f"Press: {key}") |
37 | 56 | self.event_data["presses"] += 1 |
38 | 57 | self.new_event.set() |
39 | 58 |
|
40 | | - def escape(self, event): |
41 | | - # Always returns False so that listening is never stopped |
42 | | - return False |
43 | | - |
44 | | - def next_event(self): |
45 | | - """Returns an event and prepares the internal state so that it can start to build a new event""" |
46 | | - self.new_event.clear() |
47 | | - data = self.event_data |
48 | | - self._reset_data() |
49 | | - return data |
50 | | - |
51 | | - def has_new_event(self): |
52 | | - return self.new_event.is_set() |
| 59 | + def on_release(self, key): |
| 60 | + # Don't count releases, only clicks |
| 61 | + # self.logger.debug(f"Release: {key}") |
| 62 | + pass |
53 | 63 |
|
54 | 64 |
|
55 | | -class MouseListener(PyMouseEvent, EventFactory): |
| 65 | +class MouseListener(EventFactory): |
56 | 66 | def __init__(self): |
57 | | - PyMouseEvent.__init__(self) |
| 67 | + EventFactory.__init__(self) |
58 | 68 | self.logger = logger.getChild("mouse") |
59 | | - self.logger.setLevel(logging.INFO) |
60 | | - self.new_event = threading.Event() |
61 | 69 | self.pos = None |
62 | | - self._reset_data() |
63 | 70 |
|
64 | 71 | def _reset_data(self): |
65 | | - self.event_data = { |
66 | | - "clicks": 0, |
67 | | - "deltaX": 0, |
68 | | - "deltaY": 0 |
69 | | - } |
70 | | - |
71 | | - def click(self, x, y, button, press): |
72 | | - # TODO: Differentiate between leftclick and rightclick? |
73 | | - if press: |
74 | | - self.logger.debug("Clicked mousebutton") |
75 | | - self.event_data["clicks"] += 1 |
76 | | - self.new_event.set() |
| 72 | + self.event_data = defaultdict(int) |
| 73 | + self.event_data.update( |
| 74 | + {"clicks": 0, "deltaX": 0, "deltaY": 0, "scrollX": 0, "scrollY": 0} |
| 75 | + ) |
77 | 76 |
|
78 | | - def move(self, x, y): |
| 77 | + def start(self): |
| 78 | + from pynput import mouse |
| 79 | + |
| 80 | + listener = mouse.Listener( |
| 81 | + on_move=self.on_move, on_click=self.on_click, on_scroll=self.on_scroll |
| 82 | + ) |
| 83 | + listener.start() |
| 84 | + |
| 85 | + def on_move(self, x, y): |
79 | 86 | newpos = (x, y) |
80 | | - #self.logger.debug("Moved mouse to: {},{}".format(x, y)) |
| 87 | + # self.logger.debug("Moved mouse to: {},{}".format(x, y)) |
81 | 88 | if not self.pos: |
82 | 89 | self.pos = newpos |
83 | 90 |
|
84 | | - delta = tuple(abs(self.pos[i] - newpos[i]) for i in range(2)) |
85 | | - self.event_data["deltaX"] += delta[0] |
86 | | - self.event_data["deltaY"] += delta[1] |
| 91 | + delta = tuple(self.pos[i] - newpos[i] for i in range(2)) |
| 92 | + self.event_data["deltaX"] += abs(delta[0]) |
| 93 | + self.event_data["deltaY"] += abs(delta[1]) |
87 | 94 |
|
88 | 95 | self.pos = newpos |
89 | 96 | self.new_event.set() |
90 | 97 |
|
91 | | - def has_new_event(self): |
92 | | - answer = self.new_event.is_set() |
93 | | - self.new_event.clear() |
94 | | - return answer |
| 98 | + def on_click(self, x, y, button, down): |
| 99 | + # self.logger.debug(f"Click: {button} at {(x, y)}") |
| 100 | + # Only count presses, not releases |
| 101 | + if down: |
| 102 | + self.event_data["clicks"] += 1 |
| 103 | + self.new_event.set() |
95 | 104 |
|
96 | | - def next_event(self): |
97 | | - self.new_event.clear() |
98 | | - data = self.event_data |
99 | | - self._reset_data() |
100 | | - return data |
| 105 | + def on_scroll(self, x, y, scrollx, scrolly): |
| 106 | + # self.logger.debug(f"Scroll: {scrollx}, {scrolly} at {(x, y)}") |
| 107 | + self.event_data["scrollX"] += abs(scrollx) |
| 108 | + self.event_data["scrollY"] += abs(scrolly) |
| 109 | + self.new_event.set() |
0 commit comments