Skip to content

Commit

Permalink
CR Fixes #1
Browse files Browse the repository at this point in the history
  • Loading branch information
Niv Sluzki committed May 4, 2020
1 parent 06e6e80 commit 68638ab
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,9 @@ def get_airflow_tracking_manager(airflow_task_context):
class AirflowTrackingManager(object):
def __init__(self, af_context):
# type: (AirflowTaskContext) -> None
# self.run_uid = get_job_run_uid(
# dag_id=af_context.root_dag_id, execution_date=af_context.execution_date
# )
self.dag_id = af_context.dag_id
# self.run_uid = get_uuid()
# # this is the real operator uid, we need to connect to it with our "tracked" task,
# # so the moment monitor is on -> we can sync
af_runtime_op_task_id = af_context.task_id
# self.af_operator_sync__task_run_uid = get_task_run_uid(
# self.run_uid, af_context.dag_id, af_runtime_op_task_id
# )

# 1. create proper DatabandContext so we can create other objects
set_tracking_config_overide(
use_dbnd_log=override_airflow_log_system_for_tracking()
Expand All @@ -296,29 +288,6 @@ def __init__(self, af_context):
task_version="%s:%s"
% (af_runtime_op_task_id, af_context.execution_date),
)

# this is the real operator uid, we need to connect to it with our "tracked" task,
# so the moment monitor is on -> we can sync
# af_db_op_task_run_uid = get_task_run_uid(
# self.run_uid, af_context.dag_id, af_runtime_op_task_id
# )
# af_runtime_op.task_meta.extra_parents_task_run_uids.add(
# af_db_op_task_run_uid
# )
# af_runtime_op.ctrl.force_task_run_uid = TaskRunUidGen_TaskAfId(
# af_context.dag_id
# )

# self.af_operator_runtime__task = af_runtime_op
# AIRFLOW DAG RUNTIME
# self.af_dag_runtime__task = AirflowDagRuntimeTask(
# task_name=task_name_for_runtime(DAG_SPECIAL_TASK_ID),
# dag_id=af_context.root_dag_id, # <- ROOT DAG!
# execution_date=af_context.execution_date,
# task_target_date=task_target_date,
# )
# _add_child(self.af_dag_runtime__task, self.af_operator_runtime__task)

# this will create databand run with driver and root tasks.
# we need the "root" task to be the same between different airflow tasks invocations
# since in dbnd we must have single root task, so we create "dummy" task with dag_id name
Expand All @@ -337,9 +306,6 @@ def __init__(self, af_context):
self.dr = dr
dr._init_without_run()
self.airflow_operator__task_run = dr.root_task_run
# self.airflow_operator__task_run = dr.get_task_run_by_id(
# self.af_runtime_op.task_id
# )

def run_airflow_dynamic_task(self, func_call):
# type: (FuncCall) -> Any
Expand Down Expand Up @@ -395,31 +361,3 @@ def _handle_tracking_error(func_call, msg):
global _CURRENT_AIRFLOW_TRACKING_MANAGER
_CURRENT_AIRFLOW_TRACKING_MANAGER = False
return func_call.invoke()


# def _add_child(parent, child):
# parent.set_upstream(child)
# parent.task_meta.add_child(child.task_id)


# class TaskRunUidGen_TaskAfId(TaskRunUidGen):
# def __init__(self, dag_id):
# super(TaskRunUidGen_TaskAfId, self).__init__()
# self.dag_id = dag_id
#
# def generate_task_run_uid(self, run, task, task_af_id):
# return get_task_run_uid(run.run_uid, self.dag_id, task_af_id)
#
#
# class TaskRunUidGen_TaskAfId_Runtime(TaskRunUidGen):
# def __init__(self, dag_id):
# super(TaskRunUidGen_TaskAfId_Runtime, self).__init__()
# self.dag_id = dag_id
#
# def generate_task_run_uid(self, run, task, task_af_id):
# runtime_af = (
# _CURRENT_AIRFLOW_TRACKING_MANAGER.airflow_operator__task_run.task_af_id
# )
# return get_task_run_uid(
# run.run_uid, self.dag_id, "%s.%s" % (runtime_af, task_af_id)
# )
4 changes: 1 addition & 3 deletions modules/dbnd/src/dbnd/_core/run/run_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def set_run_state(self, state):
def add_task_runs(self, task_runs):
if not self.run.is_tracked:
return
self.tracking_store.add_task_runs(
run=self.run, task_runs=task_runs, source=self.run.source
)
self.tracking_store.add_task_runs(run=self.run, task_runs=task_runs)

def set_task_run_states(self, task_runs):
# type: (List[TaskRun]) -> None
Expand Down
11 changes: 7 additions & 4 deletions modules/dbnd/src/dbnd/_core/tracking/tracking_info_convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
TaskRunParamInfo,
)
from dbnd._core.tracking.tracking_info_run import RunInfo
from dbnd._core.utils.string_utils import safe_short_string
from dbnd._core.utils.string_utils import is_task_name_for_runtime, safe_short_string
from dbnd._core.utils.timezone import utcnow
from dbnd._core.utils.traversing import traverse
from dbnd.api.tracking_api import InitRunArgs, TaskRunsInfo
Expand Down Expand Up @@ -123,13 +123,16 @@ def _add_rel(rel_map, t_id_1, t_id_2):
# set children/upstreams maps
upstreams_map = set()
parent_child_map = set()
parent_task_ids = set()
runtime_children = set()

for task_run in run.task_runs:
task = task_run.task
for t_id in task.task_meta.children:
_add_rel(parent_child_map, task.task_id, t_id)
parent_task_ids.add(run.get_task_run_by_id(t_id).task_af_id)
if is_task_name_for_runtime(task.task_id):
# saving children of '__runtime' operators, so we'll know to create special relationships for them
# in the webserver
runtime_children.add(t_id)

task_dag = task.ctrl.task_dag
for upstream in task_dag.upstream:
Expand All @@ -151,7 +154,7 @@ def _add_rel(rel_map, t_id_1, t_id_2):
parent_child_map=parent_child_map,
upstreams_map=upstreams_map,
dynamic_task_run_update=dynamic_task_run_update,
parent_task_ids=parent_task_ids,
runtime_children=runtime_children,
af_context=run.af_context,
)

Expand Down
2 changes: 1 addition & 1 deletion modules/dbnd/src/dbnd/_core/tracking/tracking_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def log_metric(self, task_run, metric, source=None):
def log_artifact(self, task_run, name, artifact, artifact_target):
pass

def add_task_runs(self, run, task_runs, source):
def add_task_runs(self, run, task_runs):
pass

def heartbeat(self, run_uid):
Expand Down
4 changes: 2 additions & 2 deletions modules/dbnd/src/dbnd/_core/tracking/tracking_store_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def init_run(self, run):
def init_run_from_args(self, init_args):
return self._m(self.channel.init_run, init_run_schema, init_args=init_args)

def add_task_runs(self, run, task_runs, source):
def add_task_runs(self, run, task_runs):
from dbnd._core.tracking.tracking_info_convertor import TrackingInfoBuilder

task_runs_info = TrackingInfoBuilder(run).build_task_runs_info(
Expand All @@ -67,7 +67,7 @@ def add_task_runs(self, run, task_runs, source):
self.channel.add_task_runs,
add_task_runs_schema,
task_runs_info=task_runs_info,
source=source,
source=run.source,
)

def set_run_state(self, run, state, error=None, timestamp=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def set_task_run_state(self, task_run, state, error=None, timestamp=None):
except Exception as ex:
logger.log(level, "%s \nfailed to create banner: %s" % (task_msg, ex))

def add_task_runs(self, run, task_runs, source):
def add_task_runs(self, run, task_runs):
pass

def save_external_links(self, task_run, external_links_dict):
Expand Down
4 changes: 2 additions & 2 deletions modules/dbnd/src/dbnd/api/tracking_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class TaskRunsInfoSchema(ApiObjectSchema):
TaskRunInfoSchema, many=True, exclude=("task_signature_source",)
)
upstreams_map = fields.List(fields.List(fields.UUID()))
parent_task_ids = fields.List(fields.Str())
runtime_children = fields.List(fields.Str())

dynamic_task_run_update = fields.Boolean()

Expand Down Expand Up @@ -396,7 +396,7 @@ class TaskRunsInfo(object):

parent_child_map = attr.ib(default=None)
upstreams_map = attr.ib(default=None)
parent_task_ids = attr.ib(default=None)
runtime_children = attr.ib(default=None)
dynamic_task_run_update = attr.ib(default=False)
af_context = attr.ib(default=None) # type: Optional[AirflowTaskContext]

Expand Down

0 comments on commit 68638ab

Please sign in to comment.