In [None]:
from google.cloud import bigquery
from datetime import datetime
from googleapiclient import discovery

## Step 1: Create dataset 

In [None]:
project_id = ''
dataset_id = ''
table_name = 'train_{}'.format(datetime.utcnow().strftime('%Y%m%d%H%M%S'))
model_name = ''
model_version = ''
bucket_name = ''

In [None]:
query = """
    WITH dataset AS( SELECT 

          EXTRACT(HOUR FROM  trip_start_timestamp) trip_start_hour
        , EXTRACT(DAYOFWEEK FROM  trip_start_timestamp) trip_start_weekday
        , EXTRACT(WEEK FROM  trip_start_timestamp) trip_start_week
        , EXTRACT(DAYOFYEAR FROM  trip_start_timestamp) trip_start_yearday
        , EXTRACT(MONTH FROM  trip_start_timestamp) trip_start_month
        , (trip_miles * 1.60934 ) / ((trip_seconds + .01) / (60 * 60)) trip_speed_kmph
        , trip_miles
        , pickup_latitude
        , pickup_longitude
        , dropoff_latitude
        , dropoff_longitude
        , pickup_community_area
        , dropoff_community_area
        , ST_DISTANCE(
          (ST_GEOGPOINT(pickup_longitude,pickup_latitude)),
          (ST_GEOGPOINT(dropoff_longitude,dropoff_latitude))) air_distance
        , CAST (trip_seconds AS FLOAT64) trip_seconds
    FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` 
        WHERE RAND() < (3000000/112860054) --sample maximum ~3M records 
                AND  trip_start_timestamp < '2016-01-01'
                AND pickup_location IS NOT NULL
                AND dropoff_location IS NOT NULL)
    SELECT 
         trip_seconds
        , air_distance
        , pickup_latitude
        , pickup_longitude
        , dropoff_latitude
        , dropoff_longitude
        , pickup_community_area
        , dropoff_community_area
        , trip_start_hour
        , trip_start_weekday
        , trip_start_week
        , trip_start_yearday
        , trip_start_month
    FROM dataset
    WHERE trip_speed_kmph BETWEEN 5 AND 90
"""

In [None]:
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset(dataset_id).table(table_name)
job_config.destination = table_ref
sql = query

# Start the query, passing in the extra configuration.
query_job = client.query(
    sql,
    location='US',
    job_config=job_config)  

query_job.result()
print('Query results loaded to table {}'.format(table_ref.path))

## Export data to GCS 

In [None]:
destination_uri = "gs://{}/{}".format(bucket_name, "data/{}.csv".format(table_name))
job_config = bigquery.ExtractJobConfig(print_header=False)
extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location="US",
job_config=job_config) 
extract_job.result()  

print(
    "Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri)
)

## Submit training 

In [None]:
data_dir = "gs://doit-chicago-taxi/data/{}.csv".format(table_name)
job_dir = "gs://doit-chicago-taxi/models/{}".format(model_version)

In [None]:
training_inputs = {
        "scaleTier": "CUSTOM",
        "masterType": "standard_gpu",
        "args": [
            "--preprocess",
            "--training_data_path={}".format(data_dir),
            "--validation_split=0.2",
            "--model_type=regression",
            "--hidden_units=120,60,60",
            "--batch_size=128",
            "--eval_frequency_secs=128",
            "--optimizer_type=ftrl",
            "--use_wide",
            "--embed_categories",
            "--dnn_learning_rate=0.001",
            "--dnn_optimizer_type=ftrl"
        ],
        "hyperparameters": {
            "goal": "MINIMIZE",
            "params": [
                {
                    "parameterName": "max_steps",
                    "minValue": 100,
                    "maxValue": 60000,
                    "type": "INTEGER",
                    "scaleType": "UNIT_LINEAR_SCALE"
                },
                {
                    "parameterName": "learning_rate",
                    "minValue": 0.0001,
                    "maxValue": 0.5,
                    "type": "DOUBLE",
                    "scaleType": "UNIT_LINEAR_SCALE"
                },
                {
                    "parameterName": "l1_regularization_strength",
                    "maxValue": 1,
                    "type": "DOUBLE",
                    "scaleType": "UNIT_LINEAR_SCALE"
                },
                {
                    "parameterName": "l2_regularization_strength",
                    "maxValue": 1,
                    "type": "DOUBLE",
                    "scaleType": "UNIT_LINEAR_SCALE"
                },
                {
                    "parameterName": "l2_shrinkage_regularization_strength",
                    "maxValue": 1,
                    "type": "DOUBLE",
                    "scaleType": "UNIT_LINEAR_SCALE"
                }
            ],
            "maxTrials": 5,
            "maxParallelTrials": 2,
            "hyperparameterMetricTag": "loss",
            "enableTrialEarlyStopping": True
        },
        "region": "us-central1",
        "jobDir": "{}".format(job_dir),
        "masterConfig": {
            "imageUri": "gcr.io/cloud-ml-algos/wide_deep_learner_gpu:latest"
        }
    }

In [None]:
job_name = 'chicago_travel_time_training_{}'.format(datetime.utcnow().strftime('%Y%m%d%H%M%S'))
project_name = 'projects/{}'.format(project_id)
job_spec = {'jobId': job_name, 'trainingInput': training_inputs}

In [None]:
cloudml = discovery.build('ml', 'v1')
response = cloudml.projects().jobs().create(body=job_spec,
              parent=project_name).execute()

## Create model and deploy version

In [None]:
# wait for job to complete
job_is_running = True
while job_is_running:
    job_results = cloudml.projects().jobs().get(name='{}/jobs/{}'.format(project_name, job_name)).execute()
    job_is_running = job_results['state'] == 'RUNNING'
    
    print(str(datetime.utcnow()), 
        ': Completed {} training trials'.format(job_results['trainingOutput']['completedTrialCount']),
         ' Waiting for 5 minutes')
    time.sleep(5*60)

### Create the model 

In [None]:
operations = cloudml.projects()
models = operations.models()
create_spec = {'name': model_name}

In [None]:
models.create(body=create_spec,
    parent=project_name).execute()

### Deploy the version 

In [None]:
training_outputs = job_results['trainingOutput']
version_spec = {
  "name": model_version,
  "isDefault": False,
  "runtimeVersion": training_outputs['builtInAlgorithmOutput']['runtimeVersion'],
  "deploymentUri": training_outputs['trials'][0]['builtInAlgorithmOutput']['modelPath'] ,
  "framework": training_outputs['builtInAlgorithmOutput']['framework'],
  "pythonVersion": training_outputs['builtInAlgorithmOutput']['pythonVersion'],
  "autoScaling": {  
    'minNodes': 0
  }
}

In [None]:
versions = models.versions()

In [None]:
versions.create(body=version_spec,
              parent='{}/models/{}'.format(project_name, model_name)).execute()

## Validate 

In [None]:
df_val = pd.read_csv('gs://{BUCKET}/models/v3/processed_data/validation.csv')

In [None]:
instances = [", ".join(x) for x in df_val.iloc[:10, 1:].astype(str).values.tolist()]
service = discovery.build('ml', 'v1')
name = 'projects/{}/models/{}'.format(project_id, model_name)

if model_version is not None:
    name += '/versions/{}'.format(model_version)

In [None]:
response = service.projects().predict(
    name=name,
    body={'instances': instances}
).execute()

if 'error' in response:
    raise RuntimeError(response['error'])

response['predictions']