This repository has been archived by the owner on Nov 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 77
/
callback.py
150 lines (122 loc) 路 6.51 KB
/
callback.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import inspect
import os
import time
from typing import Any, Dict, List, Optional, Union
import numpy
import onnxruntime
import torch
from pl_bolts.callbacks import SparseMLCallback
from pytorch_lightning import Callback
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.utilities import rank_zero_info
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from sparseml.pytorch.utils import ModuleExporter
from sparseml.pytorch.utils.logger import WANDBLogger
from torch import Tensor
class LightningBoltsSparseMLCallback(SparseMLCallback):
def __init__(self, output_dir, recipe_path):
self.output_dir = output_dir
super().__init__(recipe_path=recipe_path)
def on_init_end(self, trainer: "pl.Trainer") -> None:
if isinstance(trainer.logger, WANDBLogger):
trainer.logger.__init__(init_kwargs={"project": "lightning-transformers"})
def on_fit_start(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
optimizer = trainer.optimizers
if len(optimizer) > 1:
raise MisconfigurationException("SparseML only supports training with one optimizer.")
optimizer = optimizer[0]
loggers = trainer.logger
if not isinstance(loggers, list):
loggers = [loggers]
self.manager.initialize(pl_module, epoch=0.0, logger=loggers)
self.manager.initialize_loggers(loggers)
optimizer = self.manager.modify(
pl_module, optimizer, steps_per_epoch=self._num_training_steps_per_epoch(trainer), epoch=0
)
trainer.optimizers = [optimizer]
@staticmethod
def export_to_sparse_onnx(
model: "LightningModule", output_dir: str, sample_batch: Optional[Tensor] = None, **kwargs
) -> None:
"""Exports the model to ONNX format."""
with model._prevent_trainer_and_dataloaders_deepcopy():
exporter = ModuleExporter(model.model, output_dir=output_dir)
sample_batch = sample_batch if sample_batch is not None else model.example_input_array
if sample_batch is None:
raise MisconfigurationException(
"To export the model, a sample batch must be passed via "
"``SparseMLCallback.export_to_sparse_onnx(model, output_dir, sample_batch=sample_batch)`` "
"or an ``example_input_array`` property within the LightningModule"
)
# the following is adapted from @natuan and @spacemanidol
sess = None
num_samples = 0
sample_inputs = os.path.join(output_dir, "sample-inputs")
sample_outputs = os.path.join(output_dir, "sample-outputs")
os.makedirs(sample_inputs, exist_ok=True)
os.makedirs(sample_outputs, exist_ok=True)
if sess is None:
forward_args_spec = inspect.getfullargspec(exporter._module.__class__.forward)
one_sample_input = collections.OrderedDict([(f, sample_batch[f][0].long().reshape(1, -1))
for f in forward_args_spec.args if f in sample_batch])
try:
exporter.export_onnx(sample_batch=one_sample_input, convert_qat=True, **kwargs)
exporter.export_onnx(
sample_batch=one_sample_input,
name="small_model.onnx",
convert_qat=True,
export_params=False,
**kwargs,
)
onnx_file = os.path.join(output_dir, "model.onnx")
except Exception:
raise RuntimeError("Error exporting ONNX models and/or inputs/outputs")
sess = onnxruntime.InferenceSession(onnx_file)
# add additional files for testing since this feature is very new
input_names = list(sample_batch.keys())
output_names = [o.name for o in sess.get_outputs()]
for input_vals in zip(*sample_batch.values()):
input_feed = {k: v.long().numpy() for k, v in zip(input_names, input_vals)}
output_vals = sess.run(output_names, {k: input_feed[k].reshape(1, -1) for k in input_feed})
output_dict = {name: numpy.squeeze(val) for name, val in zip(output_names, output_vals)}
file_idx = f"{num_samples}".zfill(4)
numpy.savez(f"{sample_inputs}/inp-{file_idx}.npz", **input_feed)
numpy.savez(f"{sample_outputs}/out-{file_idx}.npz", **output_dict)
num_samples += 1
def teardown(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule", stage: Optional[str] = None) -> None:
sample_batch = next(iter(trainer.train_dataloader))
# if asked for output names, bert's ModelOutput gives two names
# but when run, this the model only gives one output
# workaround is just to force onnx to realize there is only one output
output_names = ["logits"]
self.export_to_sparse_onnx(
output_dir=self.output_dir, model=pl_module, sample_batch=sample_batch, output_names=output_names
)
class CUDACallback(Callback):
def on_train_epoch_start(self, trainer, pl_module):
# Reset the memory use counter
torch.cuda.reset_peak_memory_stats(trainer.root_gpu)
torch.cuda.synchronize(trainer.root_gpu)
self.start_time = time.time()
def on_train_epoch_end(self, trainer, pl_module, outputs):
torch.cuda.synchronize(trainer.root_gpu)
max_memory = torch.cuda.max_memory_allocated(trainer.root_gpu) / 2**20
epoch_time = time.time() - self.start_time
max_memory = trainer.training_type_plugin.reduce(max_memory)
epoch_time = trainer.training_type_plugin.reduce(epoch_time)
rank_zero_info(f"Average Epoch time: {epoch_time:.2f} seconds")
rank_zero_info(f"Average Peak memory {max_memory:.2f}MiB")