# Simple Stochastic Gradient Descent in Two Nodes with TensorFlow, Horovod and IPCMagic.

Here we visualize the minimization of the loss with the SGD algorithm.
For this, we consider a linear model with only two weights (the slope and the offset).

With this example we show how use IPCMagic to run TensorFlow in two nodes from a Jupyter notebook.
We consider blocking and non-blocking execution of code on the IPCluster.
We also show how to combine non-blocking execution and the function `ipcmagic.utilities.watch_asyncresult` to have real-time logging during training.

In [None]:
import ipcmagic
from ipcmagic import utilities   

In [None]:
%ipcluster --version

In [None]:
%ipcluster start -n 2 --launcher srun

In [None]:
# In cells that take some time, IPyParallel shows a progress bar.
# That can be disabled with by passing `--progress-after -1` to `%%px`.
%pxconfig --progress-after -1

In [None]:
%%px
import socket
socket.gethostname()

In [None]:
%%px
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import tensorflow as tf
import horovod.tensorflow.keras as hvd

In [None]:
%%px
hvd.init()

In [None]:
%%px --target 0
tf.version.VERSION

In [None]:
%%px
# Create a linear function with noise as our data
nsamples = 1000
ref_slope = 2.0
ref_offset = 0.0
noise = np.random.random((nsamples, 1)) - 0.5    # -0.5 to center the noise
x_train = np.random.random((nsamples, 1)) - 0.5  # -0.5 to center x around 0
y_train = ref_slope * x_train + ref_offset + noise

In [None]:
%%px
dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(np.float32),
                                              y_train.astype(np.float32)))
dataset = dataset.shuffle(1000)
dataset = dataset.batch(100)
dataset = dataset.repeat(150)

In [None]:
%%px
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(1, input_shape=(1,), activation='linear'),
])

opt = tf.keras.optimizers.SGD(lr=0.5)
opt = hvd.DistributedOptimizer(opt)

model.compile(optimizer=opt,
              loss='mse')

In [None]:
%%px
class TrainHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.vars = []
        self.loss = []

    def on_batch_end(self, batch, logs={}):
        self.vars.append([v.numpy() for v in self.model.variables])
        self.loss.append(logs.get('loss'))
        
history = TrainHistory()

In [None]:
%%px
initial_sync = hvd.callbacks.BroadcastGlobalVariablesCallback(0)

In the previous cells we have been using the default execution mode of IPyParallel, which is blocking.
That means that the cell will be busy until the execution is over and the output of the cell will be visible only when the run finishes.

In the next cell, which takes some seconds to finish, we run `model.fit(...)` which shows an animation where we can see the progress of the training.
In blocking mode, we won't see the progress, only the final result.
To see the animation, we can use the non-blocking execution together with `ipcmagic.utilities.watch_asyncresult` which lets us see the output as it's produced.
Non-blocking execution can be specified by passing the option `--noblock` to `%%px`.

In non-blocking execution, the cell will exit inmediatelly, returning an `AsyncResult` object.
That object is needed by the `ipcmagic.utilities.watch_asyncresult` function.
To get the output of a cell in a python variable (let's call it `my_async_result`), we need to pass the option `-o my_async_result` to `%%px`.

In [None]:
%%px --noblock -o training_async_result

# the non-blocking execution (`%%px --noblock`) returns an `AsyncResult` object inmediately.
# the `AsyncResult` object can be accessed from python with the option `-o <variable>`.
# by doing that we can fetch information while the code running.

fit = model.fit(dataset, callbacks=[initial_sync, history])

In [None]:
# watch the output in real time
utilities.watch_asyncresult(training_async_result)

In [None]:
%%px --target 0
slope_hist = np.array(history.vars)[:, 0]
offset_hist = np.array(history.vars)[:, 1]
loss_hist = np.array(history.loss)

matplotlib.rcParams['figure.figsize'] = (8, 3)

plt.subplot(1, 2, 1)
plt.plot(loss_hist[10:], 'r.-')
plt.xlabel('Training steps')
plt.ylabel('Loss')
plt.grid()

plt.subplot(1, 2, 2)
plt.plot(x_train, y_train, '.')
plt.plot(x_train, slope_hist[-1] * x_train + offset_hist[-1], 'r-')
plt.xlabel('x')
plt.ylabel('y')
plt.grid()

plt.tight_layout()
plt.show()

matplotlib.rcParams['figure.figsize'] = (6, 4)

In [None]:
%%px --target 0
def loss_function_field(m, n, xref, yref):
    '''Utility function for ploting the loss'''
    return np.mean(np.square(yref - m * xref - n ))

_m = np.arange(-0.0, 4.01, 0.1)
_n = np.arange(-0.5, 0.51, 0.1)
M, N = np.meshgrid(_m, _n)

Z = np.zeros(M.shape)
for i in range(M.shape[0]):
    for j in range(M.shape[1]):
        Z[i, j] = loss_function_field(M[i, j], N[i, j],
                                      x_train, y_train)

matplotlib.rcParams['figure.figsize'] = (10, 7)

cp = plt.contour(M, N, Z, 15, vmin=Z.min(), vmax=Z.max(), alpha=0.99, colors='k', linestyles='--')
plt.contourf(M, N, Z, vmin=Z.min(), vmax=Z.max(), alpha=0.8, cmap=plt.cm.RdYlBu_r)
plt.clabel(cp, cp.levels[:6])
plt.colorbar()
m = slope_hist[-1]
n = offset_hist[-1]
plt.plot(slope_hist, offset_hist, '.-', lw=2, c='k')
plt.plot([ref_slope], [ref_offset], 'rx', ms=10)
plt.xlim([_m.min(), _m.max()])
plt.ylim([_n.min(), _n.max()])
plt.xlabel('Slope')
plt.ylabel('Offset')
plt.show()

matplotlib.rcParams['figure.figsize'] = (6, 4)

In [None]:
%ipcluster stop