Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redacting the sensitive env variables in env_vars for KPO #39003

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

amoghrajesh
Copy link
Contributor

env_vars in the KPO can have secret values sometimes and this will be blindly rendered by the rendered template.

To fix this, I am passing the env_vars through the redact filter in secrets masker so that it is hidden from the rendered template as well as from the task logs.

The example DAG i tried and the results are here:

from pendulum import datetime, duration
from airflow import DAG
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import V1EnvVar


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2022, 1, 1),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": duration(minutes=5),
}
with DAG(
    dag_id="example_kubernetes_pod", schedule="@once", default_args=default_args
) as dag:
    t1 = KubernetesPodOperator(
        namespace="default",
        image="hello-world",
        name="airflow-test-pod",
        task_id="task-1",
        config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
        get_logs=True,
        env_vars=[
            V1EnvVar(
                name="password",
                value="mypassword",
            ),
            V1EnvVar(
                name="normal",
                value="normalstuff",
            ),
            V1EnvVar(
                name="secret",
                value="ok",
            ),
            V1EnvVar(
                name="thisisok",
                value="password",
            ),
        ],
    )

    t2 = KubernetesPodOperator(
        namespace="default",
        image="hello-world",
        name="airflow-test-pod",
        task_id="task-2",
        config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
        get_logs=True,
        env_vars= {
            "password": "password1",
            "good": "thisisgood",
            "secret": "normal_stuff",
            "normal": "secret"
        }
    )

    [t1, t2]

image

image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@eladkal
Copy link
Contributor

eladkal commented Apr 14, 2024

It reminds me about #28086
cc @RoyNoymanW

Comment on lines +338 to +339
mask_secret(self.env_vars)
self.env_vars = redact(self.env_vars)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test it to make sure values are masked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have added the testing results in the PR description. @eladkal

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test cases are needed for this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looking at it.

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I override the KPO to create a secret object and mount it to the pod, then I use the KPO callbacks to make the pod its owner (for auto-cleanup). I think we should resolve the issue @eladkal mentioned, but there is no harm in protecting environment vars, but we need to add some tests for it.

@amoghrajesh
Copy link
Contributor Author

@hussein-awala yea some test coverage would be nice. Just wanted to understand how the portion of interacting with a DB can be tested here. I was trying something along these lines:

    @pytest.mark.parametrize(
        "input",
        [
            [
                V1EnvVar(
                    name="password",
                    value="mypassword",
                ),
                V1EnvVar(
                    name="normal",
                    value="normalstuff",
                ),
                V1EnvVar(
                    name="secret",
                    value="ok",
                ),
                V1EnvVar(
                    name="thisisok",
                    value="password",
                ),
            ],
            {
                "password": "password1",
                "good": "thisisgood",
                "secret": "normal_stuff",
                "normal": "secret"
            }
        ],
    )
    def test_env_vars_redaction(self, input):
        k = KubernetesPodOperator(
            task_id="task-1",
            env_vars=input,
        )

        ctx = create_context(k, persist_to_db=True)
        pod = k.build_pod_request_obj(ctx)
        print(pod)

But I constantly get this:

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x115594550>
cursor = <sqlite3.Cursor object at 0x12545f030>
statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, ... next_dagrun_create_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
parameters = ('dag', None, 0, 0, 0, None, ...)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x1253d6d90>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlite3.OperationalError: table dag has no column named dag_display_name

The pod object also contains the un protected env_vars, so we need a DB interaction for sure. Any suggestions?

@amoghrajesh
Copy link
Contributor Author

@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.

@potiuk
Copy link
Member

potiuk commented Apr 18, 2024

@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.

I think your problems were with not recreated DB -> the error indicates you need to reset db with latest changes - this column has been added recently.

@amoghrajesh
Copy link
Contributor Author

@ashb @potiuk @eladkal Need some help with fixing the unit tests here on this change also want to ask what kind of testing needs to be performed here to mark this task complete.

So I am trying to write a UT here which calls KPO init and then checks the DB for the RenderedTaskInstanceFields table and validates the redaction. Didn't get too far with it, so far I have this:

    @provide_session
    @pytest.mark.parametrize(
        "input",
        [
            [
                V1EnvVar(
                    name="password",
                    value="mypassword",
                ),
                V1EnvVar(
                    name="normal",
                    value="normalstuff",
                ),
                V1EnvVar(
                    name="secret",
                    value="ok",
                ),
                V1EnvVar(
                    name="thisisok",
                    value="password",
                ),
            ],
            {
                "password": "password1",
                "good": "thisisgood",
                "secret": "normal_stuff",
                "normal": "secret"
            }
        ],
    )
    def test_env_vars_redaction(self, dag_maker, input, session=None):
        from airflow.models.renderedtifields import RenderedTaskInstanceFields
        # k = KubernetesPodOperator(
        #     task_id="task-1",
        #     env_vars=input,
        # )
        #
        # ctx = create_context(k, persist_to_db=True)
        #
        # pod = k.build_pod_request_obj(ctx)

        # session = Session()
        # all_rows = session.query(RenderedTaskInstanceFields).all()
        #
        # print(all_rows)

        with dag_maker("test-dag", session=session) as dag:
            task = KubernetesPodOperator(
                task_id="task-1",
                env_vars=input,
            )
        ti = dag_maker.create_dagrun().task_instances[0]
        ti.task = task

        session.add(RenderedTaskInstanceFields(ti))
        session.flush()

        y = ti.get_rendered_template_fields(session=session)
        print(y)

Basically i want to validate this:

{"cmds": [], "image": "hello-world", "labels": {}, "volumes": [], "env_from": [], "env_vars": [{"name": "password", "value": "***", "value_from": null}, {"name": "normal", "value": "normalstuff", "value_from": null}, {"name": "secret", "value": "***", "value_from": null}, {"name": "thisisok", "value": "password", "value_from": null}], "arguments": [], "namespace": "default", "annotations": {}, "config_file": "~/Downloads/ECS\\ Kubeconfigs/shared-rke-01", "volume_mounts": [], "cluster_context": null, "pod_template_dict": null, "pod_template_file": null, "container_resources": null}

Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants