# RNN

In [5]:
import numpy as np

### RNN Cell

- 正向传播
$$
\begin{align*}
a^{<t>} &= tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b_a) \\
\hat{y}^{<t>} &= softmax(W_{ya}a^{<t>} + b_y) 
\end{align*}
$$

- 反向传播
$$
\begin{align*}
\frac{\partial a^{<t>}}{\partial W_{ax}} &= (1 - \tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b)^2)\cdot x^{<t>T} \\
\frac{\partial a^{<t>}}{\partial W_{aa}} &= (1 - \tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b)^2)\cdot a^{<t-1>T} \\
\frac{\partial a^{<t>}}{\partial b} &= \sum_{batch}(1 - \tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b)^2) \\
\frac{\partial a^{<t>}}{\partial x^{<t>}} &= W_{ax}^T \cdot (1 - \tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b)^2) \\
\frac{\partial a^{<t>}}{\partial x^{<t-1>}} &= W_{aa}^T \cdot (1 - \tanh(W_{ax}x^{<t>} + W_{aa}a^{<t-1>} + b)^2) \\
\end{align*}
$$


In [3]:
class RNNCell():
    def __init__(self, n_out, act_fn="Tanh", init="glorot_uniform", optimizer=None):
        """
        Parameters
        ----------
        n_out : int
            The dimension of a single hidden state / output on a given timestep
        act_fn : str, :doc:`Activation <numpy_ml.neural_nets.activations>` object, or None
            The activation function for computing ``A[t]``. Default is `'Tanh'`.
        init : {'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform'}
            The weight initialization strategy. Default is `'glorot_uniform'`.
        optimizer : str, :doc:`Optimizer <numpy_ml.neural_nets.optimizers>` object, or None
            The optimization strategy to use when performing gradient updates
            within the :meth:`update` method.  If None, use the :class:`SGD
            <numpy_ml.neural_nets.optimizers.SGD>` optimizer with default
            parameters. Default is None.
        """
        self.init = init
        self.n_in = None
        self.n_out = n_out
        self.n_timesteps = None
        self.act_fn = ActivationInitializer(act_fn)()
        self.parameters = {"Waa": None, "Wax": None, "ba": None, "bx": None}
        self.is_initialized = False

    def _init_params(self):
        self.X = []
        init_weights = WeightInitializer(str(self.act_fn), mode=self.init)

        Wax = init_weights((self.n_in, self.n_out))
        Waa = init_weights((self.n_out, self.n_out))
        ba = np.zeros((self.n_out, 1))
        bx = np.zeros((self.n_out, 1))

        self.parameters = {"Waa": Waa, "Wax": Wax, "ba": ba, "bx": bx}

        self.gradients = {
            "Waa": np.zeros_like(Waa),
            "Wax": np.zeros_like(Wax),
            "ba": np.zeros_like(ba),
            "bx": np.zeros_like(bx),
        }

        self.derived_variables = {
            "A": [],
            "Z": [],
            "n_timesteps": 0,
            "current_step": 0,
            "dLdA_accumulator": None,
        }

        self.is_initialized = True

    def forward(self, Xt):
        """
        Compute the network output for a single timestep.

        Parameters
        ----------
        Xt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            Input at timestep `t` consisting of `n_ex` examples each of
            dimensionality `n_in`.

        Returns
        -------
        At: :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            The value of the hidden state at timestep `t` for each of the
            `n_ex` examples.
        """
        if not self.is_initialized:
            self.n_in = Xt.shape[1]
            self._init_params()

        # increment timestep
        self.derived_variables["n_timesteps"] += 1
        self.derived_variables["current_step"] += 1

        # Retrieve parameters
        ba = self.parameters["ba"]
        bx = self.parameters["bx"]
        Wax = self.parameters["Wax"]
        Waa = self.parameters["Waa"]

        # initialize the hidden state to zero
        As = self.derived_variables["A"]
        if len(As) == 0:
            n_ex, n_in = Xt.shape
            A0 = np.zeros((n_ex, self.n_out))
            As.append(A0)

        # compute next hidden state
        Zt = As[-1] @ Waa + ba.T + Xt @ Wax + bx.T
        At = self.act_fn(Zt)

        self.derived_variables["Z"].append(Zt)
        self.derived_variables["A"].append(At)

        # store intermediate variables
        self.X.append(Xt)
        return At

    def backward(self, dLdAt):
        """
        Backprop for a single timestep.

        Parameters
        ----------
        dLdAt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            The gradient of the loss wrt. the layer outputs (ie., hidden
            states) at timestep `t`.

        Returns
        -------
        dLdXt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            The gradient of the loss wrt. the layer inputs at timestep `t`.
        """
        assert self.trainable, "Layer is frozen"

        #  decrement current step
        self.derived_variables["current_step"] -= 1

        # extract context variables
        Zs = self.derived_variables["Z"]
        As = self.derived_variables["A"]
        t = self.derived_variables["current_step"]
        dA_acc = self.derived_variables["dLdA_accumulator"]

        # initialize accumulator
        if dA_acc is None:
            dA_acc = np.zeros_like(As[0])

        # get network weights for gradient calcs
        Wax = self.parameters["Wax"]
        Waa = self.parameters["Waa"]

        # compute gradient components at timestep t
        dA = dLdAt + dA_acc
        dZ = self.act_fn.grad(Zs[t]) * dA
        dXt = dZ @ Wax.T

        # update parameter gradients with signal from current step
        self.gradients["Waa"] += As[t].T @ dZ
        self.gradients["Wax"] += self.X[t].T @ dZ
        self.gradients["ba"] += dZ.sum(axis=0, keepdims=True).T
        self.gradients["bx"] += dZ.sum(axis=0, keepdims=True).T

        # update accumulator variable for hidden state
        self.derived_variables["dLdA_accumulator"] = dZ @ Waa.T
        return dXt



### GRU

- 正向传播
$$
  \begin{align*}
  z_t &= \sigma(W_t\cdot[h_{t-1}, x_t]) \\
  r_t &= \sigma(W_t\cdot[h_{t-1}, x_t]) \\
  \tilde h_t &= \tanh(W\cdot[r_t * h_{t-1}, x_t]) \\
  h_t &= (1 - z_t) * h_{t-1} + z_t * \tilde h_t \\
  \end{align*}
$$
- 反向传播

### LTSM

- 正向传播
    $$
    \Gamma_f^{\langle t \rangle} = \sigma(W_f[a^{\langle t-1 \rangle}, x^{\langle t \rangle}] + b_f) \\
    \Gamma_u^{\langle t \rangle} = \sigma(W_u[a^{\langle t-1 \rangle}, x^{\{t\}}] + b_u) \\
    \tilde{c}^{\langle t \rangle} = \tanh(W_c[a^{\langle t-1 \rangle}, x^{\langle t \rangle}] + b_c) \\
    c^{\langle t \rangle} = \Gamma_f^{\langle t \rangle}* c^{\langle t-1 \rangle} + \Gamma_u^{\langle t \rangle} *\tilde{c}^{\langle t \rangle} \\
    \Gamma_o^{\langle t \rangle}=  \sigma(W_o[a^{\langle t-1 \rangle}, x^{\langle t \rangle}] + b_o) \\
    a^{\langle t \rangle} = \Gamma_o^{\langle t \rangle}* \tanh(c^{\langle t \rangle}) \\
    $$


In [6]:
class LSTMCell(LayerBase):
    def __init__(
        self,
        n_out,
        act_fn="Tanh",
        gate_fn="Sigmoid",
        init="glorot_uniform",
        optimizer=None,
    ):
        """
        A single step of a long short-term memory (LSTM) RNN.

        Notes
        -----
        Notation:

        - ``Z[t]``  is the input to each of the gates at timestep `t`
        - ``A[t]``  is the value of the hidden state at timestep `t`
        - ``Cc[t]`` is the value of the *candidate* cell/memory state at timestep `t`
        - ``C[t]``  is the value of the *final* cell/memory state at timestep `t`
        - ``Gf[t]`` is the output of the forget gate at timestep `t`
        - ``Gu[t]`` is the output of the update gate at timestep `t`
        - ``Go[t]`` is the output of the output gate at timestep `t`

        Equations::

            Z[t]  = stack([A[t-1], X[t]])
            Gf[t] = gate_fn(Wf @ Z[t] + bf)
            Gu[t] = gate_fn(Wu @ Z[t] + bu)
            Go[t] = gate_fn(Wo @ Z[t] + bo)
            Cc[t] = act_fn(Wc @ Z[t] + bc)
            C[t]  = Gf[t] * C[t-1] + Gu[t] * Cc[t]
            A[t]  = Go[t] * act_fn(C[t])

        where `@` indicates dot/matrix product, and '*' indicates elementwise
        multiplication.

        Parameters
        ----------
        n_out : int
            The dimension of a single hidden state / output on a given timestep.
        act_fn : str, :doc:`Activation <numpy_ml.neural_nets.activations>` object, or None
            The activation function for computing ``A[t]``. Default is
            `'Tanh'`.
        gate_fn : str, :doc:`Activation <numpy_ml.neural_nets.activations>` object, or None
            The gate function for computing the update, forget, and output
            gates. Default is `'Sigmoid'`.
        init : {'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform'}
            The weight initialization strategy. Default is `'glorot_uniform'`.
        optimizer : str, :doc:`Optimizer <numpy_ml.neural_nets.optimizers>` object, or None
            The optimization strategy to use when performing gradient updates
            within the :meth:`update` method.  If None, use the :class:`SGD
            <numpy_ml.neural_nets.optimizers.SGD>` optimizer with default
            parameters. Default is None.
        """
        super().__init__(optimizer)

        self.init = init
        self.n_in = None
        self.n_out = n_out
        self.n_timesteps = None
        self.act_fn = ActivationInitializer(act_fn)()
        self.gate_fn = ActivationInitializer(gate_fn)()
        self.parameters = {
            "Wf": None,
            "Wu": None,
            "Wc": None,
            "Wo": None,
            "bf": None,
            "bu": None,
            "bc": None,
            "bo": None,
        }
        self.is_initialized = False

    def _init_params(self):
        self.X = []
        init_weights_gate = WeightInitializer(str(self.gate_fn), mode=self.init)
        init_weights_act = WeightInitializer(str(self.act_fn), mode=self.init)

        Wf = init_weights_gate((self.n_in + self.n_out, self.n_out))
        Wu = init_weights_gate((self.n_in + self.n_out, self.n_out))
        Wc = init_weights_act((self.n_in + self.n_out, self.n_out))
        Wo = init_weights_gate((self.n_in + self.n_out, self.n_out))

        bf = np.zeros((1, self.n_out))
        bu = np.zeros((1, self.n_out))
        bc = np.zeros((1, self.n_out))
        bo = np.zeros((1, self.n_out))

        self.parameters = {
            "Wf": Wf,
            "Wu": Wu,
            "Wc": Wc,
            "Wo": Wo,
            "bf": bf,
            "bu": bu,
            "bc": bc,
            "bo": bo,
        }

        self.gradients = {
            "Wf": np.zeros_like(Wf),
            "Wu": np.zeros_like(Wu),
            "Wc": np.zeros_like(Wc),
            "Wo": np.zeros_like(Wo),
            "bf": np.zeros_like(bf),
            "bu": np.zeros_like(bu),
            "bc": np.zeros_like(bc),
            "bo": np.zeros_like(bo),
        }

        self.derived_variables = {
            "C": [],
            "A": [],
            "Gf": [],
            "Gu": [],
            "Go": [],
            "Gc": [],
            "Cc": [],
            "n_timesteps": 0,
            "current_step": 0,
            "dLdA_accumulator": None,
            "dLdC_accumulator": None,
        }

        self.is_initialized = True

    def _get_params(self):
        Wf = self.parameters["Wf"]
        Wu = self.parameters["Wu"]
        Wc = self.parameters["Wc"]
        Wo = self.parameters["Wo"]
        bf = self.parameters["bf"]
        bu = self.parameters["bu"]
        bc = self.parameters["bc"]
        bo = self.parameters["bo"]
        return Wf, Wu, Wc, Wo, bf, bu, bc, bo

    @property
    def hyperparameters(self):
        """Return a dictionary containing the layer hyperparameters."""
        return {
            "layer": "LSTMCell",
            "init": self.init,
            "n_in": self.n_in,
            "n_out": self.n_out,
            "act_fn": str(self.act_fn),
            "gate_fn": str(self.gate_fn),
            "optimizer": {
                "cache": self.optimizer.cache,
                "hyperparameters": self.optimizer.hyperparameters,
            },
        }

    def forward(self, Xt):
        """
        Compute the layer output for a single timestep.

        Parameters
        ----------
        Xt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            Input at timestep t consisting of `n_ex` examples each of
            dimensionality `n_in`.

        Returns
        -------
        At: :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            The value of the hidden state at timestep `t` for each of the `n_ex`
            examples.
        Ct: :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            The value of the cell/memory state at timestep `t` for each of the
            `n_ex` examples.
        """
        if not self.is_initialized:
            self.n_in = Xt.shape[1]
            self._init_params()

        Wf, Wu, Wc, Wo, bf, bu, bc, bo = self._get_params()

        self.derived_variables["n_timesteps"] += 1
        self.derived_variables["current_step"] += 1

        if len(self.derived_variables["A"]) == 0:
            n_ex, n_in = Xt.shape
            init = np.zeros((n_ex, self.n_out))
            self.derived_variables["A"].append(init)
            self.derived_variables["C"].append(init)

        A_prev = self.derived_variables["A"][-1]
        C_prev = self.derived_variables["C"][-1]

        # concatenate A_prev and Xt to create Zt
        Zt = np.hstack([A_prev, Xt])

        Gft = self.gate_fn(Zt @ Wf + bf)
        Gut = self.gate_fn(Zt @ Wu + bu)
        Got = self.gate_fn(Zt @ Wo + bo)
        Cct = self.act_fn(Zt @ Wc + bc)
        Ct = Gft * C_prev + Gut * Cct
        At = Got * self.act_fn(Ct)

        # bookkeeping
        self.X.append(Xt)
        self.derived_variables["A"].append(At)
        self.derived_variables["C"].append(Ct)
        self.derived_variables["Gf"].append(Gft)
        self.derived_variables["Gu"].append(Gut)
        self.derived_variables["Go"].append(Got)
        self.derived_variables["Cc"].append(Cct)
        return At, Ct

    def backward(self, dLdAt):
        """
        Backprop for a single timestep.

        Parameters
        ----------
        dLdAt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            The gradient of the loss wrt. the layer outputs (ie., hidden
            states) at timestep `t`.

        Returns
        -------
        dLdXt : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            The gradient of the loss wrt. the layer inputs at timestep `t`.
        """
        assert self.trainable, "Layer is frozen"

        Wf, Wu, Wc, Wo, bf, bu, bc, bo = self._get_params()

        self.derived_variables["current_step"] -= 1
        t = self.derived_variables["current_step"]

        Got = self.derived_variables["Go"][t]
        Gft = self.derived_variables["Gf"][t]
        Gut = self.derived_variables["Gu"][t]
        Cct = self.derived_variables["Cc"][t]
        At = self.derived_variables["A"][t + 1]
        Ct = self.derived_variables["C"][t + 1]
        C_prev = self.derived_variables["C"][t]
        A_prev = self.derived_variables["A"][t]

        Xt = self.X[t]
        Zt = np.hstack([A_prev, Xt])

        dA_acc = self.derived_variables["dLdA_accumulator"]
        dC_acc = self.derived_variables["dLdC_accumulator"]

        # initialize accumulators
        if dA_acc is None:
            dA_acc = np.zeros_like(At)

        if dC_acc is None:
            dC_acc = np.zeros_like(Ct)

        # Gradient calculations
        # ---------------------

        dA = dLdAt + dA_acc
        dC = dC_acc + dA * Got * self.act_fn.grad(Ct)

        # compute the input to the gate functions at timestep t
        _Go = Zt @ Wo + bo
        _Gf = Zt @ Wf + bo
        _Gu = Zt @ Wu + bo
        _Gc = Zt @ Wc + bc

        # compute gradients wrt the *input* to each gate
        dGot = dA * self.act_fn(Ct) * self.gate_fn.grad(_Go)
        dCct = dC * Gut * self.act_fn.grad(_Gc)
        dGut = dC * Cct * self.gate_fn.grad(_Gu)
        dGft = dC * C_prev * self.gate_fn.grad(_Gf)

        dZ = dGft @ Wf.T + dGut @ Wu.T + dCct @ Wc.T + dGot @ Wo.T
        dXt = dZ[:, self.n_out :]

        self.gradients["Wc"] += Zt.T @ dCct
        self.gradients["Wu"] += Zt.T @ dGut
        self.gradients["Wf"] += Zt.T @ dGft
        self.gradients["Wo"] += Zt.T @ dGot
        self.gradients["bo"] += dGot.sum(axis=0, keepdims=True)
        self.gradients["bu"] += dGut.sum(axis=0, keepdims=True)
        self.gradients["bf"] += dGft.sum(axis=0, keepdims=True)
        self.gradients["bc"] += dCct.sum(axis=0, keepdims=True)

        self.derived_variables["dLdA_accumulator"] = dZ[:, : self.n_out]
        self.derived_variables["dLdC_accumulator"] = Gft * dC
        return dXt

    def flush_gradients(self):
        assert self.trainable, "Layer is frozen"

        self.X = []
        for k, v in self.derived_variables.items():
            self.derived_variables[k] = []

        self.derived_variables["n_timesteps"] = 0
        self.derived_variables["current_step"] = 0

        # reset parameter gradients to 0
        for k, v in self.parameters.items():
            self.gradients[k] = np.zeros_like(v)

