Skip to content

Commit

Permalink
Simplify hive client connection (#37043)
Browse files Browse the repository at this point in the history
* change hive proxy user

* remove extra from connection

* change documentation

* add breaking change and change version

* fix spell check

* fix docs

* Update airflow/providers/apache/hive/CHANGELOG.rst

* fix typo

* proxy user sent in operator will override proxy user in connection

* fix change log

---------

Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com>
  • Loading branch information
romsharon98 and eladkal committed Feb 4, 2024
1 parent 44ba900 commit caec4c7
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 42 deletions.
10 changes: 10 additions & 0 deletions airflow/providers/apache/hive/CHANGELOG.rst
Expand Up @@ -27,6 +27,16 @@
Changelog
---------

7.0.0
.....


Breaking changes
~~~~~~~~~~~~~~~~

* Remove the ability of specify a proxy user as an ``owner`` or ``login`` or ``as_param`` in the connection. Now, setting the user in ``Proxy User`` connection parameter or passing ``proxy_user`` to HiveHook will do the job.


6.4.2
.....

Expand Down
38 changes: 27 additions & 11 deletions airflow/providers/apache/hive/hooks/hive.py
Expand Up @@ -91,7 +91,6 @@ class HiveCliHook(BaseHook):
def __init__(
self,
hive_cli_conn_id: str = default_conn_name,
run_as: str | None = None,
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
Expand All @@ -105,7 +104,6 @@ def __init__(
self.use_beeline: bool = conn.extra_dejson.get("use_beeline", False)
self.auth = auth
self.conn = conn
self.run_as = run_as
self.sub_process: Any = None
if mapred_queue_priority:
mapred_queue_priority = mapred_queue_priority.upper()
Expand All @@ -119,20 +117,38 @@ def __init__(
self.mapred_job_name = mapred_job_name
self.proxy_user = proxy_user

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Returns connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import BooleanField, StringField

return {
"use_beeline": BooleanField(lazy_gettext("Use Beeline"), default=False),
"proxy_user": StringField(lazy_gettext("Proxy User"), widget=BS3TextFieldWidget(), default=""),
"principal": StringField(
lazy_gettext("Principal"), widget=BS3TextFieldWidget(), default="hive/_HOST@EXAMPLE.COM"
),
}

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["extra"],
"relabeling": {},
}

def _get_proxy_user(self) -> str:
"""Set the proper proxy_user value in case the user overwrite the default."""
conn = self.conn

proxy_user_value: str = conn.extra_dejson.get("proxy_user", "")
if proxy_user_value == "login" and conn.login:
return f"hive.server2.proxy.user={conn.login}"
if proxy_user_value == "owner" and self.run_as:
return f"hive.server2.proxy.user={self.run_as}"
if proxy_user_value == "as_param" and self.proxy_user:
if self.proxy_user is not None:
return f"hive.server2.proxy.user={self.proxy_user}"
if proxy_user_value != "": # There is a custom proxy user
proxy_user_value: str = conn.extra_dejson.get("proxy_user", "")
if proxy_user_value != "":
return f"hive.server2.proxy.user={proxy_user_value}"
return proxy_user_value # The default proxy user (undefined)
return ""

def _prepare_cli_cmd(self) -> list[Any]:
"""Create the command list from available information."""
Expand Down
6 changes: 0 additions & 6 deletions airflow/providers/apache/hive/operators/hive.py
Expand Up @@ -54,7 +54,6 @@ class HiveOperator(BaseOperator):
object documentation for more details.
:param script_begin_tag: If defined, the operator will get rid of the
part of the script before the first occurrence of `script_begin_tag`
:param run_as_owner: Run HQL code as a DAG's owner.
:param mapred_queue: queue used by the Hadoop CapacityScheduler. (templated)
:param mapred_queue_priority: priority within CapacityScheduler queue.
Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
Expand Down Expand Up @@ -91,7 +90,6 @@ def __init__(
hiveconfs: dict[Any, Any] | None = None,
hiveconf_jinja_translate: bool = False,
script_begin_tag: str | None = None,
run_as_owner: bool = False,
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
Expand All @@ -107,9 +105,6 @@ def __init__(
self.hiveconfs = hiveconfs or {}
self.hiveconf_jinja_translate = hiveconf_jinja_translate
self.script_begin_tag = script_begin_tag
self.run_as = None
if run_as_owner:
self.run_as = self.dag.owner
self.mapred_queue = mapred_queue
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
Expand All @@ -128,7 +123,6 @@ def hook(self) -> HiveCliHook:
"""Get Hive cli hook."""
return HiveCliHook(
hive_cli_conn_id=self.hive_cli_conn_id,
run_as=self.run_as,
mapred_queue=self.mapred_queue,
mapred_queue_priority=self.mapred_queue_priority,
mapred_job_name=self.mapred_job_name,
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/apache/hive/provider.yaml
Expand Up @@ -24,6 +24,7 @@ description: |
state: ready
source-date-epoch: 1705911912
versions:
- 7.0.0
- 6.4.2
- 6.4.1
- 6.4.0
Expand Down
21 changes: 8 additions & 13 deletions docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst
Expand Up @@ -64,19 +64,14 @@ Schema (optional)
Specify your JDBC Hive database that you want to connect to with Beeline
or specify a schema for an HQL statement to run with the Hive CLI.

Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in Hive CLI connection.
The following parameters are all optional:

* ``use_beeline``
Specify as ``True`` if using the Beeline CLI. Default is ``False``.
* ``proxy_user``
Specify a proxy user as an ``owner`` or ``login`` or ``as_param`` keep blank if using a
custom proxy user.
When using ``owner`` you will want to pass the operator ``run_as_owner=True`` if you don't you will run the hql as user="owner"
When using ``as_param`` you will want to pass the operator ``proxy_user=<some_user>`` if you don't you will run the hql as user="as_param"
* ``principal``
Specify the JDBC Hive principal to be used with Hive Beeline.
Use Beeline (optional)
Specify as ``True`` if using the Beeline CLI. Default is ``False``.

Proxy User (optional)
Specify a proxy user to run HQL code as this user.

Principal (optional)
Specify the JDBC Hive principal to be used with Hive Beeline.


When specifying the connection in environment variable you should specify
Expand Down
15 changes: 3 additions & 12 deletions tests/providers/apache/hive/hooks/test_hive.py
Expand Up @@ -880,28 +880,19 @@ def setup_method(self):
self.nondefault_schema = "nondefault"

@pytest.mark.parametrize(
"extra_dejson, correct_proxy_user, run_as, proxy_user",
"extra_dejson, correct_proxy_user, proxy_user",
[
({"proxy_user": "a_user_proxy"}, "hive.server2.proxy.user=a_user_proxy", None, None),
({"proxy_user": "owner"}, "hive.server2.proxy.user=dummy_dag_owner", "dummy_dag_owner", None),
({"proxy_user": "login"}, "hive.server2.proxy.user=admin", None, None),
(
{"proxy_user": "as_param"},
"hive.server2.proxy.user=param_proxy_user",
None,
"param_proxy_user",
),
({"proxy_user": "a_user_proxy"}, "hive.server2.proxy.user=a_user_proxy", None),
],
)
def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, run_as, proxy_user):
def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, proxy_user):
hook = MockHiveCliHook()
returner = mock.MagicMock()
returner.extra_dejson = extra_dejson
returner.login = "admin"
hook.use_beeline = True
hook.conn = returner
hook.proxy_user = proxy_user
hook.run_as = run_as

# Run
result = hook._prepare_cli_cmd()
Expand Down

0 comments on commit caec4c7

Please sign in to comment.