Skip to content

Commit

Permalink
added relationships to real af ops, small fixes, small ui fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Niv Sluzki committed May 3, 2020
1 parent 9a79c8f commit 06e6e80
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ def _add_rel(rel_map, t_id_1, t_id_2):
# set children/upstreams maps
upstreams_map = set()
parent_child_map = set()
parent_task_ids = set()

for task_run in run.task_runs:
task = task_run.task
for t_id in task.task_meta.children:
_add_rel(parent_child_map, task.task_id, t_id)
parent_task_ids.add(run.get_task_run_by_id(t_id).task_af_id)

task_dag = task.ctrl.task_dag
for upstream in task_dag.upstream:
Expand All @@ -149,6 +151,8 @@ def _add_rel(rel_map, t_id_1, t_id_2):
parent_child_map=parent_child_map,
upstreams_map=upstreams_map,
dynamic_task_run_update=dynamic_task_run_update,
parent_task_ids=parent_task_ids,
af_context=run.af_context,
)

def task_to_targets(self, task, targets):
Expand Down
26 changes: 15 additions & 11 deletions modules/dbnd/src/dbnd/api/tracking_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
)


class AirflowTaskContextSchema(ApiObjectSchema):
dag_id = fields.String()
execution_date = fields.String()
task_id = fields.String()
try_number = fields.Integer(allow_none=True)

@post_load
def make_run_info(self, data, **kwargs):
return AirflowTaskContext(**data)


class TaskRunsInfoSchema(ApiObjectSchema):
run_uid = fields.UUID()
root_run_uid = fields.UUID()
Expand All @@ -58,11 +69,13 @@ class TaskRunsInfoSchema(ApiObjectSchema):
TaskRunInfoSchema, many=True, exclude=("task_signature_source",)
)
upstreams_map = fields.List(fields.List(fields.UUID()))
parent_task_ids = fields.List(fields.Str())

dynamic_task_run_update = fields.Boolean()

targets = fields.Nested(TargetInfoSchema, many=True)
task_definitions = fields.Nested(TaskDefinitionInfoSchema, many=True)
af_context = fields.Nested(AirflowTaskContextSchema, allow_none=True)

@post_load
def make_run_info(self, data, **kwargs):
Expand Down Expand Up @@ -90,17 +103,6 @@ def asdict(self):
return attr.asdict(self, recurse=False)


class AirflowTaskContextSchema(ApiObjectSchema):
dag_id = fields.String()
execution_date = fields.String()
task_id = fields.String()
try_number = fields.Integer(allow_none=True)

@post_load
def make_run_info(self, data, **kwargs):
return AirflowTaskContext(**data)


class InitRunArgsSchema(ApiObjectSchema):
run_uid = fields.UUID()
root_run_uid = fields.UUID()
Expand Down Expand Up @@ -394,7 +396,9 @@ class TaskRunsInfo(object):

parent_child_map = attr.ib(default=None)
upstreams_map = attr.ib(default=None)
parent_task_ids = attr.ib(default=None)
dynamic_task_run_update = attr.ib(default=False)
af_context = attr.ib(default=None) # type: Optional[AirflowTaskContext]


@attr.s
Expand Down

0 comments on commit 06e6e80

Please sign in to comment.