Skip to content

Commit

Permalink
Merge pull request #38 from BingoWon/feat/enhance-fade-brightness
Browse files Browse the repository at this point in the history
Make `fade_brightness()` thread stoppable for concurrent requests
  • Loading branch information
Crozzers committed Apr 3, 2024
2 parents 2e86484 + c56f64a commit 1577d52
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 14 deletions.
88 changes: 75 additions & 13 deletions screen_brightness_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import warnings
from dataclasses import dataclass, field, fields
from types import ModuleType
from typing import Callable, Any, Dict, List, Optional, Tuple, Type, Union

from typing import Callable, Any, Dict, List, Optional, Tuple, Type, Union, FrozenSet, ClassVar
from ._version import __author__, __version__ # noqa: F401
from .exceptions import NoValidDisplayError, format_exc
from .helpers import (BrightnessMethod, ScreenBrightnessError,
Expand Down Expand Up @@ -151,6 +150,7 @@ def fade_brightness(
blocking: bool = True,
force: bool = False,
logarithmic: bool = True,
stoppable: bool = True,
**kwargs
) -> Union[List[threading.Thread], List[Union[IntPercentage, None]]]:
'''
Expand All @@ -168,6 +168,7 @@ def fade_brightness(
This is because on most displays a brightness of 0 will turn off the backlight.
If True, this check is bypassed
logarithmic: follow a logarithmic brightness curve when adjusting the brightness
stoppable: whether the fade can be stopped by starting a new fade on the same display
**kwargs: passed through to `filter_monitors` for display selection.
Will also be passed to `get_brightness` if `blocking is True`
Expand Down Expand Up @@ -209,12 +210,13 @@ def fade_brightness(
for i in available_monitors:
display = Display.from_dict(i)

thread = threading.Thread(target=display.fade_brightness, args=(finish,), kwargs={
thread = threading.Thread(target=display._fade_brightness, args=(finish,), kwargs={
'start': start,
'interval': interval,
'increment': increment,
'force': force,
'logarithmic': logarithmic
'logarithmic': logarithmic,
'stoppable': stoppable
})
thread.start()
threads.append(thread)
Expand Down Expand Up @@ -358,6 +360,8 @@ class Display():
'''The serial number of the display or (if serial is not available) an ID assigned by the OS'''

_logger: logging.Logger = field(init=False, repr=False)
_fade_thread_dict: ClassVar[Dict[FrozenSet[Tuple[Any, Any]], threading.Thread]] = {}
'''A dictionary mapping display identifiers to latest fade threads for stopping fades.'''

def __post_init__(self):
self._logger = _logger.getChild(self.__class__.__name__).getChild(
Expand All @@ -370,12 +374,15 @@ def fade_brightness(
interval: float = 0.01,
increment: int = 1,
force: bool = False,
logarithmic: bool = True
) -> IntPercentage:
logarithmic: bool = True,
blocking: bool = True,
stoppable: bool = True
) -> Union[threading.Thread, IntPercentage]:
'''
Gradually change the brightness of this display to a set value.
This works by incrementally changing the brightness until the desired
value is reached.
Can execute in the current thread, blocking until completion,
or in a separate thread, allowing concurrent operations.
When set as non-blocking and stoppable, a new fade can halt the this operation.
Args:
finish (.types.Percentage): the brightness level to end up on
Expand All @@ -388,14 +395,67 @@ def fade_brightness(
often turns off the backlight
logarithmic: follow a logarithmic curve when setting brightness values.
See `logarithmic_range` for rationale
blocking: run this function in the current thread and block until it completes
stoppable: whether this fade will be stopped by starting a new fade on the same display
Returns:
The brightness of the display after the fade is complete.
See `.types.IntPercentage`
If `blocking` is `False`, returns a `threading.Thread` object representing the
thread in which the fade operation is running.
If `blocking` is `True`, returns the current brightness level after the fade operation completes.
.. warning:: Deprecated
This function will return `None` in v0.23.0 and later.
'''
thread = threading.Thread(target=self._fade_brightness, args=(finish,), kwargs={
'start': start,
'interval': interval,
'increment': increment,
'force': force,
'logarithmic': logarithmic,
'stoppable': stoppable
})
thread.start()

if not blocking:
return thread
else:
thread.join()
return self.get_brightness()

def _fade_brightness(
self,
finish: Percentage,
start: Optional[Percentage] = None,
interval: float = 0.01,
increment: int = 1,
force: bool = False,
logarithmic: bool = True,
stoppable: bool = True
) -> None:
'''
Gradually change the brightness of this display to a set value.
This works by incrementally changing the brightness until the desired
value is reached.
Args:
finish (.types.Percentage): the brightness level to end up on
start (.types.Percentage): where the fade should start from. Defaults
to whatever the current brightness level for the display is
interval: time delay between each change in brightness
increment: amount to change the brightness by each time (as a percentage)
force: [*Linux only*] allow the brightness to be set to 0. By default,
brightness values will never be set lower than 1, since setting them to 0
often turns off the backlight
logarithmic: follow a logarithmic curve when setting brightness values.
See `logarithmic_range` for rationale
stoppable: whether the fade can be stopped by starting a new fade on the same display
Returns:
None
'''
# Record the latest thread for this display so that other stoppable threads can be stopped
display_key = frozenset((self.method, self.index))
self._fade_thread_dict[display_key] = threading.current_thread()
# minimum brightness value
if platform.system() == 'Linux' and not force:
lower_bound = 1
Expand All @@ -420,6 +480,9 @@ def fade_brightness(
# Record the time when the next brightness change should start
next_change_start_time = time.time()
for value in range_func(start, finish, increment):
if stoppable and threading.current_thread() != self._fade_thread_dict[display_key]:
# If the current thread is stoppable and it's not the latest thread, stop fading
break
# `value` is ensured not to hit `finish` in loop, this will be handled in the final step.
self.set_brightness(value, force=force)

Expand All @@ -432,9 +495,8 @@ def fade_brightness(
else:
# As `value` doesn't hit `finish` in loop, we explicitly set brightness to `finish`.
# This also avoids an unnecessary sleep in the last iteration.
self.set_brightness(finish, force=force)

return self.get_brightness()
if not stoppable or threading.current_thread() == self._fade_thread_dict[display_key]:
self.set_brightness(finish, force=force)

@classmethod
def from_dict(cls, display: dict) -> 'Display':
Expand Down
47 changes: 46 additions & 1 deletion tests/test_init.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import threading
import time
from copy import deepcopy
from timeit import timeit
from typing import Any, Dict, List
from typing import Any, Dict, List, cast
from unittest.mock import Mock, call

import pytest
Expand Down Expand Up @@ -316,6 +317,50 @@ def test_end_of_fade_correction(self, display: sbc.Display, mocker: MockerFixtur
# it should have also passed the `force` kwarg along to the final call
assert 'force' in setter.mock_calls[-1].kwargs, 'force kwarg should be propagated'

def test_stoppable_kwarg(self, display: sbc.Display, mocker: MockerFixture):
start = 1
finish = 20 # smaller value could introduce errors; greater value will extend the test.
interval = 0.05 # same as above
steps = int((finish - start) / 2) # half the steps to ensure the thread is still active when checking.
duration = interval * (steps - 1) # -1 because the first step is immediate.

mocker.patch.object(display, 'get_brightness', Mock(return_value=start))
setter = mocker.patch.object(display, 'set_brightness', autospec=True)

def fade_brightness_thread(stoppable: bool):
'''mainly for Mypy to stop complaining about the return type of `display.fade_brightness`'''
return cast(threading.Thread,
display.fade_brightness(finish=finish, start=start, interval=interval,
logarithmic=False, blocking=False, stoppable=stoppable))

thread_0 = fade_brightness_thread(stoppable=True)
thread_1 = fade_brightness_thread(stoppable=True)
time.sleep(duration) # block the main thread to allow non-blocking fades to occur.
# The second fade should have stopped the first one.
assert not thread_0.is_alive() and thread_1.is_alive()
call_count = len(setter.mock_calls)
# *1 because only the second (latest) fade should run and the first should be stopped.
# Extra increment (+1) is added due to the immediate setting of the start brightness in the first thread,
# which occurs right after the first thread starts and before the second thread can signal it to stop.
expected_call_count = steps * 1 + 1
# Allow for a small margin of error due to one incomplete last step
assert 0 <= expected_call_count - call_count <= 1

# The fades below can't be stopped but they will halt the two above, which is essential for call count.
thread_2 = fade_brightness_thread(stoppable=False)
thread_3 = fade_brightness_thread(stoppable=False)
time.sleep(duration)
# Both two new threads should run without stopping.
assert thread_2.is_alive() and thread_3.is_alive()
call_count = len(setter.mock_calls) - call_count
expected_call_count = steps * 2
# Allow for a small margin of error due to two incomplete last steps
assert 0 <= expected_call_count - call_count <= 2

# Ensure all threads complete to prevent interference with subsequent tests.
while threading.active_count() > 1:
time.sleep(interval)

class TestFromDict:
def test_returns_valid_instance(self, subtests):
info = sbc.list_monitors_info()[0]
Expand Down

0 comments on commit 1577d52

Please sign in to comment.