Skip to content

[AIRFLOW-922] Update PrestoHook to enable synchronous execution#2206

Closed
patrickmckenna wants to merge 8 commits intoapache:masterfrom
patrickmckenna:update-presto-hook
Closed

[AIRFLOW-922] Update PrestoHook to enable synchronous execution#2206
patrickmckenna wants to merge 8 commits intoapache:masterfrom
patrickmckenna:update-presto-hook

Conversation

@patrickmckenna
Copy link
Contributor

JIRA

https://issues.apache.org/jira/browse/AIRFLOW-922

Description

This updates PrestoHook so that it can block until a statement finishes executing. Currently, PrestoHook.run returns as soon as it sends a statement, so Operators that use it won't run synchronously.

There are other, minor changes too that seemed worth making (hence the separate commits), though they're unrelated to the primary goal of this PR. Of course if you'd rather jettison those, or put them in a separate PR, that's fine by me.

Tests

I added some under tests/contrib/hooks, though I wasn't sure if that was the proper location. (PrestoHook is built in, not user-contributed, but AFAICT there are no existing tests for it.)

Commits

I haven't squashed commits yet, because I wanted to keep the history easy to see until the code is in a satisfactory state. Once it is, I'm happy to rewrite the commit history (unless you plan to just squash and merge to take care of that?).

@mention-bot
Copy link

@patrickmckenna, thanks for your PR! By analyzing the history of the files in this pull request, we identified @mistercrunch, @artwr and @smarden1 to be potential reviewers.

@patrickmckenna
Copy link
Contributor Author

Sorry for letting this PR languish with broken tests! I had assumed using pytest was acceptable, since it was employed in an existing test. However, it looks like that test has been modified to no longer use pytest. I'll make similar updates to this PR.

The previous version of this hook included a fair amount of error message reformatting. But there was no logic to actually recover from the errors; they were just re-raised with the original error hidden. I thought it might be better to remove that reformatting (in ead2f33). However, I see that 6dd4b3b added more error message reformatting (though the new method doesn't appear to be called anywhere).

@artwr @mistercrunch Would you prefer then I that I revert ead2f33b9c6051e65c147f8d34c7ec92c11d4544 and incorporate 6dd4b3b?

@patrickmckenna
Copy link
Contributor Author

The tests are passing.

@SamWildmo
Copy link

@patrickmckenna are you still working on this PR?

@Rotemlofer
Copy link

As presto user I would love to see this one it. I have several use cases for this.
I wonder why this didn't get any comments/code review?

@patrickmckenna
Copy link
Contributor Author

I think giving PrestoHook.run the ability to execute synchronously (and possibly even doing so by default) is still desired, missing functionality. If that isn't the case, please LMK and I'll close this PR.

For now, I've updated it to incorporate (most of) the latest commits on apache:master, and removed some of the earlier, stylistic changes that weren't strictly related to AIRFLOW-922.

I'm a bit confused by the partial test failures, which occur only on some Python 2 builds. Anyone else have insights here? (The test suite does seem a bit flaky—e.g. this is recent successful build of apache/master, but a different build of the same commit partially failed—but I'm unsure if that's the root cause.)

/cc @SamWildmo @Rotemlofer (as interested users)
/cc @Fokko @kaxil @feng-tao (as maintainers who've touched presto_hook the most recently)

return cursor.poll() is None
except Exception as ex:
msg = "Couldn't determine statement execution status: ".format(ex)
self.log.error(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass unformatted text to logger.

Suggested change
self.log.error(msg)
self.log.error("Couldn't determine statement execution status: %s", ex)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mik-laj ah, is that the agreed upon style preference? Happy to change it, just wasn't aware (didn't see anything in the docs or linting tests enforcing that, but may very well have missed it 😄).

Copy link
Member

@mik-laj mik-laj Feb 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the style of the code, but the principle of using loggers. If you format before pass to the logger, you create an unnecessary object that the logger can ignore when the level is too low. One of the ways to increase application performance is to reduce the number of logs collected.

In special cases, the logger does not format the text. It can save the message separately and separate data to make analysis easier. If you format data before pass it to the logger, this functionality disappears.

These notes applies to any programming language.

@Fokko
Copy link
Contributor

Fokko commented Jan 29, 2019

It looks like Python2 isn't passing on the tests:


======================================================================
66) FAIL: test_run_continues_polling_if_execution_status_unknown (tests.hooks.test_presto_hook.TestPrestoHook)
----------------------------------------------------------------------
   Traceback (most recent call last):
    .tox/py27-backend_sqlite-env_docker/lib/python2.7/site-packages/mock/mock.py line 1305 in patched
      return func(*args, **keywargs)
    tests/hooks/test_presto_hook.py line 87 in test_run_continues_polling_if_execution_status_unknown
      mock_sleep.assert_called_once_with(POLL_INTERVAL)
    .tox/py27-backend_sqlite-env_docker/lib/python2.7/site-packages/mock/mock.py line 947 in assert_called_once_with
      raise AssertionError(msg)
   AssertionError: Expected 'sleep' to be called once. Called 0 times.
   -------------------- >> begin captured stdout << ---------------------
   [2019-01-24 21:08:46,786] {base_hook.py:83} INFO - Using connection to: id: presto_default. Host: localhost, Port: 3400, Schema: hive, Login: None, Password: None, extra: {}
   
   --------------------- >> end captured stdout << ----------------------
   -------------------- >> begin captured logging << --------------------
   airflow.utils.log.logging_mixin.LoggingMixin: INFO: Using connection to: id: presto_default. Host: localhost, Port: 3400, Schema: hive, Login: None, Password: None, extra: {}
   --------------------- >> end captured logging << ---------------------
======================================================================
67) FAIL: test_run_optionally_blocks_while_statement_executes (tests.hooks.test_presto_hook.TestPrestoHook)
----------------------------------------------------------------------
   Traceback (most recent call last):
    .tox/py27-backend_sqlite-env_docker/lib/python2.7/site-packages/mock/mock.py line 1305 in patched
      return func(*args, **keywargs)
    tests/hooks/test_presto_hook.py line 74 in test_run_optionally_blocks_while_statement_executes
      mock_sleep.assert_called_once_with(POLL_INTERVAL)
    .tox/py27-backend_sqlite-env_docker/lib/python2.7/site-packages/mock/mock.py line 947 in assert_called_once_with
      raise AssertionError(msg)
   AssertionError: Expected 'sleep' to be called once. Called 0 times.

@patrickmckenna
Copy link
Contributor Author

Apologies for my slow reply! All of the currently failing builds show the same 2 errors, which appear unrelated to this PR:

failing tests
======================================================================
84) FAIL: test_scheduler_sla_miss_callback_exception (tests.test_jobs.SchedulerJobTest)
----------------------------------------------------------------------
   Traceback (most recent call last):
    tests/test_jobs.py line 3025 in test_scheduler_sla_miss_callback_exception
      sla_callback.assert_called()
    .tox/py35-backend_postgres-env_docker/lib/python3.5/site-packages/mock/mock.py line 906 in assert_called
      raise AssertionError(msg)
   AssertionError: Expected 'None' to have been called.
   -------------------- >> begin captured stdout << ---------------------
   [2019-02-01 00:36:45,730] {test_task_view_type_check.py:52} INFO - class_instance type: <class 'unusual_prefix_61c0ab525b75d060ea41c0aa11a98c88efc72c26_test_task_view_type_check.CallableClass'>
   
   --------------------- >> end captured stdout << ----------------------
   -------------------- >> begin captured logging << --------------------
   root: INFO: class_instance type: <class 'unusual_prefix_61c0ab525b75d060ea41c0aa11a98c88efc72c26_test_task_view_type_check.CallableClass'>
   --------------------- >> end captured logging << ---------------------
======================================================================
85) FAIL: test_scheduler_sla_miss_email_exception (tests.test_jobs.SchedulerJobTest)
----------------------------------------------------------------------
   Traceback (most recent call last):
    /usr/lib/python3.5/unittest/mock.py line 1157 in patched
      return func(*args, **keywargs)
    tests/test_jobs.py line 3069 in test_scheduler_sla_miss_email_exception
      'test_sla_miss')
    .tox/py35-backend_postgres-env_docker/lib/python3.5/site-packages/mock/mock.py line 925 in assert_called_with
      raise AssertionError('Expected call: %s\nNot called' % (expected,))
   AssertionError: Expected call: exception('Could not send SLA Miss email notification for DAG %s', 'test_sla_miss')
   Not called
   -------------------- >> begin captured stdout << ---------------------
   [2019-02-01 00:36:45,820] {test_task_view_type_check.py:52} INFO - class_instance type: <class 'unusual_prefix_61c0ab525b75d060ea41c0aa11a98c88efc72c26_test_task_view_type_check.CallableClass'>
   
   --------------------- >> end captured stdout << ----------------------
   -------------------- >> begin captured logging << --------------------
   root: INFO: class_instance type: <class 'unusual_prefix_61c0ab525b75d060ea41c0aa11a98c88efc72c26_test_task_view_type_check.CallableClass'>
   --------------------- >> end captured logging << ---------------------

I'm not sure how to understand this (and am all the more confused because the latest build of apache/master is passing). I'll try rebasing this branch to generate the same tree as the currently failing build but trigger a new test...

This will allow PrestoHook to be used by Operators derived from BaseOperator,
which (https://git.io/fhamQ)

  should perform or trigger certain tasks synchronously (wait for completion)

Notes on the other differences between this and DbApiHook.run:
  - no need for utf-8 encoding (https://git.io/vD9LI) b/c PyHive does it
    automatically (https://git.io/vD9Lm)
  - no closing/commiting the cursor/conn (https://git.io/vD9Lc), because those
    are no-ops w/ PyHive (https://git.io/vD9L6, https://git.io/vD9L1)

presto.Cursor does have a _poll_interval attribute, but it has no public
accessor, so it seemed safer to make that value a parameter to pass to PrestoHook.run.
Catch only network-related exceptions when polling Presto. And make str
handling work in Python 2, too.
@codecov-io
Copy link

codecov-io commented Feb 1, 2019

Codecov Report

Merging #2206 into master will increase coverage by 0.03%.
The diff coverage is 95.65%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2206      +/-   ##
==========================================
+ Coverage    74.3%   74.34%   +0.03%     
==========================================
  Files         426      426              
  Lines       27867    27888      +21     
==========================================
+ Hits        20706    20732      +26     
+ Misses       7161     7156       -5
Impacted Files Coverage Δ
airflow/hooks/presto_hook.py 63.38% <95.65%> (+25.38%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ee5b8c2...bd1c4d6. Read the comment docs.

cursor.execute(stmt, parameters)

if poll_interval is not None:
while not self.execution_finished(cursor):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely A Bad Idea™️ to poll indefinitely, but I'm assuming all the timeout logic is expected to live in the operators using this hook. @Fokko please LMK if that's assumption's inaccurate, and if there ought to be some minimal safeguards here, e.g. an upper bound on the number of pings to send or time to wait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other operators there are similar constructions that define an upper bound to thrown an exception when the maximum time is exceeded: https://github.com/apache/airflow/blob/master/airflow/hooks/druid_hook.py#L90

@Fokko
Copy link
Contributor

Fokko commented Feb 12, 2019

@patrickmckenna Can you pick up the latest suggestions from the PR?

@r39132
Copy link
Contributor

r39132 commented Apr 23, 2019

@patrickmckenna Closing this for lack of activity.

@r39132 r39132 closed this Apr 23, 2019
@HaloKo4
Copy link

HaloKo4 commented Jun 20, 2019

@patrickmckenna is there a chance you will continue to work on that? It's shame that this amazing work will go to waste. This PR is important without it we can not schedule Presto jobs on Airflow as everything is considered success... we can not set dependencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants