Skip to content

Commit

Permalink
Add resources to K8sContainerContext (#7619)
Browse files Browse the repository at this point in the history
Summary: The next step after this would be to add these as fields on the instance/helm chart and in the  executor so that they can be configured on the instance and per-op, but this gives us what we need as a foundation to vary them per-location and have that config threaded all the way through the launched k8s jobs for that location.
  • Loading branch information
gibsondan committed Apr 28, 2022
1 parent 00d8e1c commit 4cb262c
Show file tree
Hide file tree
Showing 18 changed files with 261 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ dagster-user-deployments:
- name: test-volume
mountPath: /opt/dagster/test_folder
subPath: test_file.yaml
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
includeConfigInLaunchedRuns:
enabled: true
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,12 @@ dagster-user-deployments:
- name: test-volume
mountPath: /opt/dagster/test_folder
subPath: test_file.yaml
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
includeConfigInLaunchedRuns:
enabled: true
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ DAGSTER_K8S_PIPELINE_RUN_ENV_CONFIGMAP: "{{ template "dagster.fullname" . }}-pip
{{- if .labels }}
labels: {{- .labels | toYaml | nindent 6 }}
{{- end }}
{{- if .resources }}
resources: {{- .resources | toYaml | nindent 6 }}
{{- end }}
namespace: {{ $.Release.Namespace }}
service_account_name: {{ include "dagsterUserDeployments.serviceAccountName" $ }}
{{- end }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class K8sRunLauncherConfig(BaseModel):
volumes: List[kubernetes.Volume]
labels: Optional[Dict[str, str]]
failPodOnRunFailure: Optional[bool]
resources: Optional[kubernetes.ResourceRequirements]

class Config:
extra = Extra.forbid
Expand Down
5 changes: 5 additions & 0 deletions helm/dagster/schema/schema/charts/utils/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,8 @@ class Config:
class Volume(BaseModel):
class Config:
schema_extra = {"$ref": create_definition_ref("io.k8s.api.core.v1.Volume")}


class ResourceRequirements(BaseModel):
class Config:
schema_extra = {"$ref": create_definition_ref("io.k8s.api.core.v1.ResourceRequirements")}
30 changes: 30 additions & 0 deletions helm/dagster/schema/schema_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ def test_k8s_run_launcher_fail_pod_on_run_failure(template: HelmTemplate):
assert run_launcher_config["config"]["fail_pod_on_run_failure"]


def test_k8s_run_launcher_resources(template: HelmTemplate):
resources = {
"requests": {"memory": "64Mi", "cpu": "250m"},
"limits": {"memory": "128Mi", "cpu": "500m"},
}

helm_values = DagsterHelmValues.construct(
runLauncher=RunLauncher.construct(
type=RunLauncherType.K8S,
config=RunLauncherConfig.construct(
k8sRunLauncher=K8sRunLauncherConfig.construct(
imagePullPolicy="Always",
loadInclusterConfig=True,
envConfigMaps=[],
envSecrets=[],
envVars=[],
volumeMounts=[],
volumes=[],
resources=resources,
)
),
)
)
configmaps = template.render(helm_values)
instance = yaml.full_load(configmaps[0].data["dagster.yaml"])
run_launcher_config = instance["run_launcher"]

assert run_launcher_config["config"]["resources"] == resources


def test_celery_k8s_run_launcher_config(template: HelmTemplate):
image = {"repository": "test_repo", "tag": "test_tag", "pullPolicy": "Always"}

Expand Down
48 changes: 48 additions & 0 deletions helm/dagster/schema/schema_tests/test_user_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,54 @@ def test_user_deployment_labels(template: HelmTemplate, include_config_in_launch
_assert_no_container_context(user_deployments[0])


@pytest.mark.parametrize("include_config_in_launched_runs", [False, True])
def test_user_deployment_resources(template: HelmTemplate, include_config_in_launched_runs: bool):
name = "foo"

resources = {
"requests": {"memory": "64Mi", "cpu": "250m"},
"limits": {"memory": "128Mi", "cpu": "500m"},
}

deployment = UserDeployment.construct(
name=name,
image={"repository": f"repo/{name}", "tag": "tag1", "pullPolicy": "Always"},
dagsterApiGrpcArgs=["-m", name],
port=3030,
resources=resources,
includeConfigInLaunchedRuns={"enabled": include_config_in_launched_runs},
)

helm_values = DagsterHelmValues.construct(
dagsterUserDeployments=UserDeployments(
enabled=True,
enableSubchart=True,
deployments=[deployment],
)
)

user_deployments = template.render(helm_values)

assert len(user_deployments) == 1

if include_config_in_launched_runs:
container_context = user_deployments[0].spec.template.spec.containers[0].env[2]
assert container_context.name == "DAGSTER_CLI_API_GRPC_CONTAINER_CONTEXT"
assert json.loads(container_context.value) == {
"k8s": {
"image_pull_policy": "Always",
"env_config_maps": [
"release-name-dagster-user-deployments-foo-user-env",
],
"resources": resources,
"namespace": "default",
"service_account_name": "release-name-dagster-user-deployments-user-deployments",
}
}
else:
_assert_no_container_context(user_deployments[0])


def test_subchart_image_pull_secrets(subchart_template: HelmTemplate):
image_pull_secrets = [{"name": "super-duper-secret"}]
deployment_values = DagsterUserDeploymentsHelmValues.construct(
Expand Down
4 changes: 4 additions & 0 deletions helm/dagster/templates/helpers/instance/_run-launcher.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ config:
labels: {{- $k8sRunLauncherConfig.labels | toYaml | nindent 4 }}
{{- end }}

{{- if $k8sRunLauncherConfig.resources }}
resources: {{- $k8sRunLauncherConfig.resources | toYaml | nindent 4 }}
{{- end }}

{{- if $k8sRunLauncherConfig.failPodOnRunFailure }}
fail_pod_on_run_failure: true
{{- end }}
Expand Down
17 changes: 17 additions & 0 deletions helm/dagster/values.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions helm/dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,19 @@ runLauncher:
# my_label_key: my_label_value
labels: {}

# Default compute resource requirements for the container in the Job's Pod. See:
# https://kubernetes.io/docs/concepts/configuration/manage-resources-containers
#
# Example:
# resources:
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
resources: {}

# Whether the launched Kubernetes Jobs and Pods should fail if the Dagster run fails.
failPodOnRunFailure: false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class K8sContainerContext(
("volumes", List[Dict[str, Any]]),
("labels", Dict[str, str]),
("namespace", Optional[str]),
("resources", Dict[str, Any]),
],
)
):
Expand All @@ -53,6 +54,7 @@ def __new__(
volumes: Optional[List[Dict[str, Any]]] = None,
labels: Optional[Dict[str, str]] = None,
namespace: Optional[str] = None,
resources: Optional[Dict[str, Any]] = None,
):
return super(K8sContainerContext, cls).__new__(
cls,
Expand All @@ -72,6 +74,7 @@ def __new__(
],
labels=check.opt_dict_param(labels, "labels"),
namespace=check.opt_str_param(namespace, "namespace"),
resources=check.opt_dict_param(resources, "resources"),
)

def merge(self, other: "K8sContainerContext") -> "K8sContainerContext":
Expand All @@ -94,6 +97,7 @@ def merge(self, other: "K8sContainerContext") -> "K8sContainerContext":
volumes=_dedupe_list(other.volumes + self.volumes),
labels=merge_dicts(other.labels, self.labels),
namespace=other.namespace if other.namespace else self.namespace,
resources=other.resources if other.resources else self.resources,
)

@staticmethod
Expand All @@ -115,6 +119,7 @@ def create_for_run(
volumes=run_launcher.volumes,
labels=run_launcher.labels,
namespace=run_launcher.job_namespace,
resources=run_launcher.resources,
)
)

Expand Down Expand Up @@ -162,6 +167,7 @@ def create_from_config(run_container_context) -> "K8sContainerContext":
volumes=processed_context_value.get("volumes"),
labels=processed_context_value.get("labels"),
namespace=processed_context_value.get("namespace"),
resources=processed_context_value.get("resources"),
)

def get_k8s_job_config(self, job_image, run_launcher) -> DagsterK8sJobConfig:
Expand All @@ -179,4 +185,5 @@ def get_k8s_job_config(self, job_image, run_launcher) -> DagsterK8sJobConfig:
volume_mounts=self.volume_mounts,
volumes=self.volumes,
labels=self.labels,
resources=self.resources,
)
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def k8s_job_executor(init_context: InitExecutorContext) -> Executor:
volumes=exc_cfg.get("volumes"),
labels=exc_cfg.get("labels"),
namespace=exc_cfg.get("job_namespace"),
resources=exc_cfg.get("resources"),
)

return StepDelegatingExecutor(
Expand Down
22 changes: 21 additions & 1 deletion python_modules/libraries/dagster-k8s/dagster_k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class DagsterK8sJobConfig(
"_K8sJobTaskConfig",
"job_image dagster_home image_pull_policy image_pull_secrets service_account_name "
"instance_config_map postgres_password_secret env_config_maps env_secrets env_vars "
"volume_mounts volumes labels",
"volume_mounts volumes labels resources",
)
):
"""Configuration parameters for launching Dagster Jobs on Kubernetes.
Expand Down Expand Up @@ -255,6 +255,8 @@ class DagsterK8sJobConfig(
https://v1-18.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#volume-v1-core
labels (Optional[Dict[str, str]]): Additional labels that should be included in the Job's Pod. See:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels
resources (Optional[Dict[str, Any]]) Compute resource requirements for the container. See:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
"""

def __new__(
Expand All @@ -272,6 +274,7 @@ def __new__(
volume_mounts=None,
volumes=None,
labels=None,
resources=None,
):
return super(DagsterK8sJobConfig, cls).__new__(
cls,
Expand Down Expand Up @@ -300,6 +303,7 @@ def __new__(
for volume in check.opt_list_param(volumes, "volumes")
],
labels=check.opt_dict_param(labels, "labels", key_type=str, value_type=str),
resources=check.opt_dict_param(resources, "resources", key_type=str),
)

@classmethod
Expand Down Expand Up @@ -456,6 +460,17 @@ def config_type_container(cls):
description="Labels to apply to all created pods. See: "
"https://kubernetes.io/docs/concepts/overview/working-with-objects/labels",
),
"resources": Field(
Noneable(
{
"limits": Field(dict, is_required=False),
"requests": Field(dict, is_required=False),
}
),
is_required=False,
description="Compute resource requirements for the container. See: "
"https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/",
),
}

@classmethod
Expand Down Expand Up @@ -607,6 +622,8 @@ def construct_dagster_k8s_job(

user_defined_k8s_volume_mounts = container_config.pop("volume_mounts", [])

user_defined_resources = container_config.pop("resources", {})

volume_mounts = (
[
{
Expand All @@ -621,6 +638,8 @@ def construct_dagster_k8s_job(
+ user_defined_k8s_volume_mounts
)

resources = user_defined_resources if user_defined_resources else job_config.resources

container_config = merge_dicts(
container_config,
{
Expand All @@ -631,6 +650,7 @@ def construct_dagster_k8s_job(
"env": env + job_config.env + additional_k8s_env_vars,
"env_from": job_config.env_from_sources + additional_k8s_env_from,
"volume_mounts": volume_mounts,
"resources": resources,
},
)

Expand Down
6 changes: 6 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
volumes=None,
labels=None,
fail_pod_on_run_failure=None,
resources=None,
):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self.job_namespace = check.str_param(job_namespace, "job_namespace")
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(
self._fail_pod_on_run_failure = check.opt_bool_param(
fail_pod_on_run_failure, "fail_pod_on_run_failure"
)
self._resources = check.opt_dict_param(resources, "resources")

super().__init__()

Expand Down Expand Up @@ -144,6 +146,10 @@ def volume_mounts(self):
def volumes(self):
return self._volumes

@property
def resources(self):
return self._resources

@property
def env_vars(self):
return self._env_vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ def kubeconfig_file(tmp_path):
return str(config_path)


LAUNCHER_RESOURCES = {
"requests": {"cpu": "128m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "1000Mi"},
}


@pytest.fixture
def k8s_run_launcher_instance(kubeconfig_file): # pylint: disable=redefined-outer-name
with environ({"BAR_TEST": "bar"}):
Expand All @@ -46,6 +52,7 @@ def k8s_run_launcher_instance(kubeconfig_file): # pylint: disable=redefined-out
"load_incluster_config": False,
"kubeconfig_file": kubeconfig_file,
"env_vars": ["BAR_TEST"],
"resources": LAUNCHER_RESOURCES,
},
},
}
Expand Down

1 comment on commit 4cb262c

@vercel
Copy link

@vercel vercel bot commented on 4cb262c Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.