[AIRFLOW-1272] Google Cloud ML Batch Prediction Operator#2390
[AIRFLOW-1272] Google Cloud ML Batch Prediction Operator#2390jiwang576 wants to merge 13 commits intoapache:masterfrom
Conversation
There should be an Operator for the Google Cloud ML Batch Prediction Job, which supports executing Tensorflow neural network prediction as a service. Documentation is here: https://cloud.google.com/ml-engine/docs/how-tos/batch-predict.
|
@jiwang576, thanks for your PR! By analyzing the history of the files in this pull request, we identified @artwr, @jlowin and @criccomini to be potential reviewers. |
Codecov Report
@@ Coverage Diff @@
## master #2390 +/- ##
=======================================
Coverage 69.27% 69.27%
=======================================
Files 146 146
Lines 11224 11224
=======================================
Hits 7775 7775
Misses 3449 3449Continue to review full report at Codecov.
|
| """CloudML job operations helper class.""" | ||
|
|
||
| def __init__(self, cloudml, project_name, job_id, job_spec=None): | ||
| assert project_name is not None and project_name is not '' |
There was a problem hiding this comment.
Should compared against '' with != since logically we are checking equality, not identity. I'm not even sure that '' is guaranteed to be interned.
There was a problem hiding this comment.
Thank you! The fix was committed.
Changed precondition assertions inside the gcp_cloudml_hook.py.
|
Please fix merge conflicts |
|
Please fix all comments that are:
to be |
There should be an Operator for the Google Cloud ML Batch Prediction Job, which supports executing Tensorflow neural network prediction as a service. Documentation is here: https://cloud.google.com/ml-engine/docs/how-tos/batch-predict.
airflow/utils/db.py
Outdated
| merge_conn( | ||
| models.Connection( | ||
| conn_id='bigquery_default', conn_type='bigquery')) | ||
| merge_conn( |
There was a problem hiding this comment.
I think this was already added as part of:
There was a problem hiding this comment.
Yes these are duplicate code when PR2379 was not merged into Airflow. I will get rid of those.
Thank you!
There was a problem hiding this comment.
Still needs to get cleaned up. Same declaration is here:
| conn_id='presto_default', conn_type='presto', | ||
| host='localhost', | ||
| schema='hive', port=3400)) | ||
| merge_conn( |
There was a problem hiding this comment.
Why two google_cloud_default entries?
|
|
||
| def create_job(self, project_name, job): | ||
| """ | ||
| Creates a CloudML Job, and returns the Job object, which can be waited |
There was a problem hiding this comment.
"which can be waited upon"
Based on the return line below cloudml_job.wait_for_done(10), it looks to me like the waiting is done before create_job returns.
There was a problem hiding this comment.
Thanks for the catch! We wrote a different docstring for that. Hopefully it reveals clearly what it does.
criccomini
left a comment
There was a problem hiding this comment.
This rebase looks kind of funky. It's showing a bunch of #2379. Is everything as intended?
| """ | ||
|
|
||
| template_fields = [ | ||
| '_model', |
There was a problem hiding this comment.
Probably want project_name templated, too
There was a problem hiding this comment.
Thanks Chris! the project_name is not intended for templating on purpose.
To answer the question of the funky rebase, I changed some of the code related to PR2379 for styles only. The main code changes are on the CloudMLBatchOperator (PR2390).
There was a problem hiding this comment.
Got it, thanks for clarification.
criccomini
left a comment
There was a problem hiding this comment.
Overall, pretty good shape. Just a few nits/questions.
| job_id) | ||
| except errors.HttpError as e: | ||
| if e.resp.status == 404: | ||
| logging.error( |
There was a problem hiding this comment.
Is this really an error? Seems like info is more appropriate
There was a problem hiding this comment.
Done. Thanks for the suggestion!
| logging.error( | ||
| 'Job with job_id {} does not exist. Will create it.' | ||
| .format(job_id)) | ||
| finished_prediction_job = hook.create_job( |
There was a problem hiding this comment.
Should wait_for_job_done be called here?
There was a problem hiding this comment.
Sorry for the confusing interface. It's not needed as create_job is calling wait_for_job_done under the cover.
To make it clearer we refactored the hook interface and how the operator uses it. Now only create_job (which creates and wait until a job is finished) is "exposed" and used by the operator.
airflow/utils/db.py
Outdated
| merge_conn( | ||
| models.Connection( | ||
| conn_id='bigquery_default', conn_type='bigquery')) | ||
| merge_conn( |
There was a problem hiding this comment.
Still needs to get cleaned up. Same declaration is here:
This commit simplified the CloudMLHook class regarding job requests. Public interface of the hook now only exposes the create_job API, which contains the logic of creating the job, test for validation, and polling for the job until it's finished.
|
cc @N3da I'm going to give you a day to look at this, since it's touching some of your stuff. I'll merge tomorrow if I don't hear from you. |
|
@criccomini Thanks for the heads up. Ship it! :) |
|
@jiwang576 OK to merge this? Seems like you're still committing |
|
@criccomini Hi Chris, it's good to go. @N3da and I talked offline and I made some minor changes. Thanks for asking! |
Dear Airflow maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Here are some details about my PR, including screenshots of any UI changes:
Added CloudMLBatchPredictionOperator, which is a wrapper around the Google Cloud ML Batch Prediction API.
Augmented the CloudMLHook in PR#2379, which handles submitting job request to Google Cloud ML Engine.
Tests
My PR adds the following unit tests:
tests.contrib.operators.test_cloudml_operator:CloudMLBatchPredictionOperatorTest
tests.contrib.hooks.test_gcp_cloudml_hook:TestCloudMLHook, which builds upon the same test class in PR#2379
Commits