Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permitting airflow kerberos to run in different modes #35146

Merged
merged 12 commits into from
Oct 25, 2023
4 changes: 4 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ def string_lower_type(val):
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab"))
ARG_KERBEROS_ONE_TIME_MODE = Arg(
("-o", "--one-time"), help="Run airflow kerberos one time instead of forever", action="store_true"
)
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
Expand Down Expand Up @@ -1889,6 +1892,7 @@ class GroupCommand(NamedTuple):
ARG_KEYTAB,
ARG_PID,
ARG_DAEMON,
ARG_KERBEROS_ONE_TIME_MODE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
Expand Down
7 changes: 6 additions & 1 deletion airflow/cli/commands/kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.security import kerberos as krb
from airflow.security.kerberos import KerberosMode
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

Expand All @@ -30,8 +31,12 @@ def kerberos(args):
"""Start a kerberos ticket renewer."""
print(settings.HEADER)

mode = KerberosMode.STANDARD
if args.one_time:
mode = KerberosMode.ONE_TIME

run_command_with_daemon_option(
args=args,
process_name="kerberos",
callback=lambda: krb.run(principal=args.principal, keytab=args.keytab),
callback=lambda: krb.run(principal=args.principal, keytab=args.keytab, mode=mode),
)
25 changes: 22 additions & 3 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations

from enum import Enum

# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -47,6 +49,17 @@
log = logging.getLogger(__name__)


class KerberosMode(Enum):
"""
Defines modes for running airflow kerberos.
:return: None.
"""

STANDARD = "standard"
ONE_TIME = "one-time"


def get_kerberos_principle(principal: str | None) -> str:
"""Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided."""
return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname())
Expand Down Expand Up @@ -176,18 +189,24 @@ def detect_conf_var() -> bool:
return b"X-CACHECONF:" in file.read()


def run(principal: str | None, keytab: str):
def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.STANDARD):
"""
Run the kerberos renewer.
:param principal: principal name
:param keytab: keytab file
:param mode: mode to run the airflow kerberos in
:return: None
"""
if not keytab:
log.warning("Keytab renewer not starting, no keytab configured")
sys.exit(0)

while True:
log.info("Using airflow kerberos with mode: %s", mode.value)

if mode == KerberosMode.STANDARD:
while True:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
elif mode == KerberosMode.ONE_TIME:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
23 changes: 23 additions & 0 deletions docs/apache-airflow/security/kerberos.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,29 @@ Launch the ticket renewer by
# run ticket renewer
airflow kerberos
To support more advanced deployment models for using kerberos in standard or one-time fashion,
you can specify the mode while running the ``airflow kerberos`` by using the ``--one-time`` flag.

a) standard: The airflow kerberos command will run endlessly. The ticket renewer process runs continuously every few seconds
and refreshes the ticket if it has expired.
b) one-time: The airflow kerberos will run once and exit. In case of failure the main task won't spin up.

The default mode is standard.

Example usages:

For standard mode:

.. code-block:: bash
airflow kerberos
For one time mode:

.. code-block:: bash
airflow kerberos --one-time
Hadoop
^^^^^^

Expand Down
31 changes: 29 additions & 2 deletions tests/cli/commands/test_kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from airflow.cli import cli_parser
from airflow.cli.commands import kerberos_command
from airflow.security.kerberos import KerberosMode
from tests.test_utils.config import conf_vars


Expand All @@ -34,7 +35,9 @@ def test_run_command(self, mock_krb):
args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"])

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)

@mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile")
@mock.patch("airflow.cli.commands.daemon_utils.setup_locations")
Expand Down Expand Up @@ -69,7 +72,9 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m
with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open):
kerberos_command.kerberos(args)

mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)
assert mock_daemon.mock_calls[:3] == [
mock.call.DaemonContext(
pidfile=mock_pid_file.return_value,
Expand Down Expand Up @@ -100,3 +105,25 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m
mock.call().__exit__(None, None, None),
mock.call().__exit__(None, None, None),
]

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_standard(self, mock_krb):
args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"])

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD
)

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_one_time(self, mock_krb):
args = self.parser.parse_args(
["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab", "--one-time"]
)

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(
keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.ONE_TIME
)