diff --git a/deeppavlov/core/commands/infer.py b/deeppavlov/core/commands/infer.py index c20769ad02..b643ff7beb 100644 --- a/deeppavlov/core/commands/infer.py +++ b/deeppavlov/core/commands/infer.py @@ -30,10 +30,9 @@ def build_model(config: Union[str, Path, dict], mode: str = 'infer', load_trained: bool = False, download: bool = False, - serialized: Optional[bytes] = None) -> Chainer: + serialized: Optional[bytes] = None, buckets: Optional[list] = None) -> Chainer: """Build and return the model described in corresponding configuration file.""" config = parse_config(config) - if serialized: serialized: list = pickle.loads(serialized) @@ -43,9 +42,7 @@ def build_model(config: Union[str, Path, dict], mode: str = 'infer', import_packages(config.get('metadata', {}).get('imports', [])) model_config = config['chainer'] - - model = Chainer(model_config['in'], model_config['out'], model_config.get('in_y')) - + model = Chainer(model_config['in'], model_config['out'], model_config.get('in_y'), model_config.get('buckets')) for component_config in model_config['pipe']: if load_trained and ('fit_on' in component_config or 'in_y' in component_config): try: @@ -106,7 +103,7 @@ def predict_on_stream(config: Union[str, Path, dict], else: f = open(file_path, encoding='utf8') - model: Chainer = build_model(config) + model: Chainer = build_model(config, hist_name = "predict") args_count = len(model.in_x) while True: diff --git a/deeppavlov/core/common/chainer.py b/deeppavlov/core/common/chainer.py index b3f78d13e3..8f76108533 100644 --- a/deeppavlov/core/common/chainer.py +++ b/deeppavlov/core/common/chainer.py @@ -12,11 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import pickle +import random +import string from itertools import islice from logging import getLogger from types import FunctionType from typing import Union, Tuple, List, Optional, Hashable, Reversible +from prometheus_client import Histogram from deeppavlov.core.common.errors import ConfigError from deeppavlov.core.models.component import Component @@ -48,7 +52,7 @@ class Chainer(Component): """ def __init__(self, in_x: Union[str, list] = None, out_params: Union[str, list] = None, - in_y: Union[str, list] = None, *args, **kwargs) -> None: + in_y: Union[str, list] = None, buckets: Optional[list] = None, *args, **kwargs) -> None: self.pipe: List[Tuple[Tuple[List[str], List[str]], List[str], Component]] = [] self.train_pipe = [] if isinstance(in_x, str): @@ -68,6 +72,23 @@ def __init__(self, in_x: Union[str, list] = None, out_params: Union[str, list] = self.main = None + self.hist_name = ''.join(random.choice(string.ascii_uppercase) for _ in range(5)) + if buckets is not None: + self.buckets = buckets + else: + self.buckets = [.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 20, 50, 100, 200, 500] + self.hist = Histogram(self.hist_name, "response latency (seconds)", ["component"], buckets = self.buckets) + def print_hist(): + intervals = [str(i) for i in self.hist._upper_bounds] + components = [k[0]for k in list(self.hist._metrics.keys())] + components_ = [i.split('.')[-1].split(' ')[0] for i in components] + values = [[str(self.hist.labels(component=i)._buckets[j].get()) for j in range(len(self.hist._upper_bounds))] for i in components] + format = "{:>45} "+"{:>10}" *len(intervals) + print(format.format("", *intervals)) + for component, value in zip(components_, values): + print(format.format(component, *value)) + self.print_hist = print_hist + def __getitem__(self, item): if isinstance(item, int): in_params, out_params, component = self.train_pipe[item] @@ -201,13 +222,13 @@ def compute(self, x, y=None, targets=None): args += list(zip(*y)) in_params += self.in_y - return self._compute(*args, pipe=pipe, param_names=in_params, targets=targets) + return self._compute(*args, pipe=pipe, param_names=in_params, targets=targets, hist = self.hist) def __call__(self, *args): - return self._compute(*args, param_names=self.in_x, pipe=self.pipe, targets=self.out_params) + return self._compute(*args, param_names=self.in_x, pipe=self.pipe, targets=self.out_params, hist = self.hist) @staticmethod - def _compute(*args, param_names, pipe, targets): + def _compute(*args, param_names, pipe, targets, hist: Optional[Histogram] = None): expected = set(targets) final_pipe = [] for (in_keys, in_params), out_params, component in reversed(pipe): @@ -224,10 +245,14 @@ def _compute(*args, param_names, pipe, targets): for (in_keys, in_params), out_params, component in pipe: x = [mem[k] for k in in_params] + start_time = time.perf_counter() if in_keys: res = component.__call__(**dict(zip(in_keys, x))) else: res = component.__call__(*x) + duration = time.perf_counter() - start_time + if hist is not None: + hist.labels(component = component).observe(duration) if len(out_params) == 1: mem[out_params[0]] = res else: diff --git a/deeppavlov/core/trainers/fit_trainer.py b/deeppavlov/core/trainers/fit_trainer.py index 0378560564..c88241af7f 100644 --- a/deeppavlov/core/trainers/fit_trainer.py +++ b/deeppavlov/core/trainers/fit_trainer.py @@ -67,7 +67,8 @@ def __init__(self, chainer_config: dict, *, batch_size: int = -1, if kwargs: log.info(f'{self.__class__.__name__} got additional init parameters {list(kwargs)} that will be ignored:') self.chainer_config = chainer_config - self._chainer = Chainer(chainer_config['in'], chainer_config['out'], chainer_config.get('in_y')) + self.buckets = chainer_config.get('buckets') + self._chainer = Chainer(chainer_config['in'], chainer_config['out'], chainer_config.get('in_y'), self.buckets) self.batch_size = batch_size self.metrics = parse_metrics(metrics, self._chainer.in_y, self._chainer.out_params) self.evaluation_targets = tuple(evaluation_targets) @@ -155,7 +156,7 @@ def fit_chainer(self, iterator: Union[DataFittingIterator, DataLearningIterator] def _load(self) -> None: if not self._loaded: self._chainer.destroy() - self._chainer = build_model({'chainer': self.chainer_config}, load_trained=self._saved) + self._chainer = build_model({'chainer': self.chainer_config}, load_trained=self._saved, buckets = self.buckets) self._loaded = True def get_chainer(self) -> Chainer: @@ -265,5 +266,6 @@ def evaluate(self, iterator: DataLearningIterator, evaluation_targets: Optional[ res[data_type] = report if print_reports: print(json.dumps({data_type: report}, ensure_ascii=False, cls=NumpyArrayEncoder)) - + if self._chainer.hist_name is not None: + self._chainer.print_hist() return res diff --git a/deeppavlov/core/trainers/nn_trainer.py b/deeppavlov/core/trainers/nn_trainer.py index 6f6fd8b4bf..a8c9406eb9 100644 --- a/deeppavlov/core/trainers/nn_trainer.py +++ b/deeppavlov/core/trainers/nn_trainer.py @@ -23,6 +23,7 @@ from deeppavlov.core.common.errors import ConfigError from deeppavlov.core.common.registry import register from deeppavlov.core.data.data_learning_iterator import DataLearningIterator +from deeppavlov.core.models import component from deeppavlov.core.trainers.fit_trainer import FitTrainer from deeppavlov.core.trainers.utils import parse_metrics, NumpyArrayEncoder @@ -293,6 +294,9 @@ def train_on_batches(self, iterator: DataLearningIterator) -> None: if self.log_every_n_batches > 0 and self.train_batches_seen % self.log_every_n_batches == 0: self._log(iterator, tensorboard_tag='every_n_batches', tensorboard_index=self.train_batches_seen) + if self._chainer.hist_name is not None: + self._chainer.print_hist() + if self.val_every_n_batches > 0 and self.train_batches_seen % self.val_every_n_batches == 0: self._validate(iterator,