From 6cd760f5bb264acccd91fe2f7d7f63c02e4c3d57 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 26 Jun 2020 18:55:00 -0500 Subject: [PATCH 01/12] iteration averaging --- src/gluonts/mx/trainer/__init__.py | 3 +- src/gluonts/mx/trainer/_base.py | 46 +++- .../mx/trainer/model_iteration_averaging.py | 257 ++++++++++++++++++ src/gluonts/trainer.py | 1 + .../trainer/test_model_iteration_averaging.py | 182 +++++++++++++ 5 files changed, 483 insertions(+), 6 deletions(-) create mode 100644 src/gluonts/mx/trainer/model_iteration_averaging.py create mode 100644 test/trainer/test_model_iteration_averaging.py diff --git a/src/gluonts/mx/trainer/__init__.py b/src/gluonts/mx/trainer/__init__.py index 8eec15d116..5bb425c535 100644 --- a/src/gluonts/mx/trainer/__init__.py +++ b/src/gluonts/mx/trainer/__init__.py @@ -14,9 +14,10 @@ # Relative imports from . import learning_rate_scheduler as lrs from . import model_averaging +from . import model_iteration_averaging from ._base import Trainer -__all__ = ["lrs", "Trainer", "model_averaging"] +__all__ = ["lrs", "Trainer", "model_averaging", "model_iteration_averaging"] # fix Sphinx issues, see https://bit.ly/2K2eptM for item in __all__: diff --git a/src/gluonts/mx/trainer/_base.py b/src/gluonts/mx/trainer/_base.py index dd23c7ec46..ca3988cd31 100644 --- a/src/gluonts/mx/trainer/_base.py +++ b/src/gluonts/mx/trainer/_base.py @@ -39,6 +39,13 @@ SelectNBestMean, save_epoch_info, ) +# iteration averaging +from .model_iteration_averaging import ( + IterationAveragingStrategy, + NTA_V1, + NTA_V2, + Alpha_Suffix, +) logger = logging.getLogger("gluonts").getChild("trainer") @@ -113,7 +120,7 @@ def __init__( weight_decay: float = 1e-8, init: Union[str, mx.initializer.Initializer] = "xavier", hybridize: bool = True, - avg_strategy: AveragingStrategy = SelectNBestMean(num_models=1), + avg_strategy: Union[AveragingStrategy,IterationAveragingStrategy] = SelectNBestMean(num_models=1), ) -> None: assert ( @@ -228,6 +235,10 @@ def loop( epoch_loss = mx.metric.Loss() + # use averaged model for validation + if not is_training and isinstance(self.avg_strategy, IterationAveragingStrategy): + self.avg_strategy.load_averaged_model(net) + with tqdm(batch_iter) as it: for batch_no, data_entry in enumerate(it, start=1): if self.halt: @@ -251,6 +262,10 @@ def loop( loss.backward() trainer.step(batch_size) + # iteration averaging in training + if isinstance(self.avg_strategy, IterationAveragingStrategy): + self.avg_strategy.apply(net) + epoch_loss.update(None, preds=loss) lv = loss_value(epoch_loss) @@ -289,6 +304,11 @@ def loop( ("" if is_training else "validation_") + "epoch_loss", lv, ) + + if not is_training and isinstance(self.avg_strategy, IterationAveragingStrategy): + # bring back the cached model + self.avg_strategy.load_cached_model(net) + return epoch_loss for epoch_no in range(self.epochs): @@ -307,6 +327,17 @@ def loop( epoch_no, validation_iter, is_training=False ) + # update average trigger + if isinstance(self.avg_strategy, IterationAveragingStrategy): + if isinstance(self.avg_strategy, Alpha_Suffix): + # alpha suffix + self.avg_strategy.update_average_trigger(epoch_no+1) + else: + # NTA + self.avg_strategy.update_average_trigger(loss_value(epoch_loss)) + # once triggered, update the average immediately + self.avg_strategy.apply(net) + should_continue = lr_scheduler.step(loss_value(epoch_loss)) if not should_continue: logger.info("Stopping training") @@ -344,10 +375,15 @@ def loop( best_epoch_info["params_path"], self.ctx ) - logging.info("Computing averaged parameters.") - averaged_params_path = self.avg_strategy.apply(gluonts_temp) + if isinstance(self.avg_strategy, AveragingStrategy): + logging.info("Computing averaged parameters.") + averaged_params_path = self.avg_strategy.apply(gluonts_temp) + + logging.info("Loading averaged parameters.") + net.load_parameters(averaged_params_path, self.ctx) - logging.info("Loading averaged parameters.") - net.load_parameters(averaged_params_path, self.ctx) + if isinstance(self.avg_strategy, IterationAveragingStrategy): + logging.info("Loading averaged parameters.") + self.avg_strategy.load_averaged_model(net) logger.info("End model training") diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py new file mode 100644 index 0000000000..e1dbb456b1 --- /dev/null +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -0,0 +1,257 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +# Standard library imports +from typing import Any, Dict, Optional + +import mxnet as mx +import mxnet.gluon.nn as nn + +# First-party imports +from gluonts.core.component import validated + + +class IterationAveragingStrategy: + @validated() + def __init__(self): + r""" + Parameters + ---------- + averaged_model + Dict that maintains the averaged model parameters. + cached_model + Temporarily save the current model, so that the averaged model can be used for validation. + average_counter + The number of models accumulated in the average. + averaging_started + Indicate whether the model averaging has started. + """ + + self.averaged_model = None + self.cached_model = None + self.average_counter = 0 + self.averaging_started = False + + def update_average_trigger(self, average_trigger: Any): + r""" + Parameters + ---------- + average_trigger + The criteria to trigger averaging. + + Returns + ------- + """ + # implement a naive strategy, use average_trigger as boolean + self.averaging_started = average_trigger + # raise NotImplementedError() + + def apply(self, model: nn.HybridBlock) -> Optional[Dict]: + r""" + Parameters + ---------- + model + The model of the current iteration. + + Returns + ------- + The averaged model, None if the averaging hasn't started. + """ + + if self.averaging_started: + self.update_average(model) + + return self.averaged_model + + def update_average(self, model: nn.HybridBlock): + r""" + Parameters + ---------- + model + The model to update the average. + """ + self.average_counter += 1 + if self.averaged_model is None: + self.averaged_model = {k: v.list_data()[0].copy() \ + for k, v in model.collect_params().items()} + else: + alpha = 1. / self.average_counter + # moving average + for name, param_avg in self.averaged_model.items(): + param_avg[:] += alpha * (model.collect_params()[name].list_data()[0] - param_avg) + + def load_averaged_model(self, model:nn.HybridBlock): + r""" + When validating/evaluating the averaged model in the half way of training, + use load_averaged_model first to load the averaged model and overwrite the current model, + do the evaluation, and then use load_cached_model to load the current model back. + + Parameters + ---------- + model + The model that the averaged model is loaded to. + """ + if self.averaged_model is not None: + # cache the current model + if self.cached_model is None: + self.cached_model = {k: v.list_data()[0].copy() \ + for k, v in model.collect_params().items()} + else: + for name, param_cached in self.cached_model.items(): + param_cached[:] = model.collect_params()[name].list_data()[0] + # load the averaged model + for name, param_avg in self.averaged_model.items(): + model.collect_params()[name].set_data(param_avg) + + def load_cached_model(self, model:nn.HybridBlock): + r""" + Parameters + ---------- + model + The model that the cached model is loaded to. + """ + if self.cached_model is not None: + # load the cached model + for name, param_cached in self.cached_model.items(): + model.collect_params()[name].set_data(param_cached) + +class NTA_V1(IterationAveragingStrategy): + @validated() + def __init__(self, n: int = 5, maximize: bool = False): + r""" + Depending on the choice of metrics, the users may want to minimize or maximize the metrics. + Thus, set maximize = True to maximize, otherwise minimize. + + Parameters + ---------- + n + The non-montone interval. + maximize + Whether to maximize or minimize the validation metric. + val_logs + Historical validation metrics. + """ + + super().__init__() + + self.n = n + self.maximize = maximize + self.val_logs = [] + + def update_average_trigger(self, average_trigger: Any): + r""" + Parameters + ---------- + average_trigger + The criteria to trigger averaging, evaluation metrics in this case. + + Returns + ------- + """ + + # implement NTA (salesforce) + # this is the implementation from the iclr (and salesforce github) version, which mismatches the arxiv (and gluonnlp) version + if not self.averaging_started and self.n > 0: + if self.maximize: + if len(self.val_logs) > self.n and average_trigger < max(self.val_logs[:-self.n]): + self.averaging_started = True + else: + if len(self.val_logs) > self.n and average_trigger > min(self.val_logs[:-self.n]): + self.averaging_started = True + self.val_logs.append(average_trigger) + +class NTA_V2(IterationAveragingStrategy): + @validated() + def __init__(self, n: int = 5, maximize: bool = False): + r""" + Parameters + ---------- + n + The non-montone interval + maximize + Whether to maximize or minimize the validation metric + val_logs + Historical validation metrics + """ + + super().__init__() + + self.n = n + self.maximize = maximize + self.val_logs = [] + + def update_average_trigger(self, average_trigger: Any): + r""" + Parameters + ---------- + average_trigger + The criteria to trigger averaging, evaluation metrics in this case. + + Returns + ------- + """ + + # implement NTA (gluonnlp) + if not self.averaging_started and self.n > 0: + if self.maximize: + # in gluonnlp awd-lstm, "len(self.val_logs) > self.n" is used, but I think it should be ">=" instead + if len(self.val_logs) >= self.n and average_trigger < max(self.val_logs[-self.n:]): + self.averaging_started = True + else: + if len(self.val_logs) >= self.n and average_trigger > min(self.val_logs[-self.n:]): + self.averaging_started = True + self.val_logs.append(average_trigger) + +class Alpha_Suffix(IterationAveragingStrategy): + @validated() + def __init__(self, epochs: int, alpha: float = 0.75): + r""" + Taking iteration average for the last epoch*alpha epochs + + Parameters + ---------- + epochs + The total number of epochs. + alpha + Propotion of averaging. + alpha_suffix + The epoch where iteration averaging starts. + """ + + super().__init__() + + assert (alpha >= 0 and alpha <= 1) + + self.alpha_suffix = epochs * (1. - alpha) + + def update_average_trigger(self, average_trigger: Any): + r""" + Parameters + ---------- + average_trigger + The current number of epoch. + + Returns + ------- + """ + + # implement NTA (gluonnlp) + if not self.averaging_started: + if average_trigger >= self.alpha_suffix: + self.averaging_started = True + + + + + + \ No newline at end of file diff --git a/src/gluonts/trainer.py b/src/gluonts/trainer.py index 19784c74fe..5cf366b383 100644 --- a/src/gluonts/trainer.py +++ b/src/gluonts/trainer.py @@ -30,6 +30,7 @@ "_base", "learning_rate_scheduler", "model_averaging", + "model_iteration_averaging", ): sys.modules[f"gluonts.trainer.{submodule}"] = importlib.import_module( f"gluonts.mx.trainer.{submodule}" diff --git a/test/trainer/test_model_iteration_averaging.py b/test/trainer/test_model_iteration_averaging.py new file mode 100644 index 0000000000..71f548b8bb --- /dev/null +++ b/test/trainer/test_model_iteration_averaging.py @@ -0,0 +1,182 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +# Third-party imports +import mxnet as mx +import mxnet.gluon.nn as nn +import numpy as np +import pytest +import pandas as pd +import math + +# First-party imports +from gluonts.dataset.common import ListDataset +from gluonts.model.simple_feedforward import SimpleFeedForwardEstimator +from gluonts.mx.trainer import Trainer +from gluonts.mx.trainer.model_iteration_averaging import ( + IterationAveragingStrategy, + NTA_V1, + NTA_V2, + Alpha_Suffix, +) + +def initialize_model() -> nn.HybridBlock: + # dummy training data + N = 10 # number of time series + T = 100 # number of timesteps + prediction_length = 24 + freq = "1H" + custom_dataset = np.random.normal(size=(N, T)) + start = pd.Timestamp("01-01-2019", freq=freq) # can be different for each time series + train_ds = ListDataset([{'target': x, 'start': start} + for x in custom_dataset[:, :-prediction_length]], + freq=freq) + # create a simple model + estimator = SimpleFeedForwardEstimator( + num_hidden_dimensions=[10], + prediction_length=prediction_length, + context_length=T, + freq=freq, + trainer=Trainer(ctx="cpu", + epochs=1, + learning_rate=1e-3, + num_batches_per_epoch=1 + ) + ) + # train model + predictor = estimator.train(train_ds) + + return predictor.prediction_net + +@pytest.mark.parametrize("n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20]) +def test_NTA_V1(n: int): + model = initialize_model() + params = model.collect_params() + avg_strategy = NTA_V1(n=n) + loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] + for i, loss in enumerate(loss_list): + for k,v in params.items(): + for arr in v.list_data(): + arr[:] = i + avg_strategy.update_average_trigger(loss) + avg_strategy.apply(model) + # nothing is cached yet, thus load_cached_model won't change anything + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + # test averaged model + avg_strategy.load_averaged_model(model) + if n <= 0 or n > 6: + # average never happends, model is not changed + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + else: + for k,v in params.items(): + for arr in v.list_data(): + # NTA_V1 takes the average on the last 7-n iterations + assert mx.nd.norm(arr - (4+n+10)/2.).asscalar() < 1e-30 + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + +@pytest.mark.parametrize("n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20]) +def test_NTA_V2(n: int): + model = initialize_model() + params = model.collect_params() + avg_strategy = NTA_V2(n=n) + loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] + for i, loss in enumerate(loss_list): + for k,v in params.items(): + for arr in v.list_data(): + arr[:] = i + avg_strategy.update_average_trigger(loss) + avg_strategy.apply(model) + # nothing is cached yet, thus load_cached_model won't change anything + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + # test averaged model + avg_strategy.load_averaged_model(model) + if n <= 0 or n >= len(loss_list): + # average never happends, model is not changed + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + else: + for k,v in params.items(): + for arr in v.list_data(): + # NTA_V2 takes the average once the loss increases, no matter what n is taken + # (the first n iterations are ignored) + if n <= 4: + val = 7 + else: + val = (n+10) / 2. + assert mx.nd.norm(arr - val).asscalar() < 1e-30 + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + +@pytest.mark.parametrize("alpha", [0.0, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0]) +def test_Alpha_Suffix(alpha: float): + model = initialize_model() + params = model.collect_params() + loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] + avg_strategy = Alpha_Suffix(epochs=len(loss_list), alpha=alpha) + for i, loss in enumerate(loss_list): + for k,v in params.items(): + for arr in v.list_data(): + arr[:] = i + avg_strategy.update_average_trigger(i+1) + avg_strategy.apply(model) + # nothing is cached yet, thus load_cached_model won't change anything + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + # test averaged model + avg_strategy.load_averaged_model(model) + n = max(int(math.ceil(len(loss_list)*(1-alpha))), 1) + if n > len(loss_list): + # average never happends, model is not changed + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + else: + for k,v in params.items(): + for arr in v.list_data(): + val = (n+9) / 2. + assert mx.nd.norm(arr - val).asscalar() < 1e-30 + # test cached model + avg_strategy.load_cached_model(model) + for k,v in params.items(): + for arr in v.list_data(): + # the last model should have 10 in all coordinates + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 \ No newline at end of file From 630bf3496b3bb70e7d281812af6d3f883a0bf566 Mon Sep 17 00:00:00 2001 From: Zheng Date: Fri, 26 Jun 2020 18:33:58 -0700 Subject: [PATCH 02/12] fix and clean --- src/gluonts/mx/trainer/_base.py | 4 +++- src/gluonts/mx/trainer/model_iteration_averaging.py | 11 ++++++----- test/trainer/test_model_iteration_averaging.py | 6 +++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/gluonts/mx/trainer/_base.py b/src/gluonts/mx/trainer/_base.py index ca3988cd31..fd84607e5e 100644 --- a/src/gluonts/mx/trainer/_base.py +++ b/src/gluonts/mx/trainer/_base.py @@ -332,9 +332,11 @@ def loop( if isinstance(self.avg_strategy, Alpha_Suffix): # alpha suffix self.avg_strategy.update_average_trigger(epoch_no+1) - else: + elif isinstance(self.avg_strategy, (NTA_V1, NTA_V2)): # NTA self.avg_strategy.update_average_trigger(loss_value(epoch_loss)) + else: + raise NotImplementedError # once triggered, update the average immediately self.avg_strategy.apply(net) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index e1dbb456b1..4fccc94112 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -14,7 +14,6 @@ # Standard library imports from typing import Any, Dict, Optional -import mxnet as mx import mxnet.gluon.nn as nn # First-party imports @@ -125,6 +124,7 @@ def load_cached_model(self, model:nn.HybridBlock): for name, param_cached in self.cached_model.items(): model.collect_params()[name].set_data(param_cached) + class NTA_V1(IterationAveragingStrategy): @validated() def __init__(self, n: int = 5, maximize: bool = False): @@ -170,6 +170,7 @@ def update_average_trigger(self, average_trigger: Any): self.averaging_started = True self.val_logs.append(average_trigger) + class NTA_V2(IterationAveragingStrategy): @validated() def __init__(self, n: int = 5, maximize: bool = False): @@ -177,7 +178,7 @@ def __init__(self, n: int = 5, maximize: bool = False): Parameters ---------- n - The non-montone interval + The non-monotone interval maximize Whether to maximize or minimize the validation metric val_logs @@ -212,6 +213,7 @@ def update_average_trigger(self, average_trigger: Any): self.averaging_started = True self.val_logs.append(average_trigger) + class Alpha_Suffix(IterationAveragingStrategy): @validated() def __init__(self, epochs: int, alpha: float = 0.75): @@ -223,7 +225,7 @@ def __init__(self, epochs: int, alpha: float = 0.75): epochs The total number of epochs. alpha - Propotion of averaging. + Proportion of averaging. alpha_suffix The epoch where iteration averaging starts. """ @@ -244,8 +246,7 @@ def update_average_trigger(self, average_trigger: Any): Returns ------- """ - - # implement NTA (gluonnlp) + if not self.averaging_started: if average_trigger >= self.alpha_suffix: self.averaging_started = True diff --git a/test/trainer/test_model_iteration_averaging.py b/test/trainer/test_model_iteration_averaging.py index 71f548b8bb..9163429089 100644 --- a/test/trainer/test_model_iteration_averaging.py +++ b/test/trainer/test_model_iteration_averaging.py @@ -80,7 +80,7 @@ def test_NTA_V1(n: int): # test averaged model avg_strategy.load_averaged_model(model) if n <= 0 or n > 6: - # average never happends, model is not changed + # average never happens, model is not changed for k,v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates @@ -119,7 +119,7 @@ def test_NTA_V2(n: int): # test averaged model avg_strategy.load_averaged_model(model) if n <= 0 or n >= len(loss_list): - # average never happends, model is not changed + # average never happens, model is not changed for k,v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates @@ -164,7 +164,7 @@ def test_Alpha_Suffix(alpha: float): avg_strategy.load_averaged_model(model) n = max(int(math.ceil(len(loss_list)*(1-alpha))), 1) if n > len(loss_list): - # average never happends, model is not changed + # average never happens, model is not changed for k,v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates From 5cc3d69d1f71c073c7da931f5d75f001f1207905 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 29 Jun 2020 20:26:01 +0000 Subject: [PATCH 03/12] reformated by black --- src/gluonts/mx/trainer/_base.py | 34 ++++++-- .../mx/trainer/model_iteration_averaging.py | 64 ++++++++------- .../trainer/test_model_iteration_averaging.py | 78 +++++++++++-------- 3 files changed, 108 insertions(+), 68 deletions(-) diff --git a/src/gluonts/mx/trainer/_base.py b/src/gluonts/mx/trainer/_base.py index fd84607e5e..aea2f6a28f 100644 --- a/src/gluonts/mx/trainer/_base.py +++ b/src/gluonts/mx/trainer/_base.py @@ -39,6 +39,7 @@ SelectNBestMean, save_epoch_info, ) + # iteration averaging from .model_iteration_averaging import ( IterationAveragingStrategy, @@ -120,7 +121,9 @@ def __init__( weight_decay: float = 1e-8, init: Union[str, mx.initializer.Initializer] = "xavier", hybridize: bool = True, - avg_strategy: Union[AveragingStrategy,IterationAveragingStrategy] = SelectNBestMean(num_models=1), + avg_strategy: Union[ + AveragingStrategy, IterationAveragingStrategy + ] = SelectNBestMean(num_models=1), ) -> None: assert ( @@ -236,7 +239,9 @@ def loop( epoch_loss = mx.metric.Loss() # use averaged model for validation - if not is_training and isinstance(self.avg_strategy, IterationAveragingStrategy): + if not is_training and isinstance( + self.avg_strategy, IterationAveragingStrategy + ): self.avg_strategy.load_averaged_model(net) with tqdm(batch_iter) as it: @@ -263,7 +268,10 @@ def loop( trainer.step(batch_size) # iteration averaging in training - if isinstance(self.avg_strategy, IterationAveragingStrategy): + if isinstance( + self.avg_strategy, + IterationAveragingStrategy, + ): self.avg_strategy.apply(net) epoch_loss.update(None, preds=loss) @@ -305,7 +313,9 @@ def loop( lv, ) - if not is_training and isinstance(self.avg_strategy, IterationAveragingStrategy): + if not is_training and isinstance( + self.avg_strategy, IterationAveragingStrategy + ): # bring back the cached model self.avg_strategy.load_cached_model(net) @@ -328,13 +338,19 @@ def loop( ) # update average trigger - if isinstance(self.avg_strategy, IterationAveragingStrategy): + if isinstance( + self.avg_strategy, IterationAveragingStrategy + ): if isinstance(self.avg_strategy, Alpha_Suffix): # alpha suffix - self.avg_strategy.update_average_trigger(epoch_no+1) + self.avg_strategy.update_average_trigger( + epoch_no + 1 + ) elif isinstance(self.avg_strategy, (NTA_V1, NTA_V2)): # NTA - self.avg_strategy.update_average_trigger(loss_value(epoch_loss)) + self.avg_strategy.update_average_trigger( + loss_value(epoch_loss) + ) else: raise NotImplementedError # once triggered, update the average immediately @@ -379,7 +395,9 @@ def loop( if isinstance(self.avg_strategy, AveragingStrategy): logging.info("Computing averaged parameters.") - averaged_params_path = self.avg_strategy.apply(gluonts_temp) + averaged_params_path = self.avg_strategy.apply( + gluonts_temp + ) logging.info("Loading averaged parameters.") net.load_parameters(averaged_params_path, self.ctx) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 4fccc94112..53007815d5 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -66,7 +66,7 @@ def apply(self, model: nn.HybridBlock) -> Optional[Dict]: ------- The averaged model, None if the averaging hasn't started. """ - + if self.averaging_started: self.update_average(model) @@ -81,15 +81,19 @@ def update_average(self, model: nn.HybridBlock): """ self.average_counter += 1 if self.averaged_model is None: - self.averaged_model = {k: v.list_data()[0].copy() \ - for k, v in model.collect_params().items()} + self.averaged_model = { + k: v.list_data()[0].copy() + for k, v in model.collect_params().items() + } else: - alpha = 1. / self.average_counter + alpha = 1.0 / self.average_counter # moving average for name, param_avg in self.averaged_model.items(): - param_avg[:] += alpha * (model.collect_params()[name].list_data()[0] - param_avg) + param_avg[:] += alpha * ( + model.collect_params()[name].list_data()[0] - param_avg + ) - def load_averaged_model(self, model:nn.HybridBlock): + def load_averaged_model(self, model: nn.HybridBlock): r""" When validating/evaluating the averaged model in the half way of training, use load_averaged_model first to load the averaged model and overwrite the current model, @@ -103,16 +107,20 @@ def load_averaged_model(self, model:nn.HybridBlock): if self.averaged_model is not None: # cache the current model if self.cached_model is None: - self.cached_model = {k: v.list_data()[0].copy() \ - for k, v in model.collect_params().items()} + self.cached_model = { + k: v.list_data()[0].copy() + for k, v in model.collect_params().items() + } else: for name, param_cached in self.cached_model.items(): - param_cached[:] = model.collect_params()[name].list_data()[0] + param_cached[:] = model.collect_params()[name].list_data()[ + 0 + ] # load the averaged model for name, param_avg in self.averaged_model.items(): model.collect_params()[name].set_data(param_avg) - def load_cached_model(self, model:nn.HybridBlock): + def load_cached_model(self, model: nn.HybridBlock): r""" Parameters ---------- @@ -147,7 +155,7 @@ def __init__(self, n: int = 5, maximize: bool = False): self.n = n self.maximize = maximize self.val_logs = [] - + def update_average_trigger(self, average_trigger: Any): r""" Parameters @@ -158,15 +166,19 @@ def update_average_trigger(self, average_trigger: Any): Returns ------- """ - + # implement NTA (salesforce) # this is the implementation from the iclr (and salesforce github) version, which mismatches the arxiv (and gluonnlp) version if not self.averaging_started and self.n > 0: if self.maximize: - if len(self.val_logs) > self.n and average_trigger < max(self.val_logs[:-self.n]): + if len(self.val_logs) > self.n and average_trigger < max( + self.val_logs[: -self.n] + ): self.averaging_started = True else: - if len(self.val_logs) > self.n and average_trigger > min(self.val_logs[:-self.n]): + if len(self.val_logs) > self.n and average_trigger > min( + self.val_logs[: -self.n] + ): self.averaging_started = True self.val_logs.append(average_trigger) @@ -190,7 +202,7 @@ def __init__(self, n: int = 5, maximize: bool = False): self.n = n self.maximize = maximize self.val_logs = [] - + def update_average_trigger(self, average_trigger: Any): r""" Parameters @@ -201,15 +213,19 @@ def update_average_trigger(self, average_trigger: Any): Returns ------- """ - + # implement NTA (gluonnlp) if not self.averaging_started and self.n > 0: if self.maximize: # in gluonnlp awd-lstm, "len(self.val_logs) > self.n" is used, but I think it should be ">=" instead - if len(self.val_logs) >= self.n and average_trigger < max(self.val_logs[-self.n:]): + if len(self.val_logs) >= self.n and average_trigger < max( + self.val_logs[-self.n :] + ): self.averaging_started = True else: - if len(self.val_logs) >= self.n and average_trigger > min(self.val_logs[-self.n:]): + if len(self.val_logs) >= self.n and average_trigger > min( + self.val_logs[-self.n :] + ): self.averaging_started = True self.val_logs.append(average_trigger) @@ -232,10 +248,10 @@ def __init__(self, epochs: int, alpha: float = 0.75): super().__init__() - assert (alpha >= 0 and alpha <= 1) + assert alpha >= 0 and alpha <= 1 + + self.alpha_suffix = epochs * (1.0 - alpha) - self.alpha_suffix = epochs * (1. - alpha) - def update_average_trigger(self, average_trigger: Any): r""" Parameters @@ -250,9 +266,3 @@ def update_average_trigger(self, average_trigger: Any): if not self.averaging_started: if average_trigger >= self.alpha_suffix: self.averaging_started = True - - - - - - \ No newline at end of file diff --git a/test/trainer/test_model_iteration_averaging.py b/test/trainer/test_model_iteration_averaging.py index 9163429089..1c1a45d17a 100644 --- a/test/trainer/test_model_iteration_averaging.py +++ b/test/trainer/test_model_iteration_averaging.py @@ -30,6 +30,7 @@ Alpha_Suffix, ) + def initialize_model() -> nn.HybridBlock: # dummy training data N = 10 # number of time series @@ -37,35 +38,42 @@ def initialize_model() -> nn.HybridBlock: prediction_length = 24 freq = "1H" custom_dataset = np.random.normal(size=(N, T)) - start = pd.Timestamp("01-01-2019", freq=freq) # can be different for each time series - train_ds = ListDataset([{'target': x, 'start': start} - for x in custom_dataset[:, :-prediction_length]], - freq=freq) + start = pd.Timestamp( + "01-01-2019", freq=freq + ) # can be different for each time series + train_ds = ListDataset( + [ + {"target": x, "start": start} + for x in custom_dataset[:, :-prediction_length] + ], + freq=freq, + ) # create a simple model estimator = SimpleFeedForwardEstimator( num_hidden_dimensions=[10], prediction_length=prediction_length, context_length=T, freq=freq, - trainer=Trainer(ctx="cpu", - epochs=1, - learning_rate=1e-3, - num_batches_per_epoch=1 - ) + trainer=Trainer( + ctx="cpu", epochs=1, learning_rate=1e-3, num_batches_per_epoch=1 + ), ) # train model predictor = estimator.train(train_ds) return predictor.prediction_net -@pytest.mark.parametrize("n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20]) + +@pytest.mark.parametrize( + "n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20] +) def test_NTA_V1(n: int): model = initialize_model() params = model.collect_params() avg_strategy = NTA_V1(n=n) loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] for i, loss in enumerate(loss_list): - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): arr[:] = i avg_strategy.update_average_trigger(loss) @@ -73,7 +81,7 @@ def test_NTA_V1(n: int): # nothing is cached yet, thus load_cached_model won't change anything # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 @@ -81,30 +89,33 @@ def test_NTA_V1(n: int): avg_strategy.load_averaged_model(model) if n <= 0 or n > 6: # average never happens, model is not changed - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 else: - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # NTA_V1 takes the average on the last 7-n iterations - assert mx.nd.norm(arr - (4+n+10)/2.).asscalar() < 1e-30 + assert mx.nd.norm(arr - (4 + n + 10) / 2.0).asscalar() < 1e-30 # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 -@pytest.mark.parametrize("n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20]) + +@pytest.mark.parametrize( + "n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20] +) def test_NTA_V2(n: int): model = initialize_model() params = model.collect_params() avg_strategy = NTA_V2(n=n) loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] for i, loss in enumerate(loss_list): - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): arr[:] = i avg_strategy.update_average_trigger(loss) @@ -112,7 +123,7 @@ def test_NTA_V2(n: int): # nothing is cached yet, thus load_cached_model won't change anything # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 @@ -120,27 +131,28 @@ def test_NTA_V2(n: int): avg_strategy.load_averaged_model(model) if n <= 0 or n >= len(loss_list): # average never happens, model is not changed - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 else: - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): - # NTA_V2 takes the average once the loss increases, no matter what n is taken + # NTA_V2 takes the average once the loss increases, no matter what n is taken # (the first n iterations are ignored) if n <= 4: val = 7 else: - val = (n+10) / 2. + val = (n + 10) / 2.0 assert mx.nd.norm(arr - val).asscalar() < 1e-30 # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 + @pytest.mark.parametrize("alpha", [0.0, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0]) def test_Alpha_Suffix(alpha: float): model = initialize_model() @@ -148,35 +160,35 @@ def test_Alpha_Suffix(alpha: float): loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] avg_strategy = Alpha_Suffix(epochs=len(loss_list), alpha=alpha) for i, loss in enumerate(loss_list): - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): arr[:] = i - avg_strategy.update_average_trigger(i+1) + avg_strategy.update_average_trigger(i + 1) avg_strategy.apply(model) # nothing is cached yet, thus load_cached_model won't change anything # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 # test averaged model avg_strategy.load_averaged_model(model) - n = max(int(math.ceil(len(loss_list)*(1-alpha))), 1) + n = max(int(math.ceil(len(loss_list) * (1 - alpha))), 1) if n > len(loss_list): # average never happens, model is not changed - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates assert mx.nd.norm(arr - 10).asscalar() < 1e-30 else: - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): - val = (n+9) / 2. + val = (n + 9) / 2.0 assert mx.nd.norm(arr - val).asscalar() < 1e-30 # test cached model avg_strategy.load_cached_model(model) - for k,v in params.items(): + for k, v in params.items(): for arr in v.list_data(): # the last model should have 10 in all coordinates - assert mx.nd.norm(arr - 10).asscalar() < 1e-30 \ No newline at end of file + assert mx.nd.norm(arr - 10).asscalar() < 1e-30 From 5eec9e9eaa0567a3a1508dc5b31d1d3e60e0a4e7 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 29 Jun 2020 15:50:14 -0500 Subject: [PATCH 04/12] add type annotation for 'val_logs' --- src/gluonts/mx/trainer/model_iteration_averaging.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 53007815d5..8e6fa4cfd5 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -12,7 +12,7 @@ # permissions and limitations under the License. # Standard library imports -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List import mxnet.gluon.nn as nn @@ -134,6 +134,8 @@ def load_cached_model(self, model: nn.HybridBlock): class NTA_V1(IterationAveragingStrategy): + val_logs: List[Any] + @validated() def __init__(self, n: int = 5, maximize: bool = False): r""" @@ -184,6 +186,8 @@ def update_average_trigger(self, average_trigger: Any): class NTA_V2(IterationAveragingStrategy): + val_logs: List[Any] + @validated() def __init__(self, n: int = 5, maximize: bool = False): r""" From 4713f38e51d3fc4d833f5c4f84278b7d3af082d5 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 29 Jun 2020 20:53:32 +0000 Subject: [PATCH 05/12] reformated by balck --- src/gluonts/mx/trainer/model_iteration_averaging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 8e6fa4cfd5..b9ccaf3a32 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -187,7 +187,7 @@ def update_average_trigger(self, average_trigger: Any): class NTA_V2(IterationAveragingStrategy): val_logs: List[Any] - + @validated() def __init__(self, n: int = 5, maximize: bool = False): r""" From 3f813565a3fe0a67a4572cde0bd87a3c95b4a401 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 30 Jun 2020 16:22:16 -0500 Subject: [PATCH 06/12] resolved some comments, add new parameter eta for polynomial-decay averaging --- .../mx/trainer/model_iteration_averaging.py | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 8e6fa4cfd5..3af42368ff 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -14,6 +14,7 @@ # Standard library imports from typing import Any, Dict, Optional, List +# Third-party imports import mxnet.gluon.nn as nn # First-party imports @@ -21,24 +22,31 @@ class IterationAveragingStrategy: + + r""" + The model averaging is based on paper + "Stochastic Gradient Descent for Non-smooth Optimization: Convergence Results and Optimal Averaging Schemes", + (http://proceedings.mlr.press/v28/shamir13.pdf), parameterized by eta. + When eta = 0, it is equivalent to simple average over all iterations with same weights. + """ + @validated() - def __init__(self): + def __init__(self, eta: float = 0): r""" Parameters ---------- - averaged_model - Dict that maintains the averaged model parameters. - cached_model - Temporarily save the current model, so that the averaged model can be used for validation. - average_counter - The number of models accumulated in the average. - averaging_started - Indicate whether the model averaging has started. + eta + Parameter of polynomial-decay averaging. """ + self.eta = eta + # Dict that maintains the averaged model parameters. self.averaged_model = None + # Temporarily save the current model, so that the averaged model can be used for validation. self.cached_model = None + # The number of models accumulated in the average. self.average_counter = 0 + # Indicate whether the model averaging has started. self.averaging_started = False def update_average_trigger(self, average_trigger: Any): @@ -86,7 +94,7 @@ def update_average(self, model: nn.HybridBlock): for k, v in model.collect_params().items() } else: - alpha = 1.0 / self.average_counter + alpha = (self.eta + 1.0) / (self.eta + self.average_counter) # moving average for name, param_avg in self.averaged_model.items(): param_avg[:] += alpha * ( @@ -134,10 +142,18 @@ def load_cached_model(self, model: nn.HybridBlock): class NTA_V1(IterationAveragingStrategy): + r""" + Implement Non-monotonically Triggered AvSGD (NTA) + This method is based on paper "Regularizing and Optimizing LSTM Language Models", + (https://openreview.net/pdf?id=SyyGPP0TZ), and an implementation is available in Salesforce GitHub + (https://github.com/salesforce/awd-lstm-lm/blob/master/main.py) + Note that it mismatches the arxiv (and gluonnlp) version, which is referred to as NTA_V2 below + """ + val_logs: List[Any] @validated() - def __init__(self, n: int = 5, maximize: bool = False): + def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): r""" Depending on the choice of metrics, the users may want to minimize or maximize the metrics. Thus, set maximize = True to maximize, otherwise minimize. @@ -148,14 +164,15 @@ def __init__(self, n: int = 5, maximize: bool = False): The non-montone interval. maximize Whether to maximize or minimize the validation metric. - val_logs - Historical validation metrics. + eta + Parameter of polynomial-decay averaging. """ - super().__init__() + super().__init__(eta = eta) self.n = n self.maximize = maximize + # Historical validation metrics. self.val_logs = [] def update_average_trigger(self, average_trigger: Any): @@ -169,8 +186,6 @@ def update_average_trigger(self, average_trigger: Any): ------- """ - # implement NTA (salesforce) - # this is the implementation from the iclr (and salesforce github) version, which mismatches the arxiv (and gluonnlp) version if not self.averaging_started and self.n > 0: if self.maximize: if len(self.val_logs) > self.n and average_trigger < max( @@ -186,25 +201,34 @@ def update_average_trigger(self, average_trigger: Any): class NTA_V2(IterationAveragingStrategy): + + r""" + Implement Non-monotonically Triggered AvSGD (NTA) + This method is based on paper "Regularizing and Optimizing LSTM Language Models", + (https://arxiv.org/pdf/1708.02182.pdf), and an implementation is available in GluonNLP GitHub + (https://github.com/dmlc/gluon-nlp/blob/v0.9.x/scripts/language_model/word_language_model.py) + """ + val_logs: List[Any] - + @validated() - def __init__(self, n: int = 5, maximize: bool = False): + def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): r""" Parameters ---------- n - The non-monotone interval + The non-monotone interval. maximize - Whether to maximize or minimize the validation metric - val_logs - Historical validation metrics + Whether to maximize or minimize the validation metric. + eta + Parameter of polynomial-decay averaging. """ - super().__init__() + super().__init__(eta = eta) self.n = n self.maximize = maximize + # Historical validation metrics. self.val_logs = [] def update_average_trigger(self, average_trigger: Any): @@ -218,10 +242,8 @@ def update_average_trigger(self, average_trigger: Any): ------- """ - # implement NTA (gluonnlp) if not self.averaging_started and self.n > 0: if self.maximize: - # in gluonnlp awd-lstm, "len(self.val_logs) > self.n" is used, but I think it should be ">=" instead if len(self.val_logs) >= self.n and average_trigger < max( self.val_logs[-self.n :] ): @@ -236,7 +258,7 @@ def update_average_trigger(self, average_trigger: Any): class Alpha_Suffix(IterationAveragingStrategy): @validated() - def __init__(self, epochs: int, alpha: float = 0.75): + def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): r""" Taking iteration average for the last epoch*alpha epochs @@ -246,14 +268,15 @@ def __init__(self, epochs: int, alpha: float = 0.75): The total number of epochs. alpha Proportion of averaging. - alpha_suffix - The epoch where iteration averaging starts. + eta + Parameter of polynomial-decay averaging. """ - super().__init__() + super().__init__(eta = eta) - assert alpha >= 0 and alpha <= 1 + assert 0 <= alpha <= 1 + # The epoch where iteration averaging starts. self.alpha_suffix = epochs * (1.0 - alpha) def update_average_trigger(self, average_trigger: Any): From cfb23b106c7f2bfdc259345bd538d6ed2c54b8ca Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 30 Jun 2020 21:24:57 +0000 Subject: [PATCH 07/12] reformated by balck --- src/gluonts/mx/trainer/model_iteration_averaging.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 3af42368ff..01fcf921c0 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -168,7 +168,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) self.n = n self.maximize = maximize @@ -224,7 +224,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) self.n = n self.maximize = maximize @@ -272,7 +272,7 @@ def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) assert 0 <= alpha <= 1 From 8f6d2f26136bed24759baf521d352305c11b6808 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 30 Jun 2020 16:45:39 -0500 Subject: [PATCH 08/12] add some comments, fix a type-check issue --- .../mx/trainer/model_iteration_averaging.py | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 01fcf921c0..8e9828a90c 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -15,6 +15,7 @@ from typing import Any, Dict, Optional, List # Third-party imports +import mxnet as mx import mxnet.gluon.nn as nn # First-party imports @@ -26,10 +27,16 @@ class IterationAveragingStrategy: r""" The model averaging is based on paper "Stochastic Gradient Descent for Non-smooth Optimization: Convergence Results and Optimal Averaging Schemes", - (http://proceedings.mlr.press/v28/shamir13.pdf), parameterized by eta. + (http://proceedings.mlr.press/v28/shamir13.pdf), + which implements polynomial-decay averaging, parameterized by eta. When eta = 0, it is equivalent to simple average over all iterations with same weights. """ + averaged_model: Optional[Dict[str, mx.nd.NDArray]] + cached_model: Optional[Dict[str, mx.nd.NDArray]] + average_counter: int + averaging_started: bool + @validated() def __init__(self, eta: float = 0): r""" @@ -143,7 +150,7 @@ def load_cached_model(self, model: nn.HybridBlock): class NTA_V1(IterationAveragingStrategy): r""" - Implement Non-monotonically Triggered AvSGD (NTA) + Implement Non-monotonically Triggered AvSGD (NTA). This method is based on paper "Regularizing and Optimizing LSTM Language Models", (https://openreview.net/pdf?id=SyyGPP0TZ), and an implementation is available in Salesforce GitHub (https://github.com/salesforce/awd-lstm-lm/blob/master/main.py) @@ -168,7 +175,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta=eta) + super().__init__(eta = eta) self.n = n self.maximize = maximize @@ -203,7 +210,7 @@ def update_average_trigger(self, average_trigger: Any): class NTA_V2(IterationAveragingStrategy): r""" - Implement Non-monotonically Triggered AvSGD (NTA) + Implement Non-monotonically Triggered AvSGD (NTA). This method is based on paper "Regularizing and Optimizing LSTM Language Models", (https://arxiv.org/pdf/1708.02182.pdf), and an implementation is available in GluonNLP GitHub (https://github.com/dmlc/gluon-nlp/blob/v0.9.x/scripts/language_model/word_language_model.py) @@ -224,7 +231,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta=eta) + super().__init__(eta = eta) self.n = n self.maximize = maximize @@ -257,6 +264,15 @@ def update_average_trigger(self, average_trigger: Any): class Alpha_Suffix(IterationAveragingStrategy): + + r""" + Implement Alpha Suffix model averaging. + This method is based on paper "Making Gradient Descent Optimalfor Strongly Convex Stochastic Optimization", + (https://arxiv.org/pdf/1109.5647.pdf). + """ + + alpha_suffix: float + @validated() def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): r""" @@ -272,7 +288,7 @@ def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta=eta) + super().__init__(eta = eta) assert 0 <= alpha <= 1 From 3305f6f8ed2458838db71124729edf862fe194e9 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 30 Jun 2020 21:46:21 +0000 Subject: [PATCH 09/12] reformated by balck --- src/gluonts/mx/trainer/model_iteration_averaging.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 8e9828a90c..7a198269af 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -175,7 +175,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) self.n = n self.maximize = maximize @@ -231,7 +231,7 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) self.n = n self.maximize = maximize @@ -288,7 +288,7 @@ def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): Parameter of polynomial-decay averaging. """ - super().__init__(eta = eta) + super().__init__(eta=eta) assert 0 <= alpha <= 1 From 0a591f7172141d261de35e92f1367ba51275b87e Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 15 Jul 2020 15:55:07 -0500 Subject: [PATCH 10/12] merge NTA_V1 and NTA_V2 into a single class, --- src/gluonts/mx/trainer/_base.py | 18 +-- .../mx/trainer/model_iteration_averaging.py | 135 +++++++----------- .../trainer/test_model_iteration_averaging.py | 18 +-- 3 files changed, 68 insertions(+), 103 deletions(-) diff --git a/src/gluonts/mx/trainer/_base.py b/src/gluonts/mx/trainer/_base.py index aea2f6a28f..33a357a349 100644 --- a/src/gluonts/mx/trainer/_base.py +++ b/src/gluonts/mx/trainer/_base.py @@ -43,8 +43,7 @@ # iteration averaging from .model_iteration_averaging import ( IterationAveragingStrategy, - NTA_V1, - NTA_V2, + NTA, Alpha_Suffix, ) @@ -341,18 +340,9 @@ def loop( if isinstance( self.avg_strategy, IterationAveragingStrategy ): - if isinstance(self.avg_strategy, Alpha_Suffix): - # alpha suffix - self.avg_strategy.update_average_trigger( - epoch_no + 1 - ) - elif isinstance(self.avg_strategy, (NTA_V1, NTA_V2)): - # NTA - self.avg_strategy.update_average_trigger( - loss_value(epoch_loss) - ) - else: - raise NotImplementedError + self.avg_strategy.update_average_trigger( + metric=loss_value(epoch_loss), epoch=epoch_no + 1 + ) # once triggered, update the average immediately self.avg_strategy.apply(net) diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 7a198269af..6665b182ad 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -56,19 +56,21 @@ def __init__(self, eta: float = 0): # Indicate whether the model averaging has started. self.averaging_started = False - def update_average_trigger(self, average_trigger: Any): + def update_average_trigger( + self, metric: Any = None, epoch: int = 0, **kwargs + ): r""" Parameters ---------- - average_trigger + metric The criteria to trigger averaging. + epoch + The epoch to start averaging. Returns ------- """ - # implement a naive strategy, use average_trigger as boolean - self.averaging_started = average_trigger - # raise NotImplementedError() + raise NotImplementedError() def apply(self, model: nn.HybridBlock) -> Optional[Dict]: r""" @@ -148,7 +150,7 @@ def load_cached_model(self, model: nn.HybridBlock): model.collect_params()[name].set_data(param_cached) -class NTA_V1(IterationAveragingStrategy): +class NTA(IterationAveragingStrategy): r""" Implement Non-monotonically Triggered AvSGD (NTA). This method is based on paper "Regularizing and Optimizing LSTM Language Models", @@ -160,7 +162,13 @@ class NTA_V1(IterationAveragingStrategy): val_logs: List[Any] @validated() - def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): + def __init__( + self, + n: int = 5, + maximize: bool = False, + last_n_trigger: bool = False, + eta: float = 0, + ): r""" Depending on the choice of metrics, the users may want to minimize or maximize the metrics. Thus, set maximize = True to maximize, otherwise minimize. @@ -173,94 +181,57 @@ def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): Whether to maximize or minimize the validation metric. eta Parameter of polynomial-decay averaging. + last_n_trigger + If True, use [-n:] in average trigger, otherwise use [:-n] """ super().__init__(eta=eta) self.n = n self.maximize = maximize + self.last_n_trigger = last_n_trigger # Historical validation metrics. self.val_logs = [] - def update_average_trigger(self, average_trigger: Any): + def update_average_trigger( + self, metric: Any = None, epoch: int = 0, **kwargs + ): r""" Parameters ---------- - average_trigger - The criteria to trigger averaging, evaluation metrics in this case. - - Returns - ------- - """ - - if not self.averaging_started and self.n > 0: - if self.maximize: - if len(self.val_logs) > self.n and average_trigger < max( - self.val_logs[: -self.n] - ): - self.averaging_started = True - else: - if len(self.val_logs) > self.n and average_trigger > min( - self.val_logs[: -self.n] - ): - self.averaging_started = True - self.val_logs.append(average_trigger) - - -class NTA_V2(IterationAveragingStrategy): - - r""" - Implement Non-monotonically Triggered AvSGD (NTA). - This method is based on paper "Regularizing and Optimizing LSTM Language Models", - (https://arxiv.org/pdf/1708.02182.pdf), and an implementation is available in GluonNLP GitHub - (https://github.com/dmlc/gluon-nlp/blob/v0.9.x/scripts/language_model/word_language_model.py) - """ - - val_logs: List[Any] - - @validated() - def __init__(self, n: int = 5, maximize: bool = False, eta: float = 0): - r""" - Parameters - ---------- - n - The non-monotone interval. - maximize - Whether to maximize or minimize the validation metric. - eta - Parameter of polynomial-decay averaging. - """ - - super().__init__(eta=eta) - - self.n = n - self.maximize = maximize - # Historical validation metrics. - self.val_logs = [] - - def update_average_trigger(self, average_trigger: Any): - r""" - Parameters - ---------- - average_trigger - The criteria to trigger averaging, evaluation metrics in this case. + metric + The criteria to trigger averaging. + epoch + The epoch to start averaging, not used in NTA Returns ------- """ if not self.averaging_started and self.n > 0: - if self.maximize: - if len(self.val_logs) >= self.n and average_trigger < max( - self.val_logs[-self.n :] - ): - self.averaging_started = True + if self.last_n_trigger: + if self.maximize: + if len(self.val_logs) >= self.n and metric < max( + self.val_logs[-self.n :] + ): + self.averaging_started = True + else: + if len(self.val_logs) >= self.n and metric > min( + self.val_logs[-self.n :] + ): + self.averaging_started = True else: - if len(self.val_logs) >= self.n and average_trigger > min( - self.val_logs[-self.n :] - ): - self.averaging_started = True - self.val_logs.append(average_trigger) + if self.maximize: + if len(self.val_logs) > self.n and metric < max( + self.val_logs[: -self.n] + ): + self.averaging_started = True + else: + if len(self.val_logs) > self.n and metric > min( + self.val_logs[: -self.n] + ): + self.averaging_started = True + self.val_logs.append(metric) class Alpha_Suffix(IterationAveragingStrategy): @@ -295,17 +266,21 @@ def __init__(self, epochs: int, alpha: float = 0.75, eta: float = 0): # The epoch where iteration averaging starts. self.alpha_suffix = epochs * (1.0 - alpha) - def update_average_trigger(self, average_trigger: Any): + def update_average_trigger( + self, metric: Any = None, epoch: int = 0, **kwargs + ): r""" Parameters ---------- - average_trigger - The current number of epoch. + metric + The criteria to trigger averaging, not used in Alpha Suffix. + epoch + The epoch to start averaging. Returns ------- """ if not self.averaging_started: - if average_trigger >= self.alpha_suffix: + if epoch >= self.alpha_suffix: self.averaging_started = True diff --git a/test/trainer/test_model_iteration_averaging.py b/test/trainer/test_model_iteration_averaging.py index 1c1a45d17a..82ede0f5e6 100644 --- a/test/trainer/test_model_iteration_averaging.py +++ b/test/trainer/test_model_iteration_averaging.py @@ -25,8 +25,7 @@ from gluonts.mx.trainer import Trainer from gluonts.mx.trainer.model_iteration_averaging import ( IterationAveragingStrategy, - NTA_V1, - NTA_V2, + NTA, Alpha_Suffix, ) @@ -37,7 +36,7 @@ def initialize_model() -> nn.HybridBlock: T = 100 # number of timesteps prediction_length = 24 freq = "1H" - custom_dataset = np.random.normal(size=(N, T)) + custom_dataset = np.zeros(shape=(N, T)) start = pd.Timestamp( "01-01-2019", freq=freq ) # can be different for each time series @@ -55,9 +54,10 @@ def initialize_model() -> nn.HybridBlock: context_length=T, freq=freq, trainer=Trainer( - ctx="cpu", epochs=1, learning_rate=1e-3, num_batches_per_epoch=1 + ctx="cpu", epochs=1, learning_rate=1e-3, num_batches_per_epoch=1, ), ) + # train model predictor = estimator.train(train_ds) @@ -70,13 +70,13 @@ def initialize_model() -> nn.HybridBlock: def test_NTA_V1(n: int): model = initialize_model() params = model.collect_params() - avg_strategy = NTA_V1(n=n) + avg_strategy = NTA(n=n, last_n_trigger=False) loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] for i, loss in enumerate(loss_list): for k, v in params.items(): for arr in v.list_data(): arr[:] = i - avg_strategy.update_average_trigger(loss) + avg_strategy.update_average_trigger(metric=loss) avg_strategy.apply(model) # nothing is cached yet, thus load_cached_model won't change anything # test cached model @@ -112,13 +112,13 @@ def test_NTA_V1(n: int): def test_NTA_V2(n: int): model = initialize_model() params = model.collect_params() - avg_strategy = NTA_V2(n=n) + avg_strategy = NTA(n=n, last_n_trigger=True) loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] for i, loss in enumerate(loss_list): for k, v in params.items(): for arr in v.list_data(): arr[:] = i - avg_strategy.update_average_trigger(loss) + avg_strategy.update_average_trigger(metric=loss) avg_strategy.apply(model) # nothing is cached yet, thus load_cached_model won't change anything # test cached model @@ -163,7 +163,7 @@ def test_Alpha_Suffix(alpha: float): for k, v in params.items(): for arr in v.list_data(): arr[:] = i - avg_strategy.update_average_trigger(i + 1) + avg_strategy.update_average_trigger(epoch=i + 1) avg_strategy.apply(model) # nothing is cached yet, thus load_cached_model won't change anything # test cached model From 512b457fc22d4d5b819104f3b57e5d8964f5c5c7 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 15 Jul 2020 16:31:53 -0500 Subject: [PATCH 11/12] restart ci check From 31fdd9b177be8fcaf7c7c794835a9bcfc81f2815 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 16 Jul 2020 11:11:09 -0500 Subject: [PATCH 12/12] refine the merge of NTA_V1 and NTA_V2 --- src/gluonts/mx/trainer/_base.py | 2 - .../mx/trainer/model_iteration_averaging.py | 36 +++++------ .../trainer/test_model_iteration_averaging.py | 61 ++++--------------- 3 files changed, 26 insertions(+), 73 deletions(-) diff --git a/src/gluonts/mx/trainer/_base.py b/src/gluonts/mx/trainer/_base.py index 33a357a349..1cd09805e2 100644 --- a/src/gluonts/mx/trainer/_base.py +++ b/src/gluonts/mx/trainer/_base.py @@ -39,8 +39,6 @@ SelectNBestMean, save_epoch_info, ) - -# iteration averaging from .model_iteration_averaging import ( IterationAveragingStrategy, NTA, diff --git a/src/gluonts/mx/trainer/model_iteration_averaging.py b/src/gluonts/mx/trainer/model_iteration_averaging.py index 6665b182ad..e68b6e9ff0 100644 --- a/src/gluonts/mx/trainer/model_iteration_averaging.py +++ b/src/gluonts/mx/trainer/model_iteration_averaging.py @@ -209,28 +209,22 @@ def update_average_trigger( """ if not self.averaging_started and self.n > 0: - if self.last_n_trigger: - if self.maximize: - if len(self.val_logs) >= self.n and metric < max( - self.val_logs[-self.n :] - ): - self.averaging_started = True - else: - if len(self.val_logs) >= self.n and metric > min( - self.val_logs[-self.n :] - ): - self.averaging_started = True + min_len = self.n if self.last_n_trigger else (self.n + 1) + sliced_val_logs = ( + self.val_logs[-self.n :] + if self.last_n_trigger + else self.val_logs[: -self.n] + ) + if self.maximize: + if len(self.val_logs) >= min_len and metric < max( + sliced_val_logs + ): + self.averaging_started = True else: - if self.maximize: - if len(self.val_logs) > self.n and metric < max( - self.val_logs[: -self.n] - ): - self.averaging_started = True - else: - if len(self.val_logs) > self.n and metric > min( - self.val_logs[: -self.n] - ): - self.averaging_started = True + if len(self.val_logs) >= min_len and metric > min( + sliced_val_logs + ): + self.averaging_started = True self.val_logs.append(metric) diff --git a/test/trainer/test_model_iteration_averaging.py b/test/trainer/test_model_iteration_averaging.py index 82ede0f5e6..9e46b828c0 100644 --- a/test/trainer/test_model_iteration_averaging.py +++ b/test/trainer/test_model_iteration_averaging.py @@ -67,10 +67,11 @@ def initialize_model() -> nn.HybridBlock: @pytest.mark.parametrize( "n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20] ) -def test_NTA_V1(n: int): +@pytest.mark.parametrize("last_n_trigger", [True, False]) +def test_NTA_V1(n: int, last_n_trigger: bool): model = initialize_model() params = model.collect_params() - avg_strategy = NTA(n=n, last_n_trigger=False) + avg_strategy = NTA(n=n, last_n_trigger=last_n_trigger) loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] for i, loss in enumerate(loss_list): for k, v in params.items(): @@ -87,7 +88,8 @@ def test_NTA_V1(n: int): assert mx.nd.norm(arr - 10).asscalar() < 1e-30 # test averaged model avg_strategy.load_averaged_model(model) - if n <= 0 or n > 6: + len_limit = (len(loss_list) - 1) if last_n_trigger else 6 + if n <= 0 or n > len_limit: # average never happens, model is not changed for k, v in params.items(): for arr in v.list_data(): @@ -96,54 +98,13 @@ def test_NTA_V1(n: int): else: for k, v in params.items(): for arr in v.list_data(): - # NTA_V1 takes the average on the last 7-n iterations - assert mx.nd.norm(arr - (4 + n + 10) / 2.0).asscalar() < 1e-30 - # test cached model - avg_strategy.load_cached_model(model) - for k, v in params.items(): - for arr in v.list_data(): - # the last model should have 10 in all coordinates - assert mx.nd.norm(arr - 10).asscalar() < 1e-30 - - -@pytest.mark.parametrize( - "n", [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 20] -) -def test_NTA_V2(n: int): - model = initialize_model() - params = model.collect_params() - avg_strategy = NTA(n=n, last_n_trigger=True) - loss_list = [5, 4, 3, 2, 3, 3, 3, 3, 3, 3, 3] - for i, loss in enumerate(loss_list): - for k, v in params.items(): - for arr in v.list_data(): - arr[:] = i - avg_strategy.update_average_trigger(metric=loss) - avg_strategy.apply(model) - # nothing is cached yet, thus load_cached_model won't change anything - # test cached model - avg_strategy.load_cached_model(model) - for k, v in params.items(): - for arr in v.list_data(): - # the last model should have 10 in all coordinates - assert mx.nd.norm(arr - 10).asscalar() < 1e-30 - # test averaged model - avg_strategy.load_averaged_model(model) - if n <= 0 or n >= len(loss_list): - # average never happens, model is not changed - for k, v in params.items(): - for arr in v.list_data(): - # the last model should have 10 in all coordinates - assert mx.nd.norm(arr - 10).asscalar() < 1e-30 - else: - for k, v in params.items(): - for arr in v.list_data(): - # NTA_V2 takes the average once the loss increases, no matter what n is taken - # (the first n iterations are ignored) - if n <= 4: - val = 7 + if last_n_trigger: + # takes the average once the loss increases, no matter what n is taken + # (the first n iterations are ignored) + val = 7 if n <= 4 else ((n + 10) / 2.0) else: - val = (n + 10) / 2.0 + # takes the average on the last 7-n iterations + val = (4 + n + 10) / 2.0 assert mx.nd.norm(arr - val).asscalar() < 1e-30 # test cached model avg_strategy.load_cached_model(model)