# 1. 理解 Python 函数

In [1]:
def foo():
    return "I am Le0v1n"

print(f"foo(): {foo()}")


fn = foo  # 这里 foo 后面没有小括号，不是函数调用，而是将 foo 函数赋值给变量 fn
print(f"fn(): {fn()}")

foo(): I am Le0v1n
fn(): I am Le0v1n


In [2]:
def foo():
    print("foo 函数正在运行...")
    
    # 定义函数中的函数
    def bar():
        return "foo.bar 函数正在运行..."
    
    def bam():
        return "foo.bam 函数正在运行..."
        
    # 调用函数中的函数
    print(bar())
    print(bam())
    print("foo 函数即将结束!")
    

if __name__ == "__main__":
    foo()
    
    # 如果我们调用函数中的函数
    try:
        bar()
    except Exception as e:
        print(f"报错啦: {e}")
        
    try:
        bam()
    except Exception as e:
        print(f"报错啦: {e}")

foo 函数正在运行...
foo.bar 函数正在运行...
foo.bam 函数正在运行...
foo 函数即将结束!
报错啦: name 'bar' is not defined
报错啦: name 'bam' is not defined


In [3]:
def foo(choice='bar'):
    print("foo 函数正在运行...")

    # 定义函数中的函数
    def bar():
        return "foo.bar 函数正在运行..."

    def bam():
        return "foo.bam 函数正在运行..."

    print("foo 函数即将结束!")

    if choice == 'bar':
        return bar
    elif choice == 'bam':
        return bam
    else:
        raise NotImplementedError("choice 必须是 bar 或 bam !")


if __name__ == "__main__":
    fn1 = foo(choice='bar')
    fn2 = foo(choice='bam')
    print(fn1)
    print(fn2)
    print(fn1())
    print(fn2())


foo 函数正在运行...
foo 函数即将结束!
foo 函数正在运行...
foo 函数即将结束!
<function foo.<locals>.bar at 0x0000016BED4D25E0>
<function foo.<locals>.bam at 0x0000016BED4D2820>
foo.bar 函数正在运行...
foo.bam 函数正在运行...


In [4]:
def foo():
    return "I am foo"

def bar(fn):
    print("I am bar")
    print(fn())
    
    
if __name__ == "__main__":
    bar(foo)
    print()
    
    try:
        bar(foo())
    except Exception as e:
        print(f"报错啦: {e}")

I am bar
I am foo

I am bar
报错啦: 'str' object is not callable


In [5]:
def decorator(fn):
    def wrapper():
        print("---------- 函数调用前 ----------")
        fn()  # 调用函数
        print("---------- 函数调用后 ----------")
    return wrapper


def foo():
    print("I am foo!")
    
    
if __name__ == "__main__":
    # 直接调用函数
    print("直接调用函数: ", end="")
    foo()
    print()

    # 调用装饰器包装后函数
    fn = decorator(foo)  # 将foo函数用装饰器包装 -> fn
    print("调用包装后的foo函数: ")
    fn()  # 调用包装后的foo函数
    print()

直接调用函数: I am foo!

调用包装后的foo函数: 
---------- 函数调用前 ----------
I am foo!
---------- 函数调用后 ----------



# 2. 理解Python装饰器

In [6]:
def decorator(fn):
    def wrapper():
        print("---------- 函数调用前 ----------")
        fn()
        print("---------- 函数调用后 ----------")

    return wrapper
    

@decorator  # @装饰器名称
def foo():
    print("I am foo")
    
    
if __name__ == "__main__":
    # 直接调用函数
    print("直接调用函数: ")
    foo()
    print()

    print(f"函数的名称: {foo.__name__}")

直接调用函数: 
---------- 函数调用前 ----------
I am foo
---------- 函数调用后 ----------

函数的名称: wrapper


In [7]:
from functools import wraps


def decorator(fn):
    @wraps(fn)
    def wrapper():
        print("---------- 函数调用前 ----------")
        fn()
        print("---------- 函数调用后 ----------")
    return wrapper


@decorator
def foo():
    print("I am foo")


if __name__ == "__main__":
    print("直接调用函数: ")
    foo()
    print()

    print(f"函数的名称: {foo.__name__}")

直接调用函数: 
---------- 函数调用前 ----------
I am foo
---------- 函数调用后 ----------

函数的名称: foo


In [8]:
from functools import wraps


class Decorate:
    def __init__(self, fn) -> None:
        self.fn = fn

    def __call__(self):
        @wraps(self.fn)
        def wrapper(*args, **kwargs):
            print("---------- 函数调用前 ----------")
            self.fn(*args, **kwargs)
            print("---------- 函数调用后 ----------")
        return wrapper


@Decorate  # 用类来装饰函数，那么函数也变为了类
def foo(param1, param2):
    print(f"I am foo. \n"
          f"My parameters are: \n"
          f"param1: {param1} | param2: {param2}")


if __name__ == "__main__":
    # 实例化类对象
    obj = foo()

    # 调用对象的方法
    obj("参数1", "参数2")


---------- 函数调用前 ----------
I am foo. 
My parameters are: 
param1: 参数1 | param2: 参数2
---------- 函数调用后 ----------


# 3. Python 注册器 Registry

In [9]:
def foo():
    ...


def fn(x): return x**2


class ExampleClass:
    ...


if __name__ == "__main__":
    # 创建注册字典
    register_obj = dict()

    # 开始为函数和类进行注册
    register_obj[foo.__name__] = foo
    register_obj[fn.__name__] = fn
    register_obj[ExampleClass.__name__] = ExampleClass

    print(register_obj)


{'foo': <function foo at 0x0000016BED4D2430>, 'fn': <function fn at 0x0000016BED4D2C10>, 'ExampleClass': <class '__main__.ExampleClass'>}


In [10]:
class Register(dict):
    def __init__(self, *args, **kwargs):
        super(Register, self).__init__(*args, **kwargs)
        self._dict = dict()  # 创建一个字典用于保存注册的可调用对象

    def register(self, target):
        def add_item(key, value):
            if key in self._dict:  # 如果 key 已经存在
                print(f"\033[34m"
                      f"WARNING: {value.__name__} 已经存在!"
                      f"\033[0m")

            # 进行注册，将 key 和 value 添加到字典中
            self[key] = value
            return value

        # 传入的 target 可调用 --> 没有给注册名 --> 传入的函数名或类名作为注册名
        if callable(target):  # key 为函数/类的名称; value 为函数/类本体
            return add_item(key=target.__name__, value=target)
        else:  # 传入的 target 不可调用 --> 抛出异常
            raise TypeError("\033[31mOnly support callable object, e.g. function or class\033[0m")

    def __setitem__(self, key, value):  # 将键值对添加到 _dict 字典中
        self._dict[key] = value

    def __getitem__(self, key):  # 从 _dict 字典中获取注册的可调用对象
        return self._dict[key]

    def __contains__(self, key):  # 检查给定的注册名是否存在于 _dict 字典中
        return key in self._dict

    def __str__(self):  # 返回 _dict 字典的字符串表示
        return str(self._dict)

    def keys(self):  # 返回 _dict 字典中的所有键
        return self._dict.keys()

    def values(self):  # 返回 _dict 字典中的所有值
        return self._dict.values()

    def items(self):  # 返回 _dict 字典中的所有键值对
        return self._dict.items()


if __name__ == "__main__":
    register_obj = Register()
    
    @register_obj.register
    def fn1_add(a, b):
        return a + b
    
    @register_obj.register
    def fn2_subject(a, b):
        return a - b
    
    @register_obj.register
    def fn3_multiply(a, b):
        return a * b
    
    @register_obj.register
    def fn4_divide(a, b):
        return a / b
    
    # 我们再重复定义一个函数
    @register_obj.register
    def fn2_subject(a, b):
        return b - a
    
    # 尝试使用 register 方法注册不可调用的对象
    try:
        register_obj.register("传入字符串，它是不可调用的")
    except Exception as e:
        print(f"报错啦: {e}")

    print("\n所有函数均已注册!\n")
    
    # 我们查看一个注册器中有哪些元素
    print(f"\033[34mkey\t\tvalue\033[0m")
    for k, v in register_obj.items():  # <=> for k, v in register_obj._dict.items()
        print(f"{k}: \t{v}")

报错啦: [31mOnly support callable object, e.g. function or class[0m

所有函数均已注册!

[34mkey		value[0m
fn1_add: 	<function fn1_add at 0x0000016BED4D2B80>
fn2_subject: 	<function fn2_subject at 0x0000016BED5233A0>
fn3_multiply: 	<function fn3_multiply at 0x0000016BED523280>
fn4_divide: 	<function fn4_divide at 0x0000016BED523310>


In [11]:
class Register(dict):
    def __init__(self, *args, **kwargs):
        super(Register, self).__init__(*args, **kwargs)
        self._dict = dict()  # 创建一个字典用于保存注册的可调用对象

    def register(self, target):
        def add_item(key, value):
            if key in self._dict:  # 如果 key 已经存在
                print(f"\033[34m"
                      f"WARNING: {value.__name__} 已经存在!"
                      f"\033[0m")

            # 进行注册，将 key 和 value 添加到字典中
            self[key] = value
            return value

        # 传入的 target 可调用 --> 没有给注册名 --> 传入的函数名或类名作为注册名
        if callable(target):  # key 为函数/类的名称; value 为函数/类本体
            return add_item(key=target.__name__, value=target)
        else:  # 传入的 target 不可调用 --> 抛出异常
            raise TypeError("\033[31mOnly support callable object, e.g. function or class\033[0m")
        
    def __call__(self, target):
        return self.register(target)

    def __setitem__(self, key, value):  # 将键值对添加到 _dict 字典中
        self._dict[key] = value

    def __getitem__(self, key):  # 从 _dict 字典中获取注册的可调用对象
        return self._dict[key]

    def __contains__(self, key):  # 检查给定的注册名是否存在于 _dict 字典中
        return key in self._dict

    def __str__(self):  # 返回 _dict 字典的字符串表示
        return str(self._dict)

    def keys(self):  # 返回 _dict 字典中的所有键
        return self._dict.keys()

    def values(self):  # 返回 _dict 字典中的所有值
        return self._dict.values()

    def items(self):  # 返回 _dict 字典中的所有键值对
        return self._dict.items()


if __name__ == "__main__":
    register_obj = Register()
    
    @register_obj  # 不用再 register_obj.register 了
    def fn1_add(a, b):
        return a + b
    
    @register_obj  # 不用再 register_obj.register 了
    def fn2_subject(a, b):
        return a - b
    
    @register_obj  # 不用再 register_obj.register 了
    def fn3_multiply(a, b):
        return a * b
    
    @register_obj  # 不用再 register_obj.register 了
    def fn4_divide(a, b):
        return a / b
    
    # 我们再重复定义一个函数
    @register_obj  # 不用再 register_obj.register 了
    def fn2_subject(a, b):
        return b - a
    
    # 尝试使用 register 方法注册不可调用的对象
    try:
        register_obj("传入字符串，它是不可调用的")
    except Exception as e:
        print(f"报错啦: {e}")

    print("\n所有函数均已注册!\n")
    
    # 我们查看一个注册器中有哪些元素
    print(f"\033[34mkey\t\tvalue\033[0m")
    for k, v in register_obj.items():  # <=> for k, v in register_obj._dict.items()
        print(f"{k}: \t{v}")

报错啦: [31mOnly support callable object, e.g. function or class[0m

所有函数均已注册!

[34mkey		value[0m
fn1_add: 	<function fn1_add at 0x0000016BED5231F0>
fn2_subject: 	<function fn2_subject at 0x0000016BED523430>
fn3_multiply: 	<function fn3_multiply at 0x0000016BED5233A0>
fn4_divide: 	<function fn4_divide at 0x0000016BED523280>


# 4. Python 注册器在深度学习中的应用

In [28]:
import torch.nn as nn

# 实现一个注册器
class LayerRegistry:
    def __init__(self):
        self.layers = dict()

    def register(self, layer_name):
        # 让装饰器接受 layer 参数
        def decorator(layer):
            # 开始注册
            self.layers[layer_name] = layer
            return layer  # 返回注册的层
        return decorator

    def get_layer(self, layer_name):
        if layer_name in self.layers:
            return self.layers[layer_name]
        else:
            raise KeyError(f"未注册的层 '{layer_name}'.")

# 实例化自定义层注册器
layer_register = LayerRegistry()

# 自定义层类
@layer_register.register("ConvBNReLU")
class ConvBNReLU(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, *args, **kwargs):
        super(ConvBNReLU, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, *args, **kwargs),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        return self.layers(x)

if __name__ == "__main__":
    # 在创建层的使用可以使用注册器中的层
    example_layer = layer_register.get_layer("ConvBNReLU")
    
    # 创建具体的层实例
    specific_layer = example_layer(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
    
    # 打印具体层的信息
    print(specific_layer)



ConvBNReLU(
  (layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
)


In [40]:
import torch.nn as nn


class LayerRegistry:  # 实现一个注册器
    def __init__(self):
        self.layers = dict()

    def register(self, layer_name):
        # 让装饰器接受 layer 参数
        def decorator(layer):
            # 开始注册
            self.layers[layer_name] = layer
            return layer  # 返回注册的层
        return decorator

    def get_layer(self, layer_name):
        if layer_name in self.layers:
            return self.layers[layer_name]
        else:
            raise KeyError(f"未注册的层 '{layer_name}'.")


# 实例化自定义层注册器
layer_register = LayerRegistry()


@layer_register.register("ConvBNReLU")
class ConvBNReLU(nn.Module):  # 自定义层类
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(ConvBNReLU, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        return self.layers(x)


# 继续注册其他模块
@layer_register.register("BatchNorm2d")
class BatchNorm2d(nn.Module):
    def __init__(self, num_features, *args, **kwargs):
        super(BatchNorm2d, self).__init__()
        self.bn = nn.BatchNorm2d(num_features, *args, **kwargs)

    def forward(self, x):
        return self.bn(x)


@layer_register.register("ReLU")
class ReLU(nn.Module):
    def __init__(self, *args, **kwargs):
        super(ReLU, self).__init__()
        self.relu = nn.ReLU(*args, **kwargs)

    def forward(self, x):
        return self.relu(x)


@layer_register.register("MaxPooling")
class MaxPooling(nn.Module):
    def __init__(self, kernel_size, stride=1, padding=0):
        super(MaxPooling, self).__init__()
        self.maxpool = nn.MaxPool2d(kernel_size, stride=stride, padding=padding)

    def forward(self, x):
        return self.maxpool(x)


@layer_register.register("AvgPooling")
class AvgPooling(nn.Module):
    def __init__(self, kernel_size, stride=1, padding=0):
        super(AvgPooling, self).__init__()
        self.avgpool = nn.AvgPool2d(kernel_size, stride=stride, padding=padding)

    def forward(self, x):
        return self.avgpool(x)


# 定义网络配置(cfg)来构建完整的网络
cfg = [
    ('ConvBNReLU', 3, 64, 3, 1),  # 传递4个参数
    ('MaxPooling', 2, 2, 0),
    ('ConvBNReLU', 64, 128, 3, 1),
    ('MaxPooling', 2, 2, 0),
    ('ConvBNReLU', 128, 256, 3, 1),
    ('AvgPooling', 4, 1, 0),
]


# 构建网络
class CustomNet(nn.Module):
    def __init__(self, cfg):
        super(CustomNet, self).__init__()
        self.layers = nn.ModuleList()
        in_channels = 3  # 输入通道数

        for layer_cfg in cfg:
            layer_name, *layer_params = layer_cfg
            layer = layer_register.get_layer(layer_name)

            if layer_name in ['ConvBNReLU', 'BatchNorm2d']:
                self.layers.append(layer(in_channels, *layer_params))
                in_channels = layer_params[1]
            else:
                self.layers.append(layer(*layer_params))



# 创建完整的网络实例
custom_net = CustomNet(cfg)

# 打印网络结构
print(custom_net)


CustomNet(
  (layers): ModuleList(
    (0): ConvBNReLU(
      (layers): Sequential(
        (0): Conv2d(3, 3, kernel_size=(64, 64), stride=(3, 3), padding=(1, 1))
        (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU()
      )
    )
    (1): MaxPooling(
      (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (2): ConvBNReLU(
      (layers): Sequential(
        (0): Conv2d(64, 64, kernel_size=(128, 128), stride=(3, 3), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU()
      )
    )
    (3): MaxPooling(
      (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (4): ConvBNReLU(
      (layers): Sequential(
        (0): Conv2d(128, 128, kernel_size=(256, 256), stride=(3, 3), padding=(1, 1))
        (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_runnin

In [46]:
class LossRegistry:
    def __init__(self):
        self.losses = dict()

    def register(self, loss_name):
        def decorator(loss_fn):
            self.losses[loss_name] = loss_fn
            return loss_fn
        return decorator

    def get_loss(self, loss_name):
        if loss_name in self.losses:
            return self.losses[loss_name]
        else:
            raise KeyError(f"未注册的损失函数 '{loss_name}'.")

# 实例化自定义损失函数注册器
loss_register = LossRegistry()


@loss_register.register("MSE")
class MeanSquaredErrorLoss(nn.Module):
    def forward(self, input, target):
        return nn.functional.mse_loss(input, target)

@loss_register.register("CE")
class CrossEntropyLoss(nn.Module):
    def forward(self, input, target):
        return nn.functional.cross_entropy(input, target)


loss_config = [
    ('MSE', None),  # 使用默认参数
    ('CE', None),   # 使用默认参数
]

# 通过配置文件构建损失函数列表
loss_functions = [loss_register.get_loss(loss_name) for loss_name, _ in loss_config]

loss_fn_1 = loss_functions[0]()
loss_fn_2 = loss_functions[1]()
print(loss_fn_1)
print(loss_fn_2)

MeanSquaredErrorLoss()
CrossEntropyLoss()


In [54]:
import torch.optim as optim

class OptimizerRegistry:
    def __init__(self):
        self.optimizers = dict()

    def register(self, optimizer_name):
        def decorator(optimizer_fn):
            self.optimizers[optimizer_name] = optimizer_fn
            return optimizer_fn
        return decorator

    def get_optimizer(self, optimizer_name, model_parameters, *args, **kwargs):
        if optimizer_name in self.optimizers:
            return self.optimizers[optimizer_name](model_parameters, *args, **kwargs)
        else:
            raise KeyError(f"未注册的优化器 '{optimizer_name}'.")

# 实例化自定义优化器注册器
optimizer_register = OptimizerRegistry()


@optimizer_register.register("SGD")
class SGDOptimizer:
    def __init__(self, model_parameters, lr, momentum):
        self.optimizer = optim.SGD(model_parameters, lr=lr, momentum=momentum)

    def step(self):
        self.optimizer.step()

    def zero_grad(self):
        self.optimizer.zero_grad()

@optimizer_register.register("Adam")
class AdamOptimizer:
    def __init__(self, model_parameters, lr, betas):
        self.optimizer = optim.Adam(model_parameters, lr=lr, betas=betas)

    def step(self):
        self.optimizer.step()

    def zero_grad(self):
        self.optimizer.zero_grad()


optimizer_config = [
    ('SGD', {'lr': 0.01, 'momentum': 0.9}),
    ('Adam', {'lr': 0.001, 'betas': (0.9, 0.999)})
]

# 通过配置文件构建优化器列表
optimizers = [optimizer_register.get_optimizer(optimizer_name, custom_net.parameters(), **params) for optimizer_name, params in optimizer_config]

for optimizer in optimizers:
    optimizer.zero_grad()  # 清空梯度
    optimizer.step()  # 下一步


In [65]:
import numpy as np
import cv2

class PreprocessingRegistry:
    def __init__(self):
        self.preprocessing_steps = dict()

    def register(self, step_name):
        def decorator(preprocessing_fn):
            self.preprocessing_steps[step_name] = preprocessing_fn
            return preprocessing_fn
        return decorator

    def get_preprocessing_step(self, step_name, *args, **kwargs):
        if step_name in self.preprocessing_steps:
            return self.preprocessing_steps[step_name](*args, **kwargs)
        else:
            raise KeyError(f"未注册的数据预处理步骤 '{step_name}'.")

# 实例化自定义数据预处理步骤注册器
preprocessing_register = PreprocessingRegistry()

@preprocessing_register.register("Normalize")
class Normalize:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, data):
        return (data - self.mean) / self.std

@preprocessing_register.register("RandomCrop")
class RandomCrop:
    def __init__(self, crop_size):
        self.crop_size = crop_size

    def __call__(self, data):
        h, w, c = data.shape
        x = np.random.randint(0, h - self.crop_size)
        y = np.random.randint(0, w - self.crop_size)
        return data[x:x+self.crop_size, y:y+self.crop_size, :]

@preprocessing_register.register("Resize")
class Resize:
    def __init__(self, target_size):
        self.target_size = target_size

    def __call__(self, data):
        return cv2.resize(data, (self.target_size, self.target_size))


preprocessing_config = [
    ('Normalize', {'mean': [0.485, 0.456, 0.406], 'std': [0.229, 0.224, 0.225]}),
    ('RandomCrop', {'crop_size': 224}),
    ('Resize', {'target_size': 256})
]

# 通过配置文件构建数据预处理步骤列表
preprocessing_steps = [preprocessing_register.get_preprocessing_step(step_name, **params) for step_name, params in preprocessing_config]

# 假设我们有一张原始图像
original_image = cv2.imread('./lena.png')  # 读取原始图像

# 应用数据预处理步骤
preprocessed_data = original_image.copy()  # 创建副本以保存经过预处理的数据

for preprocessing_step in preprocessing_steps:
    preprocessed_data = preprocessing_step(preprocessed_data)

# preprocessed_data 现在包含了经过预处理的数据
print(preprocessed_data.shape)  # (256, 256, 3)

# 现在可以将 preprocessed_data 用于深度学习模型的训练或推理

(256, 256, 3)
