diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 2b6a7ed17b121..a7d99c6b00dcd 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1331,21 +1331,64 @@ def scheduler(args): @cli_utils.action_logging def serve_logs(args): - print("Starting flask") - import flask - flask_app = flask.Flask(__name__) - - @flask_app.route('/log/') - def serve_logs(filename): # noqa - log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - return flask.send_from_directory( - log, - filename, - mimetype="application/json", - as_attachment=False) + from flask import Flask, abort, request, send_from_directory + from itsdangerous import TimedJSONWebSignatureSerializer + + def flask_app(): # noqa: D103 + flask_app = Flask(__name__) + max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30) + log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER')) + + signer = TimedJSONWebSignatureSerializer( + secret_key=conf.get('webserver', 'secret_key'), + algorithm_name='HS512', + expires_in=max_request_age, + # This isn't really a "salt", more of a signing context + salt='task-instance-logs', + ) + + # Prevent direct access to the logs port + @flask_app.before_request + def validate_pre_signed_url(): + try: + auth = request.headers['Authorization'] + + # We don't actually care about the payload, just that the signature + # was valid and the `exp` claim is correct + filename, headers = signer.loads(auth, return_header=True) + + issued_at = int(headers['iat']) + expires_at = int(headers['exp']) + except Exception: + abort(403) + + if filename != request.view_args['filename']: + abort(403) + + # Validate the `iat` and `exp` are within `max_request_age` of now. + now = int(time.time()) + if abs(now - issued_at) > max_request_age: + abort(403) + if abs(now - expires_at) > max_request_age: + abort(403) + if issued_at > expires_at or expires_at - issued_at > max_request_age: + abort(403) + + @flask_app.route('/log/') + def serve_logs(filename): # noqa + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) + return flask.send_from_directory( + log, + filename, + mimetype="application/json", + as_attachment=False) + + return flask_app + + app = flask_app() worker_log_server_port = int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) - flask_app.run(host='0.0.0.0', port=worker_log_server_port) + app.run(host='0.0.0.0', port=worker_log_server_port) def _serve_logs(env, skip_serve_logs=False): diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index ef359110125c4..a14eba65f3063 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -22,6 +22,7 @@ from typing import Optional import requests +from itsdangerous import TimedJSONWebSignatureSerializer from airflow.configuration import conf from airflow.configuration import AirflowConfigException @@ -160,7 +161,17 @@ def _read(self, ti, try_number, metadata=None): except (AirflowConfigException, ValueError): pass - response = requests.get(url, timeout=timeout) + signer = TimedJSONWebSignatureSerializer( + secret_key=conf.get('webserver', 'secret_key'), + algorithm_name='HS512', + expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30), + # This isn't really a "salt", more of a signing context + salt='task-instance-logs', + ) + + response = requests.get( + url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)} + ) response.encoding = "utf-8" # Check if the resource was properly fetched