Source: [training-data-analyst, stable-link (not-master)](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/f3b838007a0e1fb3cc7880ad0343f46678926f36/courses/machine_learning/deepdive/04_advanced_preprocessing/a_dataflow.ipynb) 

# Data Preprocessing for Machine Learning

**Learning Objectives**
* Understand the different approaches for data preprocessing in developing ML models
* Use Dataflow to perform data preprocessing steps

## Introduction

Preprocessing data for a machine learning model involves both data engineering and feature engineering. During data engineering, we convert raw data into prepared data which is necessary for the model. Feature engineering then takes that prepared data and creates the features expected by the model. We have already seen various ways we can engineer new features for a machine learning model and where those steps take place. We also have flexibility as to where data preprocessing steps can take place; for example, BigQuery, Cloud Dataflow and Tensorflow. In this lab, we'll explore different data preprocessing strategies and see how they can be accomplished with Cloud Dataflow.

One perspective in which to categorize different types of data preprocessing operations is in terms of the granularity of the operation. Here, we will consider the following three types of operations:
1. Instance-level transformations
2. Full-pass transformations
3. Time-windowed aggregations

Cloud Dataflow can perform each of these types of operations and is particularly useful when performing computationally expensive operations as it is an autoscaling service for batch and streaming data processing pipelines. We'll say a few words about each of these below. For more information, have a look at this article about [data preprocessing for machine learning from Google Cloud](https://cloud.google.com/solutions/machine-learning/data-preprocessing-for-ml-with-tf-transform-pt1).

**1. Instance-level transformations**
These are transformations which take place during training and prediction, looking only at values from a single data point. For example, they might include clipping the value of a feature, polynomially expand a feature, multiply two features, or compare two features to create a Boolean flag.

It is necessary to apply the same transformations at training time and at prediction time. Failure to do this results in training/serving skew and will negatively affect the performance of the model.

**2. Full-pass transformations**
These transformations occur during training, but occur as instance-level operations during prediction. That is, during training you must analyze the entirety of the training data to compute quantities such as maximum, minimum, mean or variance while at prediction time you need only use those values to rescale or normalize a single data point. 

A good example to keep in mind is standard scaling (z-score normalization) of features for training. You need to compute the mean and standard deviation of that feature across the whole training data set, thus it is called a full-pass transformation. At prediction time you use those previously computed values to appropriately normalize the new data point. Failure to do so results in training/serving skew.

**3. Time-windowed aggregations**
These types of transformations occur during training and at prediction time. They involve creating a feature by summarizing real-time values by aggregating over some temporal window clause. For example, if we wanted our model to estimate the taxi trip time based on the traffic metrics for the route in the last 5 minutes, in the last 10 minutes or the last 30 minutes we would want to create a time-window to aggreagate these values. 

At prediction time these aggregations have to be computed in real-time from a data stream.

### Set environment variables and load necessary libraries

Apache Beam only works in Python 2 at the moment, so switch to the Python 2 kernel in the upper right hand side. Then execute the following cells to install the necessary libraries if they have not been installed already.

In [None]:
#Ensure that we have Apache Beam 2.10 installed.
!pip freeze | grep apache-beam==2.10.0 || pip install apache-beam[gcp]

Check that your jupyter-server is running in the correct environement:
- I had problem if it was not one with Python 2.7, even when the kernel was selected correctly

In [32]:
import tensorflow as tf
import apache_beam as beam
import shutil
import os
print("TF-Version: {}".format(tf.__version__))
import sys
print("Python-Version: {}".format(sys.version))

TF-Version: 1.13.1
Python-Version: 2.7.13 (default, Sep 26 2018, 18:42:22) 
[GCC 6.3.0 20170516]


In [33]:
assert sys.version_info.major == 2 , "Beam currently does only run properly with Python2"

Next, set the environment variables related to your GCP Project.

In [34]:
from utils import chdir_
pwd = chdir_()

import yaml
with open('config.yaml', 'r') as f:
    #cfg = yaml.load(f, Loader=yaml.BaseLoader)
    cfg = yaml.safe_load(f)

Current Working direcotory:	/home/jupyter/proj_DL_models_and_pipelines_with_GCP


In [47]:
PROJECT = cfg["project-id"]  # Replace with your PROJECT
BUCKET = cfg["bucket"]  # Replace with your BUCKET
REGION = cfg["region"]           # Choose an available region for Cloud MLE
REGION = 'europe-west1'  # europe-west6 is not working currently (19-04-23)
TFVERSION = str(cfg["tf-version"])                # TF version for CMLE to use

In [36]:
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = TFVERSION 

## ensure we"re using python2 env
os.environ["CLOUDSDK_PYTHON"] = "python2"

In [44]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`

Updated property [core/project].
Updated property [compute/region].
Updated property [ml_engine/local_python].


## Create data preprocessing job with Cloud Dataflow

The following code reads from BigQuery and saves the data on Google Cloud Storage. The data of the feature columns is standardized by deviding pixels intensities in the range of 0 to 255 by 255. Remember to repeat that prepreprocessing at prediction time to avoid training/serving skew. 
In general, it is better to use tf.transform which will do this book-keeping for you, or to do preprocessing within your TensorFlow model. We will look at how tf.transform works in another notebook. For now, we are simply moving data from BigQuery to CSV using Dataflow.

It's worth noting that while we could read from [BQ directly from TensorFlow](https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV. We can do this at scale with Cloud Dataflow. Furthermore, because we are running this on the cloud, you should go to the [GCP Console](https://console.cloud.google.com/dataflow) to view the status of the job. It will take several minutes for the preprocessing job to launch.

### Define our query and pipeline functions

To start we'll copy over the `create_query` function we created in the `01_bigquery/c_extract_and_benchmark` notebook. 

In [38]:
def create_query(phase, sample_size):
    basequery = """
    SELECT *
    FROM
        `{project}.test.DATA`
    WHERE
        MOD(ID, EVERY_N) = 0
    """.format(project=PROJECT)

    if phase == 'TRAIN':
        subsample = """
        AND MOD(ID, EVERY_N * 100) >= (EVERY_N * 0)
        AND MOD(ID, EVERY_N * 100) <  (EVERY_N * 85)
        """
    elif phase == 'TEST':
        subsample = """
        AND MOD(ID, EVERY_N * 100) >= (EVERY_N * 85)
        AND MOD(ID, EVERY_N * 100) <  (EVERY_N * 100)
        """

    query = basequery + subsample
    return query.replace("EVERY_N", sample_size)

Then, we'll write the csv we create to a Cloud Storage bucket. So, we'll look to see that the location is empty, and if not clear out its contents so that it is.

In [11]:
%env BUCKET_FOLDER gs://$BUCKET/mnist/bq/

env: BUCKET_FOLDER=gs://presentation-38388/mnist/bq/


In [None]:
%%bash
echo ${BUCKET_FOLDER}
gsutil -m rm -rf ${BUCKET_FOLDER}

In [13]:
from google.cloud import bigquery
client = bigquery.Client(project=PROJECT)

sql = """
    SELECT *
    FROM `{project}.test.DATA`
    LIMIT 15
""".format(project=PROJECT)
df = client.query(sql).to_dataframe()
df.head()

Unnamed: 0,ID,feat_1,feat_2,feat_3,feat_4,feat_5,feat_6,feat_7,feat_8,feat_9,...,feat_776,feat_777,feat_778,feat_779,feat_780,feat_781,feat_782,feat_783,feat_784,label
0,51,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,68,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,75,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,118,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,121,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


Extract Column names from sampled data:
1. `CSV_COLUMNS`: All columns in the Table
2. `FEAT_COLUMNS`: All feature columns in the table, has to be adapted for each Table!

In [14]:
CSV_COLUMNS = list(df.columns)
FEAT_COLUMNS = [x for x in CSV_COLUMNS if 'feat' in x]

Beam processes each row separatly. After reading the data, each row will be a dictionary containing `Col_Name`: `Value` pairs:

In [15]:
#example for row_dict
rowdict= {'ID': 32, 'feat_1': 4, 'feat_45': 0, 'feat_458': 134}
for k in FEAT_COLUMNS: 
    rowdict[k] = rowdict.get(k, 0) / 255.0
rowdict['ID'], rowdict['feat_1'], rowdict['feat_45'], rowdict['feat_458']

(32, 0.01568627450980392, 0.0, 0.5254901960784314)

In [16]:
def make_parse_features(featcolumns):
    def parse_features(rowdict):
        """
        Arguments:
            -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
            which each row is represented as a python dictionary
        Returns:
            -rowdict: A transformed rowdict
        """
        for k in featcolumns:
            rowdict[k] = rowdict.get(k, 0) / 255.0
        return rowdict
    return parse_features
parse_features = make_parse_features(featcolumns=FEAT_COLUMNS)

Next, we'll create a function and pipeline for preprocessing the data. First, we'll define a `to_csv` function which takes a row dictionary (a dictionary created from a BigQuery reader representing each row of a dataset) and returns a comma separated string for each record

In [17]:
def make_to_csv(columns):
    def to_csv(rowdict):
        """
        Arguments:
            -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
            which each row is represented as a python dictionary
        Returns:
            -rowstring: a comma separated string representation of the record
        """
        rowstring = ','.join([str(rowdict[k]) for k in columns])
        return rowstring
    return to_csv
to_csv = make_to_csv(columns=CSV_COLUMNS)

Next, we define our primary preprocessing function. Reading through the code this creates a pipeline to read data from BigQuery, use our `to_csv` function above to make a comma separated string, then write to a file in Google Cloud Storage. 

In [48]:
import datetime

def preprocess(EVERY_N, RUNNER):
    """
    Arguments:
        -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
        Larger values will yield smaller sample
        -RUNNER: "DirectRunner" or "DataflowRunner". Specfy to run the pipeline
        locally or on Google Cloud respectively. 
    Side-effects:
        -Creates and executes dataflow pipeline. 
        See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
    """
    job_name = "preprocess-mnist" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S")
    print("Launching Dataflow job {} ... hang on".format(job_name))
    OUTPUT_DIR = "gs://{0}/mnist/bq/".format(BUCKET)

    #dictionary of pipeline options
    options = {
        "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
        "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
        "job_name": job_name,
        "project": PROJECT,
        "runner": RUNNER,
        "region": REGION
    }
  
    #instantiate PipelineOptions object using options dictionary
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    #instantantiate Pipeline object using PipelineOptions
    with beam.Pipeline(options=opts) as p:
        for phase in ["TRAIN", "TEST"]:
            query = create_query(phase, EVERY_N)
            outfile = os.path.join(OUTPUT_DIR, "{}.csv".format(phase))
            (
                p | "read_{}".format(phase) >> beam.io.Read(beam.io.BigQuerySource(query = query, use_standard_sql = True))
                  | "parsefeat_{}".format(phase) >> beam.Map(parse_features)
                  | "tocsv_{}".format(phase) >> beam.Map(to_csv)
                  | "write_{}".format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
            )
    print("Done")

In [40]:
OUTPUT_DIR = "gs://{0}/mnist/bq".format(BUCKET)
%env OUTPUT_DIR $OUTPUT_DIR
!gsutil ls $OUTPUT_DIR

env: OUTPUT_DIR=gs://presentation-38388/mnist/bq
gs://presentation-38388/mnist/bq/TEST.csv-00000-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00001-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00002-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00003-of-00004
gs://presentation-38388/mnist/bq/TRAIN.csv-00000-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00001-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00002-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00003-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00004-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00005-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00006-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00007-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00008-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00009-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00010-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00011-of-00014
gs://presentation-38388/mni

Now that we have the preprocessing pipeline function, we can execute the pipeline locally or on the cloud. To run our pipeline locally, we specify the `RUNNER` variable as `DirectRunner`. To run our pipeline in the cloud, we set `RUNNER` to be `DataflowRunner`. In either case, this variable is passed to the options dictionary that we use to instantiate the pipeline. 

As with training a model, it is good practice to test your preprocessing pipeline locally with a subset of your data before running it against your entire dataset.

### Run Beam pipeline locally

We'll start by testing our pipeline locally. This takes upto 5 minutes. You will see a message "Done" when it has finished.

In [None]:
preprocess("50*1000", "DirectRunner")

### Run Beam pipeline on Cloud Dataflow¶

Again, we'll clear out our bucket to GCS to ensure a fresh run.

In [41]:
%%bash
if gsutil ls -r $OUTPUT_DIR | grep csv; then
    gsutil -m rm -rf $OUTPUT_DIR
fi

gs://presentation-38388/mnist/bq/TEST.csv-00000-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00001-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00002-of-00004
gs://presentation-38388/mnist/bq/TEST.csv-00003-of-00004
gs://presentation-38388/mnist/bq/TRAIN.csv-00000-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00001-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00002-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00003-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00004-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00005-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00006-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00007-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00008-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00009-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00010-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00011-of-00014
gs://presentation-38388/mnist/bq/TRAIN.csv-00012-of-00014
gs://presentation-

Removing gs://presentation-38388/mnist/bq/TEST.csv-00000-of-00004#1556029063188338...
Removing gs://presentation-38388/mnist/bq/TEST.csv-00001-of-00004#1556029063082529...
Removing gs://presentation-38388/mnist/bq/TEST.csv-00002-of-00004#1556029063094932...
Removing gs://presentation-38388/mnist/bq/TEST.csv-00003-of-00004#1556029063154076...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00000-of-00014#1556029111419574...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00001-of-00014#1556029111443460...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00002-of-00014#1556029111453702...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00003-of-00014#1556029111408919...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00004-of-00014#1556029111414916...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00005-of-00014#1556029111488396...
Removing gs://presentation-38388/mnist/bq/TRAIN.csv-00006-of-00014#1556029111437015...
Removing gs://presentation-38388/mnist/bq/TRAIN

The following step will take **15-20 minutes**. Monitor job progress on the Dataflow section of [GCP Console](https://console.cloud.google.com/dataflow). Note, you can change the first arugment to "1" to process the full dataset.

> Error, due to missing service account which is created when created a project [`[PROJECT_NUMBER]-compute@developer.gserviceaccount.com`](https://cloud.google.com/compute/docs/access/service-accounts#compute_engine_default_service_account)
>
> NEVER delete it!

In [49]:
# Process full dataset
preprocess("1", "DataflowRunner")

Launching Dataflow job preprocess-mnist-190423-150433 ... hang on
Done


Once the job finishes, we can look at the files that have been created and have a look at what they contain. You will notice that the files have been sharded into many csv files.

In [62]:
%%bash
gsutil cat "$OUTPUT_DIR/TRAIN.csv-00000-of-*" | head -1

5001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.121568627451,0.517647058824,0.996078431373,0.992156862745,0.996078431373,0.835294117647,0.321568627451,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0823529411765,0.556862745098,0.913725490196,0.988235294118,0.992156862745,0.988235294118,0.992156862745,0.988235294118,0.874509803922,0.078431372549,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0

Source: [training-data-analyst, stable-link (not-master)](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/f3b838007a0e1fb3cc7880ad0343f46678926f36/courses/machine_learning/deepdive/04_advanced_preprocessing/a_dataflow.ipynb) 

Source: [training-data-analyst, stable-link (not-master)](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/f3b838007a0e1fb3cc7880ad0343f46678926f36/courses/machine_learning/deepdive/04_advanced_preprocessing/a_dataflow.ipynb) 