# 元编程

## 在函数上添加包装器 

In [1]:
import time
from functools import wraps

def timethis(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(func.__name__, end-start)
        return result
    return wrapper

@timethis
def countdown(n: int):
    '''
    Counts down
    '''
    while n > 0:
        n -= 1

countdown(1000000)
countdown(10000000)

countdown 0.23600029945373535
countdown 1.5932979583740234


## 创建装饰器时保留函数元信息

In [2]:
## 定义装饰器时，都应该使用functools库中的@wraps装饰器来注解底层包装函数，此时会保留函数元信息

print(countdown.__name__)
print(countdown.__doc__)
print(countdown.__annotations__)

countdown

    Counts down
    
{'n': <class 'int'>}


In [3]:
# @wraps 有一个重要特征是它能通过属性 __wrapped__ 直接访问被包装函数。
countdown.__wrapped__(10000)

In [4]:
## __wrapped__ 属性还能让被装饰函数正确暴露底层的参数签名信息。
from inspect import signature

print(signature(countdown))

(n: int)


## 解除一个装饰器

In [5]:
# 通过属性 __wrapped__ 直接访问被包装函数
countdown.__wrapped__(10000)

## 定义一个带参数的装饰器

In [6]:
from functools import wraps
import logging

def logged(level, name=None, message=None):
    def decorate(func):
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__

        @wraps(func)
        def wrapper(*args, **kwargs):
            log.log(level, logmsg)
            return func(*args, **kwargs)
        return wrapper
    return decorate

@logged(logging.WARNING)
def add(x, y):
    return x + y

@logged(logging.CRITICAL, message='example')
def spam():
    print('Spam!')


In [7]:
add(2, 3)

add


5

In [8]:
spam()

example
Spam!


## 可自定义属性的装饰器

In [9]:
from functools import partial, wraps
import logging

def attach_wrapper(obj, func=None):
    if func is None:
        return partial(attach_wrapper, obj)
    setattr(obj, func.__name__, func)
    return func

def logged(level, name=None, message=None):
    def decorate(func):
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            log.log(level, logmsg)
            return func(*args, **kwargs)

        @attach_wrapper(wrapper)
        def set_level(newlevel):
            nonlocal level
            level = newlevel

        @attach_wrapper(wrapper)
        def set_message(newmsg):
            nonlocal logmsg
            logmsg = newmsg

        return wrapper
    return decorate

@logged(logging.WARNING)
def add(x, y):
    return x + y

@logged(logging.CRITICAL, 'example')
def spam():
    print('Spam!')

In [10]:
spam.set_level(logging.WARNING)
spam.set_message('Spam!!!!!!')
spam()

Spam!!!!!!
Spam!


## 带可选参数的装饰器

In [11]:
from functools import wraps, partial
import logging

def logged(func=None, *, level=logging.DEBUG, name=None, message=None):
    if func is None:
        return partial(logged, level=level, name=name, message=message)
    logname = name if name else func.__module__
    log = logging.getLogger(logname)
    logmsg = message if message else func.__name__

    @wraps(func)
    def wrapper(*args, **kwargs):
        log.log(level, logmsg)
        return func(*args, **kwargs)
    return wrapper

@logged
def add(x, y):
    return x + y

@logged(level=logging.CRITICAL, name='example')
def spam():
    print('Spam!')

## 利用装饰器强制函数上的类型检查

In [12]:
from inspect import signature
from functools import wraps

def typeassert(*ty_args, **ty_kwargs):
    def decorate(func):
        if not __debug__:
            return func
        sig = signature(func)
        bound_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments

        @wraps(func)
        def wrapper(*args, **kwargs):
            bound_values = sig.bind(*args, **kwargs)
            for name, value in bound_values.arguments.items():
                if name in bound_types:
                    if not isinstance(value, bound_types[name]):
                        raise TypeError('Arguement {} must be {}'.format(name, bound_types[name]))
            return func(*args, **kwargs)
        return wrapper
    return decorate


@typeassert(int, int)
def add(x, y):
    return x + y

add(2, '3')

TypeError: Arguement y must be <class 'int'>

## 将装饰器定义为类的一部分

In [13]:
from functools import wraps

class A:
    def decorator1(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print('Decorator 1')
            return func(*args, **kwargs)
        return wrapper

    @classmethod
    def decorator2(cls, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            print('Decorator 2')
            return func(*args, **kwargs)
        return wrapper

a = A()
@a.decorator1
def spam():
    pass

@A.decorator2
def grok():
    pass

spam()
grok()

Decorator 1
Decorator 2


 ## 将装饰器定义为类

In [14]:
import types
from functools import wraps

class Profiled:
    def __init__(self, func):
        wraps(func)(self)
        self.ncalls = 0

    def __call__(self, *args, **kwargs):
        self.ncalls += 1
        return self.__wrapped__(*args, **kwargs)

    def __get__(self, instance, cls):
        if instance is None:
            return self
        else:
            return types.MethodType(self, instance)

In [15]:
@Profiled
def add(x, y):
    return x + y

add(1, 2)
add(2, 3)
add(3, 4)
print('add calls {} times'.format(add.ncalls))

add calls 3 times


## 为类和静态方法提供装饰器

In [16]:
import time
from functools import wraps

def timethis(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        r = func(*args, **kwargs)
        end = time.time()
        print(end-start)
        return r
    return wrapper

class Spam:
    @timethis
    def instance_method(self, n):
        print(self, n)
        while n > 0:
            n -= 1
    
    @classmethod
    @timethis
    def class_method(cls, n):
        print(cls, n)
        while n > 0:
            n -= 1

    @staticmethod
    @timethis
    def static_method(n):
        print(n)
        while n > 0:
            n -= 1

In [17]:
s = Spam()
s.instance_method(1000000)
Spam.class_method(1000000)
Spam.static_method(1000000)

<__main__.Spam object at 0x000001F24E257D88> 1000000
0.1230003833770752
<class '__main__.Spam'> 1000000
0.12700152397155762
1000000
0.13099932670593262


## 装饰器为被包装函数增加参数

In [18]:
from functools import wraps

def optional_debug(func):
    @wraps(func)
    def wrapper(*args, debug=False, **kwargs):
        if debug:
            print('Calling', func.__name__)
        return func(*args, **kwargs)
    return wrapper

@optional_debug
def spam(a, b, c):
    print(a, b, c)

spam(1, 2, 3)

spam(1, 2, 3, debug=True)

1 2 3
Calling spam
1 2 3


## 使用装饰器扩充类的功能

In [19]:
def log_getattribute(cls):
    orig_getattribute = cls.__getattribute__

    def new_getattribute(self, name):
        print('getting:', name)
        return orig_getattribute(self, name)
    cls.__getattribute__ = new_getattribute
    return cls

@log_getattribute
class A:
    def __init__(self, x):
        self.x = x
    def spam(self):
        pass

In [20]:
a = A(42)
a.x

getting: x


42

In [21]:
a.spam()

getting: spam


## 使用元类控制实例的创建

In [22]:
class NoInstance(type):
    def __call__(self, *args, **kwargs):
        raise TypeError("Can't instantiate directly")

class Spam(metaclass=NoInstance):
    @staticmethod
    def grok(x):
        print('Spam.grok')

In [23]:
Spam.grok(42)

Spam.grok


In [24]:
s = Spam()

TypeError: Can't instantiate directly

In [25]:
class Singletion(type):
    def __init__(self, *args, **kwargs):
        self.__instance = None
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        if self.__instance is None:
            self.__instance = super().__call__(*args, **kwargs)
            return self.__instance
        else:
            return self.__instance

class Spam(metaclass=Singletion):
    def __init__(self):
        print('Creating Spam')

In [26]:
a = Spam()
b = Spam()
print(a is b)

Creating Spam
True


In [27]:
import weakref

class Cached(type):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__cache = weakref.WeakValueDictionary()

    def __call__(self, *args):
        if args in self.__cache:
            return self.__cache[args]
        else:
            obj = super().__call__(*args)
            self.__cache[args] = obj
            return obj

class Spam(metaclass=Cached):
    def __init__(self, name):
        print('Creating Spam({!r})'.format(name))
        self.name = name

In [28]:
a = Spam('Guido')
b = Spam('Diana')
c = Spam('Guido')
print(a is b)
print(a is c)

Creating Spam('Guido')
Creating Spam('Diana')
False
True


## 捕获类的属性定义顺序

In [29]:
from collections import OrderedDict

class Typed:
    _expected_type = type(None)

    def __init__(self, name=None):
        self._name = name

    def __set__(self, instance, value):
        if not isinstance(value, self._expected_type):
            raise TypeError('Expected ' + str(self._expected_type))
        instance.__dict__[self._name] = value

class Integer(Typed):
    _expected_type = int

class Float(Typed):
    _expected_type = float

class String(Typed):
    _expected_type = str

class OrderedMeta(type):
    def __new__(cls, clsname, bases, clsdict):
        d = dict(clsdict)
        order = []
        for name, value in clsdict.items():
            if isinstance(value, Typed):
                value._name = name
                order.append(name)
        d['_order'] = order
        return type.__new__(cls, clsname, bases, d)

    @classmethod
    def __prepare__(cls, clsname, bases):
        return OrderedDict()

class Structure(metaclass=OrderedMeta):
    def as_csv(self):
        return ','.join(str(getattr(self,name)) for name in self._order)

class Stock(Structure):
    name = String()
    shares = Integer()
    price = Float()
    def __init__(self, name, shares, price):
        self.name = name
        self.shares = shares
        self.price = price

In [30]:
s = Stock('GOOG', 100, 490.1)
print(s.name)
print(s.as_csv())

GOOG
GOOG,100,490.1


## 定义有可选参数的元类

In [31]:
class MyMeta(type):
    @classmethod
    def __prepare__(cls, name, bases, *, debug=False, synchronize=False):
        pass
        return super().__prepare__(name, bases)

    def __new__(cls, name, bases, ns, *, debug=False, synchronize=False):
        pass
        return super().__new__(cls, name, bases, ns)

    def __init__(self, name, bases, ns, *, debug=False, synchronize=False):
        pass
        super().__init__(name, bases, ns)

class Spam(metaclass=MyMeta, debug=True, synchronize=True):
    pass

## *args和**kwargs的强制参数签名

In [32]:
from inspect import Signature, Parameter

parms = [Parameter('x', Parameter.POSITIONAL_OR_KEYWORD),
        Parameter('y', Parameter.POSITIONAL_OR_KEYWORD, default=42),
        Parameter('z', Parameter.KEYWORD_ONLY, default=None)]

sig = Signature(parms)
print(sig)

(x, y=42, *, z=None)


In [33]:
def func(*args, **kwargs):
    bound_values = sig.bind(*args, **kwargs)
    for name, value in bound_values.arguments.items():
        print(name, value)

func(1, 2, z=3)

x 1
y 2
z 3


In [34]:
from inspect import Signature, Parameter

def make_sig(*names):
    parms = [Parameter(name, Parameter.POSITIONAL_OR_KEYWORD) for name in names]
    return Signature(parms)

class Structure:
    __signature__ = make_sig()

    def __init(self, *args, **kwargs):
        bound_values = self.__signature__.bind(*args, **kwargs)
        for name, value in bound_values.arguments.items():
            setattr(self, name, value)

class Stock(Structure):
    __signature__ = make_sig('name', 'shares', 'price')

class Point(Structure):
    __signature__ = make_sig('x', 'y')

In [35]:
import inspect
print(inspect.signature(Stock))
print(inspect.signature(Point))

(name, shares, price)
(x, y)


## 在类上强制使用编程规约

In [36]:
class MyMeta(type):
    def __new__(self, clsname, bases, clsdict):
        return super().__new__(cls, clsname, bases, clsdict)


# 另一种是，定义 __init__() 方法
# class MyMeta(type):
#     def __init__(self, clsname, bases, clsdict):
#         super().__init__(clsname, bases, clsdict)

In [37]:
class NoMixedCaseMeta(type):
    def __new__(cls, clsname, bases, clsdict):
        for name in clsdict:
            if name.lower() != name:
                raise TypeError('Bad attribute name: ' + name)
        return super().__new__(cls, clsname, bases, clsdict)

class Root(metaclass=NoMixedCaseMeta):
    pass

class A(Root):
    def foo_bar(self): # Ok
        pass

class B(Root):
    def fooBar(self): # TypeError
        pass

TypeError: Bad attribute name: fooBar

In [38]:
from inspect import signature
import logging

class MatchSignaturesMeta(type):
    def __init__(self, clsname, bases, clsdict):
        super().__init__(clsname, bases, clsdict)
        sup = super(self, self)
        for name, value in clsdict.items():
            if name.startswith('_') or not callable(value):
                continue
            # Get the previous definition (if any) and compare the signatures
            prev_dfn = getattr(sup,name,None)
            if prev_dfn:
                prev_sig = signature(prev_dfn)
                val_sig = signature(value)
                if prev_sig != val_sig:
                    logging.warning('Signature mismatch in %s. %s != %s',
                                    value.__qualname__, prev_sig, val_sig)

In [39]:
class Root(metaclass=MatchSignaturesMeta):
    pass

class A(Root):
    def foo(self, x, y):
        pass
    def spam(self, x, *, z):
        pass

# Class with redefined methods, but slightly different signatures
class B(A):
    def foo(self, a, b):
        pass
    def spam(self,x,z):
        pass



## 在定义的时候初始化类的成员

In [40]:
import operator

class StructTupleMeta(type):
    def __init__(cls, *args, **kwargs):
        super().__init__(*args, **kwargs)
        for n, name in enumerate(cls._fields):
            setattr(cls, name, property(operator.itemgetter(n)))
    
class StructTuple(tuple, metaclass=StructTupleMeta):
    _fields = []
    def __new__(cls, *args):
        if len(args) != len(cls._fields):
            raise ValueError('{} arguments required'.format(len(cls._fields)))
        return super().__new__(cls, args)


class Stock(StructTuple):
    _fields = ['name', 'shares', 'price']

class Point(StructTuple):
    _fields = ['x', 'y']

In [41]:
s = Stock('ACME', 50, 91.1)
print(s)

('ACME', 50, 91.1)


In [42]:
s.shares = 23

AttributeError: can't set attribute

## 利用函数注解实现方法重载

In [43]:
import inspect
import types

class MultiMethod:
    '''
    Represents a single multimethod.
    '''
    def __init__(self, name):
        self._methods = {}
        self.__name__ = name
    
    def register(self, meth):
        '''
        Register a new method as a multimethod
        '''
        sig = inspect.signature(meth)
        # Build a type signature from the method's annotations
        types = []
        for name, parm in sig.parameters.items():
            if name == 'self':
                continue
            if parm.annotation is inspect.Parameter.empty:
                raise TypeError('Argument {} must be annotated with a type'.format(name))
            if not isinstance(parm.annotation, type):
                raise TypeError('Argument {} annotation must be a type'.format(name))
            if parm.default is not inspect.Parameter.empty:
                self._methods[tuple(types)] = meth
            types.append(parm.annotation)
        self._methods[tuple(types)] = meth

    def __call__(self, *args):
        '''
        Call a method based on type signature of the arguments
        '''
        types = tuple(type(arg) for arg in args[1:])
        meth = self._methods.get(types, None)
        if meth:
            return meth(*args)
        else:
            raise TypeError('No matching method for types {}'.format(types))

    def __get__(self, instance, cls):
        '''
        Descriptor method needed to make calls work in a class
        '''
        if instance is not None:
            return types.MethodType(self, instance)
        else:
            return self

class MultiDict(dict):
    '''
    Special dictionary to build multimethods in a metaclass
    '''
    def __setitem__(self, key, value):
        if key in self:
            # If key already exists, it must be a multimethod or callable
            current_value = self[key]
            if isinstance(current_value, MultiMethod):
                current_value.register(value)
            else:
                mvalue = MultiMethod(key)
                mvalue.register(current_value)
                mvalue.register(value)
                super().__setitem__(key, mvalue)
        else:
            super().__setitem__(key, value)

class MultipleMeta(type):
    '''
    Metaclass that allows multiple dispatch of methods
    '''
    def __new__(cls, clsname, bases, clsdict):
        return type.__new__(cls, clsname, bases, dict(clsdict))

    @classmethod
    def __prepare__(cls, clsname, bases):
        return MultiDict()

In [44]:
class Spam(metaclass=MultipleMeta):
    def bar(self, x:int, y:int):
        print('Bar 1:', x, y)

    def bar(self, s:str, n:int = 0):
        print('Bar 2:', s, n)

In [45]:
s = Spam()
s.bar(2, 3)
s.bar('hello')

Bar 1: 2 3
Bar 2: hello 0


## 避免重复的属性方法

In [46]:
def typed_property(name, expected_type):
    storage_name = '_' + name
    @property
    def prop(self):
        return getattr(self, storage_name)

    @prop.setter
    def prop(self, value):
        if not isinstance(value, expected_type):
            raise TypeError('{} must be a {}'.format(name, expected_type))
        setattr(self, storage_name, value)
        
    return prop

In [47]:
class Person:
    name = typed_property('name', str)
    age = typed_property('age', int)
    def __init__(self, name, age):
        self.name = name
        self.age = age

In [48]:
p = Person('Tom', '12')

TypeError: age must be a <class 'int'>

## 定义上下文管理器的简单方法

In [49]:
import time
from contextlib import contextmanager

@contextmanager
def timethis(label):
    start = time.time()
    try:
        yield
    finally:
        end = time.time()
    print('{}: {}'.format(label, end - start))


with timethis('counting'):
    n = 10000000
    while n > 0:
        n -= 1

counting: 1.865997314453125


In [50]:
@contextmanager
def list_transaction(orig_list):
    working = list(orig_list)
    yield working
    orig_list[:] = working

In [51]:
items = [1, 2, 3]

with list_transaction(items) as working:
    working.append(4)
    working.append(5)

print(items)

[1, 2, 3, 4, 5]


## 在局部变量域中执行代码

In [52]:
# 全局变量域
a = 13
exec('b = a + 1')
print(b)

14


In [53]:
# 局部变量域
def test():
    a = 13
    loc = locals()
    exec('b = a + 1')
    b = loc['b']
    print(b)

y=test()

14


##  解析与分析 Python 源码

In [54]:
import ast

class CodeAnalyzer(ast.NodeVisitor):
    def __init__(self):
        self.loaded = set()
        self.stored = set()
        self.deleted = set()

    def visit_Name(self, node):
        if isinstance(node.ctx, ast.Load):
            self.loaded.add(node.id)
        elif isinstance(node.ctx, ast.Store):
            self.stored.add(node.id)
        elif isinstance(node.ctx, ast.Del):
            self.deleted.add(node.id)

code = '''
for i in range(10):
    print(i)
del i
'''

top = ast.parse(code, mode='exec')

c = CodeAnalyzer()
c.visit(top)

print('Loaded:', c.loaded)
print('Stored:', c.stored)
print('Deleted:', c.deleted)

Loaded: {'i', 'range', 'print'}
Stored: {'i'}
Deleted: {'i'}


In [55]:
exec(compile(top, '<stdin>', 'exec'))

0
1
2
3
4
5
6
7
8
9


## 拆解 Python 字节码

In [56]:
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
    print('Blastoff!')

import dis
dis.dis(countdown)

2           0 SETUP_LOOP              30 (to 32)
        >>    2 LOAD_FAST                0 (n)
              4 LOAD_CONST               1 (0)
              6 COMPARE_OP               4 (>)
              8 POP_JUMP_IF_FALSE       30

  3          10 LOAD_GLOBAL              0 (print)
             12 LOAD_CONST               2 ('T-minus')
             14 LOAD_FAST                0 (n)
             16 CALL_FUNCTION            2
             18 POP_TOP

  4          20 LOAD_FAST                0 (n)
             22 LOAD_CONST               3 (1)
             24 INPLACE_SUBTRACT
             26 STORE_FAST               0 (n)
             28 JUMP_ABSOLUTE            2
        >>   30 POP_BLOCK

  5     >>   32 LOAD_GLOBAL              0 (print)
             34 LOAD_CONST               4 ('Blastoff!')
             36 CALL_FUNCTION            1
             38 POP_TOP
             40 LOAD_CONST               0 (None)
             42 RETURN_VALUE
