# Decorator

## 1. Wrapped function

### 1.1. Simple wrapped

In [None]:
from typing import Callable
import json


def tag(func: Callable) -> Callable:
    def wrapper(*args, **kwargs) -> str:
        result = {
            "wrapped": True,
            "result": func()
        }
        return json.dumps(result)

    return wrapper


@tag
def demo():
    return "demo worked"


print("* Call 'demo()' function result is: '{}'".format(demo()))

### 1.2. Pass arguments into decorator

In [None]:
from typing import Callable
import io


def html_tag(tag_name: str, **kwargs) -> Callable:
    attribute = kwargs

    def decorator(func: Callable) -> Callable:
        def wrapper(*args, **kwargs) -> str:
            sio = io.StringIO()
            sio.write("<")
            sio.write(tag_name)

            for key in sorted(attribute.keys()):
                sio.write(" ")
                sio.write("class" if key == "clazz" else key)
                value = str(attribute[key])

                if value.find("\"") < 0:
                    sio.write("=\"")
                    sio.write(value)
                    sio.write("\"")
                else:
                    sio.write("=\'")
                    sio.write(value)
                    sio.write("\'")

            nested = func()
            if len(nested) == 0:
                sio.write("/>")
            else:
                sio.write(">")
                sio.write(nested)
                sio.write("</")
                sio.write(tag_name)
                sio.write(">")

            sio.seek(0)
            return sio.read()
        return wrapper
    return decorator


@html_tag(tag_name="div", style="display:block", clazz="col-md-2")
def demo1() -> str:
    return "Hello"


@html_tag(tag_name="div", click="alter('ok')")
def demo2() -> str:
    return ""


print("* Call 'demo1()' return: '{}'".format(demo1()))
print("* Call 'demo2()' return: '{}'".format(demo2()))

### 1.3. Class as decorator

In [None]:
from typing import Callable
import io


class HtmlTag:
    def __init__(self, tag_name: str, **kwargs):
        self._kwargs = kwargs
        self._tag_name = tag_name

    def __call__(self, func: Callable) -> Callable:
        def wrapper(*args, **kwargs) -> str:
            sio = io.StringIO()
            sio.write("<")
            sio.write(self._tag_name)

            for key in sorted(self._kwargs):
                sio.write(" ")
                sio.write("class" if key == "clazz" else key)
                value = str(self._kwargs[key])

                if value.find("\"") < 0:
                    sio.write("=\"")
                    sio.write(value)
                    sio.write("\"")
                else:
                    sio.write("=\'")
                    sio.write(value)
                    sio.write("\'")

            nested = func()
            if len(nested) == 0:
                sio.write("/>")
            else:
                sio.write(">")
                sio.write(nested)
                sio.write("</")
                sio.write(self._tag_name)
                sio.write(">")

            sio.seek(0)
            return sio.read()

        return wrapper


@HtmlTag(tag_name="div", style="display:block", clazz="col-md-2")
def demo1():
    return "Hello"


@HtmlTag(tag_name="div", click="alter('ok')")
def demo2():
    return ""


print("* Call 'demo1()' return: '{}'".format(demo1()))
print("* Call 'demo2()' return: '{}'".format(demo2()))

### 1.4. Pass argument to wrapped function

In [None]:
from typing import Callable, Any, Tuple, Dict


def decorator(func: Callable) -> Callable:
    def wrapper(*args, **kwargs) -> Any:
        args += (0,)
        kwargs["name"] = "Alvin"
        return func(*args, **kwargs)

    return wrapper


@decorator
def demo(*args, **kwargs) -> Tuple[Tuple, Dict]:
    return (args, kwargs)


r = demo(100)
print("* Call 'demo(100)', return: '{}'".format(r))

r = demo(100, no=1001)
print("* Call 'demo(100, no=1001)', return: '{}'".format(r))

## 2. Advance

### 2.1. Keep name of wrappered function

In [None]:
from typing import Any, Callable
from functools import wraps


def d1(func) -> Callable:
    def wrapper(*args, **kwargs) -> Any:
        return func(*args, **kwargs)

    return wrapper


def d2(func) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs) -> Any:
        return func(*args, **kwargs)

    return wrapper


@d1
def demo1():
    pass


@d2
def demo2():
    pass


print("* Unwraped function name is: '{}'".format(demo1.__name__))
print("* Wraped function name is: '{}'".format(demo2.__name__))

### 2.2. Cache function result

In [None]:
from typing import Any, Callable
from functools import wraps
from time import process_time


def memo(fn) -> Callable:
    cache = {}
    miss = ...

    @wraps(fn)
    def wrapper(*args) -> Any:
        result = cache.get(args, miss)
        if result == miss:
            result = fn(*args)
            cache[args] = result
        return result

    return wrapper


@memo
def fib(n) -> int:
    if n < 2:
        return n

    return fib(n - 1) + fib(n - 2)


t = process_time()
r = fib(100)
print("* Call 'fib(100)' once, result is: {}, "
      "and cost time: {}".format(r, round(process_time() - t, 6)))

t = process_time()
r = fib(100)
print("* Call 'fib(100)' again, result is: {}, "
      "and cost time: {}".format(r, round(process_time() - t, 6)))

### 2.3. Class method as decorator

In [None]:
from typing import Any, Callable


class App:
    def __init__(self):
        self._method_map = {}

    def register(self, url: str) -> Callable:
        def wrapper(func: Callable) -> Callable:
            self._method_map[url] = func
            return func

        return wrapper

    def execute(self, url: str) -> Any:
        fn = self._method_map.get(url)
        if not fn:
            raise Exception("function '{:}' not register".format(str(url)))
        return fn()


app = App()
print("* After create object 'app = App()'")


@app.register("/")
def main_page():
    return "The main page"


@app.register("/next")
def next_page():
    return "The next page"


print("  call 'app.execute(\"/\")' return: {}".format(app.execute("/")))
print("  call 'app.execute(\"/next\")' return: {}".format(app.execute("/next")))

try:
    app.execute("/prev")
except Exception as e:
    print("* call 'app.execute(\"/prev\")', cause error: '{}'".format(e))

### 2.4. Object as decorator

In [None]:
from typing import Any, Callable
import io
from functools import wraps
from time import process_time


class Logger:
    def __init__(self):
        self._io = io.StringIO()

    def __call__(self, fn: Callable) -> Callable:
        @wraps(fn)
        def wrapper(*args, **kwargs) -> str:
            start_time = process_time()
            result = fn(*args, **kwargs)
            time_cost = process_time() - start_time

            self._io.write("\tlog function '{}' is call: \n".format(fn.__name__))
            self._io.write("\t  function={}\n".format(fn.__name__))
            self._io.write("\t  arguments={} {}\n".format(args, kwargs))
            self._io.write("\t  return={}\n".format(result))
            self._io.write("\t  time={:.6f} sec\n".format(time_cost))
            return result

        return wrapper

    def reset(self):
        self._io.seek(0)

    def __str__(self) -> str:
        return self._io.getvalue()


logger = Logger()
print("* After create object 'logger = Logger()'")


@logger
def multiply(x, y):
    return x * y


multiply(10, 20)
multiply(30, 40)
print("  after call 'multiply' function, the log is: ")
print(str(logger))

### 2.5. Decorator for class method

In [None]:
from typing import Any, Callable
from functools import wraps


class EmptyError(Exception):
    pass


def not_contains_empty_str(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(self_: Any, *args) -> Any:
        """
        the first argument must object self
        """
        for arg in args:
            if isinstance(arg, str) and not arg:
                raise EmptyError("Arguments cannot be empty")
        return func(self_, *args)
    return wrapper


class A:
    @not_contains_empty_str
    def test(self, name):
        return True


a = A()
print("* After create object 'a = A()'")

print("  call 'a.test(\"Alvin\")' return: {}".format(a.test("Alvin")))
print("  call 'a.test(None)' return: {}".format(a.test(None)))

try:
    print(a.test(""))
except EmptyError as e:
    print("  call 'a.test(\"\")', error: '{}'".format(e))

## 3. Use wrapt

In [None]:
from typing import (
    Any,
    Dict,
    Callable,
    Optional,
    Tuple
)
import wrapt

- Define simple memory cache class

In [None]:
class LocalCache:
    def __init__(self) -> None:
        self._cache: Dict[str, Any] = {}

    def set(self, key: str, value: Any) -> None:
        self._cache[key] = value

    def get(self, key: str) -> Any:
        return self._cache.get(key)

    def exist(self, key: str) -> bool:
        return key in self._cache

    def clear(self):
        self._cache = {}


_local_cache: LocalCache = LocalCache()

-

In [None]:
import inspect
from inspect import FullArgSpec

_cached_arg_names: Dict[Callable, FullArgSpec] = {}
_cached_default_args: Dict[Callable, Any] = {}


def interpolate_str(
    fmt: str,
    func: Callable,
    instance: Any,
    args: Tuple[Any, ...],
    kwargs: Dict[str, Any]
) -> str:
    if instance:
        arg_values = (instance,) + args
    else:
        arg_values = args

    if func in _cached_arg_names:
        arg_names = _cached_arg_names[func]
    else:
        arg_names = inspect.getfullargspec(func)
        _cached_arg_names[func] = arg_names

    print(f'  >> name of args of function "{func.__name__}" is: "{arg_names.args}"')

    if func in _cached_default_args:
        default_args = _cached_default_args[func]
    else:
        default_args = {}
        if arg_names.defaults:
            offset = len(arg_names.args) - len(arg_names.defaults)

            for idx, default_value in enumerate(arg_names.defaults):
                arg_name = arg_names.args[idx + offset]
                default_args[arg_name] = default_value

        _cached_default_args[func] = default_args

    print(f'  >> default args of function "{func.__name__}" is: "{default_args}"')

    context = {**default_args, **dict(zip(arg_names.args, arg_values)), **kwargs}
    print(f'  >> the full args of function "{func.__name__}" is: "{context}"')

    return fmt.format(**context)

- Define cache decorator

In [None]:
import re


_cache_key_names = set()


def local_cache(key: str) -> Callable:
    # check cache key if it is duplicated
    canonical_key = re.sub("{[^}]*}", "{__exp__}", key)
    if canonical_key in _cache_key_names:
        raise ValueError(f'Duplicate cache key "{key}"')

    _cache_key_names.add(canonical_key)
    print(f'  >> the "canonical_key" is "{canonical_key}"')

    @wrapt.decorator
    def wrapper(
        wrapped: Callable,
        instance: Any,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any]
    ) -> Any:
        cache_key = interpolate_str(key, wrapped, instance, args, kwargs)
        print(f'  >> the cache key of function "{func.__name__}" is: "{cache_key}"')
        if _local_cache.exist(cache_key):
            return _local_cache.get(cache_key)

        return_value = wrapped(*args, **kwargs)
        _local_cache.set(cache_key, return_value)

        return return_value

    return wrapper

- Test cache for function

In [None]:
@local_cache("function:test_key:{id_}")
def test_key_1(id_: str, name: str = "Alvin"):
    print("  >> cache not hit, function executed")
    return {
        "id": id_,
        "name": name
    }


ret = test_key_1("user-101", name="Alvin")
print(f'* Run function "{test_key_1.__name__}" first time and return: "{ret}"')

ret = test_key_1("user-101", name="Emma")
print(f'* Run function "{test_key_1.__name__}" again and return return: "{ret}"')

ret = test_key_1("user-102", name="Emma")
print(f'* Run function "{test_key_1.__name__}" again with other argument, and return return: "{ret}"')

- Test cache for class method

In [None]:
class User:
    def __init__(self, id_: str, name: str):
        self._id = id_
        self._name = name

    @local_cache("method:test_key:{self._id}")
    def execute(self):
        print("  >> cache not hit, method executed")
        return {
            "id": self._id,
            "name": self._name
        }


user = User("user-101", "Alvin")
ret = user.execute()
print(f'* Run method "{User.execute.__name__}" first time, and return return: "{ret}"')

user = User("user-101", "Emma")
ret = user.execute()
print(f'* Run method "{User.execute.__name__}" again, and return return: "{ret}"')

user = User("user-102", "Emma")
ret = user.execute()
print(f'* Run method "{User.execute.__name__}" again with other arguments, and return return: "{ret}"')

- Finish test, clear all context

In [None]:
_cache_key_names = set()
_cached_arg_names = {}
_cached_default_args = {}
_local_cache.clear()