Skip to content

Commit

Permalink
Permitting airflow kerberos to run in different modes (#35146)
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh committed Oct 25, 2023
1 parent 2a0cd3a commit 36c5c11
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 6 deletions.
4 changes: 4 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ def string_lower_type(val):
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab"))
ARG_KERBEROS_ONE_TIME_MODE = Arg(
("-o", "--one-time"), help="Run airflow kerberos one time instead of forever", action="store_true"
)
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
Expand Down Expand Up @@ -1889,6 +1892,7 @@ class GroupCommand(NamedTuple):
ARG_KEYTAB,
ARG_PID,
ARG_DAEMON,
ARG_KERBEROS_ONE_TIME_MODE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
Expand Down
7 changes: 6 additions & 1 deletion airflow/cli/commands/kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.security import kerberos as krb
from airflow.security.kerberos import KerberosMode
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

Expand All @@ -30,8 +31,12 @@ def kerberos(args):
"""Start a kerberos ticket renewer."""
print(settings.HEADER)

mode = KerberosMode.STANDARD
if args.one_time:
mode = KerberosMode.ONE_TIME

run_command_with_daemon_option(
args=args,
process_name="kerberos",
callback=lambda: krb.run(principal=args.principal, keytab=args.keytab),
callback=lambda: krb.run(principal=args.principal, keytab=args.keytab, mode=mode),
)
25 changes: 22 additions & 3 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations

from enum import Enum

# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -47,6 +49,17 @@
log = logging.getLogger(__name__)


class KerberosMode(Enum):
"""
Defines modes for running airflow kerberos.
:return: None.
"""

STANDARD = "standard"
ONE_TIME = "one-time"


def get_kerberos_principle(principal: str | None) -> str:
"""Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided."""
return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname())
Expand Down Expand Up @@ -176,18 +189,24 @@ def detect_conf_var() -> bool:
return b"X-CACHECONF:" in file.read()


def run(principal: str | None, keytab: str):
def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.STANDARD):
"""
Run the kerberos renewer.
:param principal: principal name
:param keytab: keytab file
:param mode: mode to run the airflow kerberos in
:return: None
"""
if not keytab:
log.warning("Keytab renewer not starting, no keytab configured")
sys.exit(0)

while True:
log.info("Using airflow kerberos with mode: %s", mode.value)

if mode == KerberosMode.STANDARD:
while True:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
elif mode == KerberosMode.ONE_TIME:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
23 changes: 23 additions & 0 deletions docs/apache-airflow/security/kerberos.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,29 @@ Launch the ticket renewer by
# run ticket renewer
airflow kerberos
To support more advanced deployment models for using kerberos in standard or one-time fashion,
you can specify the mode while running the ``airflow kerberos`` by using the ``--one-time`` flag.

a) standard: The airflow kerberos command will run endlessly. The ticket renewer process runs continuously every few seconds
and refreshes the ticket if it has expired.
b) one-time: The airflow kerberos will run once and exit. In case of failure the main task won't spin up.

The default mode is standard.

Example usages:

For standard mode:

.. code-block:: bash
airflow kerberos
For one time mode:

.. code-block:: bash
airflow kerberos --one-time
Hadoop
^^^^^^

Expand Down
31 changes: 29 additions & 2 deletions tests/cli/commands/test_kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from airflow.cli import cli_parser
from airflow.cli.commands import kerberos_command
from airflow.security.kerberos import KerberosMode
from tests.test_utils.config import conf_vars


Expand All @@ -34,7 +35,9 @@ def test_run_command(self, mock_krb):
args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"])

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)

@mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile")
@mock.patch("airflow.cli.commands.daemon_utils.setup_locations")
Expand Down Expand Up @@ -69,7 +72,9 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m
with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open):
kerberos_command.kerberos(args)

mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)
assert mock_daemon.mock_calls[:3] == [
mock.call.DaemonContext(
pidfile=mock_pid_file.return_value,
Expand Down Expand Up @@ -100,3 +105,25 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m
mock.call().__exit__(None, None, None),
mock.call().__exit__(None, None, None),
]

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_standard(self, mock_krb):
args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"])

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_one_time(self, mock_krb):
args = self.parser.parse_args(
["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab", "--one-time"]
)

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.ONE_TIME
)

0 comments on commit 36c5c11

Please sign in to comment.