Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No mixed precision support with GradientAccumulateOptimizer? #113

Open
dPys opened this issue Sep 8, 2023 · 7 comments
Open

No mixed precision support with GradientAccumulateOptimizer? #113

dPys opened this issue Sep 8, 2023 · 7 comments
Assignees
Labels
bug Something isn't working enhancement New feature or request

Comments

@dPys
Copy link

dPys commented Sep 8, 2023

Love this package!

Curious -- is the lack of mixed-precision support for GradientAccumulateOptimizer intentional (e.g. perhaps observed issues with stability?) or is this already something on the to-do list that just has yet to be implemented? if just the latter, I've put together a rough draft for how this might look (assuming the input optimizer is either SGD or ADAM and has already wrapped in the LossScaleOptimizer):

@tf.keras.utils.register_keras_serializable("gradient-accumulator")
class GradientAccumulateOptimizer(opt):
    """Optimizer wrapper for gradient accumulation.
    """

    def __init__(
        self,
        optimizer="SGD",
        accum_steps=1,
        reduction: str = "MEAN",
        mixed_precision: bool = False,
        name: str = "GradientAccumulateOptimizer",
        **kwargs
    ):
        """Construct a new GradientAccumulateOptimizer optimizer.

        Adding support for sparse tensors was tricky, but this resource was
        helpful. Note that you need to implement both _resource_apply_sparse()
        and _resource_apply_sparse_duplicate_indices() for it to work as
        intended.

        See here for more information regarding implementation:
        * https://github.com/tensorflow/addons/blob/master/tensorflow_addons/optimizers/average_wrapper.py#L93  # noqa

        Args:
            optimizer: str or `tf.keras.optimizers.Optimizer` that will be
                used to compute and apply gradients.
            accum_steps: int > 0. Update gradient in every accumulation steps.
            reduction: str. Which gradient reduction method to use. Defaults
                to 'SUM'.
            mixed_precision: bool. Whether to use mixed precision training.
            name: Optional name for the operations created when applying
                gradients. Defaults to "GradientAccumulateOptimizer".
            **kwargs: keyword arguments. Allowed to be {`clipnorm`,
                `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by
                norm; `clipvalue` is clip gradients by value, `decay` is
                included for backward compatibility to allow time inverse
                decay of learning rate. `lr` is included for backward
                compatibility, recommended to use `learning_rate` instead.
        """
        self._optimizer = tf.keras.optimizers.get(optimizer)
        self._accum_steps = accum_steps
        self._reduction = reduction
        self._step = None
        self.mixed_precision = mixed_precision
        super().__init__(name, **kwargs)

    def get_slot(self, *args, **kwargs):
        return self._optimizer.get_slot(*args, **kwargs)

    def add_slot(self, *args, **kwargs):
        return self._optimizer.add_slot(*args, **kwargs)

    def _create_slots(self, var_list):
        """Creates slots for optimizer gradients.

        Args:
            List of trainable variables.
        """
        self._optimizer._create_slots(var_list=var_list)
        for var in var_list:
            self.add_slot(var, "ga")

        self._gradients = [self.get_slot(var, "ga") for var in var_list]

        if self.mixed_precision:
            # Update this line
            self._optimizer.inner_optimizer._create_slots(var_list=var_list)

            for var in var_list:
                self.add_slot(var, "ga", initializer=tf.zeros(var.shape, var.dtype))

            # Initialize Adam optimizer slots if necessary
            if isinstance(
                self._optimizer.inner_optimizer, tf.keras.optimizers.Adam
            ):  # change this line as well
                for var in var_list:
                    self.add_slot(var, "m")
                    self.add_slot(var, "v")

            self._gradients = [self.get_slot(var, "ga") for var in var_list]

    @property
    def step(self):
        """The number of training steps this Optimizer has run.
        Initializes step variable if None.

        Returns:
            Current number of optimizer steps.
        """
        if self._step is None:
            with self._distribution_strategy_scope():
                self._step = self.add_weight(
                    "iter",
                    shape=[],
                    initializer="ones",
                    dtype=tf.int64,
                    trainable=False,
                    aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA,
                )
            self._weights.append(self._step)
        return self._step

    @step.setter
    def step(self, variable):  # pragma: no cover
        """Sets the step value."""
        if self._step is not None:
            raise RuntimeError(
                "Cannot set `step` to a new Variable after "
                "the Optimizer weights have been created"
            )
        self._step = variable
        self._weights.append(self._step)

    @property
    def gradients(self):  # pragma: no cover
        """The accumulated gradients on the current replica.

        Returns:
            Current gradients in optimizer.
        """
        if not self._gradients:
            raise ValueError(
                "The accumulator should be called first to initialize the" "gradients"
            )
        return list(
            gradient.read_value() if gradient is not None else gradient
            for gradient in self._gradients
        )

    def apply_gradients(self, grads_and_vars, name=None, **kwargs):
        """Updates weights using gradients.

        Args:
            grads_and_vars: dict containing variables and corresponding
                gradients.
            name: name to set when applying gradients.
            **kwargs: keyword arguments.
        Return:
            Updated weights.
        """
        train_op = super().apply_gradients(grads_and_vars, name, **kwargs)
        with tf.control_dependencies([train_op]):
            with tf.control_dependencies(
                [
                    self._optimizer.iterations.assign_add(
                        tf.cast(
                            tf.where(self.step % self._accum_steps == 0, 1, 0),
                            tf.int64,
                        ),
                        read_value=False,
                    )
                ]
            ):
                return self.step.assign_add(1, read_value=False)

    def _resource_apply_dense(self, grad, var, apply_state=None):  # pragma: no cover
        """Performs gradient update on dense tensor.

        Args:
            grad: current gradient.
            var: current variable.
            apply_state: whether to apply X.
        Returns:
            apply_op.
        """

        if self.mixed_precision:
            opt_to_use = self._optimizer.inner_optimizer
        else:
            opt_to_use = self._optimizer

        accum_gradient = self.get_slot(var, "ga")
        if accum_gradient is not None and grad is not None:
            accum_gradient.assign_add(
                tf.math.divide(grad, self._accum_steps),
                use_locking=self._use_locking,
                read_value=False,
            )

        def _apply(accum_gradient, var, apply_state):
            grad = tf.where(
                self.step % self._accum_steps == 0,
                accum_gradient,
                tf.zeros_like(var),
            )

            if self.mixed_precision:
                grad = self.optimizer.get_unscaled_gradients([grad])[0]

            train_op = opt_to_use._resource_apply_dense(
                grad,
                var,
                apply_state if "apply_state" in opt_to_use._sparse_apply_args else None,
            )

            reset_val = tf.where(
                grad == accum_gradient,
                tf.zeros_like(accum_gradient),
                accum_gradient,
            )
            reset_op = accum_gradient.assign(
                reset_val,
                use_locking=self._use_locking,
                read_value=False,
            )

            return tf.group(train_op, reset_op)

        return _apply(accum_gradient, var, apply_state)

    def _resource_apply_sparse(
        self, grad, var, indices, apply_state=None
    ):  # pragma: no cover
        """Performs gradient update on sparse tensor.

        Args:
            grad: current gradient.
            var: current variable.
            indices: relevant indices to be used for masking the sparse tensor
                during update.
        Returns:
            apply_op.
        """

        if self.mixed_precision:
            opt_to_use = self._optimizer.inner_optimizer
        else:
            opt_to_use = self._optimizer

        accum_gradient = self.get_slot(var, "ga")

        if accum_gradient is not None and grad is not None:
            self._resource_scatter_add(
                accum_gradient, indices, tf.math.divide(grad, self._accum_steps)
            )

        def _apply(accum_gradient, var, apply_state):
            grad = tf.where(
                self.step % self._accum_steps == 0,
                accum_gradient,
                tf.zeros_like(var),
            )

            if self.mixed_precision:
                grad = self.optimizer.get_unscaled_gradients([grad])[0]

            train_op = opt_to_use._resource_apply_sparse(
                accum_gradient.sparse_read(indices),
                var,
                apply_state if "apply_state" in opt_to_use._sparse_apply_args else None,
            )

            reset_val = tf.where(
                grad == accum_gradient,
                tf.zeros_like(accum_gradient),
                accum_gradient,
            )
            reset_op = accum_gradient.assign(
                reset_val,
                use_locking=self._use_locking,
                read_value=False,
            )

            return tf.group(train_op, reset_op)

        return _apply(accum_gradient, var, apply_state)

    # TODO: needs to be updated and tested
    def _resource_apply_sparse_duplicate_indices(
        self, grad, var, indices, apply_state=None
    ):  # pragma: no cover
        """Performs gradient update on sparse tensor.

        Args:
            grad: current gradient.
            var: current variable.
            indices: relevant indices to be used for masking the sparse tensor
                during update.
        Returns:
            apply_op.
        """
        if self.mixed_precision:
            opt_to_use = self._optimizer.inner_optimizer
        else:
            opt_to_use = self._optimizer

        accum_gradient = self.get_slot(var, "ga")

        if accum_gradient is not None and grad is not None:
            self._resource_scatter_add(
                accum_gradient, indices, tf.math.divide(grad, self._accum_steps)
            )

        def _apply(accum_gradient, var, apply_state):
            grad = tf.where(
                self.step % self._accum_steps == 0,
                accum_gradient,
                tf.zeros_like(var),
            )

            if self.mixed_precision:
                grad = self.optimizer.get_unscaled_gradients([grad])[0]

            train_op = opt_to_use._resource_apply_sparse_duplicate_indices(
                accum_gradient.sparse_read(indices),
                var,
                apply_state if "apply_state" in opt_to_use._sparse_apply_args else None,
            )

            reset_val = tf.where(
                grad == accum_gradient,
                tf.zeros_like(accum_gradient),
                accum_gradient,
            )
            reset_op = accum_gradient.assign(
                reset_val,
                use_locking=self._use_locking,
                read_value=False,
            )

            return tf.group(train_op, reset_op)

        return _apply(accum_gradient, var, apply_state)

    def reset(self):  # pragma: no cover
        """Resets the accumulated gradients on the current replica."""
        assign_ops = []
        if not self._gradients:
            return assign_ops

        for gradient in self._gradients:
            if gradient is not None:
                assign_ops.append(
                    gradient.assign(
                        tf.zeros_like(gradient),
                        use_locking=self._use_locking,
                        read_value=False,
                    )
                )

        return tf.group(assign_ops)

    @property
    def optimizer(self):
        """The optimizer that this AccumOptimizer is wrapping."""
        return self._optimizer

    @property
    def iterations(self):
        """Returns current iteration value of optimizer.

        Returns:
            iterations of optimizer."""
        return self._optimizer.iterations

    @iterations.setter
    def iterations(self, variable):
        """Sets the iterations value of optimizer."""
        self._optimizer.iterations = variable

    @property
    def learning_rate(self):  # pragma: no cover
        """Returns the learning rate of the optimizer.

        Returns:
            learning rate of optimizer.
        """
        return self._optimizer._get_hyper("learning_rate")

    @learning_rate.setter
    def learning_rate(self, learning_rate):  # pragma: no cover
        """Sets the learning rate of the optimizer.

        Args:
            learning_rate: which learning rate to set in the optimizer.
        """
        self._optimizer._set_hyper("learning_rate", learning_rate)

    def get_config(self):
        """Returns the configuration as dict."""
        config = {
            "optimizer": tf.keras.optimizers.serialize(self._optimizer),
            "accum_steps": self._accum_steps,
            "reduction": self._reduction,
            "mixed_precision": self.mixed_precision,
        }
        base_config = super().get_config()
        return dict(list(base_config.items()) + list(config.items()))

    @classmethod
    def from_config(cls, config, custom_objects=None):
        """Gets config of original optimizer and deserializes it."""
        optimizer = tf.keras.optimizers.deserialize(
            config.pop("optimizer"), custom_objects=custom_objects
        )
        return cls(optimizer, **config)

curious to hear your thoughts!

@andreped
Copy link
Owner

andreped commented Sep 8, 2023

@dPys I have not really used the Optimizer wrapper much myself, but I assumed that it had mixed precision support.

For LossScaleOptimizer, you should just need to wrap the GradientAccumulateOptimizer (see here).

@andreped
Copy link
Owner

andreped commented Sep 9, 2023

I took another look at the source code of the LossScaleOptimizer and it seems like you are right (see here).

By wrapping the optimizer with LossScaleOptimizer, it seems like we then override the same methods of the Optimizer that we are already overriding with our GradientAccumulateOptimizer. Hence, the best solution is likely to add loss scaling directly in our wrapper, removing the need to externally call LossScaleOptimizer, as this will have the wrong behaviour.

I'm quite preoccupied with finalizing my PhD atm, but if you have time, a PR would be of great value, @dPys :]

@andreped andreped added bug Something isn't working enhancement New feature or request labels Sep 9, 2023
@andreped
Copy link
Owner

Oh, after a new look at this, I am not so sure we should apply the same loss scaling naively, as this may result in numerical instabilities.

I have observed the same also doing regular batch training with mixed precision when using loss scaling, so I would think you definitely would see the same in this scenario.

I am not sure what the best solution for this is. Note that, as discussed in this paper, it is possible to do gradient clipping instead to avoid exploding gradients. We already have implemented a technique for this in the model wrapper, and if anyone wants to test this as an alternative way to avoid gradient explosions, you could try to add the following to your code:
https://github.com/sayakpaul/Adaptive-Gradient-Clipping/blob/main/agc.py#L33

@dPys
Copy link
Author

dPys commented Jan 21, 2024

@andreped - I have a working reimplementation of the optimizer (along with the adaptive gradient clipping) that supports mixed-precision with the loss-scaling, among several other key upgrades. It has also purged eager ops in favor of pure TF, and includes @tf.function's where applicable. I've also done considerable work benchmarking to reduce the excessive graph tracing. If you wish to share ownership of your package, I'd be happy to submit it as a PR?

@andreped
Copy link
Owner

andreped commented Jan 21, 2024

@andreped - I have a working reimplementation of the optimizer (along with the adaptive gradient clipping) that supports mixed-precision with the loss-scaling, among several other key upgrades. It has also purged eager ops in favor of pure TF, and includes @tf.function's where applicable. I've also done considerable work benchmarking to reduce the excessive graph tracing. If you wish to share ownership of your package, I'd be happy to submit it as a PR?

Amazing, @dPys! Looking forward to it! 🎉

You will for sure be added as coauthor to the package on Zenodo, but let's first get the PR in, reviewed, and merged :]

I can review it tomorrow. I was hoping of making a new release. Actually v1.0.0, so if all else goes well, you will be coauthor on that release :]


EDIT: Also, if you wish to discuss the PR and contribution further, feel free to reach out on my LinkedIn:
https://www.linkedin.com/in/andr%C3%A9-pedersen/

@dPys
Copy link
Author

dPys commented Jan 21, 2024

Sweet, sounds good. I've at least gotten the ball rolling: #131

@andreped
Copy link
Owner

Great, I made some initial remarks, which you need to address before you can run the full tests. Note that you can run most of the tests locally using pytest. We can talk more in the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request
Projects
Status: To do
Development

No branches or pull requests

2 participants