Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to remove or mask environment variables in the KubernetesPodOperator task instance logs on failure or error events #17604

Closed
dondaum opened this issue Aug 13, 2021 · 13 comments
Labels
kind:feature Feature Requests

Comments

@dondaum
Copy link
Contributor

dondaum commented Aug 13, 2021

Hi core Airflow dev teams,

I would like to say a big 'thank you' first for this great plattform and for all of your efforts to constantly improve and mantain it. It is just a joy to work with.. Our company has recently switched to the Airflow 2 and we love the overall improvements, such as scheduler speed, UI but also all your efforts in knowledge sharing and community building.

Description
The KubernetesPodOperator does not have an option to either disable showing environment variables information or masking variables in the task logs if a failure or an error happen on a created pod. We use the KubernetesPodOperator and k8s.V1EnvVar to pass sensitive information to a pod, such as target connection, user name or passwords. If an error or failure happens the task logs show all pod details including the environment variables. An option, such as adding a new param to the KubernetesPodOperator that allows to either remove env variables completely or mask env variables would prevent stop sharing those senstive informations in the logs.

Use case / motivation

When running the KubernetesPodOperator with environment variables it would be great to have the option to remove environmental variables or to mask environment variables information in the task logs if there are failures or errors.

Are you willing to submit a PR?

Yes I would.

Currently if the final_state not equals State.SUCCESS, we are handling this by checking if we have env variables set and if so we start to masking them with **** before raising the exception that then also logs the complete remote_pod object. I guess this is rather a quick fix that we have applied. So it would be good to think about other use cases or pod objects that could benefit as well.

The relevant section is
`
launcher = self.create_pod_launcher()

        if len(pod_list.items) == 1:
            try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
            final_state, remote_pod, result = self.handle_pod_overlap(
                labels, try_numbers_match, launcher, pod_list.items[0]
            )
        else:
            self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
            final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
        if final_state != State.SUCCESS:
            raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')

`

@dondaum dondaum added the kind:feature Feature Requests label Aug 13, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 13, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk
Copy link
Member

potiuk commented Aug 13, 2021

Which Airflow version are you running ?

There is a "Secret Masker" feature in Airflow 2.1, and from what I understand it should also mask secret values in KPO(https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/index.html#masking-sensitive-data) - it will mask sensitive variables/ sensitive fields in connections in task logs. I believe it should also mask them in K8SPodOperator log (and it would do it always, not only for errors).

@dondaum
Copy link
Contributor Author

dondaum commented Aug 13, 2021

We run version v2.1.2.

So I had a look again in the 'Secret Masker" feature as I had it already configured without any positive effect on the environment variable masking.

But most likely our setup and configuration might be wrong. So let's give some context:

We have a custom logging config that uses Azure blob strorage for remote logging where we have add the new filter on the remote task handler:

DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "airflow": {"format": LOG_FORMAT},
        "airflow_coloured": {
            "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            "class": COLORED_FORMATTER_CLASS
            if COLORED_LOG
            else "logging.Formatter",
        },
    },
    "filters": {
        "mask_secrets": {
            "()": "airflow.utils.log.secrets_masker.SecretsMasker",
        },
    },
    "handlers": {
        "console": {
            "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
            "formatter": "airflow_coloured",
            "stream": "sys.stdout",
            "filters": ["mask_secrets"],
        },
        "task": {
            "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
            "formatter": "airflow",
            "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
            "filename_template": FILENAME_TEMPLATE,
            "filters": ["mask_secrets"],
        },
        "processor": {
            "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",  # noqa: E501
            "formatter": "airflow",
            "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
            "filename_template": PROCESSOR_FILENAME_TEMPLATE,
            "filters": ["mask_secrets"],
        },
    },
    "loggers": {
        "airflow.processor": {
            "handlers": ["processor"],
            "level": LOG_LEVEL,
            "propagate": False,
        },
        "airflow.task": {
            "handlers": ["task"],
            "level": LOG_LEVEL,
            "propagate": False,
            "filters": ["mask_secrets"],
        },
        "flask_appbuilder": {
            "handler": ["console"],
            "level": FAB_LOG_LEVEL,
            "propagate": True,
        },
    },
    "root": {
        "handlers": ["console"],
        "level": LOG_LEVEL,
        "filters": ["mask_secrets"],
    },
}

EXTRA_LOGGER_NAMES: str = conf.get(
    "logging", "EXTRA_LOGGER_NAMES", fallback=None
)
if EXTRA_LOGGER_NAMES:
    new_loggers = {
        logger_name.strip(): {
            "handler": ["console"],
            "level": LOG_LEVEL,
            "propagate": True,
        }
        for logger_name in EXTRA_LOGGER_NAMES.split(",")
    }
    DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)

DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
    "handlers": {
        "processor_manager": {
            "class": "logging.handlers.RotatingFileHandler",
            "formatter": "airflow",
            "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            "mode": "a",
            "maxBytes": 104857600,  # 100MB
            "backupCount": 5,
        }
    },
    "loggers": {
        "airflow.processor_manager": {
            "handlers": ["processor_manager"],
            "level": LOG_LEVEL,
            "propagate": False,
        }
    },
}

...

    elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
        REMOTE_BASE_LOG_FOLDER = TARGET_HOSTNAME
        WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
            "task": {
                "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",  # noqa: E501
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
                "wasb_container": "airflow-logs",
                "filename_template": FILENAME_TEMPLATE,
                "delete_local_copy": False,
                "filters": ["mask_secrets"],
            },
        }

In our Airflow configuration we have (we are using helm charts):

AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_PWD,DBT_WAREHOUSE"

I just had a look in the Airflow doc to verify not to miss any other important config. I found AIRFLOW__CORE__HIDE_SENSITIVE_VAR_CONN_FIELDS but it is true by default and we do not change it.

In an example dag I use an environment variable:

sec = Variable.get("testvar", deserialize_json=True)

with dag:
    k = KubernetesPodOperator(
        namespace=aks_kube_config.aks_namespace,
        in_cluster=True,
        image="python:3.6",
        cmds=["python", "-c"],
        arguments=["import sys; sys.exit(1)"],
        env_vars=[
            k8s.V1EnvVar(
                name="DBT_WAREHOUSE",
                value=sec["DBT_WAREHOUSE"],
            )
        ],
        labels={"foo": "bar"},
        name="passing-test",
        is_delete_operator_pod=True,
        task_id="passing-task",
        get_logs=True,
        resources=aks_kube_config.get_dbt_config(),
        node_selector=aks_kube_config.get_dbt_node_selector(),
    )

However I still have the value of the env variable in the task logs:

[2021-08-13 15:21:38,366] {{taskinstance.py:1501}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 368, in execute
    raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
airflow.exceptions.AirflowException: Pod passing-test.786a2b1e2b3e4b60accdf901645c6af0 returned a failure: {'api_version': 'v1',
 'kind': 'Pod',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2021, 8, 13, 15, 21, 6, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': {'airflow_version': '2.1.2',
                         'dag_id': 'k8s_v3',
                         'execution_date': '2021-08-13T151421.6018540000-bfa4f3805',
                         'foo': 'bar',
                         'kubernetes_pod_operator': 'True',
                         'task_id': 'passing-task',
                         'try_number': '2'},
              'managed_fields': [{'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'OpenAPI-Generator',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 15, 21, 6, tzinfo=tzlocal())},
                                 {'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'kubelet',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 15, 21, 36, tzinfo=tzlocal())}],
              'name': 'passing-test.786a2b1e2b3e4b60accdf901645c6af0',
              'namespace': 'XXXX',
              'owner_references': None,
              'resource_version': '90185518',
              'self_link': None,
              'uid': '6bb3ef39-0d53-4b65-8e4b-b0a142304ef4'},
 'spec': {'active_deadline_seconds': None,
          'affinity': {'node_affinity': None,
                       'pod_affinity': None,
                       'pod_anti_affinity': None},
          'automount_service_account_token': None,
          'containers': [{'args': ['import sys; sys.exit(1)'],
                          'command': ['python', '-c'],
                          'env': [{'name': 'DBT_WAREHOUSE',
                                   'value': 'secret',
                                   'value_from': None}],
                          'env_from': None,
                          'image': 'python:3.6',
                          'image_pull_policy': 'IfNotPresent',
                          'lifecycle': None,
                          'liveness_probe': None,
                          'name': 'base',
                          'ports': None,
                          'readiness_probe': None,
                          'resources': {'limits': {'cpu': '1500m',
                                                   'memory': '512Mi'},
                                        'requests': {'cpu': '512m',
                                                     'memory': '256Mi'}},
                          'security_context': None,
                          'stdin': None,
                          'stdin_once': None,
                          'termination_message_path': '/dev/termination-log',
                          'termination_message_policy': 'File',
                          'tty': None,
                          'volume_devices': None,
                          'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
                                             'mount_propagation': None,
                                             'name': 'default-token-k59nb',
                                             'read_only': True,
                                             'sub_path': None,
                                             'sub_path_expr': None}],
                          'working_dir': None}],

@potiuk
Copy link
Member

potiuk commented Aug 13, 2021

OK. Thanks for the context. Really helpful. I think I know the problem, secret masker will not mask K8S env vars, it masks Airflow Variables. Maybe it is a good idea to add it then (should be possible - @ashb WDYT?).

As a temporary workaround - you should be able to mask your variables "manually" (also to test if your logging configuration works).

Following https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/index.html#adding-your-own-masks you could derive a custom class from K8SPodExecutor and override it's execute method with something like:

def execute(self, context):
    from airflow.utils.log.secrets_masker import mask_secret
    mask_secret(sec["DBT_WAREHOUSE"])
    return super.execute(context)

@ashb
Copy link
Member

ashb commented Aug 13, 2021

Kubernetes already has the ability to define environment variables based on Secrets - is there a reason you don't use that?

How do you define the env var in the first place? (I'm on my phone so may have missed if you said that already. Sorry)

@dondaum
Copy link
Contributor Author

dondaum commented Aug 13, 2021

Thank you both for your respond and showing me a quick workaround.

I'll implement it and test it against our custom solution. We do something similar by override the execute() method and apply a kind of custom masking on the remote_pod object.

Yes we could use kubernetes secrets for our use cases as well. I assume there are pros and cons for using secrets. To my knowledge it was not possible to create secrets 'on the fly' with an instance of the KubernetesPodOperator for just running the specific task and afterwards remove it. Even if you could, you still would generate a secret per task instance or have a logic that check if a secret already exist and so on. Yes we could simply create this secret once and then use it but it would be inflexible if we want to change some values here and there as we then always need to change the secret.

At the moment (see example above) we use a V1EnvVar model instance and pass it to the KubernetesPodOperator.

@dondaum
Copy link
Contributor Author

dondaum commented Aug 13, 2021

So I did tested the workaround with a quick and dirty subclass and overriding of the execute method. For each env value I call mask_secret():

class KubernetesPodOperator_(KubernetesPodOperator):

    def __init__(
        self,
        *args,
        **kwargs,
    ):

        super().__init__(*args, **kwargs)

    def _get_secret_var(self) -> List[k8s.V1EnvVar]:
        try:
            sec = Variable.get(
                "testvar", deserialize_json=True
            )
        except KeyError:
            self.log.warning(
                "You have to add the variable in Airflow"
            )
            sec = {
                "DBT_WAREHOUSE": "Default",
            }
        return [
            k8s.V1EnvVar(
                name="DBT_DATABASE",
                value=sec["DBT_WAREHOUSE"],
            ),
        ]

    def execute(self, context):
        from airflow.utils.log.secrets_masker import mask_secret
        self.log.info(
            f"Custom KubernetesPodOperator runs K8S Task"
        )
        self.env_vars = self._get_secret_var()
        for secret_env in self.env_vars:
            mask_secret(secret_env.value)
        return super().execute(context)

Afterwards I had a look in the logs and it seems that the masking now works but only in one of the two exception messages in the logs.

Here is the detailed log message:

[2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 368, in execute
    raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
airflow.exceptions.AirflowException: Pod passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: {'api_version': 'v1',
 'kind': 'Pod',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': {'airflow_version': '2.1.2',
                         'dag_id': 'k8s_v3',
                         'execution_date': '2021-08-13T151421.6018540000-bfa4f3805',
                         'foo': 'bar',
                         'kubernetes_pod_operator': 'True',
                         'task_id': 'passing-task',
                         'try_number': '6'},
              'managed_fields': [{'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'OpenAPI-Generator',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal())},
                                 {'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'kubelet',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 18, 27, tzinfo=tzlocal())}],
              'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
              'namespace': 'XXXXX',
              'owner_references': None,
              'resource_version': '90215538',
              'self_link': None,
              'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
 'spec': {'active_deadline_seconds': None,
          'affinity': {'node_affinity': None,
                       'pod_affinity': None,
                       'pod_anti_affinity': None},
          'automount_service_account_token': None,
          'containers': [{'args': ['import sys; sys.exit(1)'],
                          'command': ['python', '-c'],
                          'env': [{'name': 'DBT_DATABASE',
                                   'value': 'secret',
                                   'value_from': None}],
                          'env_from': None,
                          'image': 'python:3.6',
                          'image_pull_policy': 'IfNotPresent',
                          'lifecycle': None,
                          'liveness_probe': None,
                          'name': 'base',
                          'ports': None,
                          'readiness_probe': None,
                          'resources': {'limits': {'cpu': '1500m',
                                                   'memory': '512Mi'},
                                        'requests': {'cpu': '512m',
                                                     'memory': '256Mi'}},
                          'security_context': None,
                          'stdin': None,
                          'stdin_once': None,
                          'termination_message_path': '/dev/termination-log',
                          'termination_message_policy': 'File',
                          'tty': None,
                          'volume_devices': None,
                          'volume_mounts': [{'mount_path': '/var/run/secrets/kubernetes.io/serviceaccount',
                                             'mount_propagation': None,
                                             'name': 'default-token-k59nb',
                                             'read_only': True,
                                             'sub_path': None,
                                             'sub_path_expr': None}],
                          'working_dir': None}],
...

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/plugins/airflow_dbt/operators/dbt_k8s_operator.py", line 61, in execute
    return super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 373, in execute
    raise AirflowException(f'Pod Launching failed: {ex}')
airflow.exceptions.AirflowException: Pod Launching failed: Pod passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure: {'api_version': 'v1',
 'kind': 'Pod',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': {'airflow_version': '2.1.2',
                         'dag_id': 'k8s_v3',
                         'execution_date': '2021-08-13T151421.6018540000-bfa4f3805',
                         'foo': 'bar',
                         'kubernetes_pod_operator': 'True',
                         'task_id': 'passing-task',
                         'try_number': '6'},
              'managed_fields': [{'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'OpenAPI-Generator',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 18, 26, 33, tzinfo=tzlocal())},
                                 {'api_version': 'v1',
                                  'fields': None,
                                  'manager': 'kubelet',
                                  'operation': 'Update',
                                  'time': datetime.datetime(2021, 8, 13, 18, 27, tzinfo=tzlocal())}],
              'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
              'namespace': 'XXXXX',
              'owner_references': None,
              'resource_version': '90215538',
              'self_link': None,
              'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
 'spec': {'active_deadline_seconds': None,
          'affinity': {'node_affinity': None,
                       'pod_affinity': None,
                       'pod_anti_affinity': None},
          'automount_service_account_token': None,
          'containers': [{'args': ['import sys; sys.exit(1)'],
                          'command': ['python', '-c'],
                          'env': [{'name': 'DBT_DATABASE',
                                   'value': '***',
                                   'value_from': None}],
                          'env_from': None,
                          'image': 'python:3.6',
                          'image_pull_policy': 'IfNotPresent',
                          'lifecycle': None,
                          'liveness_probe': None,
                          'name': 'base',
                          'ports': None,
                          'readiness_probe': None,
                          'resources': {'limits': {'cpu': '1500m',
                                                   'memory': '512Mi'},
                                        'requests': {'cpu': '512m',
                                                     'memory': '256Mi'}},
                          'security_context': None,
                          'stdin': None,
                          'stdin_once': None,
                          'termination_message_path': '/dev/termination-log',
                          'termination_message_policy': 'File',
                          'tty': None,
                          'volume_devices': None,
                          'volume_mounts': [{'mount_path': '/var/run/***s/kubernetes.io/serviceaccount',
                                             'mount_propagation': None,
                                             'name': 'default-token-k59nb',
                                             'read_only': True,
                                             'sub_path': None,
                                             'sub_path_expr': None}],
                          'working_dir': None}],

If a pod ends with an failure the KubernetesPodOperator seems to check the state first and if the state does not equal State.SUCCESS it raises an AirflowException. This exception is catched and raised again a couple of statements below. However the masking (at least how my very trivial subclass implements it) only works for the second exception message.

KubernetesPodOperator

            else:
                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
            if final_state != State.SUCCESS:
                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
            return result
        except AirflowException as ex:
            raise AirflowException(f'Pod Launching failed: {ex}')

@potiuk
Copy link
Member

potiuk commented Aug 14, 2021

WHOA! What a finding!

Indeed - It looks like the secret_masker did not mask secret values in context exceptions. I just submitted PR #17618 fixing it - possibly just in time to be included in 2.1.3

potiuk added a commit to potiuk/airflow that referenced this issue Aug 14, 2021
Secret masking did not work in context exceptions. When
there was a try/except/raise sequence, the original
exceptions were not redacted.

Related: apache#17604
potiuk added a commit to potiuk/airflow that referenced this issue Aug 14, 2021
Secret masking did not work in implicit and
explicit context exceptions (see
https://www.python.org/dev/peps/pep-3134/)
When there was a `try/except/raise` sequence,
or `raise ... from` exception - the original
exceptions were not redacted.

Related: apache#17604
potiuk added a commit that referenced this issue Aug 14, 2021
* Fix redacting secrets in context exceptions.

Secret masking did not work in implicit and
explicit context exceptions (see
https://www.python.org/dev/peps/pep-3134/)
When there was a `try/except/raise` sequence,
or `raise ... from` exception - the original
exceptions were not redacted.

Related: #17604
potiuk added a commit that referenced this issue Aug 14, 2021
* Fix redacting secrets in context exceptions.

Secret masking did not work in implicit and
explicit context exceptions (see
https://www.python.org/dev/peps/pep-3134/)
When there was a `try/except/raise` sequence,
or `raise ... from` exception - the original
exceptions were not redacted.

Related: #17604

(cherry picked from commit 6df3ee7)
@dondaum
Copy link
Contributor Author

dondaum commented Aug 15, 2021

Great! Glad that I could help finding this issue.

So I had a look on your PR and also on the original PR that merged the 'masking feature'. My example above should actually also work without having to change anything in our customer operator - I mean beside your changes on the secret masking in the exceptions.

Reading through the docs whenever you either use a connection or variable and if the variable key names appear in the configuration varibles AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES it should be handled by default.

In our setup and in the example above I get the env variables from an Airflow variable thus I am wondering why that does not work - beside of course the expection issue that your took care of.

So I tested a bit and I found another strange behavior that I could not explain. It seems that variable names in the configuration that have only capital letters somehow are not handled correctly and therefore their values are not masked.

First test (masking does not work):

# Context

# Airflow variable name: testvar
# Airflow variable payload:
# {
# "DBT_WAREHOUSE": "secret"
# }

# Configuration
AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_WAREHOUSE"

# Custom operator
class KubernetesPodOperator_(KubernetesPodOperator):

    def __init__(
        self,
        *args,
        **kwargs,
    ):

        super().__init__(*args, **kwargs)

    def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]:
        try:
            sec = Variable.get(
                "testvar", deserialize_json=True
            )
        except KeyError:
            self.log.warning(
                "You have to add the variable in Airflow"
            )
            sec = None
        return sec

    def execute(self, context):
        dummy_var = self._get_dummy_secret_var()
        self.log.info(dummy_var)

        self.log.info(
            f"Custom KubernetesPodOperator runs K8S Task"
        )
        return super().execute(context)
...
# Output
[2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:67}} INFO - {'DBT_WAREHOUSE': 'secret'}
[2021-08-15 16:00:30,237] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task

Second test (masking is working):

# Context

# Airflow variable name: testvar
# Airflow variable payload:
# {
# "DBT_WAREHOUSE": "secret"
# }

# Configuration
AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "dbt_warehouse"

# Custom operator
class KubernetesPodOperator_(KubernetesPodOperator):

    def __init__(
        self,
        *args,
        **kwargs,
    ):

        super().__init__(*args, **kwargs)

    def _get_dummy_secret_var(self) -> List[k8s.V1EnvVar]:
        try:
            sec = Variable.get(
                "testvar", deserialize_json=True
            )
        except KeyError:
            self.log.warning(
                "You have to add the variable in Airflow"
            )
            sec = None
        return sec

    def execute(self, context):
        dummy_var = self._get_dummy_secret_var()
        self.log.info(dummy_var)

        self.log.info(
            f"Custom KubernetesPodOperator runs K8S Task"
        )
        return super().execute(context)
...
# Output
[2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:67}} INFO - {'DBT_WAREHOUSE': '***'}
[2021-08-15 16:04:18,864] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task

Thus the masing only works if you change the variable name in the Airflow config from 'DBT_WAREHOUSE' to 'dbt_warehouse'. As our variables key names all written in capital letters our previous masking attemps did not work.

Any clue why that happens?

@potiuk
Copy link
Member

potiuk commented Aug 15, 2021

This is a different thing I believe. I think this is a coincidence and you have "dbt_warehouse" (lowercase) set as an extra parameter in a connection or Airflow Variable..

@dondaum
Copy link
Contributor Author

dondaum commented Aug 16, 2021

Thank you for the quick respond.

I think I do not completely understand what you mean. Is it correct that you assume that there is another either Airflow variable or connection with the name "dbt_warehouse" (lowercase) as key?

If so, I tested it on a fresh local Airflow dev instance with a clean environment withouth any other variable than the above mentionend in the context section.

But let me test it with another key name that I have not used so far and let me list all Airflow variables and connection before.

I guess I am asking this because if the masking works as expected and with your latested changes on the exception this feature request, at least from my site, is not needed anymore.

So I run another test, I created a fresh local Airflow instance and listed all variables and connections:

airflow variables list -o table
key
=======
testvar

the variable structure is:

airflow variables get testvar
{
"THIS_IS_A_RANDOM_SENSITIVE_VAR": "SuperRandomString1"
}

and

airflow connections list -o table
No data found

I sticked to the same example above with:
AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "THIS_IS_A_RANDOM_SENSITIVE_VAR"

[2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:67}} INFO - {'THIS_IS_A_RANDOM_SENSITIVE_VAR': 'SuperRandomString1'}
[2021-08-16 06:09:20,814] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task
[2021-08-16 06:09:20,823] {{taskinstance.py:1501}} ERROR - Task failed with exception

Then I only changed the Airflow configuration variable AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES and changed the var name to all lowercase

AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "this_is_a_random_sensitive_var"

[2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:67}} INFO - {'THIS_IS_A_RANDOM_SENSITIVE_VAR': '***'}
[2021-08-16 06:14:19,899] {{dbt_k8s_operator.py:69}} INFO - Custom KubernetesPodOperator runs K8S Task
[2021-08-16 06:14:19,906] {{taskinstance.py:1501}} ERROR - Task failed with exception

I get the same behavior.

kaxil pushed a commit that referenced this issue Aug 17, 2021
* Fix redacting secrets in context exceptions.

Secret masking did not work in implicit and
explicit context exceptions (see
https://www.python.org/dev/peps/pep-3134/)
When there was a `try/except/raise` sequence,
or `raise ... from` exception - the original
exceptions were not redacted.

Related: #17604

(cherry picked from commit 6df3ee7)
jhtimmins pushed a commit that referenced this issue Aug 17, 2021
* Fix redacting secrets in context exceptions.

Secret masking did not work in implicit and
explicit context exceptions (see
https://www.python.org/dev/peps/pep-3134/)
When there was a `try/except/raise` sequence,
or `raise ... from` exception - the original
exceptions were not redacted.

Related: #17604

(cherry picked from commit 6df3ee7)
@dondaum
Copy link
Contributor Author

dondaum commented Sep 16, 2021

Anything I can do here to provide more context?

@eladkal
Copy link
Contributor

eladkal commented Oct 19, 2022

To my understanding the issue intially reported was resolved.
If there are followup issues please open a new bug report and refrence this one (we prefer to keep issues focused on a specific problem so we can keep track)

I'm closing this one as completed.

@eladkal eladkal closed this as completed Oct 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

4 participants