Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permitting airflow kerberos to run in different modes #35146

Merged
merged 12 commits into from
Oct 25, 2023
2 changes: 2 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ def string_lower_type(val):
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab"))
ARG_KERBEROS_MODE = Arg(("-m", "--mode"), help="mode to run airflow kerberos", nargs="?", default="daemon")
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
Expand Down Expand Up @@ -1887,6 +1888,7 @@ class GroupCommand(NamedTuple):
args=(
ARG_PRINCIPAL,
ARG_KEYTAB,
ARG_KERBEROS_MODE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
Expand Down
16 changes: 14 additions & 2 deletions airflow/cli/commands/kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from airflow import settings
from airflow.security import kerberos as krb
from airflow.security.kerberos import KerberosMode
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand All @@ -33,6 +34,17 @@ def kerberos(args):
"""Start a kerberos ticket renewer."""
print(settings.HEADER)

mode_mapping = {
"daemon": KerberosMode.DAEMON,
"one-time": KerberosMode.ONE_TIME,
}
if args.mode:
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
mode_enum = mode_mapping.get(args.mode)
if mode_enum is None:
raise ValueError("Invalid mode. Mode must be 'daemon' or 'one-time'.")
else:
mode_enum = KerberosMode.DAEMON

if args.daemon:
pid, stdout, stderr, _ = setup_locations(
"kerberos", args.pid, args.stdout, args.stderr, args.log_file
Expand All @@ -49,6 +61,6 @@ def kerberos(args):
)

with ctx:
krb.run(principal=args.principal, keytab=args.keytab)
krb.run(principal=args.principal, keytab=args.keytab, mode=mode_enum)
else:
krb.run(principal=args.principal, keytab=args.keytab)
krb.run(principal=args.principal, keytab=args.keytab, mode=mode_enum)
24 changes: 22 additions & 2 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations

from enum import Enum

# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -47,6 +49,17 @@
log = logging.getLogger(__name__)


class KerberosMode(Enum):
"""
Defines modes for running airflow kerberos.

:return: None.
"""

DAEMON = "daemon"
ONE_TIME = "one-time"


def get_kerberos_principle(principal: str | None) -> str:
"""Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided."""
return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname())
Expand Down Expand Up @@ -176,18 +189,25 @@ def detect_conf_var() -> bool:
return b"X-CACHECONF:" in file.read()


def run(principal: str | None, keytab: str):
def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.DAEMON):
"""
Run the kerberos renewer.

:param principal: principal name
:param keytab: keytab file
:param mode: mode to run the airflow kerberos in
:return: None
"""
if not keytab:
log.warning("Keytab renewer not starting, no keytab configured")
sys.exit(0)

while True:
log.info("Using airflow kerberos with mode: %s", mode.value)

if mode == KerberosMode.DAEMON:
while True:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
elif mode == KerberosMode.ONE_TIME:
renew_from_kt(principal, keytab)
time.sleep(conf.getint("kerberos", "reinit_frequency"))
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions docs/apache-airflow/security/kerberos.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ Launch the ticket renewer by
# run ticket renewer
airflow kerberos

To support more advanced deployment models for using kerberos in daemon or one-time fashion,
you can specify the mode while running the ``airflow kerberos`` as either "daemon" or "one-time".

* daemon: The airflow kerberos process will run forever
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
* one-time: The airflow kerberos will run once and exit. In case of failure the main task won't spin up.

Example usages:
``
airflow kerberos --mode daemon
airflow kerberos --mode one-time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some examples also here

``

Hadoop
^^^^^^

Expand Down
20 changes: 20 additions & 0 deletions tests/cli/commands/test_kerberos_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def test_run_command(self, mock_krb):
kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_one_time(self, mock_krb):
args = self.parser.parse_args(
["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab", "--mode", "one-time"]
)

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")

@mock.patch("airflow.cli.commands.kerberos_command.krb")
@conf_vars({("core", "executor"): "CeleryExecutor"})
def test_run_command_with_mode_daemon(self, mock_krb):
args = self.parser.parse_args(
["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab", "--mode", "daemon"]
)

kerberos_command.kerberos(args)
mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL")

@mock.patch("airflow.cli.commands.kerberos_command.TimeoutPIDLockFile")
@mock.patch("airflow.cli.commands.kerberos_command.setup_locations")
@mock.patch("airflow.cli.commands.kerberos_command.daemon")
Expand Down