# 3_prepare_data_babyweight

**Learning Objectives**

1. Setup up the environment
1. Preprocess natality dataset
1. Augment natality dataset
1. Create the train and eval tables in BigQuery
1. Export data from BigQuery to GCS in CSV format


## Introduction 
In this notebook, we will prepare the babyweight dataset for model development and training to predict the weight of a baby before it is born.  We will use BigQuery to perform data augmentation and preprocessing which will be used for AutoML Tables, BigQuery ML, and Keras models trained on Cloud AI Platform.

In this lab, we will set up the environment, create the project dataset, preprocess and augment natality dataset, create the train and eval tables in BigQuery, and export data from BigQuery to GCS in CSV format.



## Set up environment variables and load necessary libraries

Import necessary libraries.

In [1]:
import os
from google.cloud import bigquery

In [2]:
PROJECT = "predict-babyweight-10142021"
BUCKET = PROJECT
REGION = "us-central1"

os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET 
os.environ["REGION"] = REGION

## Create a BigQuery Dataset

A BigQuery dataset is a container for tables, views, and models built with BigQuery ML. Let's create one called __babyweight__.

In [3]:
%%bash

# Create a BigQuery dataset for babyweight if it doesn't exist
datasetexists=$(bq ls -d | grep -w babyweight)

if [ -n "$datasetexists" ]; then
    echo -e "BigQuery dataset already exists, let's not recreate it."

else
    echo "Creating BigQuery dataset titled: babyweight"
    
    bq --location=US mk --dataset \
        --description "Babyweight" \
        $PROJECT:babyweight
    echo "Here are the current datasets:"
    bq ls
fi

BigQuery dataset already exists, let's not recreate it.


## Create the training and evaluation data tables

First we are going to create a subset of the data limiting our columns to `weight_pounds`, `is_male`, `mother_age`, `plurality`, and `gestation_weeks` as well as some simple filtering and a column to hash on for repeatable splitting.

* Note:  The dataset in the create table code below is the one created previously, e.g. "babyweight".

### Preprocess and filter dataset

We have some preprocessing and filtering we would like to do to get our data in the right format for training.

Preprocessing:
* Cast `is_male` from `BOOL` to `STRING`
* Cast `plurality` from `INTEGER` to `STRING` where `[1, 2, 3, 4, 5]` becomes `["Single(1)", "Twins(2)", "Triplets(3)", "Quadruplets(4)", "Quintuplets(5)"]`
* Cast `cigarette_use`from `BOOL` to `STRING` where `NULL` becomes `Unknown`
* Cast `alcohol_use`from `BOOL` to `STRING` where `NULL` becomes `Unknown`
* Add `hashcolumn` hashing on `year`, `month`,`COALESCE(wday, day, 0)`,`IFNULL(state, "Unknown")`, and `IFNULL(mother_birth_state, "Unknown")`

Filtering:
* Only want data for years later than `2003`
* Only want baby weights greater than `0`
* Only want mothers whose age is greater than `0`
* Only want plurality to be greater than `0`
* Only want the number of weeks of gestation to be greater than `0`

In [4]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_2003 AS
SELECT
    weight_pounds,
    CAST(is_male AS STRING) AS is_male,
    mother_age,
    CASE
        WHEN plurality = 1 THEN "Single(1)"
        WHEN plurality = 2 THEN "Twins(2)"
        WHEN plurality = 3 THEN "Triplets(3)"
        WHEN plurality = 4 THEN "Quadruplets(4)"
        WHEN plurality = 5 THEN "Quintuplets(5)"
    END AS plurality,
    gestation_weeks,
    IFNULL(CAST(cigarette_use AS STRING), "Unknown") AS cigarette_use,
    IFNULL(CAST(alcohol_use AS STRING), "Unknown") AS alcohol_use,
    ABS(FARM_FINGERPRINT(
        CONCAT(
            CAST(year AS STRING),
            CAST(month AS STRING),
            CAST(COALESCE(wday, day, 0)  AS STRING),
            CAST(IFNULL(state, "Unknown") AS STRING),
            CAST(IFNULL(mother_birth_state, "Unknown")  AS STRING)
        )
    )) AS hash_values
FROM
    publicdata.samples.natality
WHERE
    year > 2002
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0

### Augment dataset to simulate missing data

Now we want to augment our dataset with our simulated babyweight data by setting all gender information to `Unknown` and setting plurality of all non-single births to `Multiple(2+)`.

In [6]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_2003_augmented AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
    hash_values
FROM
    babyweight.babyweight_2003
UNION ALL
SELECT
    weight_pounds,
    "Unknown" AS is_male,
    mother_age,
    CASE
        WHEN plurality = "Single(1)" THEN plurality
        ELSE "Multiple(2+)"
    END AS plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
    hash_values
FROM
    babyweight.babyweight_2003

### Split augmented dataset into train and eval sets

Using ` hash_values`, apply a modulo to get approximately a 80/15/5 train/eval/test split.

#### Split augmented dataset into train dataset

In [7]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_2003_train AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
FROM
    babyweight.babyweight_2003_augmented
WHERE
    MOD(hash_values, 100) < 80

#### Split augmented dataset into eval dataset

In [9]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_2003_eval AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
FROM
    babyweight.babyweight_2003_augmented
WHERE
    MOD(hash_values, 100) >= 80
    AND MOD(hash_values, 100) < 95
    

#### Split augmented dataset into test dataset

In [10]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_2003_test AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
FROM
    babyweight.babyweight_2003_augmented
WHERE
    MOD(hash_values, 100) >= 95

## Verify table creation

Verify that you created the dataset and training data table.

In [11]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_2003_train
LIMIT 0

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use


In [12]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_2003_eval
LIMIT 0

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use


## Export from BigQuery to CSVs in GCS

Use BigQuery Python API to export our train, eval, and test tables to Google Cloud Storage in the CSV format to be used later for TensorFlow/Keras training. 

We'll want to use the dataset we've been using above as well as repeat the process for both training, evaluation, and testing data.

In [14]:
# Construct a BigQuery client object.
client = bigquery.Client()

dataset_name = "babyweight"

# Create dataset reference object
dataset_ref = client.dataset(
    dataset_id=dataset_name, project=client.project)

# Export both train and eval tables
for step in ["train", "eval", "test"]:
    destination_uri = os.path.join(
        "gs://", BUCKET, dataset_name, "data", f"{step}*.csv")
    table_name = f"babyweight_2003_{step}"
    table_ref = dataset_ref.table(table_name)
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        location="US", # Location must match that of the source table.
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print(f"Exported {client.project}:{dataset_name}.{table_name} \n to {destination_uri}")

Exported predict-babyweight-10142021:babyweight.babyweight_2003_train 
 to gs://predict-babyweight-10142021/babyweight/data/train*.csv
Exported predict-babyweight-10142021:babyweight.babyweight_2003_eval 
 to gs://predict-babyweight-10142021/babyweight/data/eval*.csv
Exported predict-babyweight-10142021:babyweight.babyweight_2003_test 
 to gs://predict-babyweight-10142021/babyweight/data/test*.csv


## Verify CSV creation

Verify that we correctly created the CSV files in our bucket.

In [15]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/data/*.csv

gs://predict-babyweight-10142021/babyweight/data/eval000000000000.csv
gs://predict-babyweight-10142021/babyweight/data/test000000000000.csv
gs://predict-babyweight-10142021/babyweight/data/train000000000000.csv
gs://predict-babyweight-10142021/babyweight/data/train000000000001.csv
gs://predict-babyweight-10142021/babyweight/data/train000000000002.csv
gs://predict-babyweight-10142021/babyweight/data/train000000000003.csv
gs://predict-babyweight-10142021/babyweight/data/train000000000004.csv


In [16]:
%%bash
gsutil cat gs://predict-babyweight-10142021/babyweight/data/test000000000000.csv | head -5

weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use
1.43741394824,false,15,Single(1),22,false,false
2.12525620568,false,42,Single(1),30,Unknown,Unknown
2.18698563904,Unknown,42,Single(1),31,false,false
6.3382900325,Unknown,43,Multiple(2+),45,false,false


In [17]:
%%bash
gsutil cat gs://predict-babyweight-10142021/babyweight/data/eval000000000000.csv| head -5

weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use
5.56226287026,true,15,Single(1),31,false,false
4.629707502,true,46,Twins(2),28,false,false
2.0502990366,true,46,Twins(2),26,Unknown,Unknown
1.4991433816,true,43,Single(1),18,Unknown,Unknown


In [None]:
%%bash
gsutil cat gs://predict-babyweight-10142021/babyweight/data/test000000000000.csv| head -5

## Summary: 
In this notebook, we setup our environment, created a BigQuery dataset, preprocessed and augmented the natality dataset, created train and eval tables in BigQuery, and exported data from BigQuery to GCS in CSV format.

# 3_create_ML_dataset_using_Dataflow

In this notebook, we'll use Cloud Dataflow to preprocess the entire dataset from BigQuery and create CSV files for the training/evaluation datasets for operationalizing the ML model later.

Some benefits of using Dataflow:
- Dataflow sets itself apart as a platform for data transformations because it is a serverless, fully managed offering from Google that allows you to execute Data Processing Pipelines at scale.
- Dataflow executes our code using the Apache Beam API. Apache Beam supports both batch and streaming processing using the same pipeline code.
- Dataflow changes the amount of compute resources, the number of servers that will run your pipeline elastically, all depending on the amount of data that your pipeline needs to process.


## Import necessary libraries & Set up environment variables 

In [None]:
!pip install --user google-cloud-bigquery==1.25.0

In [None]:
!pip install --user apache-beam[interactive]==2.24.0

In [1]:
import os
import pandas
import datetime
from google.cloud import bigquery
import apache_beam as beam
print(beam.__version__)
import tensorflow as tf
print(tf.__version__)

2.24.0
2.3.4


In [2]:
import hashlib
import copy
import shutil
import subprocess

In [36]:
PROJECT = "predict-babyweight-10142021"
BUCKET = PROJECT
REGION = "us-central1"

os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET 
os.environ["REGION"] = REGION

In [5]:
%%bash
if ! gsutil ls|grep -q gs://${BUCKET}/; then
    gsutil mb -l ${REGION} gs://${BUCKET}
fi

## Call BigQuery and examine in the dataframe

In [4]:
bq = bigquery.Client()

In [5]:
query_2003 = """
SELECT
    weight_pounds,
    CAST(is_male AS STRING) AS is_male,
    mother_age,
    plurality,
    gestation_weeks,
    IFNULL(CAST(cigarette_use AS STRING), "Unknown") AS cigarette_use,
    IFNULL(CAST(alcohol_use AS STRING), "Unknown") AS alcohol_use,
    year,
    month,
    COALESCE(wday, day, 0) AS date,
    IFNULL(state, "Unknown") AS state,
    IFNULL(mother_birth_state, "Unknown") AS mother_birth_state
FROM
    publicdata.samples.natality
WHERE
    year > 2002
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0
"""

query_2003_with_hash_vals=f"""
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    cigarette_use,
    alcohol_use,
    ABS(FARM_FINGERPRINT(
        CONCAT(
            CAST(year AS STRING),
            CAST(month AS STRING),
            CAST(date AS STRING),
            CAST(state AS STRING),
            CAST(mother_birth_state AS STRING)
        )
    )) AS hash_values
FROM
    ({query_2003})
"""


Let's find how many records from the `query_2003` call

In [6]:
query_2003_count = f"SELECT COUNT(*) FROM ({query_2003})"
df_count = bq.query(query_2003_count).to_dataframe()
num_records = df_count['f0_'][0]
print('*** Number of all records found:',num_records)

*** Number of all records found: 25037335


View the `query_2003_with_hash_vals` result

In [7]:
df_limit_100 = bq.query(query_2003_with_hash_vals + "LIMIT 100").to_dataframe()
df_limit_100.head()

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use,hash_values
0,7.264232,True,22,1,39,Unknown,Unknown,8045173873969881371
1,6.563162,True,38,2,39,Unknown,Unknown,16293285635216904
2,7.209116,False,18,1,41,false,false,4931362078050829102
3,6.316244,True,25,1,38,Unknown,Unknown,7429616553140235181
4,7.603743,False,23,1,39,Unknown,Unknown,1781791737502110095


## Create ML dataset using Dataflow
Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

For the preprocessing, we'll do:

- Modify plurality field to be a string where [1, 2, 3, 4, 5] becomes ["Single(1)", "Twins(2)", "Triplets(3)", "Quadruplets(4)", "Quintuplets(5)"]
- Augment our dataset with our three simulated babyweight data by:
    - setting all gender information to Unknown and setting plurality of all non-single births to Multiple(2+),
    - setting cigarette_use information to Unknown,
    - setting alcohol_use information to Unknown.
    
Fitst, let's define a function to create line(s) of CSV input from columns called by BigQuery

In [24]:
def to_csv(rowdict):
    import hashlib
    import copy
    
    CSV_COLUMNS = ["weight_pounds",
                   "is_male",
                   "mother_age",
                   "plurality",
                   "gestation_weeks",
                   "cigarette_use",
                   "alcohol_use"]

    # Modify plurality field
    rowdict_edited = copy.deepcopy(rowdict)
    rowdict_edited['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]
    
    # Clone data and mask certain columns to simulate lack of ultrasound
    no_ultrasound = copy.deepcopy(rowdict_edited)
    no_ultrasound['is_male'] = 'Unknown'
    no_ultrasound['plurality'] = 'Multiple(2+)' if rowdict['plurality'] > 1 else 'Single(1)'
    
    # Write out rows for each input row
    for result in [rowdict_edited, no_ultrasound]:
        data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
        #key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
        #yield str(f'{data},{key}')
        yield str(f'{data}')

Dataflow job will start with a selection BigQuery, converting it to CSV, and writing the output as CSV files.

In [28]:
def preprocess(in_test_mode=True):
    import shutil, os, subprocess
    
    HEADER = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use'
    job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

    if in_test_mode:
        print('Launching local job ... hang on')
        OUTPUT_DIR = './datasets_preprocessed_Dataflow_test_mode'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print(f'Launching Dataflow job {job_name} ... hang on')
        OUTPUT_DIR = f'gs://{BUCKET}/datasets_preprocessed_Dataflow/'
        try:
            subprocess.check_call(f'gsutil -m rm -r {OUTPUT_DIR}'.split())
        except:
            pass

    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'region': REGION,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
    }
    
    opts = beam.pipeline.PipelineOptions(flags = [], **options)
    if in_test_mode:
        RUNNER = 'DirectRunner'
    else:
        RUNNER = 'DataflowRunner'
        
    p = beam.Pipeline(RUNNER, options = opts)
    
    query = query_2003_with_hash_vals
    if in_test_mode:
        query = query + ' LIMIT 100' 
    
    for step in ['train', 'eval', 'test']:
        if step == 'train':
            selquery = f'SELECT * FROM ({query}) WHERE MOD(hash_values, 100) < 80'
        elif step == 'eval':
            selquery = f'SELECT * FROM ({query}) WHERE MOD(hash_values, 100) >= 80 AND MOD(hash_values, 100) < 90'
        else:
            selquery = f'SELECT * FROM ({query}) WHERE MOD(hash_values, 100) > 90'
        (p 
         | f'{step}_read' >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
         | f'{step}_csv' >> beam.FlatMap(to_csv)
         | f'{step}_out' >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, f'{step}.csv'),header=HEADER))
        )
    job = p.run()
    
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")

First, let's test the `preprocess` function locally to see if it works correctly

In [29]:
preprocess(in_test_mode = True)

Launching local job ... hang on




Done!


In [30]:
!head -5 datasets_preprocessed_Dataflow_test_mode/train*

weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use
6.8673994613,False,27,Single(1),35,Unknown,Unknown
6.8673994613,Unknown,27,Single(1),35,Unknown,Unknown
5.68572173698,False,27,Single(1),36,false,false
5.68572173698,Unknown,27,Single(1),36,false,false


Once everything has run correctly, let's execute the job in Cloud Dataflow. We can monitor the running job at the Dataflow section in the GCP web console.

In [31]:
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-babyweight-features-211023-045809 ... hang on




After ~13 minutes, the job has been done. At this point, we now have the training and evaluation datasets created at scale. The process is also fully automated. We can simply re-run the pipeline periodically to create a new training dataset on fresher data.

In [39]:
%%bash
gsutil ls gs://${BUCKET}/datasets_preprocessed_Dataflow/

gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00000-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00001-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00002-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00003-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00004-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/eval.csv-00005-of-00006
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/test.csv-00000-of-00003
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/test.csv-00001-of-00003
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/test.csv-00002-of-00003
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/train.csv-00000-of-00020
gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/train.csv-00001-of-00020
gs://predict-babyweight-101420

In [40]:
%%bash
gsutil cat gs://predict-babyweight-10142021/datasets_preprocessed_Dataflow/train.csv-00000-of-00020 |head -5

weight_pounds,is_male,mother_age,plurality,gestation_weeks,cigarette_use,alcohol_use
6.87621795178,False,18,Single(1),42,false,false
6.87621795178,Unknown,18,Single(1),42,false,false
6.81448851842,False,18,Single(1),42,Unknown,Unknown
6.81448851842,Unknown,18,Single(1),42,Unknown,Unknown


## Summary:

- Using Dataflow, we performed multiple preproseeing, modifying, and simulating data for the entire dataset and then produced CSV files for the training/evaluation datasets. 
- These files are storaged in the Cloud bucket and ready for the training a ML model at scale.

