<a href="https://colab.research.google.com/github/Shrey-Viradiya/HandsOnMachineLearning/blob/master/Artificial_Neural_Networks_with_Keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Artificial Neural Networks with Keras

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import pandas as pd

Let's start by loading the fashion MNIST dataset. Keras has a number of functions to load popular datasets in keras.datasets. The dataset is already split for you between a training set and a test set, but it can be useful to split the training set further to have a validation set:

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

In [None]:
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist.load_data()

In [None]:
X_train_full.shape

In [None]:
X_train_full.dtype

In [None]:
X_valid, X_train = X_train_full[:5000] / 255., X_train_full[5000:] / 255.
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_test = X_test / 255.

In [None]:
plt.imshow(X_train[0], cmap="binary")
plt.axis('off')
plt.show()

In [None]:
y_train[0]

In [None]:
y_train

In [None]:
class_names = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat",
               "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

In [None]:
class_names[y_train[0]]

In [None]:
X_valid.shape

In [None]:
X_test.shape

Let's take a look at a sample of the images in the dataset:

In [None]:
n_rows = 4
n_cols = 10

plt.figure(figsize = (n_cols * 1.5, n_rows * 1.5))

for row in range(n_rows):
    for col in range(n_cols):
        index = n_cols * row + col
        plt.subplot(n_rows, n_cols, index + 1)
        plt.imshow(X_train[index], cmap="binary", interpolation="nearest")
        plt.axis('off')
        plt.title(class_names[y_train[index]])
plt.tight_layout()
plt.show()

## CREATING THE MODEL USING THE SEQUENTIAL API

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Flatten(input_shape=[28, 28]))
model.add(keras.layers.Dense(300, activation = 'relu'))
model.add(keras.layers.Dense(100, activation = 'relu'))
model.add(keras.layers.Dense(10, activation = 'softmax'))

In [None]:
keras.backend.clear_session()
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28]),
    keras.layers.Dense(300, activation="relu"),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])

In [None]:
model.summary()

In [None]:
model.layers

In [None]:
hidden1 = model.layers[1]

In [None]:
hidden1.name

In [None]:
model.get_layer('dense') is hidden1

In [None]:
keras.utils.plot_model(model, "my_fashion_mnist_model.png", show_shapes=True)

In [None]:
weights, biases = hidden1.get_weights()

In [None]:
weights

In [None]:
weights.shape

In [None]:
biases

In [None]:
biases.shape

In [None]:
model.compile(loss = 'sparse_categorical_crossentropy',
             optimizer='sgd',
             metrics=['accuracy'])

### TRAINING AND EVALUATING THE MODEL

In [None]:
history = model.fit(X_train, y_train, epochs=30, validation_data=(X_valid, y_valid))

In [None]:
history.history.keys()

In [None]:
pd.DataFrame(history.history).plot(figsize=(10, 6))
plt.grid(True)
plt.ylim(0,1)
plt.show()

In [None]:
model.evaluate(X_test, y_test)

### USING THE MODEL TO MAKE PREDICTIONS

In [None]:
X_new = X_test[:3]
y_proba = model.predict(X_new)
y_proba.round(2)

In [None]:
# y_pred = model.predict_classes(X_new)
y_pred = np.argmax(model.predict(X_new), axis=-1) # Equivalent to code above as it is deprocated

In [None]:
y_pred

In [None]:
np.array(class_names)[y_pred]

In [None]:
y_new = y_test[:3]
y_new

In [None]:
plt.figure(figsize=(7.2, 2.4))
for index, image in enumerate(X_new):
    plt.subplot(1, 3, index + 1)
    plt.imshow(image, cmap="binary", interpolation="nearest")
    plt.axis('off')
    plt.title(class_names[y_test[index]], fontsize=12)
plt.subplots_adjust(wspace=0.2, hspace=0.5)
plt.show()

## Building a Regression MLP Using the Sequential API

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()

In [None]:
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full)

In [None]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_valid = scaler.transform(X_valid)
X_test = scaler.transform(X_test)

In [None]:
X_train.shape

In [None]:
X_train.shape[1:]

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, 'relu', input_shape = X_train.shape[1:]),
    keras.layers.Dense(1)
])

model.compile(loss = 'mean_squared_error', optimizer = 'sgd')

In [None]:
keras.utils.plot_model(model, 'california_housing_regression.png', show_shapes=True)

In [None]:
model.compile(loss="mean_squared_error", optimizer=keras.optimizers.SGD(lr=1e-3))
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))

In [None]:
mse_test = model.evaluate(X_test, y_test)
X_new = X_test[:3]
y_pred = model.predict(X_new)

In [None]:
X_new = X_test[:3] # pretend these are new instances
y_pred = model.predict(X_new)

In [None]:
y_pred

In [None]:
pd.DataFrame(history.history).plot(figsize=(10, 6))
plt.grid(True)
plt.ylim(0,1)
plt.show()

# Functional API

Not all neural network models are simply sequential. Some may have complex topologies. Some may have multiple inputs and/or multiple outputs

In [None]:
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
input_ = keras.layers.Input(shape = X_train.shape[1:])
hidden1 = keras.layers.Dense(30, activation = 'relu')(input_)
hidden2 = keras.layers.Dense(30, activation = 'relu')(hidden1)
concat = keras.layers.concatenate([input_, hidden2])
output = keras.layers.Dense(1)(concat)

model = keras.models.Model(inputs = [input_], outputs = [output])

In [None]:
model.summary()

In [None]:
model.compile(loss = 'mean_squared_error', optimizer=keras.optimizers.SGD(lr = 1e-3))
history = model.fit(X_train, y_train, epochs=30, validation_data = (X_valid, y_valid))
mse_test = model.evaluate(X_test, y_test)
y_pred = model.predict(X_new)

In [None]:
mse_test

In [None]:
y_pred

In [None]:
y_test[:3]

What if you want to send different subsets of input features through the wide or deep paths? We will send 5 features (features 0 to 4), and 6 through the deep path (features 2 to 7). Note that 3 features will go through both (features 2, 3 and 4).

In [None]:
inputA = keras.layers.Input(shape = [5], name='wide_input')
inputB = keras.layers.Input(shape = [6], name='deep_input')
hidden1 = keras.layers.Dense(30, activation = 'relu')(inputB)
hidden2 = keras.layers.Dense(30, activation = 'relu')(hidden1)
concat = keras.layers.concatenate([inputA, hidden2])
output = keras.layers.Dense(1, name = 'Output')(concat)

model = keras.models.Model(inputs = [inputA, inputB], outputs = [output])

In [None]:
model.compile(loss = 'mse', optimizer=keras.optimizers.SGD(lr = 1e-3))

X_train_A, X_train_B = X_train[:, :5], X_train[:,2:]
X_valid_A, X_valid_B = X_valid[:, :5], X_valid[:,2:]
X_test_A, X_test_B = X_test[:, :5], X_test[:,2:]
X_new_A, X_new_B = X_new[:, :5], X_new[:,2:]

In [None]:
history = model.fit((X_train_A, X_train_B), y_train, epochs = 20, validation_data = ((X_valid_A, X_valid_B), y_valid))

In [None]:
mse_test = model.evaluate((X_test_A, X_test_B), y_test)
y_pred = model.predict((X_new_A, X_new_B))

In [None]:
y_pred

In [None]:
y_test[:3]


Adding an auxiliary output for regularization:

In [None]:
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
input_A = keras.layers.Input(shape=[5], name="wide_input")
input_B = keras.layers.Input(shape=[6], name="deep_input")
hidden1 = keras.layers.Dense(30, activation="relu")(input_B)
hidden2 = keras.layers.Dense(30, activation="relu")(hidden1)
concat = keras.layers.concatenate([input_A, hidden2])
output = keras.layers.Dense(1, name="main_output")(concat)
aux_output = keras.layers.Dense(1, name="aux_output")(hidden2)
model = keras.models.Model(inputs=[input_A, input_B], outputs=[output, aux_output])

In [None]:
model.compile(loss = ['mse','mse'], loss_weights = [0.9, 0.1], optimizer = keras.optimizers.SGD(lr = 1e-3))

In [None]:
history = model.fit([X_train_A, X_train_B], [y_train, y_train], epochs = 30, validation_data=([X_valid_A, X_valid_B], [y_valid, y_valid]))

In [None]:
total_loss, main_loss, aux_loss = model.evaluate(
    [X_test_A, X_test_B], [y_test, y_test])

In [None]:
total_loss, main_loss, aux_loss

In [None]:
y_pred_main, y_pred_aux = model.predict([X_new_A, X_new_B])

In [None]:
y_pred_main, y_pred_aux

In [None]:
y_test[:3]

## Using the Subclassing API to Build Dynamic Models

In [None]:
class WideAndDeepModel(keras.Model):
    def __init__(self, units = 30, activation = 'relu', **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(units, activation=activation)
        self.hidden2 = keras.layers.Dense(units, activation=activation)
        self.main_output = keras.layers.Dense(1)
        self.aux_output = keras.layers.Dense(1)

    def call(self, inputs):
        input_A, input_B = inputs
        hidden1 = self.hidden1(input_B)
        hidden2 = self.hidden2(hidden1)
        concat = keras.layers.concatenate([input_A, hidden2])
        main_output = self.main_output(concat)
        aux_output = self.aux_output(hidden2)
        return main_output, aux_output

In [None]:
model = WideAndDeepModel()

In [None]:
model.compile(loss='mse', loss_weights=[.9, .1], optimizer=keras.optimizers.SGD(lr = 1e-3))

In [None]:
history = model.fit([X_train_A, X_train_B], [y_train, y_train], epochs = 30, validation_data=([X_valid_A, X_valid_B], [y_valid, y_valid]))

In [None]:
total_loss, main_loss, aux_loss = model.evaluate((X_test_A, X_test_B), (y_test, y_test))
y_pred_main, y_pred_aux = model.predict((X_new_A, X_new_B))

In [None]:
y_pred_main, y_pred_aux

## Saving and Restoring

In [None]:
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation = 'relu', input_shape=[8]),
    keras.layers.Dense(30, activation = 'relu'),
    keras.layers.Dense(1)
])

In [None]:
model.compile(loss = 'mse', optimizer = keras.optimizers.SGD(lr = 1e-3))
history = model.fit(X_train, y_train, epochs = 20, validation_data = (X_valid, y_valid))

In [None]:
mse_test = model.evaluate(X_test,y_test)

In [None]:
model.save('my_keras_model.h5')

In [None]:
model.predict(X_new)

In [None]:
y_test[:3]

In [None]:
model.save_weights("my_keras_weights.ckpt")

In [None]:
model.load_weights("my_keras_weights.ckpt")

Using Callbacks

In [None]:
keras.backend.clear_session()
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=[8]),
    keras.layers.Dense(30, activation="relu"),
    keras.layers.Dense(1)
])

In [None]:
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=1e-3))
checkpoint_cb = keras.callbacks.ModelCheckpoint("my_keras_model.h5", save_best_only=True)
history = model.fit(X_train, y_train, epochs=10,
                    validation_data=(X_valid, y_valid),
                    callbacks=[checkpoint_cb])
model = keras.models.load_model("my_keras_model.h5") # rollback to best model
mse_test = model.evaluate(X_test, y_test)

In [None]:
model.compile(loss = 'mse', optimizer=keras.optimizers.SGD(lr = 1e-3))
early_stopping_cb = keras.callbacks.EarlyStopping(patience= 10, restore_best_weights=True)

In [None]:
history = model.fit(X_train, y_train, epochs = 100, validation_data=(X_valid, y_valid),
                   callbacks=[checkpoint_cb, early_stopping_cb])
mse_test = model.evaluate(X_test, y_test)

In [None]:
class PrintValTrainRatioCallback(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs):
        print("\nval/train: {:.8f}".format(logs["val_loss"] / logs["loss"]))

In [None]:
val_train_ratio_cb = PrintValTrainRatioCallback()
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid),
                    callbacks=[val_train_ratio_cb])

# TensorBoard

In [None]:
import os
root_logdir = os.path.join(os.curdir, "my_logs")

In [None]:
def get_run_logdir():
    import time
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    return os.path.join(root_logdir, run_id)

In [None]:
run_logdir = get_run_logdir()

In [None]:
run_logdir

In [None]:
keras.backend.clear_session()
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=[8]),
    keras.layers.Dense(30, activation="relu"),
    keras.layers.Dense(1)
])    
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=1e-3))

In [None]:
tensorboard_cb = keras.callbacks.TensorBoard(run_logdir)
history = model.fit(X_train, y_train, epochs = 30,
                   validation_data = (X_valid, y_valid),
                   callbacks=[tensorboard_cb])

In [None]:
%load_ext tensorboard
%tensorboard --logdir=./my_logs --port=6006

In [None]:
run_logdir2 = get_run_logdir()
run_logdir2

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=[8]),
    keras.layers.Dense(30, activation="relu"),
    keras.layers.Dense(1)
])    
model.compile(loss="mse", optimizer=keras.optimizers.SGD(lr=0.05))

In [None]:
tensorboard_cb = keras.callbacks.TensorBoard(run_logdir2)
history = model.fit(X_train, y_train, epochs=30,
                    validation_data=(X_valid, y_valid),
                    callbacks=[checkpoint_cb, tensorboard_cb])

In [None]:
test_logdir = get_run_logdir()
writer = tf.summary.create_file_writer(test_logdir)
with writer.as_default():
    for step in range(1, 1000 + 1):
        tf.summary.scalar("my_scalar", np.sin(step / 10), step=step)
        data = (np.random.randn(100) + 2) * step / 100 # some random data
        tf.summary.histogram("my_hist", data, buckets=50, step=step)
        images = np.random.rand(2, 32, 32, 3) # random 32×32 RGB images
        tf.summary.image("my_images", images * step / 1000, step=step)
        texts = ["The step is " + str(step), "Its square is " + str(step**2)]
        tf.summary.text("my_text", texts, step=step)
        sine_wave = tf.math.sin(tf.range(12000) / 48000 * 2 * np.pi * step)
        audio = tf.reshape(tf.cast(sine_wave, tf.float32), [1, -1, 1])
        tf.summary.audio("my_audio", audio, sample_rate=48000, step=step)

## Fine-Tuning Neural Network Hyperparameters

In [None]:
def build_model(n_hidden = 1, n_neurons = 30, learning_rate = 3e-3, input_shape=[8]):
    model = keras.models.Sequential()
    model.add(keras.layers.InputLayer(input_shape = input_shape))
    for layer in range(n_hidden):
        model.add(keras.layers.Dense(n_neurons, activation='relu'))
    model.add(keras.layers.Dense(1))
    optimizer = keras.optimizers.SGD(lr = learning_rate)
    model.compile(loss = 'mse', optimizer=optimizer)
    return model

In [None]:
keras_reg = keras.wrappers.scikit_learn.KerasRegressor(build_model)

In [None]:
keras_reg.fit(X_train, y_train, epochs = 100,
             validation_data = (X_valid, y_valid),
             callbacks = [keras.callbacks.EarlyStopping(patience=10)])
mse_test = keras_reg.score(X_test, y_test)
y_pred =  keras_reg.predict(X_new)

In [None]:
np.random.seed(259)
tf.random.set_seed(259)

In [None]:
from scipy.stats import reciprocal
from sklearn.model_selection import RandomizedSearchCV

In [None]:
param_distribs = {
    'n_hidden' : [0,1,2,3],
    'n_neurons': np.arange(1,100),
    'learning_rate': reciprocal(3e-4, 3e-2)
}

In [None]:
rnd_search_cv = RandomizedSearchCV(keras_reg, param_distribs, 
                                   n_iter = 10, cv = 3, verbose=2)
rnd_search_cv.fit(X_train, y_train, epochs = 100, 
                  validation_data = (X_valid, y_valid),
                 callbacks = keras.callbacks.EarlyStopping(patience=10))

In [None]:
rnd_search_cv.best_params_

In [None]:
rnd_search_cv.best_score_

In [None]:
rnd_search_cv.best_estimator_

In [None]:
rnd_search_cv.score(X_test, y_test)

In [None]:
model = rnd_search_cv.best_estimator_.model
model

In [None]:
model.evaluate(X_test, y_test)