In [None]:
# 関数によるデコレータ
def mydecorator(function):
    def wrapped(*args, **kwargs):
        print('called function decorator')
        result = function(*args, **kwargs)
        return result
    return wrapped

@mydecorator
def some_function():
    print('called some function')

some_function()

# クラスによるデコレータ
class DecoratorAsClass:
    def __init__(self, function):
        self.function = function

    def __call__(self, *args, **kwds):
        print('called class function')
        result = self.function(*args, **kwds)
        return result

@DecoratorAsClass
def another_function():
    print('called another function')

another_function()

In [None]:
# 引数を受け取るデコレータ
def repeat(number=3):
    def actual_decorator(function):
        def wrapper(*args, **kwargs):
            result = None
            for _ in range(number):
                result = function(*args, **kwargs)
            return result
        return wrapper
    return actual_decorator

@repeat(2)
def print_my_call():
    print('print_my_call() called')

print_my_call()


# デフォルトの引数を使用するときは括弧が必要
@repeat()
def print_another_call():
    print('print_another_call() called')

print_another_call()

### メタ情報を保持するデコレータ

In [None]:
# メタ情報を保持し忘れるデコレータ
def dummy_decorator(function):
    def wrapped(*args, **kwargs):
        return function(*args, **kwargs)
    return wrapped

@dummy_decorator
def function_with_important_docstring():
    """保持したい重要なdocstring
    """

print(function_with_important_docstring.__name__)
print(function_with_important_docstring.__doc__)


# メタ情報を保持したデコレータ
from functools import wraps

def preserving_decorator(function):
    @wraps(function)
    def wrapped(*args, **kwargs):
        return function(*args, **kwargs)
    return wrapped

@preserving_decorator
def another_important_docstring():
    """保持したい他の重要なdocstring
    """

print(another_important_docstring.__name__)
print(another_important_docstring.__doc__)

### 活用例と便利なサンプル

In [None]:
# 引数チェック
rpc_info = {}
def xmlrpc(in_=(), out=(type(None),)):
    def _xmlrpc(function):
        # シグネチャの登録
        func_name = function.__name__
        rpc_info[func_name] = (in_, out)

        def _check_types(elements, types):
            # 型をチェックするサブ関数
            if len(elements) != len(types):
                raise TypeError('引数の個数を間違えています。')
            typed = enumerate(zip(elements, types))
            for index, couple in typed:
                arg, of_the_right_type = couple
                if isinstance(arg, of_the_right_type):
                    continue
                raise TypeError("arg #%d should be %s" % (index, of_the_right_type))
        
        # ラップする関数
        # キーワード引数は受け取れない
        def __xmlrpc(*args):
            # 入力チェック
            checkable_args = args[1:]
            _check_types(checkable_args, in_)

            # 関数の実行
            res = function(*args)
            # 出力値のチェック
            if not type(res) in (tuple, list):
                checkable_res = (res,)
            else:
                checkable_res = res
            _check_types(checkable_res, out)
            # 関数と型のチェックが成功
            return res
        return __xmlrpc
    return _xmlrpc

class RPCView:
    @xmlrpc((int, int))
    def accept_integers(self, int1, int2):
        print("received %d and %d" % (int1, int2))
    
    @xmlrpc((str,), (int,))
    def accept_phrase(self, phrase):
        print("received %s" % phrase)
        return 12

print(rpc_info)

my = RPCView()
my.accept_integers(1, 2)
my.accept_phrase(2)

In [None]:
# キャッシュ
"""
このモジュールは計算結果をメモ化するデコレータを提供する
このデコレータは指定された期間だけ、
デコレートされた関数の返した値をキャッシュする
"""

import time
import hashlib
import pickle

cache = {}
def is_obsolete(entry, duration):
    """キャッシュのエントリが期限切れでないかをチェックする
    """
    return time.time() - entry['time'] > duration

def compute_key(function, args, kwargs):
    """キャッシュのキーを計算する
    """
    key = pickle.dumps((function.__name__, args, kwargs))
    return hashlib.sha1(key).hexdigest()

def memoize(duration=10):
    """キーワード引数にも対応するメモ化デコレータ
    指定された期間だけ、関数の計算結果をキャッシュ可能にする
    """
    def _memoize(function):
        def __memoize(*args, **kwargs):
            key = compute_key(function, args, kwargs)
            if (key in cache and
                    not is_obsolete(cache[key], duration)):
                # キャッシュに値が格納されており、古くなっていなければ返す
                print("キャッシュが利用されました")
                return cache[key]['value']
            # 有効なキャッシュばなければ結果を計算する
            result = function(*args, **kwargs)

            # 後でキャッシュを利用できるようにする
            cache[key] = {
                'value': result,
                'time': time.time()
            }

            return result
        return __memoize
    return _memoize


@memoize()
def very_very_very_complex_stuff(a, b):
    M = 10 ** 7
    for _ in range(10**6):
        a += b
        a %= M
    return a

print(very_very_very_complex_stuff(2, 2))

print(very_very_very_complex_stuff(2, 2))


@memoize(1)
def very_very_very_complex_stuff(a, b):
    M = 10 ** 7
    for _ in range(10**6):
        a += b
        a %= M
    return a

print(very_very_very_complex_stuff(2, 2))
print(very_very_very_complex_stuff(2, 2))
print(cache)

time.sleep(2)
print(very_very_very_complex_stuff(2, 2))


In [None]:
# プロキシ
class User:
    def __init__(self, roles):
        self.roles = roles

class Unauthorized(Exception):
    pass

def protect(role):
    def _protect(function):
        def __protect(*args, **kwargs):
            user = globals().get('user')
            if user is None or role not in user.roles:
                raise Unauthorized('権限がありません')
            return function(*args, **kwargs)
        return __protect
    return _protect


tarek = User({'admin', 'user'})
bill = User({'user'})

class RecipeVault:
    @protect('admin')
    def get_waffle_recipe(self):
        print('ワッフルのレシピ')

my_vault = RecipeVault()
user = tarek
my_vault.get_waffle_recipe()

user = bill
my_vault.get_waffle_recipe()

In [None]:
# コンテキストプロバイダ
from threading import RLock

lock = RLock()

# 複数のスレッドからアクセスされてもデータが保護されていることを保証する
def synchronized(function):
    def _synchronized(*args, **kwargs):
        lock.acquire()
        try:
            return function(*args, **kwargs)
        finally:
            lock.release()
    return _synchronized

# リソースがロックされることを保証する
@synchronized
def thread_safe():
    pass

In [None]:
# クラスにメソッドを追加する
class MyClass:
    """my docstring
    """
    pass

my = MyClass()

def add_to(instance):
    def _addto(f):
        import types
        f = types.MethodType(f, instance)
        setattr(instance, f.__name__, f)
        return f
    return _addto

@add_to(my)
def print_docstring(self):
    print(self.__doc__)

my.print_docstring()

In [None]:
# デコレータにて関数の引数情報を出力
def print_args(function):
    def _print_args(*args, **kwargs):
        print(function.__name__, args, kwargs)
        return function(*args, **kwargs)
    return _print_args

@print_args
def my_function(a, b, c):
    print(a + b, c * 2)

my_function(1, 2, c="key")

In [1]:
# ハンドラ登録を行う
def onexit(function):
    # プログラムが正常終了する時に実行する
    import atexit
    atexit.register(function)
    return function

@onexit
def post_function():
    print('post process')

print('do process')
# exit()


do process
