From 0ec3decdadba4de06d507dfcdcc04d31fd816021 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 1 Jul 2021 16:20:36 +0100 Subject: [PATCH] Only allow webserver to request from the worker log server (#16754) Logs _shouldn't_ contain any sensitive info, but they often do by mistake. As an extra level of protection we shouldn't allow anything other than the webserver to access the logs. (We can't change the bind IP form 0.0.0.0 as for it to be useful it needs to be accessed from different hosts -- i.e. the webserver will almost always be on a different node) (cherry picked from commit 2285ee9f71a004d5c013119271873182fb431d8f) (cherry picked from commit d4a09f25a186223c104f7480d78c599b14481746) (cherry picked from commit 815dcd5b4e86a861fc1cd4e6a470bcd9920a81fa) --- airflow/bin/cli.py | 69 +++++++++++++++++++++----- airflow/utils/log/file_task_handler.py | 13 ++++- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 473a980ebae5e..4dee324ab27cc 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1235,21 +1235,64 @@ def scheduler(args): @cli_utils.action_logging def serve_logs(args): - print("Starting flask") - import flask - flask_app = flask.Flask(__name__) - - @flask_app.route('/log/') - def serve_logs(filename): # noqa - log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - return flask.send_from_directory( - log, - filename, - mimetype="application/json", - as_attachment=False) + from flask import Flask, abort, request, send_from_directory + from itsdangerous import TimedJSONWebSignatureSerializer + + def flask_app(): # noqa: D103 + flask_app = Flask(__name__) + max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30) + log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER')) + + signer = TimedJSONWebSignatureSerializer( + secret_key=conf.get('webserver', 'secret_key'), + algorithm_name='HS512', + expires_in=max_request_age, + # This isn't really a "salt", more of a signing context + salt='task-instance-logs', + ) + + # Prevent direct access to the logs port + @flask_app.before_request + def validate_pre_signed_url(): + try: + auth = request.headers['Authorization'] + + # We don't actually care about the payload, just that the signature + # was valid and the `exp` claim is correct + filename, headers = signer.loads(auth, return_header=True) + + issued_at = int(headers['iat']) + expires_at = int(headers['exp']) + except Exception: + abort(403) + + if filename != request.view_args['filename']: + abort(403) + + # Validate the `iat` and `exp` are within `max_request_age` of now. + now = int(time.time()) + if abs(now - issued_at) > max_request_age: + abort(403) + if abs(now - expires_at) > max_request_age: + abort(403) + if issued_at > expires_at or expires_at - issued_at > max_request_age: + abort(403) + + @flask_app.route('/log/') + def serve_logs(filename): # noqa + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) + return flask.send_from_directory( + log, + filename, + mimetype="application/json", + as_attachment=False) + + return flask_app + + app = flask_app() worker_log_server_port = int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) - flask_app.run(host='0.0.0.0', port=worker_log_server_port) + app.run(host='0.0.0.0', port=worker_log_server_port) def _serve_logs(env, skip_serve_logs=False): diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index d424ac2236e22..f6650bf25eec7 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -22,6 +22,7 @@ from typing import Optional import requests +from itsdangerous import TimedJSONWebSignatureSerializer from airflow.configuration import conf from airflow.configuration import AirflowConfigException @@ -148,7 +149,17 @@ def _read(self, ti, try_number, metadata=None): except (AirflowConfigException, ValueError): pass - response = requests.get(url, timeout=timeout) + signer = TimedJSONWebSignatureSerializer( + secret_key=conf.get('webserver', 'secret_key'), + algorithm_name='HS512', + expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30), + # This isn't really a "salt", more of a signing context + salt='task-instance-logs', + ) + + response = requests.get( + url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)} + ) response.encoding = "utf-8" # Check if the resource was properly fetched