# Creating Keras DNN model

**Objectives**

1. Create input layers for raw features
1. Create feature columns for inputs
1. Create DNN dense hidden layers and output layer
1. Build DNN model tying all of the pieces together
1. Train and evaluate


## Set up environment variables and load necessary libraries

In [None]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [None]:
!pip install --user google-cloud-bigquery==1.25.0

**Note**: Restart your kernel to use updated packages.

Kindly ignore the deprecation warnings and incompatibility errors related to google-cloud-storage.

Import necessary libraries.

In [None]:
from google.cloud import bigquery
import pandas as pd
import datetime
import os
import shutil
import matplotlib.pyplot as plt
import tensorflow as tf
print(tf.__version__)

Set environment variables so that we can use them throughout the notebook.

In [None]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT

In [None]:
PROJECT = "cloud-training-demos"  # Replace with your PROJECT

## Create ML datasets by sampling using BigQuery

We'll begin by sampling the BigQuery data to create smaller datasets. Let's create a BigQuery client that we'll use throughout the lab.

In [None]:
bq = bigquery.Client(project = PROJECT)

We need to figure out the right way to divide our hash values to get our desired splits. To do that we need to define some values to hash within the module. Feel free to play around with these values to get the perfect combination.

In [None]:
hashing_mod = 100
train_pc = 80.0
eval_pc = 10.0

train_bucket = int(hashing_mod * train_pc / 100.0)
eval_bucket = int(hashing_mod * eval_pc / 100.0)

We can make a series of queries to check if our bucketing values result in the correct sizes of each of our dataset splits and then adjust accordingly. Therefore, to make our code more compact and reusable, let's define a function to return the head of a dataframe produced from our queries up to a certain number of rows.

In [None]:
def get_df_head(query, count=10):
    """Displays count rows from dataframe head from query.
    
    Args:
        query: str, query to be run on BigQuery, results stored in dataframe.
        count: int, number of results from head of dataframe to display.
    Returns:
        Dataframe head with count number of results.
    """
    df = bq.query(
        query + " LIMIT {limit}".format(
            limit=count)).to_dataframe()

    return df.head(count)

For our first query, we're going to use the original query above to get our label, features, and columns to combine into our hash which we will use to perform our repeatable splitting. There are only a limited number of years, months, days (possibly) and states in the dataset. Try less or more in the hash and see how it changes results.

In [None]:
# Get label, features, and columns to hash and split into buckets
cleanup_query = """
SELECT
    feature_1,
    feature_2,
    feature_3,
    feature_n,
    year,
    month,
    IFNULL(state, "Unknown") AS state,
FROM
    samples.mydata
WHERE
    year > 2000
    AND feature_1 > 0
    AND feature_2 > 0
    AND feature_3 > 0
    AND feature_n > 0
"""

get_df_head(cleanup_query)

Next query will combine our hash columns and will leave us just with our label, features, and our hash values.

In [None]:
farm_fingerprint_query = """
SELECT
    feature_1,
    feature_2,
    feature_3,
    feature_n,
    FARM_FINGERPRINT(
        CONCAT(
            CAST(year AS STRING),
            CAST(month AS STRING),
            CAST(state AS STRING)
        )
    ) AS hash_values
FROM
    ({cleanup_query})
""".format(cleanup_query=cleanup_query)

get_df_head(farm_fingerprint_query)

The next query is going to find the counts of each of the unique x number of `hash_values`. This will be our first step at making actual hash buckets for our split via the `GROUP BY`.

In [None]:
# Count of records for each hash value generated in the previous step
first_bucket = """
SELECT
    hash_values,
    COUNT(*) AS num_records
FROM
    ({farm_fingerprint_query})
GROUP BY
    hash_values
""".format(farm_fingerprint_query=farm_fingerprint_query)

get_df_head(first_bucket)

The query below performs a second layer of bucketing where now for each of these bucket indices we count the number of records.

In [None]:
# Second time bucketing operation - expected to create roughly 100 buckets for default hashing_mod
second_bucket = """
SELECT
    ABS(MOD(hash_values, {hashing_mod})) AS bucket_index,
    SUM(num_records) AS num_records
FROM
    ({first_bucket})
GROUP BY
    ABS(MOD(hash_values, {hashing_mod}))
""".format(
    first_bucket=first_bucket, hashing_mod=hashing_mod)

get_df_head(second_bucket)

The number of records is hard for us to easily understand the split, so normalize the count into percentage of the data in each of the hash buckets in the next query.

In [None]:
#The below query states what percentage of total belongs to a particular bucket
percentages = """
SELECT
    bucket_index,
    num_records,
    CAST(num_records AS FLOAT64) / (
    SELECT
        SUM(num_records)
    FROM
        ({second_bucket})) AS percent_records
FROM
    ({second_bucket})
""".format(second_bucket=second_bucket)

get_df_head(percentages)

Select the range of buckets to be used in training.

In [None]:
training = """
SELECT
    *,
    "train" AS dataset_name
FROM
    ({percentages})
WHERE
    bucket_index >= 0
    AND bucket_index < {train_buckets}
""".format(
    percentages=percentages,
    train_buckets=train_buckets)

get_df_head(training)

Select the range of buckets to be used evaluation

In [None]:
# Below, we pick the buckets for evaluation which will be used periodically during training
evaluation = """
SELECT
    *,
    "eval" AS dataset_name
FROM
    ({percentages})
WHERE
    bucket_index >= {train_buckets}
    AND bucket_index < {cum_eval_buckets}
""".format(
    percentages=percentages,
    train_buckets=train_buckets,
    cum_eval_buckets=train_buckets + eval_buckets)

get_df_head(evaluation)

Select the hash buckets to be used for the test split.

In [None]:
# Finally, query for the hash buckets for our testing set
testing = """
SELECT
    *,
    "test" AS dataset_name
FROM
    ({percentages})
WHERE
    bucket_index >= {cum_eval_buckets}
    AND bucket_index < {hashing_mod}
""".format(
    percentages=percentages,
    cum_eval_buckets=train_buckets + eval_buckets,
    hashing_mod=hashing_mod)

get_df_head(testing)

In the below query, `UNION ALL` all of the datasets together so that all three sets of hash buckets will be within one table. Add `dataset_id` so that it can be sorted on in the query after.

In [None]:
# Union the training, validation (a.k.a evaluation), and testing dataset statistics
union = """
SELECT
    0 AS dataset_id,
    *
FROM
    ({training})
UNION ALL
SELECT
    1 AS dataset_id,
    *
FROM
    ({evaluation})
UNION ALL
SELECT
    2 AS dataset_id,
    *
FROM
    ({testing})
""".format(training=training, evaluation=evaluation, testing=testing)

get_df_head(union)

Lastly, show the final split between train, eval, and test sets

In [None]:
# Show final splitting according to percentages and other info
split = """
SELECT
    dataset_id,
    dataset_name,
    SUM(num_records) AS num_records,
    SUM(percent_records) AS percent_records
FROM
    ({union})
GROUP BY
    dataset_id,
    dataset_name
ORDER BY
    dataset_id
""".format(union=union)

get_df_head(split)

Here's a way to get a well-distributed portion of the data in such a way that the train, eval, test sets do not overlap and takes a subsample of the global splits.

In [None]:
# We are subsampling from each of the hash values
# The multiple chosen will depend on how much data we have

subsample_multiple = 1000

splitting_string = "ABS(MOD(hash_values, {0} * {1}))".format(subsample_multiple, hashing_mod)

def create_data_split_sample(query_string, splitting_string, lo, up):
    """Creates a dataframe with a sample of a data split.
    Args:
        query_string: str, query to run to generate splits.
        splitting_string: str, modulo string to split by.
        lo: float, lower bound for bucket filtering for split.
        up: float, upper bound for bucket filtering for split.
    Returns:
        Dataframe containing data split sample.
    """
    query = "SELECT * FROM ({0}) WHERE {1} >= {2} and {1} < {3}".format(
        query_string, splitting_string, int(lo), int(up))

    df = bq.query(query).to_dataframe()

    return df

train_df = create_data_split_sample(data_query, splitting_string,lo=0, up=train_percent)

eval_df = create_data_split_sample(data_query, splitting_string,lo=train_percent, up=train_percent + eval_percent)

test_df = create_data_split_sample(data_query, splitting_string, lo=train_percent + eval_percent, up=hashing_mod)

print("There are {} examples in the train dataset.".format(len(train_df)))
print("There are {} examples in the validation dataset.".format(len(eval_df)))
print("There are {} examples in the test dataset.".format(len(test_df)))

## Preprocess data using Pandas

In [None]:
train_df.head()

Also, notice that there may be numeric fields that are missing in some rows (the count in Pandas doesn't count missing data)

In [None]:
train_df.describe()

It is always crucial to clean raw data before using in machine learning, so have a preprocessing step. Define a `preprocess` function below

In [None]:
def preprocess(df):
    """ Preprocess pandas dataframe for augmented babyweight data.
    
    Args:
        df: Dataframe containing raw data.
    Returns:
        Pandas dataframe containing preprocessed raw data as well
            as simulated no data masking some of the original data. Also, any other
            cleaning such as one hot encoding and embedding can be performed before 
            the processed dataframe is returned by this method 
    """
    # Clean up raw data
    # Filter out what we don"t want to use for training
    df = df[df.feature_1 > 0]
    ...
    # Any other operations such as one hot encoding categorical data
    # Any embeddings needed to be applied to categorical data can be applied here
    # Any other preprocessing steps such as filling absent values with "Unknown"
    # Assigning default numerical values to missing data in a way that doesn't skew the data
    # Any data augmentation related operations

    return df

In [None]:
train_df = preprocess(train_df)
eval_df = preprocess(eval_df)
test_df = preprocess(test_df)

In [None]:
train_df.head()

In [None]:
train_df.tail()

In [None]:
train_df.describe()

## Write to .csv files 

In the final versions, we want to read from files, not Pandas dataframes. So, write the Pandas dataframes out as csv files. Using csv files gives one the advantage of shuffling during read. This is important for distributed training because some workers might be slower than others, and shuffling the data helps prevent the same data from being assigned to the slow workers.

In [None]:
# Define columns
columns = ["feature_1",
           "feature_2",
           "feature_3",
           "feature_n"]

# Write out CSV files
train_df.to_csv(
    path_or_buf="train.csv", columns=columns, header=False, index=False)
eval_df.to_csv(
    path_or_buf="eval.csv", columns=columns, header=False, index=False)
test_df.to_csv(
    path_or_buf="test.csv", columns=columns, header=False, index=False)

In [None]:
%%bash
wc -l *.csv

In [None]:
%%bash
head *.csv

In [None]:
%%bash
tail *.csv

In [None]:
%%bash
ls *.csv

In [None]:
%%bash
head -5 *.csv

## Create Keras model

### Set CSV Columns, label column, and column defaults.

Now that the existence of CSV files has been verified, set the following that will be using in the input function.
* `CSV_COLUMNS` - Ensure that they are in the same order as in the CSV files
* `LABEL_COLUMN` - the header name of the column that is the label. Pop it from the features dictionary.
* `DEFAULTS` - a list with the same length as `CSV_COLUMNS`, i.e. there is a default for each column in our CSVs. Each element is a list itself with the default value for that CSV column.

In [None]:
# Create list of string column headers
columns = ["feature_1",
           "feature_2",
           "feature_3",
           ...
           "feature_n"]

# Add string name for label column for example feature_4 (short for feature column 4)
LABEL_COLUMN = "feature_4"

# Set default values for each CSV column as a list of lists. For example

DEFAULTS = [[0.0], ["Unknown"], [0.0], ["Unknown"], [0.0]]

### Make dataset of features and label from CSV files.

Next, write an input_fn to read the data. Use `tf.data.experimental.make_csv_dataset`. This will create a CSV dataset object. However, divide the columns up into features and a label. Do this by applying the map method to the dataset and popping the label column off of the dictionary of feature tensors.

In [None]:
def features_and_labels(row_data):
    """Splits features and labels from feature dictionary.

    Args:
        row_data: Dictionary of CSV column names and tensor values.
    Returns:
        Dictionary of feature tensors and label tensor.
    """
    label = row_data.pop(LABEL_COLUMN)

    return row_data, label  # features, label


def load_dataset(pattern, batch_size=1, mode='eval'):
    """Loads dataset using the tf.data API from CSV files.

    Args:
        pattern: str, file pattern to glob into list of files.
        batch_size: int, the number of examples per batch.
        mode: 'train' | 'eval' to determine if training or evaluating.
    Returns:
        `Dataset` object.
    """
    # Make a CSV dataset
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS,
        ignore_errors=True)

    # Map dataset to features and label
    dataset = dataset.map(map_func=features_and_labels)  # features, label

    # Shuffle and repeat for training
    if mode == 'train':
        dataset = dataset.shuffle(buffer_size=1000).repeat()

    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(buffer_size=1)

    return dataset

### Create input layers for raw features.

In [None]:
def create_input_layers():
    """Creates dictionary of input layers for each feature.

    Returns:
        Dictionary of `tf.Keras.layers.Input` layers for each feature.
    """
    inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="float32")
        for colname in ["feature_1", "feature_2"]}

    inputs.update({
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="string")
        for colname in ["feature_3", "feature_5"]})

    return inputs

### Create feature columns for inputs.

Next, define the feature columns. Only dense feature columns can be inputs to a DNN.

In [None]:

def categorical_fc(name, values):
    """Helper function to wrap categorical feature by indicator column.

    Args:
        name: str, name of feature.
        values: list, list of strings of categorical values.
    Returns:
        Indicator column of categorical feature.
    """
    cat_column = tf.feature_column.categorical_column_with_vocabulary_list(
            key=name, vocabulary_list=values)

    return tf.feature_column.indicator_column(categorical_column=cat_column)


def create_feature_columns():
    """Creates dictionary of feature columns from inputs.

    Returns:
        Dictionary of feature columns.
    """
    feature_columns = {
        colname : tf.feature_column.numeric_column(key=colname)
           for colname in ["feature_1", "feature_2"]
    }

    feature_columns["some_boolean_column"] = categorical_fc(
        "some_boolean_column", ["True", "False", "Unknown"])
    feature_columns["multi_category_feature"] = categorical_fc(
        "multi_category_feature", ["Monday(1)", "Tuesday(2)", "Wednesday(3)",
                      "Thursday(4)", "Friday(5)", "Weekends(6+)"])

    return feature_columns

### Create DNN dense hidden layers and output layer.

Create some hidden dense layers beginning with our inputs and end with a dense output layer. This is regression so double check the correctness of output layer activation and that the shape is right.

In [None]:
def get_model_outputs(inputs):
    """Creates model architecture and returns outputs.

    Args:
        inputs: Dense tensor used as inputs to model.
    Returns:
        Dense tensor output from the model.
    """
    # Create two hidden layers of [64, 32]
    h1 = tf.keras.layers.Dense(64, activation="relu", name="h1")(inputs)
    h2 = tf.keras.layers.Dense(32, activation="relu", name="h2")(h1)

    # Final output is a linear activation because this is regression
    output = tf.keras.layers.Dense(
        units=1, activation="linear", name="weight")(h2)

    return output

### Create custom evaluation metric.

This is regression. Define RMSE of the model on our evaluation dataset

In [None]:
def rmse(y_actual, y_predicted):
    """Calculates RMSE evaluation metric.

    Args:
        y_actual: tensor, true labels.
        y_predicted: tensor, predicted labels.
    Returns:
        Tensor with value of RMSE between true and predicted labels.
    """
    return tf.sqrt(tf.reduce_mean((y_predicted - y_actual) ** 2))

### Build DNN model 

In [None]:
def build_dnn_model():
    """Builds simple DNN using Keras Functional API.

    Returns:
        `tf.keras.models.Model` object.
    """
    # Create input layer
    inputs = create_input_layers()

    # Create feature columns
    feature_columns = create_feature_columns()

    # The constructor for DenseFeatures takes a list of numeric columns
    # The Functional API in Keras requires: LayerConstructor()(inputs)
    dnn_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=feature_columns.values())(inputs)

    # Get output of model given inputs
    output = get_model_outputs(dnn_inputs)

    # Build model and compile it all together
    model = tf.keras.models.Model(inputs=inputs, outputs=output)
    model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])

    return model

print("Here is our DNN architecture so far:\n")
model = build_dnn_model()
print(model.summary())

Visualize the DNN using the Keras plot_model utility.

In [None]:
tf.keras.utils.plot_model(
    model=model, to_file="dnn_model.png", show_shapes=False, rankdir="LR")

## Run and evaluate model

### Train and evaluate.

In [None]:
TRAIN_BATCH_SIZE = 32
NUM_TRAIN_EXAMPLES = 10000 * 5  
NUM_EVALS = 5  
NUM_EVAL_EXAMPLES = 10000
EVAL_BATCH_SIZE = 1000

trainds = load_dataset(
    pattern="train*",
    batch_size=TRAIN_BATCH_SIZE,
    mode='train')

evalds = load_dataset(
    pattern="eval*",
    batch_size=EVAL_BATCH_SIZE,
    mode='eval').take(count=NUM_EVAL_EXAMPLES // EVAL_BATCH_SIZE)

steps_per_epoch = NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS)

logdir = os.path.join(
    "logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir=logdir, histogram_freq=1)

history = model.fit(
    trainds,
    validation_data=evalds,
    epochs=NUM_EVALS,
    steps_per_epoch=steps_per_epoch,
    callbacks=[tensorboard_callback])

### Visualize loss curve

In [None]:
import matplotlib.pyplot as plt
nrows = 1
ncols = 2
fig = plt.figure(figsize=(10, 5))

for idx, key in enumerate(["loss", "rmse"]):
    ax = fig.add_subplot(nrows, ncols, idx+1)
    plt.plot(history.history[key])
    plt.plot(history.history["val_{}".format(key)])
    plt.title("model {}".format(key))
    plt.ylabel(key)
    plt.xlabel("epoch")
    plt.legend(["train", "validation"], loc="upper left");

### Save the model

In [None]:
OUTPUT_DIR = "mymodel_trained"
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
EXPORT_PATH = os.path.join(
    OUTPUT_DIR, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
tf.saved_model.save(
    obj=model, export_dir=EXPORT_PATH)  # with default serving function
print("Exported trained model to {}".format(EXPORT_PATH))

In [None]:
!ls $EXPORT_PATH

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License