Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ assists users migrating to a new version.

## Airflow Master

### Simplification of CLI commands

Some commands have been grouped to improve UX of CLI. New commands are available according to the following
table:

| Old command | New command |
|---------------------------|------------------------------------|
| airflow kerberos | airflow run kerberos |
| airflow webserver | airflow run webserver |
| airflow scheduler | airflow run scheduler |
| airflow worker | airflow run worker |
| airflow flower | airflow run flower |
| airflow config | airflow config show |
| airflow rotate_fernet_key | airflow config rotate_fernet_key |

### Remove serve_logs command from CLI

The ``serve_logs`` command has been deleted. This command should be run only by internal application mechanisms
Expand Down
116 changes: 61 additions & 55 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,47 +847,44 @@ class CLIFactory:
},
),
}, {
'name': 'kerberos',
'func': lazy_load_command('airflow.cli.commands.kerberos_command.kerberos'),
'help': "Start a kerberos ticket renewer",
'args': ('principal', 'keytab', 'pid',
'daemon', 'stdout', 'stderr', 'log_file'),
}, {
'name': 'serve_logs',
'func': lazy_load_command('airflow.cli.commands.serve_logs_command.serve_logs'),
'help': "Serve logs generate by worker",
'args': tuple(),
}, {
'name': 'webserver',
'func': lazy_load_command('airflow.cli.commands.webserver_command.webserver'),
'help': "Start a Airflow webserver instance",
'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
}, {
'name': 'scheduler',
'func': lazy_load_command('airflow.cli.commands.scheduler_command.scheduler'),
'help': "Start a scheduler instance",
'args': ('dag_id_opt', 'subdir', 'num_runs',
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
}, {
'name': 'worker',
'func': lazy_load_command('airflow.cli.commands.worker_command.worker'),
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale', 'skip_serve_logs'),
}, {
'name': 'flower',
'func': lazy_load_command('airflow.cli.commands.flower_command.flower'),
'help': "Start a Celery Flower",
'args': ('flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix',
'flower_basic_auth', 'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
}, {
'name': 'version',
'func': lazy_load_command('airflow.cli.commands.version_command.version'),
'help': "Show the version",
'args': tuple(),
'help': "Run application component",
'name': 'run',
'subcommands': (
{
'name': 'kerberos',
'func': lazy_load_command('airflow.cli.commands.run_command.kerberos_command.kerberos'),
'help': "Start a kerberos ticket renewer",
'args': ('principal', 'keytab', 'pid',
'daemon', 'stdout', 'stderr', 'log_file'),
}, {
'name': 'webserver',
'func': lazy_load_command('airflow.cli.commands.run_command.webserver_command.webserver'),
'help': "Start a Airflow webserver instance",
'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
}, {
'name': 'scheduler',
'func': lazy_load_command('airflow.cli.commands.run_command.scheduler_command.scheduler'),
'help': "Start a scheduler instance",
'args': ('dag_id_opt', 'subdir', 'num_runs',
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
}, {
'name': 'worker',
'func': lazy_load_command('airflow.cli.commands.run_command.worker_command.worker'),
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale', 'skip_serve_logs'),
}, {
'name': 'flower',
'func': lazy_load_command('airflow.cli.commands.run_command.flower_command.flower'),
'help': "Start a Celery Flower",
'args': ('flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix',
'flower_basic_auth', 'broker_api', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
}
)
}, {
'help': "List/Add/Delete connections",
'name': 'connections',
Expand Down Expand Up @@ -981,21 +978,30 @@ class CLIFactory:
'func': lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'),
'help': "Update permissions for existing roles and DAGs.",
'args': tuple(),
},
{
'name': 'rotate_fernet_key',
'func': lazy_load_command('airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key'),
'help': 'Rotate all encrypted connection credentials and variables; see '
'https://airflow.readthedocs.io/en/stable/howto/secure-connections.html'
'#rotating-encryption-keys.',
'args': (),
},
{
}, {
'help': 'Manage configuration',
'name': 'config',
'func': lazy_load_command('airflow.cli.commands.config_command.show_config'),
'help': 'Show current application configuration.',
'args': (),
},
'subcommands': (
{
'func': lazy_load_command('airflow.cli.commands.config_command.show_config'),
'name': 'show',
'help': 'Show current application configuration.',
'args': (),
}, {
'func': lazy_load_command('airflow.cli.commands.config_command.rotate_fernet_key'),
'name': 'rotate_fernet_key',
'help': 'Rotate all encrypted connection credentials and variables; see '
'https://airflow.readthedocs.io/en/stable/howto/secure-connections.html'
'#rotating-encryption-keys.',
'args': (),
},
),
}, {
'name': 'version',
'func': lazy_load_command('airflow.cli.commands.version_command.version'),
'help': "Show the version",
'args': tuple(),
}
)
subparsers_dict = {sp.get('name') or sp['func'].__name__: sp for sp in subparsers} # type: ignore
dag_subparsers = (
Expand Down
13 changes: 13 additions & 0 deletions airflow/cli/commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
"""Config sub-commands"""
from airflow.configuration import conf
from airflow.models import Connection, Variable
from airflow.utils import cli as cli_utils, db


def show_config(args):
Expand All @@ -27,3 +29,14 @@ def show_config(args):
for parameter_key, value in sorted(parameters.items()):
print(f"{parameter_key}={value}")
print()


@cli_utils.action_logging
def rotate_fernet_key(args):
"""Rotates all encrypted connection credentials and variables"""
with db.create_session() as session:
for conn in session.query(Connection).filter(
Connection.is_encrypted | Connection.is_extra_encrypted):
conn.rotate_fernet_key()
for var in session.query(Variable).filter(Variable.is_encrypted):
var.rotate_fernet_key()
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,17 +16,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Rotate Fernet key command"""
from airflow.models import Connection, Variable
from airflow.utils import cli as cli_utils, db


@cli_utils.action_logging
def rotate_fernet_key(args):
"""Rotates all encrypted connection credentials and variables"""
with db.create_session() as session:
for conn in session.query(Connection).filter(
Connection.is_encrypted | Connection.is_extra_encrypted):
conn.rotate_fernet_key()
for var in session.query(Variable).filter(Variable.is_encrypted):
var.rotate_fernet_key()
2 changes: 1 addition & 1 deletion airflow/utils/serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def serve_logs():
flask_app = flask.Flask(__name__)

@flask_app.route('/log/<path:filename>')
def serve_logs_view(filename): # pylint: disable=unused-variable, redefined-outer-name
def serve_logs_view(filename): # pylint: disable=unused-variable
log_directory = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
return flask.send_from_directory(
log_directory,
Expand Down
19 changes: 19 additions & 0 deletions tests/cli/commands/run_command/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from airflow import settings
from airflow.bin import cli
from airflow.cli.commands import webserver_command
from airflow.cli.commands.webserver_command import get_num_ready_workers_running
from airflow.cli.commands.run_command import webserver_command
from airflow.cli.commands.run_command.webserver_command import get_num_ready_workers_running
from airflow.models import DagBag
from airflow.utils.cli import setup_locations
from tests.test_utils.config import conf_vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import airflow
from airflow.bin import cli
from airflow.cli.commands import worker_command
from airflow.cli.commands.run_command import worker_command
from tests.compat import mock, patch
from tests.test_utils.config import conf_vars

Expand Down Expand Up @@ -68,19 +68,19 @@ def setUpClass(cls):
cls.parser = cli.CLIFactory.get_parser()

def test_serve_logs_on_worker_start(self):
with patch('airflow.cli.commands.worker_command.Process') as mock_popen:
with patch('airflow.cli.commands.worker_command.Process') as mock_process:
args = self.parser.parse_args(['worker', '-c', '-1'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
worker_command.worker(args)
mock_popen.assert_called()
mock_process.assert_called()

def test_skip_serve_logs_on_worker_start(self):
with patch('airflow.cli.commands.worker_command.Process') as mock_popen:
with patch('airflow.cli.commands.worker_command.Process') as mock_process:
args = self.parser.parse_args(['worker', '-c', '-1', '-s'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
worker_command.worker(args)
mock_popen.assert_not_called()
mock_process.assert_not_called()