# Recommendations on GCP with TensorFlow and WALS with Cloud   Composer 

In [None]:
%%bash
pip install sh --upgrade pip # needed to execute shell scripts later

In [None]:
import os
PROJECT = 'PROJECT' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-central1' 

# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = 'recserve_' + PROJECT
os.environ['REGION'] = REGION

In [None]:
# setting the env variable on GCP
%%bash

gcloud config set project $PROJECT
gcloud config set compute/region $REGION

In [None]:

%%bash

# create GCS bucket with recserve_PROJECT_NAME if not exists
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)
if [ -n "$exists" ]; then
   echo "Not creating recserve_bucket since it already exists."
else
   echo "Creating recserve_bucket"
   gsutil mb -l ${REGION} gs://${BUCKET}
fi

In [None]:
# run this code only when we have not created the ML Engine
# for creating the ML engine
%%bash
run app engine creation commands
gcloud app create --region ${REGION} # see: https://cloud.google.com/compute/docs/regions-zones/
gcloud app update --no-split-health-checks

In [None]:
# Copy sample data files into our bucket
%%bash

gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/ga_sessions_sample.json.gz gs://${BUCKET}/data/ga_sessions_sample.json.gz
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv data/recommendation_events.csv
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv gs://${BUCKET}/data/recommendation_events.csv


 Create empty BigQuery dataset and load sample JSON data

In [None]:
%%bash

# create BigQuery dataset if it doesn't already exist
exists=$(bq ls -d | grep -w GA360_test)
if [ -n "$exists" ]; then
   echo "Not creating GA360_test since it already exists."
else
   echo "Creating GA360_test dataset."
   bq --project_id=${PROJECT} mk GA360_test 
fi

# create the schema and load our sample Google Analytics session data
bq load --source_format=NEWLINE_DELIMITED_JSON \
 GA360_test.ga_sessions_sample \
 gs://${BUCKET}/data/ga_sessions_sample.json.gz \
 data/ga_sessions_sample_schema.json # can't load schema files from GCS
 
# this can take upto 10 minutes... 

### Install WALS model training package and model data
1. Create a distributable package. Copy the package up to the code folder in the bucket you created previously

In [None]:

%%bash

cd wals_ml_engine

echo "creating distributable package"
python setup.py sdist

echo "copying ML package to bucket"
gsutil cp dist/wals_ml_engine-0.1.tar.gz gs://${BUCKET}/code/

2. Run the WALS model on the sample data set:

In [None]:

%%bash

# view the ML train local script before running
cat wals_ml_engine/mltrain.sh

In [None]:
%%bash

cd WALS_ML_Engine

# train locally with unoptimized hyperparams
./mltrain.sh local ../data/recommendation_events.csv --data-type web_views --use-optimized

In [None]:
# View the locally trained model directory
ls wals_ml_engine/jobs

3. Copy the model files from this directory to the model folder in the project bucket:

In [None]:
%%bash
export JOB_MODEL=$(find wals_ml_engine/jobs -name "model" | tail -1)
gsutil cp ${JOB_MODEL}/* gs://${BUCKET}/model/
  
echo "Recommendation model file numpy arrays in bucket:"  
gsutil ls gs://${BUCKET}/model/

### Install the recserve endpoint


1. Prepare the deploy template for the Cloud Endpoint API:

In [None]:
%%bash
cd scripts
cat prepare_deploy_api.sh

In [None]:
%%bash
printf "\nCopy and run the deploy script generated below:\n"
cd scripts
./prepare_deploy_api.sh   


2. Run the endpoints deploy command output above:

In [None]:

%%bash
gcloud endpoints services deploy [REPLACE_WITH_TEMP_FILE_NAME.yaml]

In [None]:

%%bash
# view the app deployment script
cat scripts/prepare_deploy_app.sh

In [None]:

%%bash
# prepare to deploy 
cd scripts

./prepare_deploy_app.sh


4. Run the command above:

In [None]:
%%bash
gcloud -q app deploy app/app_template.yaml_deploy.yaml

### Query the API for Article Recommendations

In [None]:

%%bash
cd scripts
./query_api.sh          # Query the API.
#./generate_traffic.sh   # Send traffic to the API.

In [None]:

AIRFLOW_BUCKET = 'us-central1-composer-21587538-bucket' # REPLACE WITH AIRFLOW BUCKET NAME
os.environ['AIRFLOW_BUCKET'] = AIRFLOW_BUCKET

In [None]:
%%writefile airflow/dags/training.py

import airflow
from airflow import DAG

# Reference for all available airflow operators: 
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.hooks.base_hook import BaseHook
# from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator
# above mlengine_operator currently doesnt support custom MasterType so we import our own plugins:

# custom plugins
from airflow.operators.app_engine_admin_plugin import AppEngineVersionOperator
from airflow.operators.ml_engine_plugin import MLEngineTrainingOperator


import datetime

def _get_project_id():
  """Get project ID from default GCP connection."""

  extras = BaseHook.get_connection('google_cloud_default').extra_dejson
  key = 'extra__google_cloud_platform__project'
  if key in extras:
    project_id = extras[key]
  else:
    raise ('Must configure project_id in google_cloud_default '
           'connection from Airflow Console')
  return project_id

PROJECT_ID = _get_project_id()


DATASET = 'GA360_test'
TABLE_NAME = 'ga_sessions_sample'
ARTICLE_CUSTOM_DIMENSION = '10'

BUCKET = 'gs://recserve_' + PROJECT_ID
REGION = 'us-east1'

PACKAGE_URI = BUCKET + '/code/wals_ml_engine-0.1.tar.gz'
JOB_DIR = BUCKET + '/jobs'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': datetime.timedelta(minutes=5)
}



schedule_interval = '00 21 * * *'

dag = DAG('recommendations_training_v1', 
          default_args=default_args,
          schedule_interval=schedule_interval)

dag.doc_md = __doc__


bql='''
#legacySql
SELECT
 fullVisitorId as clientId,
 ArticleID as contentId,
 (nextTime - hits.time) as timeOnPage,
FROM(
  SELECT
    fullVisitorId,
    hits.time,
    MAX(IF(hits.customDimensions.index={0},
           hits.customDimensions.value,NULL)) WITHIN hits AS ArticleID,
    LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId, visitNumber
                             ORDER BY hits.time ASC) as nextTime
  FROM [{1}.{2}.{3}]
  WHERE hits.type = "PAGE"
) HAVING timeOnPage is not null and contentId is not null;
'''

bql = bql.format(ARTICLE_CUSTOM_DIMENSION, PROJECT_ID, DATASET, TABLE_NAME)

t1 = BigQueryOperator(
    task_id='bq_rec_training_data',
    bql=bql,
    destination_dataset_table='%s.recommendation_events' % DATASET,
    write_disposition='WRITE_TRUNCATE', # specify to truncate on writes
    dag=dag)


training_file = BUCKET + '/data/recommendation_events.csv'
t2 = BigQueryToCloudStorageOperator(
    task_id='bq_export_op',
    source_project_dataset_table='%s.recommendation_events' % DATASET,
    destination_cloud_storage_uris=[training_file],
    export_format='CSV',
    dag=dag
)



job_id = 'recserve_{0}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
output_dir = BUCKET
training_args = ['--job-dir', job_dir,
                 '--train-files', training_file,
                 '--output-dir', output_dir,
                 '--data-type', 'web_views',
                 '--use-optimized']


t3 = MLEngineTrainingOperator(
    task_id='ml_engine_training_op',
    project_id=PROJECT_ID,
    job_id=job_id,
    package_uris=[PACKAGE_URI],
    training_python_module='trainer.task',
    training_args=training_args,
    region=REGION,
    scale_tier='CUSTOM',
    master_type='complex_model_m_gpu',
    dag=dag
)

# App Engine deploy new version

t4 = AppEngineVersionOperator(
    task_id='app_engine_deploy_version',
    project_id=PROJECT_ID,
    service_id='default',
    region=REGION,
    service_spec=None,
    dag=dag
)

t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)

In [None]:
%%bash
gsutil cp airflow/dags/training.py gs://${AIRFLOW_BUCKET}/dags # overwrite if it exists
gsutil cp -r airflow/plugins gs://${AIRFLOW_BUCKET} # copy custom plugins

In [1]:
# now we can see in the Airflow dag and ensure our DAG runs successfully