Skip to content

Commit

Permalink
[AIRFLOW-2770] kubernetes: add support for dag folder in the docker i…
Browse files Browse the repository at this point in the history
…mage (#3683)
  • Loading branch information
yeluolei authored and Tao Feng committed Dec 12, 2018
1 parent 4c1fec1 commit e9a09d4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 23 deletions.
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,10 @@ namespace = default
# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file)
airflow_configmap =

# For docker image already contains DAGs, this is set to `True`, and the worker will search for dags in dags_folder,
# otherwise use git sync or dags volumn chaim to mount DAGs
dags_in_docker = FALSE

# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
dags_volume_subpath =

Expand Down
10 changes: 8 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def __init__(self):
self.kubernetes_section, 'worker_service_account_name')
self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')

# NOTE: user can build the dags into the docker image directly,
# this will set to True if so
self.dags_in_docker = conf.get(self.kubernetes_section, 'dags_in_docker')

# NOTE: `git_repo` and `git_branch` must be specified together as a pair
# The http URL of the git repository to clone from
self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
Expand Down Expand Up @@ -204,10 +208,12 @@ def __init__(self):
self._validate()

def _validate(self):
if not self.dags_volume_claim and (not self.git_repo or not self.git_branch):
if not self.dags_volume_claim and not self.dags_in_docker \
and (not self.git_repo or not self.git_branch):
raise AirflowConfigException(
'In kubernetes mode the following must be set in the `kubernetes` '
'config section: `dags_volume_claim` or `git_repo and git_branch`')
'config section: `dags_volume_claim` or `git_repo and git_branch` '
'or `dags_in_docker`')


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
Expand Down
47 changes: 26 additions & 21 deletions airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, kube_config):
def _get_init_containers(self, volume_mounts):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
# If we're using volume claims to mount the dags, no init container is needed
if self.kube_config.dags_volume_claim:
if self.kube_config.dags_volume_claim or self.kube_config.dags_in_docker:
return []

# Otherwise, define a git-sync init container
Expand Down Expand Up @@ -128,32 +128,19 @@ def _construct_volume(name, claim):
return volume

volumes = [
_construct_volume(
dags_volume_name,
self.kube_config.dags_volume_claim
),
_construct_volume(
logs_volume_name,
self.kube_config.logs_volume_claim
)
]

dag_volume_mount_path = ""

if self.kube_config.dags_volume_claim:
dag_volume_mount_path = self.worker_airflow_dags
else:
dag_volume_mount_path = os.path.join(
self.worker_airflow_dags,
self.kube_config.git_subpath
if not self.kube_config.dags_in_docker:
volumes.append(
_construct_volume(
dags_volume_name,
self.kube_config.dags_volume_claim
)
)
dags_volume_mount = {
'name': dags_volume_name,
'mountPath': dag_volume_mount_path,
'readOnly': True,
}
if self.kube_config.dags_volume_subpath:
dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath

logs_volume_mount = {
'name': logs_volume_name,
Expand All @@ -163,10 +150,28 @@ def _construct_volume(name, claim):
logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath

volume_mounts = [
dags_volume_mount,
logs_volume_mount
]

if not self.kube_config.dags_in_docker:
dag_volume_mount_path = ""

if self.kube_config.dags_volume_claim:
dag_volume_mount_path = self.worker_airflow_dags
else:
dag_volume_mount_path = os.path.join(
self.worker_airflow_dags,
self.kube_config.git_subpath
)
dags_volume_mount = {
'name': dags_volume_name,
'mountPath': dag_volume_mount_path,
'readOnly': True,
}
if self.kube_config.dags_volume_subpath:
dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath
volume_mounts.append(dags_volume_mount)

# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
Expand Down
1 change: 1 addition & 0 deletions scripts/ci/kubernetes/kube/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ data:
worker_container_image_pull_policy = IfNotPresent
worker_dags_folder = /tmp/dags
delete_worker_pods = True
dags_in_docker = False
git_repo = https://github.com/apache/incubator-airflow.git
git_branch = master
git_subpath = airflow/example_dags/
Expand Down

0 comments on commit e9a09d4

Please sign in to comment.