<font size = 10>

# pimm依赖

## ***data系列***

<font size = 3>

### pimm.data.Dataset 与 pimm.data.create_loader

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.data import Dataset, create_loader

#### - 测试主要方法与函数运行

In [None]:
dataset_eval = Dataset("Cream/data/imagenet/val")

In [None]:
loader_eval = create_loader(
        dataset_eval,
        input_size = (3, 224, 224),
        batch_size = 4 * 32,
        is_training = True,
        num_workers = 4,
        distributed = False,
        interpolation = 'bicubic',
        crop_pct = 0.875,
        mean = (0.485, 0.456, 0.406),
        std = (0.229, 0.224, 0.225))

#### - 输出值测试

In [None]:
for _, (input, _) in enumerate(loader_eval):
    break
print(input)

## ***loss系列***

<font size = "3">

### pimm.loss.LabelSmoothingCrossEntropy

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.loss import LabelSmoothingCrossEntropy

#### - 测试方法创建

In [None]:
pd_fun = LabelSmoothingCrossEntropy()

#### - 前向对齐测试

In [None]:
import numpy as np

In [None]:
output = np.random.rand(100,20)
target = np.random.randint(low = 0, high = 20, size = (100))
for i in range(100):
    output[i][target[i]] += 10
    output[i] /= output[i].sum()

In [None]:
import paddle
pd_loss = pd_fun(
    paddle.to_tensor(output, dtype = "float32"), 
    paddle.to_tensor(target))
print(pd_loss)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing=0.1):
        super(LabelSmoothingCrossEntropy, self).__init__()
        assert smoothing < 1.0
        self.smoothing = smoothing
        self.confidence = 1. - smoothing

    def forward(self, x, target):
        logprobs = F.log_softmax(x, dim=-1)
        nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1))
        nll_loss = nll_loss.squeeze(1)
        smooth_loss = -logprobs.mean(dim=-1)
        loss = self.confidence * nll_loss + self.smoothing * smooth_loss
        return loss.mean()

In [None]:
index = torch.from_numpy(target).type_as(torch.zeros((2,2), dtype = torch.int64))
tc_loss = LabelSmoothingCrossEntropy()(
    torch.from_numpy(output), 
    index)
print(tc_loss)

## ***models.effcientnet_blocks系列***

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models.efficientnet_blocks import ConvBnAct, DepthwiseSeparableConv, drop_path, InvertedResidual,SqueezeExcite

<font size = 3>

### pimm.models.effcientnet_blocks.ConvBnAct

#### - 网络创建

In [None]:
conv_bn_act = ConvBnAct(in_chs = 3, out_chs = 8, kernel_size = 3)

#### - 网络组网与运行测试

In [None]:
import paddle
conv_bn_act_model = paddle.Model(conv_bn_act)
conv_bn_act_model.summary(input_size = (10, 3, 224, 224))

In [None]:
class ConvBnAct_Model(paddle.nn.Layer):
    def __init__(self) -> None:
        super(ConvBnAct_Model, self).__init__()
        self.core = ConvBnAct(in_chs = 3, out_chs = 8, kernel_size = 3)
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

conv_bn_act_model = ConvBnAct_Model()
conv_bn_act_model.eval()

conv_bn_act_model(paddle.randn((10, 3, 224, 224)))

<font size = 3>

### pimm.models.effcientnet_blocks.DepthwiseSeparableConv

#### - 网络创建

In [None]:
dsc = DepthwiseSeparableConv(in_chs = 8, out_chs = 3)

#### - 网络组网与运行测试

In [None]:
import paddle
dsc_model = paddle.Model(dsc)
dsc_model.summary(input_size = (1, 8, 224, 224))

In [None]:
import paddle
class DSC_Model(paddle.nn.Layer):
    def __init__(self) -> None:
        super(DSC_Model, self).__init__()
        self.core = DepthwiseSeparableConv(in_chs = 8, out_chs = 3)
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

dsc_model = DSC_Model()
dsc_model.eval()

dsc_model(paddle.randn((10, 8, 224, 224)))

<font size = 3>

### pimm.models.effcientnet_blocks.drop_path

In [None]:
import paddle
test_Tensor = paddle.randn((16, 3, 224, 224))
drop_path(test_Tensor)

<font size = 3>

### pimm.models.effcientnet_blocks.InvertedResidual

#### - 网络创建

In [None]:
ir = InvertedResidual(in_chs = 8, out_chs = 3)

#### - 网络组网与运行测试

In [None]:
import paddle
ir_model = paddle.Model(ir)
ir_model.summary(input_size = (16, 8, 224, 224))

In [None]:
import paddle
class IR_Model(paddle.nn.Layer):
    def __init__(self) -> None:
        super(IR_Model, self).__init__()
        self.core = InvertedResidual(in_chs = 8, out_chs = 3)
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

ir_model = IR_Model()
ir_model.eval()

ir_model(paddle.randn((16, 8, 224, 224)))

<font size = 3>

### pimm.models.effcientnet_blocks.SqueezeExcite

#### - 网络创建

In [None]:
se = SqueezeExcite(in_chs = 8)

#### - 网络组网与运行测试

In [None]:
import paddle
se_model = paddle.Model(se)
se_model.summary(input_size = (16, 8, 224, 224))

In [None]:
import paddle
class SE_Model(paddle.nn.Layer):
    def __init__(self) -> None:
        super(SE_Model, self).__init__()
        self.core = SqueezeExcite(in_chs = 8)
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

se_model = SE_Model()
se_model.eval()

se_model(paddle.randn((16, 8, 224, 224)))

## ***models.activations系列***

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models.activations import hard_sigmoid, Swish

<font size = 3>

### pimm.models.activations.hard_sigmoid

In [None]:
import paddle
import numpy as np
hard_sigmoid(paddle.to_tensor(np.linspace(0., 10.)))

<font size = 3>

### pimm.models.activations.Swish

In [None]:
import paddle
import numpy as np
swish = Swish()
swish(paddle.to_tensor(np.linspace(0., 10.)))

## ***models系列（其他函数与类）***

<font size = 3>

### pimm.models.create_conv2d

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import create_conv2d

#### - 调用测试

In [None]:
Conv_2D = []
Conv_2D.append(create_conv2d(in_chs = 8, out_chs = 3, kernel_size = [3, 5, 7]))
Conv_2D.append(create_conv2d(in_chs = 8, out_chs = 3, kernel_size = 5, num_experts = 3))
Conv_2D.append(create_conv2d(in_chs = 8, out_chs = 3, kernel_size = 5))

#### - 网络结构检查

In [None]:
import paddle
for module in (Conv_2D[0], Conv_2D[2]):
    model_test = paddle.Model(module)
    model_test.summary(input_size = (16, 8, 224, 224))
    del model_test

<font size = 3>

### pimm.models.SelectAdaptivePool2D

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import SelectAdaptivePool2D

#### - 网络创建

In [None]:
pools = [
    SelectAdaptivePool2D(output_size = 10, pool_type = "avgmax"), 
    SelectAdaptivePool2D(output_size = 10, pool_type = "catavgmax"), 
    SelectAdaptivePool2D(output_size = 10, pool_type = "avg"), 
    SelectAdaptivePool2D(output_size = 10, pool_type = "max")]

#### - 网络结构检查

In [None]:
import paddle
for layer in pools:
    model = paddle.Model(layer)
    model.summary(input_size = (16, 3, 224, 224))

#### - 网络组网与运行测试

In [None]:
import paddle
test_x = paddle.randint(0, 2, (1, 2, 3, 3))
test_x = paddle.to_tensor(test_x, dtype = "float32")
print(test_x)

In [None]:
class Pool_Model_1(paddle.nn.Layer):
    def __init__(self) -> None:
        super(Pool_Model_1, self).__init__()
        self.core = SelectAdaptivePool2D(output_size = 10, pool_type = "avgmax")
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

class Pool_Model_2(paddle.nn.Layer):
    def __init__(self) -> None:
        super(Pool_Model_2, self).__init__()
        self.core = SelectAdaptivePool2D(output_size = 10, pool_type = "catavgmax")
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

class Pool_Model_3(paddle.nn.Layer):
    def __init__(self) -> None:
        super(Pool_Model_3, self).__init__()
        self.core = SelectAdaptivePool2D(output_size = 10, pool_type = "avg")
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

class Pool_Model_4(paddle.nn.Layer):
    def __init__(self) -> None:
        super(Pool_Model_4, self).__init__()
        self.core = SelectAdaptivePool2D(output_size = 10, pool_type = "max")
    
    @paddle.jit.to_static
    def forward(self, x):
        return self.core(x)

Pool_Models = [Pool_Model_1, Pool_Model_2, Pool_Model_3, Pool_Model_4]

In [None]:
for test_Model in Pool_Models:
    model = test_Model()
    model.eval()
    print(model(test_x))

<font size = 3>

### pimm.models.resume_checkpoint

测试此部分前，请先测试`pimm.utils.CheckpointSaver`以生成本部分测试所需要的存档

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import resume_checkpoint

#### - 存档载入测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import create_conv2d
test_Model = create_conv2d(in_chs = 8, out_chs = 3, kernel_size = [3, 5, 7])

resume_checkpoint(test_Model, "test/last.pdparams")

测试完成后请运行`utils.CheckpointSaver`中的“删除生成文件部分以”删除用于测试的存档

## ***utils系列***

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.utils import reduce_tensor, CheckpointSaver, ModelEma

<font size = 3>

### pimm.utils.reduce_tensor

In [None]:
import paddle
test_Tensor = paddle.randn((3, 3, 3))
print(test_Tensor)

In [None]:
reduced_test_Tensor = reduce_tensor(test_Tensor, 3.)
print(reduced_test_Tensor)

<font size = 3>

### pimm.utils.CheckpointSaver

#### - 实例化测试

In [None]:
saver = CheckpointSaver(checkpoint_dir = "test", recovery_dir = "test", max_history = 5)

#### - 存储测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import create_conv2d
import paddle

test_Model = create_conv2d(in_chs = 8, out_chs = 3, kernel_size = [3, 5, 7])
test_Optimizer = paddle.optimizer.Adam(parameters = test_Model.parameters())

class test_Args:
    def __init__(self):
        self.model = ""

test_args = test_Args()

In [None]:
import random
for i in range(20):
    saver.save_checkpoint(
    model = test_Model, optimizer = test_Optimizer, 
    args = test_args, epoch = i + 1, 
    metric = random.random())

In [None]:
for checkpoint in saver.checkpoint_files:
    print(checkpoint)

#### - 删除生成文件（若要测试resume_checkpoint则不运行）

In [None]:
import os
f_dir = os.getcwd()
l_dir = os.path.join(f_dir, "test")
if os.path.exists(l_dir):
    lis_dir = os.listdir(l_dir)
    for tar_dir in lis_dir:
        os.remove(os.path.join(l_dir, tar_dir))
    os.removedirs(l_dir)

<font size = 3>

### pimm.utils.ModelEma

#### - 实例化测试

In [None]:
from Paddle_Cream.lib.utils.pimm.models import create_conv2d
ema = ModelEma(
    create_conv2d(in_chs = 8, out_chs = 3, kernel_size = [3, 5, 7]))

#### - 更新测试

In [None]:
ema.update(
    create_conv2d(in_chs = 8, out_chs = 3, kernel_size = [3, 5, 7]))

## ***optim与scheduler系列***

<font size = 3>

### pimm.optim.create_optimizer

没有有效的测试方法

<font size = 3>

### pimm.scheduler.create_scheduler

#### - import测试

In [None]:
from Paddle_Cream.lib.utils.pimm.scheduler import create_scheduler

#### - 实例化测试

In [None]:
class test_Args:
    def __init__(self):
        self.sched = "step"
        self.lr = .01
        self.epochs = 200
        self.decay_epochs = 4
        self.decay_rate = .1
        self.warmup_lr = 1e-4
        self.warmup_epochs = 3
        self.noise_range = None
        self.lr_noise_pct = .67
        self.lr_noise_std = 1.
        self.seed = 42

In [None]:
sched, num_epochs = create_scheduler(test_Args())

In [None]:
for i in range(20):
    sched.step(i)
    print(sched.get_lr())