# "Hallelujah Effect" Analysis

In [None]:
%bash
pip uninstall -y google-cloud-dataflow
pip install --upgrade --force tensorflow_transform==0.6.0 apache-beam[gcp]

<b>Restart the kernel</b> after you do a pip install (click on the <b>Reset</b> button in Datalab)

In [1]:
%bash
pip freeze | grep -e 'flow\|beam'

apache-airflow==1.9.0
apache-beam==2.4.0
tensorflow==1.8.0
tensorflow-transform==0.6.0


In [1]:
import tensorflow as tf
import tensorflow_transform as tft
import shutil
print(tf.__version__)

1.8.0


  from ._conv import register_converters as _register_converters


In [2]:
# Set bucket, project, and region
BUCKET = 'eim-muse'
PROJECT = 'eim-muse'
REGION = 'us-central1'

In [3]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [4]:
%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


## Retrieve and Subset Datasource

Get data from BigQuery but defer filtering, etc. to Beam. Data in BigQuery has been pre-processed with Dataprep.

In [5]:
import google.datalab.bigquery as bq
def create_query(phase, EVERY_N):
  """
  phase: 1=train 2=valid
  """
  base_query = """
SELECT *
FROM
  `eim-muse.hallelujah_effect.full_hallelujah_trials_cleaned`
  """

  if EVERY_N == None:
    if phase < 2:
      # Training
      query = "{0} WHERE MOD(FARM_FINGERPRINT(id), 10) < 7".format(base_query)
    else:
      # Validation
      query = "{0} WHERE MOD(FARM_FINGERPRINT(id), 10) >= 8".format(base_query)
  else:
      query = "{0} WHERE MOD(FARM_FINGERPRINT(id), {1}) = {2}".format(base_query, EVERY_N, phase)
    
  return query

query = create_query(2, None)

In [7]:
df_valid = bq.Query(query).execute().result().to_dataframe()
df_valid.head()
df_valid.describe()

Unnamed: 0,age,concentration,musical_expertise,artistic,fault,imagination,lazy,nervous,outgoing,reserved,...,music_pref_none,music_pref_hiphop,music_pref_dance,music_pref_world,music_pref_rock,music_pref_pop,music_pref_classical,music_pref_jazz,music_pref_folk,music_pref_traditional_irish
count,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0,...,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0,43.0
mean,23.465116,4.043465,2.473455,2.518737,3.05049,3.752754,3.602112,3.626683,3.172583,3.157408,...,0.023256,0.069767,0.209302,0.093023,0.534884,0.790698,0.255814,0.209302,0.046512,0.046512
std,11.033173,0.754667,1.088144,0.916852,0.953747,0.897524,1.034324,0.793569,1.085615,1.025196,...,0.152499,0.25777,0.411625,0.293903,0.504685,0.411625,0.441481,0.411625,0.213083,0.213083
min,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,16.0,3.991266,2.0,2.0,2.5,3.412281,3.659389,3.0,3.0,3.0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
50%,21.0,4.0,2.52988,2.353712,3.144737,3.824561,3.659389,3.596491,3.22807,3.117904,...,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0
75%,30.0,4.0,3.0,3.0,4.0,4.0,4.0,4.0,4.0,4.0,...,0.0,0.0,0.0,0.0,1.0,1.0,0.5,0.0,0.0,0.0
max,45.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


## Create ML dataset using tf.transform and Dataflow

Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.

In [8]:
%writefile requirements.txt
tensorflow-transform==0.6.0

Overwriting requirements.txt


In [32]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl

def is_valid(inputs):
  try:
    return True
  except:
    return False
  
def preprocess_tft(inputs):
      import datetime
      result = {}
      result['age'] = tft.scale_to_0_1(inputs['age'])      
      result['activity'] = tft.scale_to_0_1(inputs['activity'])
      result['hallelujah_reaction'] = tf.cast(inputs['hallelujah_reaction'], tf.int64)
#       result['concentration'] = tft.scale_to_0_1(inputs['concentration'])
      result['hearing_impairments'] = tf.cast(inputs['hearing_impairments'], tf.int64)
      result['nationality'] = tf.identity(inputs['nationality'])
      result['engagement'] = tft.scale_to_0_1(inputs['engagement'])
      result['familiarity'] = tft.scale_to_0_1(inputs['familiarity'])
      result['like_dislike'] = tft.scale_to_0_1(inputs['like_dislike'])
      result['positivity'] = tft.scale_to_0_1(inputs['positivity'])
      result['tension'] = tft.scale_to_0_1(inputs['tension'])
      result['sex'] = tf.identity(inputs['sex'])
      result['location'] = tf.identity(inputs['location'])
      result['language'] = tf.identity(inputs['language'])
      return result

def preprocess(in_test_mode, EVERY_N=None):
  import os
  import os.path
  import tempfile
  from apache_beam.io import tfrecordio
  from tensorflow_transform.coders import example_proto_coder
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.beam import tft_beam_io
  from tensorflow_transform.beam.tft_beam_io import transform_fn_io

  job_name = 'hallelujah-effect-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
  if in_test_mode:
    import shutil
    print 'Launching local job ... hang on'
    OUTPUT_DIR = './preproc_tft'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    
  else:
    print 'Launching Dataflow job {} ... hang on'.format(job_name)
    OUTPUT_DIR = 'gs://{0}/analysis/hallelujah-effect/preproc_tft/'.format(BUCKET)
    import subprocess
    subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
  
  # Configure Beam pipeline options
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'max_num_workers': 24,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True,
    'requirements_file': 'requirements.txt'
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  # Setup metadata
  raw_data_schema = {
    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'nationality,sex,location,language'.split(',')
  }
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'age,activity,engagement,familiarity,like_dislike,positivity,tension'.split(',')
    })
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.bool, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'hallelujah_reaction,hearing_impairments'.split(',')
    })
  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
  
  # run Beam  
  with beam.Pipeline(RUNNER, options=opts) as p:
    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
      
      # Write the raw data metadata to disk
      # Without the overloaded operators: raw_data_metadata.apply(tft_beam_io.WriteMetadata(os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'), pipeline=p, label='WriteInputMetadata')
      _ = (raw_data_metadata
        | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
            os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
            pipeline=p))
           
      # Analyze and transform training data
      this_query = create_query(1, EVERY_N)
      
      print('Query:')
      print(this_query)
      
      def debug_print(p_collection_as_list):
        print(p_collection_as_list)
      
      # Read in training data from BigQuery table
      raw_data = (p
        # Get raw training data from BigQuery
        | 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=this_query, use_standard_sql=True))
        # Use our is_valid function to only retain valid examples from training data
        | 'train_filter' >> beam.Filter(is_valid))

      # Package raw training data and its metadata into a 'dataset'
      raw_dataset = (raw_data, raw_data_metadata)
      
      # Using the preprocessing function `preprocess_tft`, preprocess the training data
      # and produce a transformed training dataset and a function to transform other data later
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
      
      # Break out the transformed training data and its metadata
      transformed_data, transformed_metadata = transformed_dataset
      
      # Write the transformed training data to files
      _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'train'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      
      # Read in test data from BigQuery table and filter as we did with training data
      raw_test_data = (p 
        | 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(2, EVERY_N), use_standard_sql=True))
        | 'eval_filter' >> beam.Filter(is_valid))
      
      # Package test data and metadata into a dataset
      raw_test_dataset = (raw_test_data, raw_data_metadata)
      
      # Using the same transformation function that was calculated above, transform the test dataset
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      
      # Write the transformed test data to files
      transformed_test_data, _ = transformed_test_dataset
      _ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'eval'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      
      # Write the transformation function to a file, as well
      _ = (transform_fn
           | 'WriteTransformFn' >>
           transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))

# Preprocess the training/test data
preprocess(in_test_mode=True, EVERY_N=None)

Launching local job ... hang on
Query:

SELECT *
FROM
  `eim-muse.hallelujah_effect.full_hallelujah_trials_cleaned`
   WHERE MOD(FARM_FINGERPRINT(id), 10) < 7
INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/d70a639cc65f426193b0ba3baf25fd02/saved_model.pb


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/d70a639cc65f426193b0ba3baf25fd02/saved_model.pb


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/66bfb6d19f5042a98f6d569f7864bff2/saved_model.pb


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/66bfb6d19f5042a98f6d569f7864bff2/saved_model.pb


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/9ce85aa98a61418bba52571aee90b7b1/saved_model.pb


INFO:tensorflow:SavedModel written to: ./preproc_tft/tmp/tftransform_tmp/9ce85aa98a61418bba52571aee90b7b1/saved_model.pb


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore
  chunks = self.iterencode(o, _one_shot=True)


In [None]:
%bash
# ls -l preproc_tft
# ls preproc_tft/metadata
gsutil ls -l gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/
gsutil ls gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata

<h2> Train off preprocessed data </h2>

In [None]:
MODEL_NAME = 'feature_engineering'
os.environ['MODEL_NAME'] = MODEL_NAME

In [None]:
%bash
rm -rf ${PWD}/models/${MODEL_NAME}
export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft
python -m trainer.task \
   --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
   --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*"  \
   --train_batch_size=128 \
   --output_dir=${PWD}/models/${MODEL_NAME} \
   --train_steps=50000 --eval_steps=1 --job-dir=/tmp \
   --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata
   --hidden_units="16 16 16"

In [None]:
from google.datalab.ml import TensorBoard
TensorBoard().start('gs://eim-muse/analysis/hallelujah-effect/models')

In [None]:
TensorBoard.stop(20767)

In [None]:
%bash
rm -rf ${PWD}/models/local-ml
gcloud ml-engine local train \
   --module-name=trainer.task \
   --package-path=${PWD}/taxifare_tft/trainer \
   --job-dir=${PWD}/models/local-ml \
   -- \
   --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
   --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*" \
   --train_steps=1000 \
   --train_batch_size=10 \
   --eval_steps=100 \
   --output_dir=${PWD}/models/local-ml \
   --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata/

# %%bash
# OUTDIR=gs://${BUCKET}/analysis/hallelujah-effect/models/hallelujah-effect_trained
# JOBNAME=hallelujah_effect$(date -u +%y%m%d_%H%M%S)
# echo $OUTDIR $REGION $JOBNAME
# gsutil -m rm -rf $OUTDIR
# gcloud ml-engine jobs submit training $JOBNAME \
#    --region=$REGION \
#    --package-path=${PWD}/taxifare/trainer \
#    --module-name=trainer.task \
#    --job-dir=$OUTDIR \
#    --scale-tier=STANDARD_1 \
#    --runtime-version=1.4 \
#    -- \
#    --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
#    --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*" \
#    --output_dir=$OUTDIR \
#    --train_steps=1000
#    --train_batch_size=10 \
#    --eval_steps=100
#    --config=hyperparam.yaml \

# --staging-bucket=gs://eim-muse-staging \

# export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft
# python -m trainer.task \
#    --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
#    --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*"  \
#    --train_batch_size=10 \
#    --output_dir="gs://${BUCKET}/analysis/hallelujah-effect/models/hallelujah-effect_trained" \
#    --train_steps=5000 --eval_steps=1 --job-dir=/tmp \
#    --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata

In [None]:
%bash
# rm -rf ${PWD}/models/local-ml
# gcloud ml-engine local train \
#    --module-name=trainer.task \
#    --package-path=${PWD}/taxifare_tft/trainer \
#    --job-dir=${PWD}/models/local-ml \
#    -- \
#    --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
#    --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*" \
#    --train_steps=1000 \
#    --train_batch_size=10 \
#    --eval_steps=100 \
#    --output_dir=${PWD}/models/local-ml \
#    --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata/

OUTDIR=gs://${BUCKET}/analysis/hallelujah-effect/models/hallelujah-effect_trained
JOBNAME=hallelujah_effect$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --package-path=${PWD}/taxifare_tft/trainer \
   --module-name=trainer.task \
   --job-dir=$OUTDIR \
   --scale-tier=STANDARD_1 \
   --runtime-version=1.4 \
   -- \
   --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
   --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*" \
   --output_dir=$OUTDIR \
   --train_steps=50000 \
   --train_batch_size=64 \
   --eval_steps=1 \
   --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata/

#    --config=hyperparam.yaml
# --staging-bucket=gs://eim-muse-staging \

# export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft
# python -m trainer.task \
#    --train_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/train*" \
#    --eval_data_paths="gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/eval*"  \
#    --train_batch_size=10 \
#    --output_dir="gs://${BUCKET}/analysis/hallelujah-effect/models/hallelujah-effect_trained" \
#    --train_steps=5000 --eval_steps=1 --job-dir=/tmp \
#    --metadata_path=gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/metadata

In [None]:
TensorBoard().start('./models/local-ml')

In [None]:
%bash
gsutil ls gs://eim-muse/analysis/hallelujah-effect/preproc_tft/metadata

In [None]:
%bash
gsutil ls -l gs://${BUCKET}/analysis/hallelujah-effect/preproc_tft/

In [None]:
%writefile /tmp/test.json
{"age":"29.0","activity":3.0}

In [None]:
%bash
model_dir=$(ls $PWD/hallelujah-effect_trained/export/exporter/)
gcloud ml-engine local predict \
    --model-dir=./hallelujah-effect_trained/export/exporter/${model_dir} \
    --json-instances=/tmp/test.json

# To Do

- LASSO to identify important features
- Hyperparameter search
- More plots and statistics from the dataset with which I'm working here
- Bring in rows with missing values
- Feature engineering (physiological signals, MIR, feature crosses, variable-width binning)
- Include signals with good quality only in reaction range
- Customize estimator to add additional metrics

In [None]:
%bash
cat ${PWD}/taxifare_tft/trainer/setup.py