# 8.2 异步计算

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals

# 安装 TensorFlow
try:
  # Colab only
  %tensorflow_version 2.x
except Exception:
    pass

TensorFlow 2.x selected.


In [0]:
import tensorflow as tf
import tensorflow.keras as keras
import os
import subprocess
import time

## 8.2.1 Tensorflow 中的异步计算

In [3]:
a = tf.ones((1, 2))
b = tf.ones((1, 2))
c = a * b + 2
c

<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[3., 3.]], dtype=float32)>

In [0]:
class Benchmark(object):
  def __init__(self, prefix=None):
    self.prefix = prefix + ' ' if prefix else ''

  def __enter__(self):
    self.start = time.time()

  def __exit__(self, *args):
    print('%stime: %.4f sec' % (self.prefix, time.time() - self.start))

In [5]:
with Benchmark('Workloads are queued.'):
  x = tf.random.uniform(shape=(2000, 2000))
  y = tf.keras.backend.sum(tf.transpose(x) * x)

with Benchmark('Workloads are finished.'):
  print('sum =', y)

Workloads are queued. time: 0.1726 sec
sum = tf.Tensor(1000411.9, shape=(), dtype=float32)
Workloads are finished. time: 0.0006 sec


## 8.2.2 用同步函数让前端等待计算结果

In [6]:
with Benchmark():
  y = tf.keras.backend.sum(tf.transpose(x) * x)

time: 0.0341 sec


In [7]:
with Benchmark():
  y = tf.keras.backend.sum(tf.transpose(x) * x)
  z = tf.keras.backend.sum(tf.transpose(x) * x)

time: 0.0386 sec


In [8]:
with Benchmark():
  y = tf.keras.backend.sum(tf.transpose(x) * x)
  y.numpy()

time: 0.0223 sec


In [9]:
with Benchmark():
  y = tf.keras.backend.sum(tf.transpose(x) * x)
  print(tf.norm(y).numpy())

1000411.9
time: 0.0463 sec


## 8.2.3 使用异步计算提升计算性能

In [10]:
with Benchmark('synchronous.'):
  for _ in range(1000):
    y = x + 1

@tf.function
def loop():
  for _ in range(1000):
    y = x + 1
  return y

with Benchmark('asynchronous.'):
  y = loop()

synchronous. time: 3.1374 sec
asynchronous. time: 1.1313 sec


## 8.2.4 异步计算对内存的影响

In [0]:
def data_iter():
  start = time.time()
  num_batches, batch_size = 100, 1024
  for i in range(num_batches):
    X = tf.random.normal(shape=(batch_size, 512))
    y = tf.ones((batch_size,))
    yield X, y
    if (i + 1) % 50 == 0:
      print('batch %d, time %f sec' % (i+1, time.time()-start))

In [0]:
net = keras.Sequential()
net.add(keras.layers.Dense(2048, activation='relu'))
net.add(keras.layers.Dense(512, activation='relu'))
net.add(keras.layers.Dense(1))
optimizer=keras.optimizers.SGD(0.05)
loss = keras.losses.MeanSquaredError()

In [0]:
def get_mem():
  res = subprocess.check_output(['ps', 'u', '-p', str(os.getpid())])
  return int(str(res).split()[15]) / 1e3

In [14]:
for X, y in data_iter():
  break
loss(y, net(X))

<tf.Tensor: shape=(), dtype=float32, numpy=0.49068463>

In [15]:
l_sum, mem = 0, get_mem()
dense_1 = keras.layers.Dense(2048, activation='relu')
dense_2 = keras.layers.Dense(512, activation='relu')
dense_3 = keras.layers.Dense(1)
trainable_variables = (dense_1.trainable_variables + 
                       dense_2.trainable_variables +
                       dense_3.trainable_variables)
for X, y in data_iter():
  with tf.GradientTape() as tape:
    logits = net(X)
    loss_value = loss(y, logits)

  grads = tape.gradient(loss_value, trainable_variables)
  optimizer.apply_gradients(zip(grads, trainable_variables))

print('increased memory: %f MB' % (get_mem() - mem))

batch 50, time 7.880550 sec
batch 100, time 15.700529 sec
increased memory: 14.336000 MB


In [16]:
l_sum, mem = 0, get_mem()
for X, y in data_iter():
  with tf.GradientTape() as tape:
    logits = net(X)
    loss_value = loss(y, logits)

  grads = tape.gradient(loss_value, net.trainable_weights)
  optimizer.apply_gradients(zip(grads, net.trainable_weights))

print('increased memory: %f MB' % (get_mem() - mem))

batch 50, time 7.976524 sec
batch 100, time 15.683179 sec
increased memory: 12.268000 MB
