Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify hive client connection #37043

Merged
merged 10 commits into from
Feb 4, 2024
Merged
11 changes: 11 additions & 0 deletions airflow/providers/apache/hive/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
Changelog
---------

7.0.0
.....


Breaking changes
~~~~~~~~~~~~~~~~

* ``use_beeline``, ``proxy_user``, ``principal`` in hive client connection options is moved from the extra field to ``Use Beeline``, ``Proxy User``, ``Principal`` parameter in the Hook.
romsharon98 marked this conversation as resolved.
Show resolved Hide resolved
* Remove the ability of specify a proxy user as an ``owner`` or ``login`` or ``as_param`` in the connection. Now, setting the user in ``Proxy User`` connection parameter or passing ``proxy_user`` to HiveHook will do the job.


6.4.2
.....

Expand Down
35 changes: 25 additions & 10 deletions airflow/providers/apache/hive/hooks/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class HiveCliHook(BaseHook):
def __init__(
self,
hive_cli_conn_id: str = default_conn_name,
run_as: str | None = None,
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
Expand All @@ -105,7 +104,6 @@ def __init__(
self.use_beeline: bool = conn.extra_dejson.get("use_beeline", False)
self.auth = auth
self.conn = conn
self.run_as = run_as
self.sub_process: Any = None
if mapred_queue_priority:
mapred_queue_priority = mapred_queue_priority.upper()
Expand All @@ -119,20 +117,37 @@ def __init__(
self.mapred_job_name = mapred_job_name
self.proxy_user = proxy_user

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Returns connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import BooleanField, StringField

return {
"use_beeline": BooleanField(lazy_gettext("Use Beeline"), default=False),
"proxy_user": StringField(lazy_gettext("Proxy User"), widget=BS3TextFieldWidget(), default=""),
"principal": StringField(
lazy_gettext("Principal"), widget=BS3TextFieldWidget(), default="hive/_HOST@EXAMPLE.COM"
),
}

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["extra"],
"relabeling": {},
}

def _get_proxy_user(self) -> str:
"""Set the proper proxy_user value in case the user overwrite the default."""
conn = self.conn

proxy_user_value: str = conn.extra_dejson.get("proxy_user", "")
if proxy_user_value == "login" and conn.login:
return f"hive.server2.proxy.user={conn.login}"
if proxy_user_value == "owner" and self.run_as:
return f"hive.server2.proxy.user={self.run_as}"
if proxy_user_value == "as_param" and self.proxy_user:
return f"hive.server2.proxy.user={self.proxy_user}"
if proxy_user_value != "": # There is a custom proxy user
if proxy_user_value != "":
return f"hive.server2.proxy.user={proxy_user_value}"
return proxy_user_value # The default proxy user (undefined)
return f"hive.server2.proxy.user={self.proxy_user}"

def _prepare_cli_cmd(self) -> list[Any]:
"""Create the command list from available information."""
Expand Down
6 changes: 0 additions & 6 deletions airflow/providers/apache/hive/operators/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class HiveOperator(BaseOperator):
object documentation for more details.
:param script_begin_tag: If defined, the operator will get rid of the
part of the script before the first occurrence of `script_begin_tag`
:param run_as_owner: Run HQL code as a DAG's owner.
:param mapred_queue: queue used by the Hadoop CapacityScheduler. (templated)
:param mapred_queue_priority: priority within CapacityScheduler queue.
Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
Expand Down Expand Up @@ -91,7 +90,6 @@ def __init__(
hiveconfs: dict[Any, Any] | None = None,
hiveconf_jinja_translate: bool = False,
script_begin_tag: str | None = None,
run_as_owner: bool = False,
mapred_queue: str | None = None,
mapred_queue_priority: str | None = None,
mapred_job_name: str | None = None,
Expand All @@ -107,9 +105,6 @@ def __init__(
self.hiveconfs = hiveconfs or {}
self.hiveconf_jinja_translate = hiveconf_jinja_translate
self.script_begin_tag = script_begin_tag
self.run_as = None
if run_as_owner:
self.run_as = self.dag.owner
self.mapred_queue = mapred_queue
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
Expand All @@ -128,7 +123,6 @@ def hook(self) -> HiveCliHook:
"""Get Hive cli hook."""
return HiveCliHook(
hive_cli_conn_id=self.hive_cli_conn_id,
run_as=self.run_as,
mapred_queue=self.mapred_queue,
mapred_queue_priority=self.mapred_queue_priority,
mapred_job_name=self.mapred_job_name,
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/apache/hive/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ description: |
state: ready
source-date-epoch: 1705911912
versions:
- 7.0.0
- 6.4.2
- 6.4.1
- 6.4.0
Expand Down
21 changes: 8 additions & 13 deletions docs/apache-airflow-providers-apache-hive/connections/hive_cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,14 @@ Schema (optional)
Specify your JDBC Hive database that you want to connect to with Beeline
or specify a schema for an HQL statement to run with the Hive CLI.

Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in Hive CLI connection.
The following parameters are all optional:

* ``use_beeline``
Specify as ``True`` if using the Beeline CLI. Default is ``False``.
* ``proxy_user``
Specify a proxy user as an ``owner`` or ``login`` or ``as_param`` keep blank if using a
custom proxy user.
When using ``owner`` you will want to pass the operator ``run_as_owner=True`` if you don't you will run the hql as user="owner"
When using ``as_param`` you will want to pass the operator ``proxy_user=<some_user>`` if you don't you will run the hql as user="as_param"
* ``principal``
Specify the JDBC Hive principal to be used with Hive Beeline.
User Beeline (optional)
romsharon98 marked this conversation as resolved.
Show resolved Hide resolved
Specify as ``True`` if using the Beeline CLI. Default is ``False``.

Proxy User (optional)
Specify a proxy user to run HQL code as this user.

Principal (optional)
Specify the JDBC Hive principal to be used with Hive Beeline.


When specifying the connection in environment variable you should specify
Expand Down
8 changes: 0 additions & 8 deletions tests/providers/apache/hive/hooks/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,14 +883,6 @@ def setup_method(self):
"extra_dejson, correct_proxy_user, run_as, proxy_user",
[
({"proxy_user": "a_user_proxy"}, "hive.server2.proxy.user=a_user_proxy", None, None),
({"proxy_user": "owner"}, "hive.server2.proxy.user=dummy_dag_owner", "dummy_dag_owner", None),
({"proxy_user": "login"}, "hive.server2.proxy.user=admin", None, None),
(
{"proxy_user": "as_param"},
"hive.server2.proxy.user=param_proxy_user",
None,
"param_proxy_user",
),
],
)
def test_get_proxy_user_value(self, extra_dejson, correct_proxy_user, run_as, proxy_user):
Expand Down