Skip to content

Commit

Permalink
RFC: resolve k8s job and ops tag entirely using k8s_model_from_dict (#…
Browse files Browse the repository at this point in the history
…6205)

Summary:
This is intended to resolve the confusing patchwork state we are in where some k8s config keys (env_from) need to be specified using underscores, but others need to be specified using camelCase. Also allows us to use the full range of k8s config available when configuring a job, while still getting schema validation.

That said there are still some back-compat issues to sort out here. Curious for initial thoughts though.
  • Loading branch information
gibsondan committed Mar 4, 2022
1 parent 7ea0c61 commit c52b495
Show file tree
Hide file tree
Showing 15 changed files with 698 additions and 241 deletions.
Original file line number Diff line number Diff line change
@@ -1,68 +1,90 @@
---
title: Customizing Helm | Dagster
description: We go over common ways to customize your Dagster Helm deployment. This includes adding Kubernetes configuration at the job and op level, and configuring your Helm release to use external resources.
description: This section covers common ways to customize your Dagster Helm deployment.
---

# Customizing your Kubernetes Deployment

## Overview
This section covers common ways to customize your Dagster Helm deployment.

We go over common ways to customize your Dagster Helm deployment, including adding Kubernetes configuration at the job and op level, and configuring your Helm release to use external resources.
## Job or Op Kubernetes Configuration

## Walkthrough
The `dagster-k8s/config` tag allows users to pass custom configuration through to the Kubernetes Jobs and Pods created by Dagster during execution.

### Job or Op Kubernetes Configuration
`dagster-k8s/config` is a dictionary with the following keys:

The `dagster-k8s/config` allows users to pass custom configuration to the Kubernetes Job, Job metadata, JobSpec, PodSpec, and PodTemplateSpec metadata. We can specify this information in an op or job's tags.
- `container_config`: The Pod's [Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#container-v1-core).
- `pod_spec_config`: The Pod's [PodSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#podspec-v1-core).
- `pod_template_spec_metadata`: The Pod's [Metadata](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#objectmeta-v1-meta).
- `job_spec_config`: The Job's [JobSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#jobspec-v1-batch).
- `job_metadata`: The Job's [Metadata](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#objectmeta-v1-meta).

```python
@op(
tags = {
'dagster-k8s/config': {
'container_config': {
'resources': {
'requests': { 'cpu': '250m', 'memory': '64Mi' },
'limits': { 'cpu': '500m', 'memory': '2560Mi' },
}
},
'pod_template_spec_metadata': {
'annotations': { "cluster-autoscaler.kubernetes.io/safe-to-evict": "true"}
},
'pod_spec_config': {
'affinity': {
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
'key': 'beta.kubernetes.io/os', 'operator': 'In', 'values': ['windows', 'linux'],
}]
}]
}
}
}
},
The values for each of these keys is a dictionary with the YAML configuration for the underlying Kubernetes object. The Kubernetes object fields can be configured using either snake case (for example, `volume_mounts`) or camel case (`volumeMounts`).

If your instance is using the <PyObject module="dagster_k8s" object="K8sRunLauncher" /> or <PyObject module="dagster_celery_k8s" object="CeleryK8sRunLauncher" />, you can use the `dagster-k8s/config` tag on a Dagster job. For example:

```python file=/deploying/kubernetes/k8s_config_tag.py startafter=start_k8s_config_job endbefore=end_k8s_config_job
@job(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
"volume_mounts": [
{"name": "volume1", "mount_path": "foo/bar", "sub_path": "file.txt"}
],
},
"pod_template_spec_metadata": {
"annotations": {"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"}
},
"pod_spec_config": {
"volumes": [{"name": "volume1", "secret": {"secret_name": "volume_secret_name"}}],
"affinity": {
"node_affinity": {
"required_during_scheduling_ignored_during_execution": {
"node_selector_terms": [
{
"match_expressions": [
{
"key": "beta.kubernetes.io/os",
"operator": "In",
"values": ["windows", "linux"],
}
]
}
]
}
}
},
},
},
},
},
)
def my_op(context):
context.log.info('running')
def my_job():
my_op()
```

@job(
tags = {
'dagster-k8s/config': {
'container_config': {
'resources': {
'requests': { 'cpu': '200m', 'memory': '32Mi' },
If your Dagster job is configured with an executor that runs each op in its own pod, like the <PyObject module="dagster_k8s" object="k8s_job_executor" /> or <PyObject module="dagster_celery_k8s" object="celery_k8s_job_executor" />, you can also use the `dagster-k8s/config` tag on a Dagster op to control the Kubernetes configuration for that specific op. For example:

```python file=/deploying/kubernetes/k8s_config_tag.py startafter=start_k8s_config_op endbefore=end_k8s_config_op
@op(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "200m", "memory": "32Mi"},
}
},
}
},
}
}
)
def my_job():
my_op()
def my_op(context):
context.log.info("running")
```

### Configuring an External Database
## Configuring an External Database

In a real deployment, users will likely want to set up an external PostgreSQL database and configure the `postgresql` section of `values.yaml`.

Expand All @@ -88,13 +110,13 @@ global:
generatePostgresqlPasswordSecret: false
```

### Security
## Security

Users will likely want to permission a ServiceAccount bound to a properly scoped Role to launch Jobs and create other Kubernetes resources.

Users will likely want to use [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) for managing secure information such as database logins.

### Separately Deploying Dagster infrastructure and User Code
### Separately Deploying Dagster Infrastructure and User Code

It may be desirable to manage two Helm releases for your Dagster deployment: one release for the Dagster infrastructure, which consists of Dagit and the Daemon, and another release for your User Code, which contains the definitions of your pipelines written in Dagster. This way, changes to User Code can be decoupled from upgrades to core Dagster infrastructure.

Expand Down Expand Up @@ -128,7 +150,7 @@ Finally, the `dagster-user-deployments` subchart can now be managed in its own r

helm upgrade --install user-code dagster/dagster-user-deployments -f /path/to/values.yaml

### Kubernetes Job and Pod TTL management
## Kubernetes Job and Pod TTL management

If you use a Kubernetes distribution that supports the [TTL Controller](https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished/#ttl-controller), then `Completed` and `Failed` [Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/) (and their associated [Pods](https://kubernetes.io/docs/concepts/workloads/pods/)) will be deleted after 1 day. The TTL value can be modified in your job tags:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dagster import job, op


# fmt: off
# start_k8s_config_op
@op(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "200m", "memory": "32Mi"},
}
},
}
}
)
def my_op(context):
context.log.info("running")
# end_k8s_config_op

# start_k8s_config_job
@job(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
"volume_mounts": [
{"name": "volume1", "mount_path": "foo/bar", "sub_path": "file.txt"}
],
},
"pod_template_spec_metadata": {
"annotations": {"cluster-autoscaler.kubernetes.io/safe-to-evict": "true"}
},
"pod_spec_config": {
"volumes": [{"name": "volume1", "secret": {"secret_name": "volume_secret_name"}}],
"affinity": {
"node_affinity": {
"required_during_scheduling_ignored_during_execution": {
"node_selector_terms": [
{
"match_expressions": [
{
"key": "beta.kubernetes.io/os",
"operator": "In",
"values": ["windows", "linux"],
}
]
}
]
}
}
},
},
},
},
)
def my_job():
my_op()
# end_k8s_config_job
# fmt: on
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

@op
def my_op():
print("foo")
print("foo") # pylint: disable=print-call


# fmt: off
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dagster_k8s import DagsterK8sJobConfig, construct_dagster_k8s_job
from dagster_k8s.job import get_user_defined_k8s_config

from docs_snippets.deploying.kubernetes.k8s_config_tag import my_job, my_op


def test_k8s_tag_job():
assert my_job
user_defined_cfg = get_user_defined_k8s_config(my_job.tags)

cfg = DagsterK8sJobConfig(
job_image="test/foo:latest",
dagster_home="/opt/dagster/dagster_home",
instance_config_map="test",
)
job = construct_dagster_k8s_job(cfg, [], "job123", user_defined_k8s_config=user_defined_cfg)

assert job.to_dict()["spec"]["template"]["spec"]["containers"][0]["resources"] == {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
}


def test_k8s_tag_op():
assert my_op
user_defined_cfg = get_user_defined_k8s_config(my_op.tags)

cfg = DagsterK8sJobConfig(
job_image="test/foo:latest",
dagster_home="/opt/dagster/dagster_home",
instance_config_map="test",
)
job = construct_dagster_k8s_job(cfg, [], "job123", user_defined_k8s_config=user_defined_cfg)

assert job.to_dict()["spec"]["template"]["spec"]["containers"][0]["resources"] == {
"requests": {"cpu": "200m", "memory": "32Mi"},
"limits": None,
}
2 changes: 1 addition & 1 deletion examples/docs_snippets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"Operating System :: OS Independent",
],
packages=find_packages(exclude=["test"]),
install_requires=["dagster"],
install_requires=["dagster", "dagster-k8s"],
extras_require={
"full": [
"seaborn",
Expand Down
1 change: 1 addition & 0 deletions examples/docs_snippets/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ deps =
-e ../../python_modules/libraries/dagster-slack
-e ../../python_modules/libraries/dagstermill
-e ../../python_modules/libraries/dagster-dbt
-e ../../python_modules/libraries/dagster-k8s
-e ../../python_modules/dagit
-e .[full]
usedevelop = true
Expand Down
12 changes: 9 additions & 3 deletions helm/dagster/schema/schema_tests/test_user_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List

import pytest
from dagster_k8s.models import k8s_model_from_dict
from dagster_k8s.models import k8s_model_from_dict, k8s_snake_case_dict
from kubernetes import client as k8s_client
from kubernetes.client import models
from schema.charts.dagster.subschema.global_ import Global
Expand Down Expand Up @@ -557,13 +557,19 @@ def test_user_deployment_volumes(template: HelmTemplate):

deployed_volume_mounts = user_deployments[0].spec.template.spec.containers[0].volume_mounts
assert deployed_volume_mounts == [
k8s_model_from_dict(k8s_client.models.V1VolumeMount, volume_mount)
k8s_model_from_dict(
k8s_client.models.V1VolumeMount,
k8s_snake_case_dict(k8s_client.models.V1VolumeMount, volume_mount),
)
for volume_mount in volume_mounts
]

deployed_volumes = user_deployments[0].spec.template.spec.volumes
assert deployed_volumes == [
k8s_model_from_dict(k8s_client.models.V1Volume, volume) for volume in volumes
k8s_model_from_dict(
k8s_client.models.V1Volume, k8s_snake_case_dict(k8s_client.models.V1Volume, volume)
)
for volume in volumes
]

assert image_name == deployment.image.repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def test_user_defined_k8s_config_in_run_tags(kubeconfig_file):
container = kwargs["body"].spec.template.spec.containers[0]

job_resources = container.resources
assert job_resources == expected_resources
assert job_resources.to_dict() == expected_resources

labels = kwargs["body"].spec.template.metadata.labels
assert labels["foo_label_key"] == "bar_label_value"
Expand Down

1 comment on commit c52b495

@vercel
Copy link

@vercel vercel bot commented on c52b495 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.