-
Notifications
You must be signed in to change notification settings - Fork 11
/
test_optimizer_wrapper.py
113 lines (86 loc) · 3 KB
/
test_optimizer_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import random as python_random
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.keras.models import load_model
from gradient_accumulator import GradientAccumulateOptimizer
from .utils import get_opt
from .utils import normalize_img
from .utils import reset
tf_version = int(tf.version.VERSION.split(".")[1])
def run_experiment(bs=16, accum_steps=4, epochs=1):
# load dataset
(ds_train, ds_test), ds_info = tfds.load(
"mnist",
split=["train", "test"],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
# build train pipeline
ds_train = ds_train.map(normalize_img)
ds_train = ds_train.batch(bs)
ds_train = ds_train.prefetch(1)
# build test pipeline
ds_test = ds_test.map(normalize_img)
ds_test = ds_test.batch(bs)
ds_test = ds_test.prefetch(1)
# create model
model = tf.keras.models.Sequential(
[
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(10),
]
)
# wrap optimizer to add gradient accumulation support
opt = get_opt("SGD")
opt = GradientAccumulateOptimizer(
optimizer=opt, accum_steps=accum_steps, reduction="MEAN"
) # MEAN REDUCTION IMPORTANT!!!
# compile model
model.compile(
optimizer=opt,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)
# train model
model.fit(
ds_train,
epochs=epochs,
validation_data=ds_test,
verbose=0,
)
model.save("./trained_model")
# load trained model and test
del model
trained_model = load_model("./trained_model", compile=True)
result = trained_model.evaluate(ds_test, verbose=1)
print(result)
return result[1]
def test_expected_result():
# set seed
reset()
# run once
result1 = run_experiment(
bs=500, accum_steps=1, epochs=3
) # NOTE: AS TO BE DIVISIBLE BY TRAIN SET SIZE = 50000 (!)
# reset before second run to get identical results
reset()
# run again with different batch size and number of accumulations
result2 = run_experiment(bs=250, accum_steps=2, epochs=3)
# reset before second run to get identical results
reset()
# run again with different batch size and number of accumulations
result3 = run_experiment(bs=125, accum_steps=4, epochs=3)
# reset before second run to get identical results
# reset()
# run again with different batch size and number of accumulations
# result4 = run_experiment(bs=1, accum_steps=500, epochs=2)
# results should be identical (theoretically, even in practice on CPU)
# assert result1 == result2
# assert result1 == result3
# reduced constraint for temporarily
np.testing.assert_almost_equal(result1, result2, decimal=2)
np.testing.assert_almost_equal(result1, result3, decimal=2)