# LAB 3. Data preprocesing

This is data pre processing.
It pulls training data from bigquery and convert it into CSV file by using apache beam python.
It also seperate the data into training and evaluation data

In [1]:
# change these to try this notebook out
BUCKET = 'terrycho-training-demos-ml'
PROJECT = 'terrycho-ml'
REGION = 'us-central1'

In [2]:
import apache_beam as beam
import datetime
import os

def to_csv(rowdict):
    # pull columns from BQ and create a line
    import hashlib
    import copy
    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,mother_race,plurality,gestation_weeks,mother_married,cigarette_use,alcohol_use'.split(',')
    # modify opaque numeric race code into human-readable data
    races = dict(zip([1,2,3,4,5,6,7,18,28,39,48],
                     ['White', 'Black', 'American Indian', 'Chinese', 
                      'Japanese', 'Hawaiian', 'Filipino',
                      'Asian Indian', 'Korean', 'Samaon', 'Vietnamese']))
    result = copy.deepcopy(rowdict)
    if 'mother_race' in rowdict and rowdict['mother_race'] in races:
      result['mother_race'] = races[rowdict['mother_race']]
    else:
      result['mother_race'] = 'Unknown'
    
    data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
    key = hashlib.sha224(data).hexdigest()  # hash the columns to form a key
    return str('{},{}'.format(data, key))
  
def preprocess(in_test_mode):
  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    
  if in_test_mode:
    print 'Launching local job ... hang on'
    OUTPUT_DIR = './preproc'
  else:
    print 'Launching Dataflow job {} ... hang on'.format(job_name)
    OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
    
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options=opts)
  query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  mother_race,
  plurality,
  gestation_weeks,
  mother_married,
  ever_born,
  cigarette_use,
  alcohol_use,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """
  
  if in_test_mode:
    query = query + ' LIMIT 100' 
  
  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE MOD(hashmonth,4) < 3'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE MOD(hashmonth,4) = 3'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
     | '{}_csv'.format(step) >> beam.Map(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )
 
  job = p.run()
  
preprocess(in_test_mode=True)

Launching local job ... hang on


