-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make namespace optional for KPO #27116
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of using k
as a variable name, but I know you did it to maintain consistency. Other than that, looks like a nice reasonable addition. 👍
102bea1
to
846646f
Compare
self.create_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.create_pod") | ||
self.await_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start") | ||
self.await_pod_completion_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion") | ||
self.hook_patch = mock.patch(HOOK_CLASS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we remove the global hook patch because we rely on it for namespace test
if not isinstance(self.hook_mock.return_value.is_in_cluster, bool): | ||
self.hook_mock.return_value.is_in_cluster = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing this, i just read the value from the actual hook (since we're no longer patching it globally)
operator.execute(context=context) | ||
return self.await_start_mock.call_args[1]['pod'] | ||
|
||
def sanitize_for_serialization(self, obj): | ||
return ApiClient().sanitize_for_serialization(obj) | ||
|
||
def test_config_path(self): | ||
@patch(HOOK_CLASS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you see this added in many places because we're not patching it globally
@@ -169,7 +172,7 @@ def test_security_context(self): | |||
in_cluster=False, | |||
do_xcom_push=False, | |||
) | |||
pod = self.run_pod(k) | |||
pod = k.build_pod_request_obj(create_context(k)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we're just building the pod object we don't need the full "run" method
@@ -574,6 +581,11 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: | |||
if self.random_name_suffix: | |||
pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name) | |||
|
|||
if not pod.metadata.namespace: | |||
hook_namespace = self.hook.conn_extras.get('extra__kubernetes__namespace') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a get_namespace
method on the hook but it's problematic. i may resolve that in future PR and then can update this. i'll resolve that and update this in a followup PR
@@ -123,20 +123,18 @@ def test_config_path(self, hook_mock): | |||
labels={"foo": "bar"}, | |||
name="test", | |||
task_id="task", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed so much boilerplate. 😍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now if only i can get the tests to pass...
53a1f5a
to
230a255
Compare
@@ -35,7 +35,9 @@ Previously KubernetesPodOperator considered some settings from the Airflow confi | |||
Features | |||
~~~~~~~~ | |||
|
|||
Previously, ``name`` was a required argument for KubernetesPodOperator (when also not supplying pod template or full pod spec). Now, if ``name`` is not supplied, ``task_id`` will be used. | |||
* Previously, ``name`` was a required argument for KubernetesPodOperator (when also not supplying pod template or full pod spec). Now, if ``name`` is not supplied, ``task_id`` will be used. | |||
* KubernetsPodOperator argument ``namespace`` is now optional. If not supplied, we'll check the airflow conn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we document this priority?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
192b65e
to
dd22bea
Compare
In KPO namespace is currently required, either through pod template file, airflow connection, or KPO arg. If we're in the in_cluster scenario though, we should be able to derive the current namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace. This seems like a reasonable thing to do as a default.
1395e13
to
d0bb221
Compare
In KPO, currently, if you do not provide namespace as a KPO arg (and it's not otherwise specified through pod template or full pod spec) the task will fail when trying to create the pod, because the kube client does not fill it in for you like e.g. kubectl does.
This PR makes it optional.
If it's not specified through KPO arg, or full pod spec, or pod template, then first we'll check the hook to see if you've configured a namespace there. And if that is unspecified, if we're in a cluster already, we'll check /var/run/secrets/kubernetes.io/serviceaccount/namespace for the value.