Skip to content

Commit

Permalink
Export info on dag/task id/run/instance for ingestion by analysis too…
Browse files Browse the repository at this point in the history
…ls like Dr. Elephant
  • Loading branch information
paulbramsen committed Jun 15, 2016
1 parent b7def7f commit d75b82c
Show file tree
Hide file tree
Showing 30 changed files with 39 additions and 2 deletions.
9 changes: 8 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,14 @@ def execute(self, context):
Refer to get_template_context for more context.
"""
raise NotImplementedError()
if self.dag:
os.environ['AIRFLOW_DAG_ID'] = self.dag.dag_id
dagrun = context['dag_run']
if dagrun and dagrun.execution_date:
os.environ['AIRFLOW_DAGRUN'] = dagrun.execution_date.isoformat()
os.environ['AIRFLOW_TASK_ID'] = self.task_id
if self.start_date:
os.environ['AIRFLOW_TASK_INSTANCE'] = self.start_date.isoformat()

def post_execute(self, context):
"""
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def execute(self, context):
Execute the bash command in a temporary directory
which will be cleaned afterwards
"""
super(BashOperator, self).execute(context)
bash_command = self.bash_command
logging.info("tmp dir root location: \n" + gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self.sql = sql

def execute(self, context=None):
super(CheckOperator, self).execute(context)
logging.info('Executing SQL check: ' + self.sql)
records = self.get_db_hook().get_first(self.sql)
logging.info("Record: " + str(records))
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
self.trigger_dag_id = trigger_dag_id

def execute(self, context):
super(TriggerDagRunOperator, self).execute(context)
dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())
dro = self.python_callable(context, dro)
if dro:
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/docker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __init__(
self.container = None

def execute(self, context):
super(DockerOperator, self).execute(context)
logging.info('Starting docker container from image ' + self.image)

tls_config = None
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/dummy_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def __init__(self, *args, **kwargs):
super(DummyOperator, self).__init__(*args, **kwargs)

def execute(self, context):
pass
super(DummyOperator, self).execute(context)
1 change: 1 addition & 0 deletions airflow/operators/email_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ def __init__(
self.files = files or []

def execute(self, context):
super(EmailOperator, self).execute(context)
send_email(self.to, self.subject, self.html_content, files=self.files)
1 change: 1 addition & 0 deletions airflow/operators/generic_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
self.preoperator = preoperator

def execute(self, context):
super(GenericTransfer, self).execute(context)
source_hook = BaseHook.get_hook(self.source_conn_id)

logging.info("Extracting data from {}".format(self.source_conn_id))
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def prepare_template(self):
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])

def execute(self, context):
super(HiveOperator, self).execute(context)
logging.info('Executing: ' + self.hql)
self.hook = self.get_hook()
self.hook.run_cli(hql=self.hql, schema=self.schema)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/hive_stats_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def get_default_exprs(self, col, col_type):
return {k: v.format(col=col) for k, v in d.items()}

def execute(self, context=None):
super(HiveStatsCollectionOperator, self).execute(context)
metastore = HiveMetastoreHook(metastore_conn_id=self.metastore_conn_id)
table = metastore.get_table(table_name=self.table)
field_types = {col.name: col.type for col in table.sd.cols}
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/hive_to_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
self.metastore_conn_id = metastore_conn_id

def execute(self, context):
super(HiveToDruidTransfer, self).execute(context)
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
logging.info("Extracting data from Hive")
hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/hive_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
self.bulk_load = bulk_load

def execute(self, context):
super(HiveToMySqlTransfer, self).execute(context)
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
logging.info("Extracting data from Hive")
logging.info(self.sql)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/hive_to_samba_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
self.hql = hql.strip().rstrip(';')

def execute(self, context):
super(Hive2SambaOperator, self).execute(context)
samba = SambaHook(samba_conn_id=self.samba_conn_id)
hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
tmpfile = tempfile.NamedTemporaryFile()
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/http_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self,
self.extra_options = extra_options or {}

def execute(self, context):
super(SimpleHttpOperator, self).execute(context)
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
logging.info("Calling HTTP method")
response = http.run(self.endpoint,
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/jdbc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
self.autocommit = autocommit

def execute(self, context):
super(JdbcOperator, self).execute(context)
logging.info('Executing: ' + str(self.sql))
self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
1 change: 1 addition & 0 deletions airflow/operators/mssql_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
self.parameters = parameters

def execute(self, context):
super(MsSqlOperator, self).execute(context)
logging.info('Executing: ' + str(self.sql))
hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
hook.run(self.sql, parameters=self.parameters)
1 change: 1 addition & 0 deletions airflow/operators/mssql_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def type_map(cls, mssql_type):
return d[mssql_type] if mssql_type in d else 'STRING'

def execute(self, context):
super(MsSqlToHiveTransfer, self).execute(context)
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)

Expand Down
1 change: 1 addition & 0 deletions airflow/operators/mysql_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
self.parameters = parameters

def execute(self, context):
super(MySqlOperator, self).execute(context)
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/mysql_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def type_map(cls, mysql_type):
return d[mysql_type] if mysql_type in d else 'STRING'

def execute(self, context):
super(MySqlToHiveTransfer, self).execute(context)
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)

Expand Down
1 change: 1 addition & 0 deletions airflow/operators/oracle_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
self.parameters = parameters

def execute(self, context):
super(OracleOperator, self).execute(context)
logging.info('Executing: ' + str(self.sql))
hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
hook.run(
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/pig_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def prepare_template(self):
"(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)

def execute(self, context):
super(PigOperator, self).execute(context)
logging.info('Executing: ' + self.pig)
self.hook = self.get_hook()
self.hook.run_cli(pig=self.pig)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/postgres_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
self.parameters = parameters

def execute(self, context):
super(PostgresOperator, self).execute(context)
logging.info('Executing: ' + str(self.sql))
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
1 change: 1 addition & 0 deletions airflow/operators/presto_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
self.presto_conn_id = presto_conn_id

def execute(self, context):
super(PrestoToMySqlTransfer, self).execute(context)
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
logging.info("Extracting data from Presto")
logging.info(self.sql)
Expand Down
3 changes: 3 additions & 0 deletions airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
self.template_ext = templates_exts

def execute(self, context):
super(PythonOperator, self).execute(context)
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
Expand Down Expand Up @@ -88,6 +89,7 @@ class BranchPythonOperator(PythonOperator):
``skipped``.
"""
def execute(self, context):
super(BranchPythonOperator, self).execute(context)
branch = super(BranchPythonOperator, self).execute(context)
logging.info("Following branch " + branch)
logging.info("Marking other directly downstream tasks as skipped")
Expand Down Expand Up @@ -118,6 +120,7 @@ class ShortCircuitOperator(PythonOperator):
The condition is determined by the result of `python_callable`.
"""
def execute(self, context):
super(ShortCircuitOperator, self).execute(context)
condition = super(ShortCircuitOperator, self).execute(context)
logging.info("Condition result is {}".format(condition))
if condition:
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/s3_file_transform_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
self.transform_script = transform_script

def execute(self, context):
super(S3FileTransformOperator, self).execute(context)
source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
logging.info("Downloading source S3 file {0}"
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/s3_to_hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(
self.s3_conn_id = s3_conn_id

def execute(self, context):
super(S3ToHiveTransfer, self).execute(context)
self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
logging.info("Downloading S3 file")
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def poke(self, context):
raise AirflowException('Override me.')

def execute(self, context):
super(BaseSensorOperator, self).execute(context)
started_at = datetime.now()
while not self.poke(context):
sleep(self.poke_interval)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/slack_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def execute(self, **kwargs):
SlackAPIOperator calls will not fail even if the call is not unsuccessful.
It should not prevent a DAG from completing in success
"""
super(SlackAPIOperator, self).execute(context)
if not self.api_params:
self.construct_api_call_params()
sc = SlackClient(self.token)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/sqlite_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
self.parameters = parameters or []

def execute(self, context):
super(SqliteOperator, self).execute(context)
logging.info('Executing: ' + self.sql)
hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
hook.run(self.sql, parameters=self.parameters)
1 change: 1 addition & 0 deletions airflow/operators/subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
self.executor = executor

def execute(self, context):
super(SubDagOperator, self).execute(context)
ed = context['execution_date']
self.subdag.run(
start_date=ed, end_date=ed, donot_pickle=True,
Expand Down

0 comments on commit d75b82c

Please sign in to comment.