<a href="https://colab.research.google.com/github/Sylar257/Google-Cloud-Platform-with-Tensorflow/blob/master/Pre_processing_data_using_apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1> Preprocessing using Dataflow </h1>

This notebook illustrates:
<ol>
<li> Creating datasets for Machine Learning using Dataflow
</ol>
<p>
While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

In [0]:
pip install --user apache-beam[gcp]

Run the command again if you are getting oauth2client error.

<b>Restart</b> the kernel before proceeding further.

Make sure the Dataflow API is enabled by going to this [link](https://console.developers.google.com/apis/api/dataflow.googleapis.com). Ensure that you've installed Beam by importing it and printing the version number.

In [0]:
import apache_beam as beam
print(beam.__version__)

2.16.0


You may receive a `UserWarning` about the Apache Beam SDK for Python 3 as not being yet fully supported. Don't worry about this.

In [0]:
# change these to try this notebook out
BUCKET = 'qwiklabs-gcp-00-4bd4b688d32d'
PROJECT = 'qwiklabs-gcp-00-4bd4b688d32d'
REGION = 'australia-southeast1'

In [0]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [0]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

<h2> Create ML dataset using Dataflow </h2>
Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

Instead of using Beam/Dataflow, I had three other options:

* Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!
* Read from BigQuery directly using TensorFlow.
* Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to "allow large results" and save the result into a CSV file on Google Cloud Storage. 

<p>

However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.

In [0]:
# Pull columns from BQ and create line(s) of CSV input

def to_csv(rowdict):
  import hashlib
  import copy
  
  # hashmonth is not added here as it's not one of the feautres we incorporate for prediction
  CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
    
  # Create synthetic data where we assume that no ultrasound has been performed
  # and so we don't know sex of the baby. Let's assume that we can tell the difference
  # between single and multiple, but that the errors rates in determining exact number
  # is difficult in the absence of an ultrasound.
  no_ultrasound = copy.deepcopy(rowdict)
  w_ultrasound = copy.deepcopy(rowdict)

  no_ultrasound['is_male'] = 'Unknown'
  if rowdict['plurality'] > 1:
    no_ultrasound['plurality'] = 'Multiple(2+)'
  else:
    no_ultrasound['plurality'] = 'Single(1)'

  # Change the plurality column to strings
  w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]

  # Write out two rows for each input row, one with ultrasound and one without
  for result in [no_ultrasound, w_ultrasound]:
    data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
    key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
    yield str('{},{}'.format(data, key))

In [0]:
import datetime, os
    
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
    
  query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) < 3'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql=True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
 
preprocess(in_test_mode = True)

Launching local job ... hang on




Done!


The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and <b>wait for the job to finish</b> before you run the following step.

In [0]:
!ls preproc

eval.csv-00000-of-00001  train.csv-00000-of-00001


In [0]:
!head preproc/train.csv*

6.686620406459999,Unknown,35,Single(1),35,1dbc184ff4ada8b1c4d26b84f08f3ed7c4b007ac3cf50301c766e18f
6.686620406459999,False,35,Single(1),35,f2393b23afd6982b00551393c360fe1fc6e7104ad0de32b49fd33a65
7.06361087448,Unknown,28,Single(1),35,75f3e78eb4110fb070c77616053f8f5f2e0b243787deb7b2d97ce9a2
7.06361087448,False,28,Single(1),35,58433e19ff350bcfd8fb091a17fee652d69cd8aaaa486ef70a4b073e
6.1244416383599996,Unknown,37,Single(1),36,d6ea0417a9d68de9202ee0d9ca9fb90e6237551602ce4091284fabf5
6.1244416383599996,False,37,Single(1),36,a84a90f3bce322ccfa980cc2c63f335cff4a385b25be22cd405fb247
8.12623897732,Unknown,25,Single(1),37,9675e0c377c3377fb2f1b796e308198850b4a8d159fe9192f810dcf3
8.12623897732,False,25,Single(1),37,b0c141e2355ebb0aff21847833e94b5baec2620eff84e502c56c9083
7.50012615324,Unknown,17,Single(1),37,ff0943d605b4df01e0cd9822a674edceb77fade78b4cfd48b211f042
7.50012615324,False,17,Single(1),37,3ec1d751ff20b423292e286485bbd6df22850ef2889a125def1f03dd


In [0]:
preprocess(in_test_mode = False)

In [0]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*

Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License