In [1]:
from contextlib import contextmanager
from functools import wraps, update_wrapper, partial
import time

In [44]:
class Timed:
    
    def __init__(self, func=None):
        print('in init')
        if func:
            self.func = func
            update_wrapper(self, func)
        
    def __call__(self, *args, **kwargs):
        print('in call')
        self.__enter__()
        res = self.func(*args, **kwargs)
        self.__exit__(None, None, None)
        return res
    
    def __enter__(self):
        self.start = time.time()
        print('enter')
    
    def __exit__(self, exc_type, exc_value, traceback):
        print('exit:', time.time() - self.start)
        self.start = None

In [45]:
@Timed
def foo(a, b=1, c='c'):
    """foo docs"""
    for i in range(1, b+1):
        time.sleep(1)
        a += i
    return a

in init


In [46]:
print(foo(3))

in call
enter
exit: 1.0024421215057373
4


In [47]:
with Timed():
    print(foo(5))

in init
enter
in call
enter
exit: 1.000110149383545
6
exit: 1.000378131866455


In [48]:
with Timed():
    a = 1
    for i in range(10_000_000):
        a += 1
    print(a)

in init
enter
10000001
exit: 1.0012190341949463


In [49]:
with Timed():
    a = 1
    for i in range(10_000_000):
        a += 1
    print(a)
    print(a + 'a')

in init
enter
10000001
exit: 0.8945257663726807


TypeError: unsupported operand type(s) for +: 'int' and 'str'

## Approach 1

Issues
- non-strict mode doesn't work
- bit fuzzy re how `__call__` is working. Weird that calling exit() with None works.

In [54]:
class ContextDecorator:
    
    def __init__(self, func=None):
        print('parent init')
        if func:
            self.func = func
            update_wrapper(self, func)
    
    def __call__(self, *args, **kwargs):
        print('parent call')
        self.__enter__()
        res = self.func(*args, **kwargs)
        self.__exit__(None, None, None)
        return res

In [55]:
class NewTimed(ContextDecorator):
    
    def __init__(self, func=None):
        super().__init__(func)
        
    def __enter__(self):
        self.start = time.time()
        print('enter')
    
    def __exit__(self, exc_type, exc_value, traceback):
        print('exit:', time.time() - self.start)
        self.start = None

In [56]:
@NewTimed
def new_foo(a, b=1, c='c'):
    """foo docs"""
    for i in range(1, b+1):
        time.sleep(1)
        a += i
    return a

parent init


In [57]:
new_foo(31)

parent call
enter
exit: 1.0015151500701904


32

In [58]:
with NewTimed():
    a = 1
    for i in range(10_000_000):
        a += 1
    print(a)

parent init
enter
10000001
exit: 0.9463701248168945


### reproduce timebox

In [59]:
import signal
import warnings

In [14]:
class TimeExceededError(Exception):
    pass

def timebox_handler(time, frame):
    raise TimeExceededError('Time limit exceeded.')

@contextmanager
def timebox(time, strict=True):
    try:
        signal.signal(signal.SIGALRM, timebox_handler)
        signal.alarm(time)
        yield
    except Exception as e:
        if strict: raise
        warnings.warn(e.args[0])
    finally:
        signal.alarm(0)

def timeboxed(time, strict=True):
    def intermediate_wrapper(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with timebox(time, strict) as tb:
                return func(*args, **kwargs)
        return wrapper
    return intermediate_wrapper

In [74]:
class Timebox(ContextDecorator):
    
    def __init__(self, time, strict=True):
        print('child init')
        self.time = time
        self.strict = strict
        
    def __call__(self, *args, **kwargs):
        if not hasattr(self, 'func'):
            print('child call: in if')
            super().__init__(args[0])
            return self.__call__
        
        print('child call: after if')
        return super().__call__(*args, **kwargs)
    
    def __enter__(self):
        print('enter')
        signal.signal(signal.SIGALRM, timebox_handler)
        signal.alarm(self.time)
        
    def __exit__(self, exc_type, exc_value, traceback):
        print('exit')
        signal.alarm(0)
        if exc_type:
            if self.strict: 
                print('strict')
                raise
            else:
                print('not strict')
                warnings.warn(exc_type)
                return True

In [75]:
@Timebox(1)
def bar(a, b=True, c=3, **kwargs):
    print('start bar')
    time.sleep(a)
    print('end bar')
    return a*c

child init
child call: in if
parent init


In [76]:
bar(0.5)

child call: after if
parent call
enter
start bar
end bar
exit


1.5

In [77]:
bar(2)

child call: after if
parent call
enter
start bar


TimeExceededError: Time limit exceeded.

In [78]:
with Timebox(2):
    print('start inside')
    time.sleep(1)
    print('end inside')

child init
enter
start inside
end inside
exit


In [79]:
with Timebox(2):
    print('start inside')
    time.sleep(3)
    print('end inside')

child init
enter
start inside
exit
strict


TimeExceededError: Time limit exceeded.

In [80]:
# This should NOT raise an error because strict=False. It seems like __exit__
# doesn't raise it but something does.
with Timebox(2, False):
    print('start inside')
    time.sleep(3)
    print('end inside')

child init
enter
start inside
exit
not strict




## Approach 2

Deals with decorators that accept arguments.

Issues

- Non-strict mode still doesn't work
- Still don't quite understand how call() is calling exit()

In [171]:
from abc import ABC, abstractmethod
from contextlib import contextmanager

from htools import assert_raises, wrapmethods, debug

In [189]:
class ContextDecorator(ABC):
    """Abstract class that makes it easier to define classes that can serve
    either as decorators or context managers. This is a viable option if the
    function decorator case effectively wants to execute the function inside a
    context manager. If you want to do something more complex, this may not be
    appropriate since it's not clear what would happen in the context manager
    use case.
    
    Examples
    --------
    import time
    
    class Timer(ContextDecorator):
        
        def __init__(self):
            # More complex decorators might need to store variables here.
            
        def __enter__(self):
            self.start = time.perf_counter()
            
        def __exit__(self, exc_type, exc_value, traceback):
            print('TIME:', time.perf_counter() - self.start)
            
    @Timer()
    def foo(a, *args):
        # do something
        
    with Timer():
        # do something
        
    # Both of these usage methods work!
    """
    
    def __call__(self, *args, **kwargs):
        """This method is NOT called when using child class as a context 
        manager.
        """
        # Handle case where the decorated function is implicitly passed to the
        # decorator. Return the uncalled method just like how we often
        # `return wrapper` when writing a decorator as a function.
        if not hasattr(self, 'func'):
            self._wrap_func(args[0])
            return self.__call__

        self.__enter__()
        res = self.func(*args, **kwargs)
        self.__exit__(None, None, None)
        return res
    
    def _wrap_func(self, func):
        self.func = func
        update_wrapper(self, func)
        
    @abstractmethod
    def __enter__(self):
        """Do whatever you want to happen before executing the function (or 
        the block of code inside the context manager).
        """
    
    @abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Do anything that happens after the function finishes executing.
        The three arguments will all be None unless an error occurs. 
        To suppress an error, this method must return True.
        """

In [181]:
class TimeboxNew(ContextDecorator):

    @debug
    def __init__(self, time, strict=True):
        self.time = time
        self.strict = strict
    
    @debug
    def __enter__(self):
        signal.signal(signal.SIGALRM, timebox_handler)
        signal.alarm(self.time)
    
    @debug
    def __exit__(self, exc_type, exc_value, traceback):
        signal.alarm(0)
        if exc_type:
            if self.strict: 
                raise
            else:
                warnings.warn(exc_type)
                return True

In [182]:
@TimeboxNew(1)
def bar_new(a, b=True, c=3, **kwargs):
    print('start bar')
    time.sleep(a)
    print('end bar')
    return a*c


CALLING TimeboxNew.__init__(time=1, strict=True)


In [183]:
with assert_raises(TimeExceededError):
    bar_new(2)


CALLING TimeboxNew.__enter__()
start bar
As expected, got TimeExceededError(Time limit exceeded.).


In [184]:
bar_new(.5)


CALLING TimeboxNew.__enter__()
start bar
end bar

CALLING TimeboxNew.__exit__(exc_type=None, exc_value=None, traceback=None)


1.5

In [185]:
with TimeboxNew(2):
    print('start inside')
    time.sleep(1)
    print('end inside')


CALLING TimeboxNew.__init__(time=2, strict=True)

CALLING TimeboxNew.__enter__()
start inside
end inside

CALLING TimeboxNew.__exit__(exc_type=None, exc_value=None, traceback=None)


In [186]:
with assert_raises(TimeExceededError):
    with TimeboxNew(1):
        print('start inside')
        time.sleep(2)
        print('end inside')


CALLING TimeboxNew.__init__(time=1, strict=True)

CALLING TimeboxNew.__enter__()
start inside

CALLING TimeboxNew.__exit__(exc_type=<class '__main__.TimeExceededError'>, exc_value=TimeExceededError('Time limit exceeded.'), traceback=<traceback object at 0x1a1bfef048>)
As expected, got TimeExceededError(Time limit exceeded.).


In [187]:
with TimeboxNew(1, False):
    print('start inside')
    time.sleep(2)
    print('end inside')


CALLING TimeboxNew.__init__(time=1, strict=False)

CALLING TimeboxNew.__enter__()
start inside

CALLING TimeboxNew.__exit__(exc_type=<class '__main__.TimeExceededError'>, exc_value=TimeExceededError('Time limit exceeded.'), traceback=<traceback object at 0x1a1bff8408>)


