Skip to content

Commit

Permalink
Improved lazy loading
Browse files Browse the repository at this point in the history
  • Loading branch information
dhondta committed Jul 22, 2023
1 parent eba7027 commit 672468d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/tinyscript/VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.28.1
1.28.2
248 changes: 124 additions & 124 deletions src/tinyscript/features/handlers.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,124 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""Module for defining handlers and exit hook.
"""
import sys
from signal import getsignal, signal, SIG_IGN, SIGINT, SIGTERM

from ..helpers.constants import WINDOWS
from ..helpers.inputs import user_input


__features__ = ["at_exit", "at_graceful_exit", "at_interrupt", "at_terminate", "DisableSignals"]
__all__ = ["_hooks"] + __features__


class DisableSignals(object):
""" Context manager that disable signal handlers.
:param signals: list of signal identifiers
:param fail: whether execution should fail or not when a bad signal ID is encountered
"""
def __init__(self, *signals, **kwargs):
self.__handlers = {}
for s in signals:
try:
self.__handlers[s] = getsignal(s)
except ValueError as e:
if kwargs.get('fail', False):
raise e

def __enter__(self):
for s in self.__handlers.keys():
signal(s, SIG_IGN)

def __exit__(self, exc_type, exc_val, exc_tb):
for s, h in self.__handlers.items():
signal(s, h)


# https://stackoverflow.com/questions/9741351/how-to-find-exit-code-or-reason-when-atexit-callback-is-called-in-python
class ExitHooks(object):
sigint_actions = ["confirm", "continue", "exit"]
def __init__(self):
self.__sigint_action = "exit"
self._orig_exit = sys.exit
self.code = None
self.exception = None
sys.exit = self.exit
self.resume()

def exit(self, code=0):
self.code = code
self._orig_exit(code)

def pause(self):
self.state = "PAUSED"
while self.state == "PAUSED": continue

def quit(self, code=0):
if self.__sigint_action == "confirm" and \
user_input("Do you really want to interrupt execution ?", ["(Y)es", "(N)o"], "y", style="bold") == "yes":
self.__sigint_action = "exit"
if self.state != "INTERRUPTED" or self.__sigint_action == "exit":
self.exit(code)
self.resume()

def resume(self):
self.state = "RUNNING"

@property
def sigint_action(self):
return self.__sigint_action

@sigint_action.setter
def sigint_action(self, value):
if value not in self.sigint_actions:
raise ValueError("Bad interrupt action ; should be one of {}".format("|".join(self.sigint_actions)))
self.__sigint_action = value

_hooks = ExitHooks()


def __interrupt_handler(*args):
""" Interruption handler.
:param signal: signal number
:param stack: stack frame
:param code: exit code
"""
_hooks.state = "INTERRUPTED"
_hooks.quit(0)
# bind to interrupt signal (Ctrl+C)
signal(SIGINT, __interrupt_handler)


def __pause_handler(*args):
""" Execution pause handler. """
_hooks.pause()
if not WINDOWS:
from signal import siginterrupt, SIGUSR1
# bind to user-defined signal
signal(SIGUSR1, __pause_handler)
siginterrupt(SIGUSR1, False)


def __terminate_handler(*args):
""" Termination handler.
:param signal: signal number
:param stack: stack frame
:param code: exit code
"""
_hooks.state = "TERMINATED"
_hooks.quit(0)
# bind to terminate signal
signal(SIGTERM, __terminate_handler)


at_exit = lambda: None
at_graceful_exit = lambda: None
at_interrupt = lambda: None
at_terminate = lambda: None

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""Module for defining handlers and exit hook.
"""
import sys
from signal import getsignal, signal, SIG_IGN, SIGINT, SIGTERM

from ..helpers.constants import WINDOWS


__features__ = ["at_exit", "at_graceful_exit", "at_interrupt", "at_terminate", "DisableSignals"]
__all__ = ["_hooks"] + __features__


class DisableSignals(object):
""" Context manager that disable signal handlers.
:param signals: list of signal identifiers
:param fail: whether execution should fail or not when a bad signal ID is encountered
"""
def __init__(self, *signals, **kwargs):
self.__handlers = {}
for s in signals:
try:
self.__handlers[s] = getsignal(s)
except ValueError as e:
if kwargs.get('fail', False):
raise e

def __enter__(self):
for s in self.__handlers.keys():
signal(s, SIG_IGN)

def __exit__(self, exc_type, exc_val, exc_tb):
for s, h in self.__handlers.items():
signal(s, h)


# https://stackoverflow.com/questions/9741351/how-to-find-exit-code-or-reason-when-atexit-callback-is-called-in-python
class ExitHooks(object):
sigint_actions = ["confirm", "continue", "exit"]
def __init__(self):
self.__sigint_action = "exit"
self._orig_exit = sys.exit
self.code = None
self.exception = None
sys.exit = self.exit
self.resume()

def exit(self, code=0):
self.code = code
self._orig_exit(code)

def pause(self):
self.state = "PAUSED"
while self.state == "PAUSED": continue

def quit(self, code=0):
from ..helpers.inputs import user_input
if self.__sigint_action == "confirm" and \
user_input("Do you really want to interrupt execution ?", ["(Y)es", "(N)o"], "y", style="bold") == "yes":
self.__sigint_action = "exit"
if self.state != "INTERRUPTED" or self.__sigint_action == "exit":
self.exit(code)
self.resume()

def resume(self):
self.state = "RUNNING"

@property
def sigint_action(self):
return self.__sigint_action

@sigint_action.setter
def sigint_action(self, value):
if value not in self.sigint_actions:
raise ValueError("Bad interrupt action ; should be one of {}".format("|".join(self.sigint_actions)))
self.__sigint_action = value

_hooks = ExitHooks()


def __interrupt_handler(*args):
""" Interruption handler.
:param signal: signal number
:param stack: stack frame
:param code: exit code
"""
_hooks.state = "INTERRUPTED"
_hooks.quit(0)
# bind to interrupt signal (Ctrl+C)
signal(SIGINT, __interrupt_handler)


def __pause_handler(*args):
""" Execution pause handler. """
_hooks.pause()
if not WINDOWS:
from signal import siginterrupt, SIGUSR1
# bind to user-defined signal
signal(SIGUSR1, __pause_handler)
siginterrupt(SIGUSR1, False)


def __terminate_handler(*args):
""" Termination handler.
:param signal: signal number
:param stack: stack frame
:param code: exit code
"""
_hooks.state = "TERMINATED"
_hooks.quit(0)
# bind to terminate signal
signal(SIGTERM, __terminate_handler)


at_exit = lambda: None
at_graceful_exit = lambda: None
at_interrupt = lambda: None
at_terminate = lambda: None
14 changes: 8 additions & 6 deletions src/tinyscript/helpers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@
"""
import builtins
import ctypes
import lazy_object_proxy
import os
from dateparser import parse as dateparse
from functools import update_wrapper
from importlib import import_module
from inspect import currentframe
from itertools import cycle
from string import printable
from urllib.parse import urlparse, parse_qs as urlparse_query

Expand Down Expand Up @@ -40,8 +35,9 @@ def human_readable_size(size, precision=0):

def is_admin():
""" Check if the user running the script is admin. """
from os import geteuid
try:
return ctypes.windll.shell32.IsUserAnAdmin() != 0 if WINDOWS else os.geteuid() == 0
return ctypes.windll.shell32.IsUserAnAdmin() != 0 if WINDOWS else geteuid() == 0
except AttributeError:
raise NotImplementedError("Admin check is not implemented for this operating system.")

Expand Down Expand Up @@ -183,6 +179,7 @@ def xor(str1, str2, offset=0):
:param str2: second string, with length L2
:param offset: ASCII offset to be applied on each resulting character
"""
from itertools import cycle
convert = isinstance(str1[0], int) or isinstance(str2[0], int)
r = b("") if convert else ""
for c1, c2 in zip(cycle(str1) if len(str1) < len(str2) else str1, cycle(str2) if len(str2) < len(str1) else str2):
Expand Down Expand Up @@ -215,6 +212,7 @@ def xor_file(filename, key, offset=0):
# https://stackoverflow.com/questions/10875442/possible-to-change-a-functions-repr-in-python
class __reprwrapper(object):
def __init__(self, repr, func):
from functools import update_wrapper
self._repr, self._func = repr, func
update_wrapper(self, func)

Expand All @@ -231,3 +229,7 @@ def _wrapper(f):
return __reprwrapper(func_repr, f)
return _wrapper


lazy_load_module("ctypes")
lazy_load_module("dateparser.parse", alias="dateparse")

0 comments on commit 672468d

Please sign in to comment.