## paddlepaddle基本入门

### 基本使用概念

#### data reader

paddlepaddle里面使用data reader来读取数据，主要包括三个部分：reader，reader_creator, 更复杂的creator。

**reader**：reader是指从数据源返回一条一条的数据，数据源可以是普通文件、二进制文件、分布式文件系统上的文件等等，也可以就是一个随机数生成器，只要能够返回一个或者多个数据项即可；
**reader_creator**：是一个对reader进行处理，封装的函数，对一些日常处理的基本操作封装在creator中；
**更复杂的creator**：对creator再封装一些基本的操作，如数据映射函数、firstN、合并操作等等；

reader、reader_creator、更复杂的creator从底层一步步抽样到上层解决用户需求。这里我们贴出一些代码作为分析。

**reader**

对普通文件的操作：

    def reader():
        f = open(path, "r")
        for l in f:
            yield l.rstrip('\n')
        f.close()
    return reader
返回一个读取每行的生成器即可，会加上去掉转行符这种基本操作；当然，也可以从np.array返回：

    def reader():
        if x.ndim < 1:
            yield x

        for e in x:
            yield e

    return reader
又或者是从二进制文件返回：

    def reader():
        if isinstance(paths, basestring):
            path = paths
        else:
            path = ",".join(paths)
        f = rec.reader(path)
        while True:
            r = f.read()
            if r is None:
                break
            yield pickle.loads(r)
        f.close()
还有从网络返回：

    def reader():
        global pass_num
        c.paddle_start_get_records(pass_num)
        pass_num += 1

        while True:
            r, e = c.next_record()
            if not r:
                if e != -2:
                    print "get record error: ", e
                break
            yield pickle.loads(r)

    return reader
        
**reader_creator**
对creator详细的了解，可以读下paddlepaddle中[Paddle/python/paddle/v2/reader/creator.py](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/reader/creator.py)，贴出cloud_reader的代码作为分析：

    def cloud_reader(paths, etcd_endpoints, timeout_sec=5, buf_size=64):
        """
        Create a data reader that yield a record one by one from
            the paths:
        :paths: path of recordio files, can be a string or a string list.
        :etcd_endpoints: the endpoints for etcd cluster
        :returns: data reader of recordio files.

        ..  code-block:: python
            from paddle.v2.reader.creator import cloud_reader
            etcd_endpoints = "http://127.0.0.1:2379"
            trainer.train.(
                reader=cloud_reader(["/work/dataset/uci_housing/uci_housing*"], etcd_endpoints),
            )
        """
        import os
        import cPickle as pickle
        import paddle.v2.master as master
        c = master.client(etcd_endpoints, timeout_sec, buf_size)

        if isinstance(paths, basestring):
            path = [paths]
        else:
            path = paths
        c.set_dataset(path)

        def reader():
            global pass_num
            c.paddle_start_get_records(pass_num)
            pass_num += 1

            while True:
                r, e = c.next_record()
                if not r:
                    if e != -2:
                        print "get record error: ", e
                    break
                yield pickle.loads(r)

        return reader

cloud_reader这里包括客户端请求hdfs集群，然后从指定路径下的文件生成数据迭代器，有兴趣的可以看下client.py里面的代码，主要是调用`libpaddle_master.so`中的函数来完成相应的操作。


**更复杂的creator**
    
更复杂的creator，可以在[decorator.py](https://github.com/PaddlePaddle/Paddle/blob/b4302bbbb85bbfd984cb2825887c133120699775/python/paddle/v2/reader/decorator.py)看到，包括map_readers, chain, compose等等。

    def map_readers(func, *readers):
        """
        Creates a data reader that outputs return value of function using
        output of each data readers as arguments.

        :param func: function to use. The type of func should be (Sample) => Sample
        :type: callable
        :param readers: readers whose outputs will be used as arguments of func.
        :return: the created data reader.
        :rtype: callable
        """

        def reader():
            rs = []
            for r in readers:
                rs.append(r())
            for e in itertools.imap(func, *rs):
                yield e

        return reader
        
    def compose(*readers, **kwargs):
        """
        Creates a data reader whose output is the combination of input readers.

        If input readers output following data entries:
        (1, 2)    3    (4, 5)
        The composed reader will output:
        (1, 2, 3, 4, 5)

        :param readers: readers that will be composed together.
        :param check_alignment: if True, will check if input readers are aligned
            correctly. If False, will not check alignment and trailing outputs
            will be discarded. Defaults to True.
        :type check_alignment: bool

        :return: the new data reader.

        :raises ComposeNotAligned: outputs of readers are not aligned.
            Will not raise when check_alignment is set to False.
        """
        check_alignment = kwargs.pop('check_alignment', True)

        def make_tuple(x):
            if isinstance(x, tuple):
                return x
            else:
                return (x, )

        def reader():
            rs = []
            for r in readers:
                rs.append(r())
            if not check_alignment:
                for outputs in itertools.izip(*rs):
                    yield sum(map(make_tuple, outputs), ())
            else:
                for outputs in itertools.izip_longest(*rs):
                    for o in outputs:
                        if o is None:
                            # None will be not be present if compose is aligned
                            raise ComposeNotAligned(
                                "outputs of readers are not aligned.")
                    yield sum(map(make_tuple, outputs), ())

        return reader
其中，map_readers对数据流进行一个函数映射计算，批量对数据流计算计算，compose对feature数据进行合并，在模型特征工程阶段，提升很多，试想一个场景，当我们尝试不同特征组合时，如果每一次都要从数据源阶段处理好，比较麻烦，但是如果当不同的feature来源能够直接在数据读取处理时，就会方便很多，如(1,2) 3 (4,5) 组合为(1,2,3,4,5)，只需要修改需要修改的reader即可。

#### layer

##### data layer

根据以往经验，神经网络中我们组合不同的层来搭建神经网络，paddlepaddle使用一些特定的层作为神经网络的输入：

    x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(2))
    y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))

paddlepaddle支持不同的数据类型：

 - dense_vector:稠密的浮点数向量；
 - sparse_binary_vector: 稀疏二进制向量;
 - sparse_float_vector: 稀疏浮点数向量;
 - integer_value: 整数标签;
 
然后按序列类型分为时间序列、非时间序列、子时间序列三类。

数据类型的声明代码：

    class InputType(object):
        """
        InputType is the base class for paddle input types.

        ..  note::

            this is a base class, and should never be used by user.

        :param dim: dimension of input. If the input is an integer, it means the
                    value range. Otherwise, it means the size of layer.
        :type dim: int
        :param seq_type: sequence type of input. 0 means it is not a sequence. 1
                         means it is a variable length sequence. 2 means it is a
                         nested sequence.
        :type seq_type: int
        :param type: data type of input.
        :type type: int
        """
        __slots__ = ['dim', 'seq_type', 'type']

        def __init__(self, dim, seq_type, tp):
            self.dim = dim
            self.seq_type = seq_type
            self.type = tp

        def __repr__(self):
            """
            Return a human readable representation like 'InputType(dim=25921, 
                seq_type=SequenceType.NO_SEQUENCE, type=DataType.Dense)'
            """
            repr_str = type(self).__name__
            repr_str += '('
            serialize_func_map = {
                'dim': repr,
                'seq_type': SequenceType.tostring,
                'type': DataType.tostring
            }
            for idx, k in enumerate(self.__slots__):
                if idx != 0:
                    repr_str += ', '
                repr_str += (
                    k + '=' + serialize_func_map.get(k, repr)(getattr(self, k)))
            repr_str += ')'
            return repr_str

    def dense_slot(dim, seq_type=SequenceType.NO_SEQUENCE):
        """
        Dense Array. It means the input feature is dense array with float type.
        For example, if the input is an image with 28*28 pixels, the input of
        Paddle neural network could be a dense vector with dimension 784 or a
        numpy array with shape (28, 28).

        For the 2-D convolution operation, each sample in one mini-batch must have
        the similarly size in PaddlePaddle now. But, it supports variable-dimension
        feature across mini-batch. For the variable-dimension, the param dim is not
        used. While the data reader must yield numpy array and the data feeder will
        set the data shape correctly.

        :param dim: dimension of this vector.
        :type dim: int
        :param seq_type: sequence type of input.
        :type seq_type: int
        :return: An input type object.
        :rtype: InputType
        """
        return InputType(dim, seq_type, DataType.Dense)

    def sparse_non_value_slot(dim, seq_type=SequenceType.NO_SEQUENCE):
        """
        Sparse binary vector. It means the input feature is a sparse vector and the
        every element in this vector is either zero or one.

        :param dim: dimension of this vector.
        :type dim: int
        :param seq_type: sequence type of this input.
        :type seq_type: int
        :return: An input type object.
        :rtype: InputType
        """
        return InputType(dim, seq_type, DataType.SparseNonValue)

    def sparse_value_slot(dim, seq_type=SequenceType.NO_SEQUENCE):
        """
        Sparse vector. It means the input feature is a sparse vector. Most of the
        elements in this vector are zero, others could be any float value.

        :param dim: dimension of this vector.
        :type dim: int
        :param seq_type: sequence type of this input.
        :type seq_type: int
        :return: An input type object.
        :rtype: InputType
        """
        return InputType(dim, seq_type, DataType.SparseValue)

    def index_slot(value_range, seq_type=SequenceType.NO_SEQUENCE):
        """
        Data type of integer.

        :param seq_type: sequence type of this input.
        :type seq_type: int
        :param value_range: range of this integer.
        :type value_range: int
        :return: An input type object
        :rtype: InputType
        """
        return InputType(value_range, seq_type, DataType.Index)

    dense_vector = dense_slot
    sparse_binary_vector = sparse_non_value_slot
    sparse_float_vector = sparse_value_slot
    integer_value = index_slot

其中DataType部分：

    class SequenceType(object):
        NO_SEQUENCE = 0
        SEQUENCE = 1
        SUB_SEQUENCE = 2

        @classmethod
        def tostring(cls, value):
            for k in cls.__dict__:
                if not k.startswith('__'):
                    if getattr(cls, k) == value:
                        return cls.__name__ + '.' + k
            return 'INVALID(' + str(value) + ')'


    # TODO(yuyang18): Add string data type here.
    class DataType(object):
        Dense = 0
        SparseNonValue = 1
        SparseValue = 2
        Index = 3

        @classmethod
        def tostring(cls, value):
            for k in cls.__dict__:
                if not k.startswith('__'):
                    if getattr(cls, k) == value:
                        return cls.__name__ + '.' + k
            return 'INVALID(' + str(value) + ')'

##### full connected
fc层的一个调用：`hidden1 = paddle.layer.fc(name="fc1",input=contextemb, size=128, act=paddle.activation.Sigmoid(), layer_attr=paddle.attr.Extra(drop_rate=0.5), bias_attr=paddle.attr.Param(learning_rate=2), param_attr=paddle.attr.Param( initial_std=1. / math.sqrt(5 * 8), learning_rate=1, l2_rate=6e-4))`，找下paddle/python/v2/layer.py

原先layer.py __all__=['data','parse_network']，只有两种类型，然后

    for name in v1_layers.__all__:
        obj = getattr(v1_layers, name)
        new_name = __convert_name__(name)
        if callable(obj) and __need_to_wrap__(name):
            globals()[new_name] = __convert_to_v2__(obj, new_name, __name__)
        else:
            globals()[new_name] = obj
        __all__.append(new_name)
        
经过基本的处理（`__convert_name__`,`__convert_to_v2__`），将v1_layers中所有支持的层转换为v2中支持的，v1_layers 在文件paddle/python/paddle/trainer_config_helpers/layers.py中，很容易找到`fc_layer`的定义：

    def fc_layer(input,
             size,
             act=None,
             name=None,
             param_attr=None,
             bias_attr=None,
             layer_attr=None):
        if isinstance(input, LayerOutput):
            input = [input]
            assert not isinstance(param_attr, collections.Sequence)
            param_attr = [param_attr]
        else:
            if isinstance(param_attr, collections.Sequence):
                assert len(input) == len(param_attr)
            else:
                if "parameter_name" in param_attr.attr and len(input) > 1:
                    logger.fatal(
                        "When the name field of param_attr is manually specified "
                        "and the input is a list, the param_attr should also be a "
                        "list with each item being the param_attr for each input "
                        "item. If only one named param_attr is provided, all the "
                        "input items would share this parameter.")
                param_attr = [copy.deepcopy(param_attr) for _ in range(len(input))]

        assert isinstance(input, collections.Sequence)

        Layer(
            inputs=[
                Input(ipt.name, **attr.attr) for ipt, attr in zip(input, param_attr)
            ],
            name=name,
            type=LayerType.FC_LAYER,
            size=size,
            bias=ParamAttr.to_bias(bias_attr),
            active_type=act.name,
            **ExtraLayerAttribute.to_kwargs(layer_attr))
        return LayerOutput(
            name, LayerType.FC_LAYER, input, activation=act, size=size)
            
对输入层做基本的处理，包括数据类型等等，然后Layer做层的配置，返回LayerOutput， Layer部分如下：

    def Layer(name, type, **xargs):
        layers = {}
        layers.update(g_cost_map)
        layers.update(g_layer_type_map)
        layer_func = layers.get(type)
        config_assert(layer_func, "layer type '%s' not supported." % type)
        return layer_func(name, **xargs)
        
LayerOutput类如下

    class LayerOutput(object):
        def __init__(self,
                     name,
                     layer_type,
                     parents=None,
                     activation=None,
                     num_filters=None,
                     img_norm_type=None,
                     size=None,
                     outputs=None,
                     reverse=None):
            assert isinstance(name, basestring)
            assert isinstance(layer_type, basestring)
            assert size is not None
            assert LayerType.is_layer_type(layer_type)
            self.name = name
            self.full_name = MakeLayerNameInSubmodel(name)
            self.layer_type = layer_type
            if parents is not None and type(parents) != list:
                parents = [parents]
            self.parents = [] if parents is None else parents
            self.activation = activation
            self.num_filters = num_filters
            self.img_norm_type = img_norm_type
            self.size = size
            if outputs is None:
                outputs = ['default']
            self.outputs = outputs
            self.reverse = reverse

        @property
        def width(self):
            return cp.g_layer_map[self.full_name].width

        @property
        def height(self):
            return cp.g_layer_map[self.full_name].height

        @property
        def depth(self):
            return cp.g_layer_map[self.full_name].depth

        def set_input(self, input):
            assert isinstance(input, LayerOutput)
            assert self.layer_type == LayerType.MEMORY
            SetMemoryInput(self.name, input.name)


#### parameters

Parameters类管理模型当中所有的参数，主要由`__param_conf__`, `__gradient_machines__`, `__tmp_params__`组成：

    class Parameters(object):
        def __init__(self):
            self.__param_conf__ = OrderedDict()
            self.__gradient_machines__ = []
            self.__tmp_params__ = dict()

 - `__param_conf__`: OrderedDict类型，其中保存所有可学习的参数，且保持和创建的顺序一致，能够直接迭代`__param_conf__`来从头到尾访问所有参数；
 - `__gradient_machines_`: 用来保存paddlepaddle内部python端和c++拷贝；
 - `__tmp_params_`: dict类型，其中保存dummy parameters；

Parameters内实现了`__append_config__()`, `update_param_conf()`, `keys()`, `names()`, `has_key()`, `__iter__()`, `__getter_inner()`, `__getitem__()`, `get_shape()`, `__setitem__()`, `get()`, `get_grad()`, `set()`, `append_gradient_machine()`, `serialize()`, `deserialize()`, `to_tar()`, `from_tar()`, `init_from_tar()`。 对模型参数进行基本的操作，具体可以看[https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/parameters.py](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/parameters.py)， 这部分代码难度不大，一些对Parameter内部成员基本的操作。

如何使用Parameters：

    data = paddle.layers.data(...)
        ...
        out = paddle.layers.fc(...)
        parameters = paddle.parameters.create(out)
        parameter_names = parameters.names()
        fc_mat = parameters.get('fc')
        print fc_mat
        
    def create(layers):
        """
        Create parameter pool by topology.
        :param layers:
        :return:
        """
        topology = Topology(layers)
        pool = Parameters()
        initializers = cp.g_parameter_initializer_map
        for param in topology.proto().parameters:
            pool.__append_config__(param)
            if param.name in initializers:
                pool[param.name] = initializers[param.name](param.name)
        return pool
        
`create()`， 首先得到网络的拓扑图，然后遍历`topology.proto()`，将`paddle.proto.ParameterConfig`添加到Parameter类中，如果param_name在`g_parameter_initializer_map`中出现，则完成相应初始化， 凭个人感觉应该是比如这种参数初始化的操作，如要求mean、std来初始模型参数。

#### optimizer

Paddlepaddle包括`__all__ = ['Momentum', 'Adam', 'Adamax', 'AdaGrad', 'DecayedAdaGrad', 'AdaDelta','RMSProp', 'ModelAverage', 'L2Regularization']`这些优化器的父类optimizer，代码中考虑有local和remote模式，

    class Optimizer(object):
        def __init__(self, **kwargs):
            import py_paddle.swig_paddle as swig_api
            if 'batch_size' in kwargs:
                del kwargs['batch_size']  # not important for python library.

            def __impl__():
                v1_optimizers.settings(batch_size=1, **kwargs)

            self.__opt_conf_proto__ = config_parser_utils.parse_optimizer_config(
                __impl__)
            self.__opt_conf__ = swig_api.OptimizationConfig.createFromProto(
                self.__opt_conf_proto__)

        def enable_types(self):
            import py_paddle.swig_paddle as swig_api
            tmp = swig_api.ParameterOptimizer.create(self.__opt_conf__)
            assert isinstance(tmp, swig_api.ParameterOptimizer)
            return tmp.getParameterTypes()

        def __create_local_updater__(self):
            import py_paddle.swig_paddle as swig_api
            return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__)

        def __create_remote_updater__(self, pass_num, use_sparse_updater):
            import py_paddle.swig_paddle as swig_api
            return swig_api.ParameterUpdater.createRemoteUpdater(
                self.__opt_conf__, pass_num, use_sparse_updater)

        def __create_new_remote_updater__(self, pserver_spec, use_etcd):
            import py_paddle.swig_paddle as swig_api
            return swig_api.ParameterUpdater.createNewRemoteUpdater(
                self.__opt_conf__, pserver_spec, use_etcd)

        def create_updater(self, is_local, num_passes, use_sparse_updater,
                           pserver_spec, use_etcd):
            if is_local:
                parameter_updater = self.__create_local_updater__()
            else:
                if pserver_spec is None:
                    parameter_updater = self.__create_remote_updater__(
                        num_passes, use_sparse_updater)
                else:
                    parameter_updater = self.__create_new_remote_updater__(
                        pserver_spec, use_etcd)
            return parameter_updater
            
`__init__()`部分主要用于初始化optimizer配置, `__opt_conf_proto__()`配置基本的模型参数，`__opt_conf__`表示从protobuf文件生成类然后绑定`__opt_conf_proto__()`得到的optimizer配置，具体可查看`OptimizerConfig.proto`:
 
    syntax = "proto2";

    option optimize_for = LITE_RUNTIME;

    package paddle;

    message SGDConfig {
      // SGD
      // momentum: float >= 0. Parameter updates momentum.
      // decay: float >= 0. Learning rate decay over each update.
      // nesterov: boolean. Whether to apply Nesterov momentum.
      optional double momentum = 21 [ default = 0.0 ];
      optional double decay = 23 [ default = 0.0 ];
      optional bool nesterov = 24 [ default = false ];
    }

    message AdadeltaConfig {
      // Adadelta
      // It is recommended to leave it at the default value.
      // rho: float >= 0.
      // epsilon: float >= 0. Fuzz factor.
      // decay: float >= 0. Learning rate decay over each update.

      // reference : [Adadelta - an adaptive learning rate
      // method](http://arxiv.org/abs/1212.5701)
      optional double rho = 33 [ default = 0.90 ];
      optional double epsilon = 31 [ default = 1e-5 ];
      optional double decay = 32 [ default = 0.0 ];
    }

    message AdagradConfig {
      // Adagrad
      // epsilon: float >= 0.
      // decay: float >= 0. Learning rate decay over each update.

      // reference : [Adaptive Subgradient Methods for Online Learning and
      // Stochastic
      // Optimization](http://www.jmlr.org/papers/volume12/duchi11a/duchi11a.pdf)
      optional double epsilon = 41 [ default = 1e-5 ];
      optional double decay = 42 [ default = 0.0 ];
    }

    message AdamConfig {
      // Adaj
      // beta_1: float, 0 < beta < 1. Generally close to 1.
      // beta_2: float, 0 < beta < 1. Generally close to 1.
      // epsilon: float >= 0. Fuzz factor.
      // decay: float >= 0. Learning rate decay over each update.
      // reference : [Adam - A Method for Stochastic
      // Optimization](http://arxiv.org/abs/1412.6980v8)
      optional double beta_1 = 41;
      optional double beta_2 = 42;
      optional double epsilon = 43;
      optional double decay = 44;
    }

    message ConstLrConfig {
      // learninRate Policy
      optional double learning_rate = 1 [ default = 1.0 ];
    }

    message LinearLrConfig {
      // learninRate Policy
      optional double learning_rate = 1 [ default = 1.0 ];
      optional double lr_decay_a = 2;
      optional double lr_decay_b = 3;
    }

    message TensorProto {
      enum DataType {
        PADDLE_ELEMENT_TYPE_INT32 = 0;
        PADDLE_ELEMENT_TYPE_UINT32 = 1;
        PADDLE_ELEMENT_TYPE_INT64 = 2;
        PADDLE_ELEMENT_TYPE_UINT64 = 3;
        PADDLE_ELEMENT_TYPE_FLOAT32 = 4;
        PADDLE_ELEMENT_TYPE_FLOAT64 = 5;
      }
      optional DataType data_type = 1;
      repeated bytes content = 2;
    }

    message LrPolicyState {
      // learninRate Policy
      optional double learning_rate = 1 [ default = 1.0 ];
      optional double lr_decay_a = 2;
      optional double lr_decay_b = 3;
    }

    message SGDOptimizerState {
      optional LrPolicyState lr_state = 101;
      optional double num_sample_passed = 104;
      // state
      optional TensorProto parameter = 1;
      optional TensorProto momentums = 2;
    }

    message AdadeltaOptimizerState {
      // learning rate policy
      optional LrPolicyState lr_state = 101;
      optional double num_sample_passed = 104;
      // state
      optional TensorProto parameter = 1;
      optional TensorProto accum_gradient = 2;
      optional TensorProto accum_delta = 3;
      optional TensorProto update_delta = 4;
    }

    message AdagradOptimizerState {
      optional LrPolicyState lr_state = 101;
      optional double num_sample_passed = 104;
      // state
      optional TensorProto parameter = 1;
      optional TensorProto accum_gradient = 2;
    }

    message AdamOptimizerState {
      optional LrPolicyState lr_state = 101;
      optional double num_sample_passed = 104;
      // state
      optional TensorProto parameter = 1;
      optional TensorProto momentums = 2;
      optional TensorProto velocitys = 3;
    }

    message OptimizerConfig {
      enum Optimizer {
        SGD = 1;
        Adadelta = 2;
        Adagrad = 3;
        Adam = 4;
      }
      optional Optimizer optimizer = 1;
      optional SGDConfig sgd = 3;
      optional AdadeltaConfig adadelta = 4;
      optional AdagradConfig adagrad = 5;
      optional AdamConfig adam = 6;

      enum LrPolicy {
        Const = 0;
        Linear = 1;
      }
      optional LrPolicy lr_policy = 11;
      optional ConstLrConfig const_lr = 12;
      optional LinearLrConfig linear_lr = 13;

      // common config of optimizer
      // gradient clip when L2 exceeding value
      optional double clip_norm = 101;
      // gradient clip when L1 exceeding value
      optional double clip_value = 102;
    }
另外，optimizer根据是local还是remote两种情况做相应地创建，在remote时，另外还区分是否sparse，类ParameterUpdater定义如下：

    class ParameterUpdater {
    private:
      ParameterUpdater();

    public:
      static ParameterUpdater* createLocalUpdater(OptimizationConfig* config);
      static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config,
                                                   int passCount,
                                                   bool useSparseUpdater);
      static ParameterUpdater* createNewRemoteUpdater(
          OptimizationConfig* config,
          const std::string pserverSpec,
          const bool useEtcd) throw(UnsupportError);
      ~ParameterUpdater();

      /**
       * @brief initialize Parameter Updater by GradientMachine.
       * @param gm
       */
      void init(const GradientMachine& gm);

      /**
       * @brief begin of a training/testing of one pass.
       */
      void startPass();

      /**
       * @brief end of a traning/testing of one pass.
       */
      void finishPass();

      /**
       * @brief begin of a training/testing of one batch.
       * @param data batch's size
       * @return PassType, mostly will be training.
       */
      PassType startBatch(size_t batchSize);

      /**
       * @brief end of a traning/testing of one batch
       * @param cost current batch cost.
       */
      void finishBatch(float cost);

      /**
       * @brief update a parameter (by local optimizer or by cluster pserver)
       * @param param
       */
      void update(Parameter* param);

      /**
       * @breif only get required sparse rows by default.
       * @param fullSize: get full matrix parameter if *fullSize* set
       * @param apply: get PARAMETER_APPLY on pserver if *apply* set
       */
      void getParametersRemote(bool fullSize = false, bool apply = false);

      /**
       * @brief restore the average parameter.
       * @note It is only used in AverageOptimizer. Restore will get the current
       * PARAMETER_VALUE back.
       */
      void restore();

      /**
       * @brief apply. Store the average parameter.
       * @note It is only used in AverageOptimizer. Apply will store the current
       * PARAMETER_VALUE to buffer, calcaualte current Average Parameter, and save
       * it to PARAMETER_VALUE.
       */
      void apply();

      /**
       * @brief catchUpWith The Regularization will be delayed in many situations(
       * pserver, local sparse). Catch Up means catch the regularization up, apply
       * regularization to all params.
       */
      void catchUpWith();

    private:
      ParameterUpdaterPrivate* m;
    };

其中createLocalUpdater\createRemoteUpdater\createNewRemoteUpdater实现如下：

    ParameterUpdater *ParameterUpdater::createLocalUpdater(
        OptimizationConfig *config) {
      auto updater = new ParameterUpdater();
      updater->m->updater.reset(
          new paddle::SgdThreadUpdater(config->m->getConfig()));
      return updater;
    }

    ParameterUpdater *ParameterUpdater::createNewRemoteUpdater(
        OptimizationConfig *config,
        const std::string pserverSpec,
        const bool useEtcd) throw(UnsupportError) {
    #ifndef PADDLE_WITHOUT_GOLANG
      auto updater = new ParameterUpdater();
      updater->m->updater.reset(new paddle::NewRemoteParameterUpdater(
          config->m->getConfig(), pserverSpec, useEtcd));
      return updater;
    #else
      throw UnsupportError("not compiled with WITH_GOLANG");
    #endif
    }

    ParameterUpdater *ParameterUpdater::createRemoteUpdater(
        OptimizationConfig *config, int passCount, bool useSparseUpdater) {
      auto updater = new ParameterUpdater();
      auto remoteUpdater = new paddle::RemoteParameterUpdater(
          config->m->getConfig(), passCount, nullptr);
      if (useSparseUpdater) {
        std::unique_ptr<paddle::ParameterUpdater> remoteUpdaterPtr(remoteUpdater);
        auto sparseRemoteUpdater =
            new paddle::SparseRemoteParameterUpdaterComposite(
                config->m->getConfig(),
                passCount,
                false,
                std::move(remoteUpdaterPtr));
        updater->m->updater.reset(sparseRemoteUpdater);
      } else {
        updater->m->updater.reset(remoteUpdater);
      }
      return updater;
    }
我们这里用Momentum作为一个例子，来看看optimizer的使用：

    class Momentum(Optimizer):
        def __init__(self, momentum=None, sparse=False, **kwargs):
            learning_method = v1_optimizers.MomentumOptimizer(
                momentum=momentum, sparse=sparse)
            super(Momentum, self).__init__(
                learning_method=learning_method, **kwargs)
                
    class MomentumOptimizer(BaseSGDOptimizer):
        def extra_settings(self):
            default_momentum(self.momentum)

        def to_setting_kwargs(self):
            if self.sparse:
                return {'learning_method': 'sparse_momentum'}
            else:
                return {'learning_method': 'momentum'}

        def __init__(self, momentum=None, sparse=False):
            self.momentum = momentum
            self.sparse = sparse

以上两部分代码是Momentum的使用配置，大家都知道paddlepaddl而这类框架和tf是一样的，会构建graph，这部分代码就是配置graph中的Momentum，看到这里，如何使用Momentum，包括里面的参数配置应该熟悉了，那么我们接下来深挖下，Momentum的逻辑实现，paddlepaddle所有的ops都在paddle/fluid/operators/这个目录下， 我们找到

    class MomentumOp : public framework::OperatorWithKernel {
     public:
      using framework::OperatorWithKernel::OperatorWithKernel;

     protected:
      void InferShape(framework::InferShapeContext *ctx) const override {
        PADDLE_ENFORCE(ctx->HasInput("Param"),
                       "Input(param) of Momentum should not be null.");
        PADDLE_ENFORCE(ctx->HasInput("Grad"),
                       "Input(grad) of Momentum should not be null.");
        PADDLE_ENFORCE(ctx->HasInput("Velocity"),
                       "Input(velocity) of Momentum should not be null.");
        PADDLE_ENFORCE(ctx->HasInput("LearningRate"),
                       "Input(LearningRate) of Momentum should not be null.");

        PADDLE_ENFORCE(ctx->HasOutput("ParamOut"),
                       "Output(ParamOut) of Momentum should not be null.");
        PADDLE_ENFORCE(ctx->HasOutput("VelocityOut"),
                       "Output(VelocityOut) of Momentum should not be null.");

        auto param_dim = ctx->GetInputDim("Param");
        PADDLE_ENFORCE_EQ(
            param_dim, ctx->GetInputDim("Grad"),
            "Param and Grad input of MomentumOp should have the same dimension.");
        PADDLE_ENFORCE_EQ(
            param_dim, ctx->GetInputDim("Velocity"),
            "Param and Velocity of MomentumOp should have the same dimension.");
        PADDLE_ENFORCE_EQ(framework::product(ctx->GetInputDim("LearningRate")), 1,
                          "Learning_rate should be a scalar");

        ctx->SetOutputDim("ParamOut", param_dim);
        ctx->SetOutputDim("VelocityOut", param_dim);
      }
    };

    class MomentumOpMaker : public framework::OpProtoAndCheckerMaker {
     public:
      MomentumOpMaker(OpProto *proto, OpAttrChecker *op_checker)
          : OpProtoAndCheckerMaker(proto, op_checker) {
        AddInput("Param",
                 "(Tensor, default Tensor<float>) "
                 "Input parameter that has to be updated");
        AddInput("Grad",
                 "(Tensor, default Tensor<float>) "
                 "Input gradient of the parameter");
        AddInput("Velocity",
                 "(Tensor, default Tensor<float>) "
                 "Input velocity (corresponding to the parameter) "
                 "that has to be updated");
        AddInput("LearningRate",
                 "(Tensor, default Tensor<float>) "
                 "Input learning rate");

        AddOutput("ParamOut",
                  "(Tensor) This output is updated parameter. "
                  "It shared memory with Input(Param).");
        AddOutput("VelocityOut",
                  "(Tensor) This output is updated velocity. "
                  "It shared memory with Input(Velocity).");

        AddAttr<float>("mu", "(float) Momentum coefficient");
        AddAttr<bool>("use_nesterov",
                      "(bool, default false) "
                      "Use Nesterov Momentum")
            .SetDefault(false);
        AddComment(R"DOC(
    Momentum Optimizer.

    This optimizer has a flag for Nestrov Momentum.
    The update equations are as follows:

    $$
    velocity = mu * velocity + gradient \\
    if (use\_nesterov):   \\
      param = param - gradient * learning\_rate + mu * velocity * learning\_rate \\
    else:   \\
      param = param - learning\_rate * velocity. \\
    $$

    )DOC");
      }
    };
    }  // namespace operators
    }  // namespace paddle

    namespace ops = paddle::operators;
    REGISTER_OP_WITHOUT_GRADIENT(momentum, ops::MomentumOp, ops::MomentumOpMaker);
    REGISTER_OP_CPU_KERNEL(momentum, ops::MomentumOpKernel<float>,
                           ops::MomentumOpKernel<double>);
 
上面用给op配置对应的输入、输出信息`momentum_op.cc`，还有一些基本的配置参数，Momentum的逻辑在`momentum_op.h`，有点奇怪，和常规的是不是有点不一样，一般来说我们会在h文件里面做基本的配置，算法逻辑写在cc文件才是，这里好像恰恰是相反的，不过逻辑还是蛮清晰的，拿到对应的input、output，然后转换成需要的格式，然后增加数据逻辑计算:

    template <typename T>
    class MomentumOpKernel : public framework::OpKernel<T> {
     public:
      void Compute(const framework::ExecutionContext& ctx) const override {
        auto param_out = ctx.Output<framework::Tensor>("ParamOut");
        auto velocity_out = ctx.Output<framework::Tensor>("VelocityOut");
        auto param = ctx.Input<framework::Tensor>("Param");
        auto velocity = ctx.Input<framework::Tensor>("Velocity");
        auto grad = ctx.Input<framework::Tensor>("Grad");
        auto learning_rate = ctx.Input<framework::Tensor>("LearningRate");

        param_out->mutable_data<T>(ctx.GetPlace());
        velocity_out->mutable_data<T>(ctx.GetPlace());

        T mu = static_cast<T>(ctx.Attr<float>("mu"));
        bool use_nesterov = ctx.Attr<bool>("use_nesterov");

        auto p_out = framework::EigenVector<T>::Flatten(*param_out);
        auto v_out = framework::EigenVector<T>::Flatten(*velocity_out);

        auto p = framework::EigenVector<T>::Flatten(*param);
        auto v = framework::EigenVector<T>::Flatten(*velocity);
        auto g = framework::EigenVector<T>::Flatten(*grad);
        auto* lr = learning_rate->data<T>();

        v_out = v * mu + g;
        if (use_nesterov) {
          p_out = p - (g - v_out * mu) * lr[0];
        } else {
          p_out = p - lr[0] * v_out;
        }
      }

          
#### event handler
event handler是指在训练过程中，用来做一些事件的处理，搜索test_train.py，里面有用法的示例：

    def event_handler(event):
        if isinstance(event, paddle.event.EndIteration):
            if event.batch_id % 100 == 0:
                print "Pass %d, Batch %d, Cost %f" % (
                    event.pass_id, event.batch_id, event.cost)

        if isinstance(event, paddle.event.EndPass):
            if (event.pass_id + 1) % 10 == 0:
                result = trainer.test(
                    reader=paddle.batch(
                        uci_housing.test(), batch_size=2),
                    feeding={'x': 0,
                             'y': 1})
                print "Test %d, %.2f" % (event.pass_id, result.cost)
在每一个batch之后，打印pass_id(epoch_id)，batch_id，cost等信息，每一个pass之后做一次测试集评估。

    trainer.train(
        reader=paddle.batch(
            paddle.reader.shuffle(
                cloud_reader(
                    ["/pfs/dlnel/public/dataset/uci_housing/uci_housing*"],
                    etcd_endpoints),
                buf_size=500),
            batch_size=2),
        feeding={'x': 0,
                 'y': 1},
        event_handler=event_handler,
        num_passes=30)
        
配置trainer.train，我们来看看相应的逻辑：
    
    for pass_id in xrange(num_passes):
            event_handler(v2_event.BeginPass(pass_id))
            pass_evaluator.start()
            self.__parameter_updater__.startPass()
            for batch_id, data_batch in enumerate(reader()):
                batch_evaluator.start()
                event_handler(
                    v2_event.BeginIteration(
                        pass_id=pass_id, batch_id=batch_id))
                pass_type = self.__parameter_updater__.startBatch(
                    len(data_batch))
                in_args = feeder(data_batch)
                self.__prepare_parameter__(in_args)
                self.__gradient_machine__.forwardBackward(in_args, out_args,
                                                          pass_type)
                self.__gradient_machine__.eval(pass_evaluator)
                self.__gradient_machine__.eval(batch_evaluator)
                event_handler(
                    v2_event.EndForwardBackward(
                        pass_id=pass_id,
                        batch_id=batch_id,
                        gm=self.__gradient_machine__))
                for each_param in self.__gradient_machine__.getNonStaticParameters(
                ):
                    self.__parameter_updater__.update(each_param)
                cost_sum = out_args.sum()
                cost = cost_sum / len(data_batch)
                self.__parameter_updater__.finishBatch(cost)
                batch_evaluator.finish()
                event_handler(
                    v2_event.EndIteration(
                        pass_id=pass_id,
                        batch_id=batch_id,
                        cost=cost,
                        evaluator=batch_evaluator,
                        gm=self.__gradient_machine__))

            self.__parameter_updater__.finishPass()
            pass_evaluator.finish()
            event_handler(
                v2_event.EndPass(
                    pass_id,
                    evaluator=pass_evaluator,
                    gm=self.__gradient_machine__))

event_handler分别在BeginIteration\endIterator\BeginPass\EndPass去分别更新batch_id\pass_id 等信息，在event_handler中，如果是endIterator或者EndPass对象，则打印相应的信息。

### 经典例子
前面从基础部分了解了paddlepaddle， 也从代码层面看了，如何在框架中实现一些基本的逻辑功能、参数配置等等，接下来会给出两个例子：线性回归和逻辑回归。
#### 线性回归

#### 逻辑回归


In [None]:
import paddle.v2 as paddle
import numpy as np

paddle.init(use_gpu=False)

x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(2))
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
y_predict = paddle.layer.fc(input=x, size=1, act=paddle.activation.Linear())
cost = paddle.layer.square_error_cost(input=y_predict, label=y)

parameters = paddle.parameters.create(cost)
optimizer = paddle.optimizer.Momentum(momentum=0)
trainer = paddle.trainer.SGD(cost=cost,
                             parameters=parameters,
                             update_equation=optimizer)

def event_handler(event):
    if isinstance(event, paddle.event.EndIteration):
        if event.batch_id % 1 == 0:
            print "Pass %d, Batch %d, Cost %f" % (event.pass_id, event.batch_id,
                                                  event.cost)
    # product model every 10 pass
    if isinstance(event, paddle.event.EndPass):
        if event.pass_id % 10 == 0:
            with open('params_pass_%d.tar' % event.pass_id, 'w') as f:
                trainer.save_parameter_to_tar(f)

def train_reader():
    train_x = np.array([[1, 1], [1, 2], [3, 4], [5, 2]])
    train_y = np.array([[-2], [-3], [-7], [-7]])

    def reader():
        for i in xrange(train_y.shape[0]):
            yield train_x[i], train_y[i]

    return reader

feeding = {'x': 0, 'y': 1}

trainer.train(
    reader=paddle.batch(
        train_reader(), batch_size=1),
    feeding=feeding,
    event_handler=event_handler,
    num_passes=100)

In [1]:
import paddle.v2 as paddle