Skip to content

Commit

Permalink
[DSPDC-1891] Flip load_incluster_config to true by default and extern…
Browse files Browse the repository at this point in the history
…alize (#27)

Why

The load_incluster_config param was flipped to False while I tested locally, and I failed to flip it back to True. This will break downstream k8s runners.

This PR

Flips back to True by default, and allows the config to be set by downstream clients to avoid this situation in the future.
  • Loading branch information
aherbst-broad committed Sep 3, 2021
1 parent 7c2c818 commit 5ffa523
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
12 changes: 8 additions & 4 deletions dagster_utils/resources/beam/k8s_beam_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from uuid import uuid4

import kubernetes
from dagster import DagsterLogManager, Field, IntSource, resource, StringSource, String, Noneable
from dagster import DagsterLogManager, Field, IntSource, resource, StringSource, String, Noneable, BoolSource
from dagster.core.execution.context.init import InitResourceContext
from dagster_k8s.client import DagsterKubernetesClient
from kubernetes.client.models.v1_job import V1Job
Expand All @@ -22,6 +22,7 @@ class K8sDataflowCloudConfig:
worker_machine_type: str
starting_workers: int
max_workers: int
load_incluster_config: bool

def subnetwork(self) -> Optional[str]:
if not self.subnet_name:
Expand Down Expand Up @@ -79,7 +80,8 @@ def run(
]

image_name = f"{self.image_name}:{self.image_version}" # {context.solid_config['version']}"
job = self.dispatch_k8s_job(image_name, job_name, args, command=command)
job = self.dispatch_k8s_job(image_name, job_name, args, command=command,
load_incluster_config=self.cloud_config.load_incluster_config)
self.logger.info("Dataflow job started")

client = DagsterKubernetesClient.production_client()
Expand All @@ -90,7 +92,7 @@ def dispatch_k8s_job(
image_name: str,
job_name_prefix: Optional[str],
args: List[str],
load_incluster_config: bool = False,
load_incluster_config: bool = True,
command: Optional[list[str]] = None
) -> V1Job:
# we will need to poll the pod/job status on creation
Expand Down Expand Up @@ -156,6 +158,7 @@ def dispatch_k8s_job(
"image_name": Field(StringSource),
"image_version": Field(StringSource),
"namespace": Field(StringSource),
"load_incluster_config": Field(BoolSource)
})
def k8s_dataflow_beam_runner(init_context: InitResourceContext) -> K8sDataflowBeamRunner:
cloud_config = K8sDataflowCloudConfig(
Expand All @@ -165,7 +168,8 @@ def k8s_dataflow_beam_runner(init_context: InitResourceContext) -> K8sDataflowBe
region=init_context.resource_config['region'],
worker_machine_type=init_context.resource_config['worker_machine_type'],
starting_workers=init_context.resource_config['starting_workers'],
max_workers=init_context.resource_config['max_workers']
max_workers=init_context.resource_config['max_workers'],
load_incluster_config=init_context.resource_config["load_incluster_config"]
)
return K8sDataflowBeamRunner(
cloud_config=cloud_config,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "broad_dagster_utils"
license = "BSD-3-Clause"
readme = "README.md"
repository = "https://github.com/broadinstitute/dagster-utils"
version = "0.6.2"
version = "0.6.3"

description = "Common utilities and objects for building Dagster pipelines"
authors = ["Monster Dev <monsterdev@broadinstitute.org>"]
Expand Down

0 comments on commit 5ffa523

Please sign in to comment.