Skip to content

Commit

Permalink
Modify hive operator to inject analysis data into hive conf.
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbramsen committed Jun 21, 2016
1 parent 45b735b commit ace79c0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
18 changes: 16 additions & 2 deletions airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import print_function
from builtins import zip
from past.builtins import basestring

import unicodecsv as csv
import logging
import re
Expand All @@ -41,6 +42,7 @@ class HiveCliHook(BaseHook):
Note that you can also set default hive CLI parameters using the
``hive_cli_params`` to be used in your connection as in
``{"hive_cli_params": "-hiveconf mapred.job.tracker=some.jobtracker:444"}``
Parameters passed here can be overridden by run_cli's hive_conf param
The extra connection parameter ``auth`` gets passed as in the ``jdbc``
connection string as is.
Expand All @@ -57,9 +59,17 @@ def __init__(
self.conn = conn
self.run_as = run_as

def run_cli(self, hql, schema=None, verbose=True):
def run_cli(self, hql, schema=None, verbose=True, hive_conf=None):
"""
Run an hql statement using the hive cli
Run an hql statement using the hive cli. If hive_conf is specified it should be a
dict and the entries will be set as key/value pairs in HiveConf
:param hive_conf: if specified these key value pairs will be passed to hive as
``-hiveconf "key"="value"``. Note that they will be passed after the
``hive_cli_params`` and thus will override whatever values are specified in
the database.
:type hive_conf: dict
>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
Expand Down Expand Up @@ -107,6 +117,10 @@ def run_cli(self, hql, schema=None, verbose=True):
if conn.password:
cmd_extra += ['-p', conn.password]

hive_conf = hive_conf or {}
for key, value in hive_conf.items():
cmd_extra += ['-hiveconf', '{0}={1}'.format(key, value)]

hive_cmd = [hive_bin, '-f', fname] + cmd_extra

if self.hive_cli_params:
Expand Down
4 changes: 3 additions & 1 deletion airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.hooks.hive_hooks import HiveCliHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars


class HiveOperator(BaseOperator):
Expand Down Expand Up @@ -76,7 +77,8 @@ def prepare_template(self):
def execute(self, context):
logging.info('Executing: ' + self.hql)
self.hook = self.get_hook()
self.hook.run_cli(hql=self.hql, schema=self.schema)
self.hook.run_cli(hql=self.hql, schema=self.schema,
hive_conf=context_to_airflow_vars(context))

def dry_run(self):
self.hook = self.get_hook()
Expand Down
39 changes: 39 additions & 0 deletions airflow/utils/operator_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


def context_to_airflow_vars(context):
"""
Given a context, this function provides a dictionary of values that can be used to
externally reconstruct relations between dags, dag_runs, tasks and task_instances.
:param context: The context for the task_instance of interest
:type successes: dict
"""
params = dict()
dag = context['dag']
if dag and dag.dag_id:
params['airflow.ctx.dag.dag_id'] = dag.dag_id
dag_run = context['dag_run']
if dag_run and dag_run.execution_date:
params['airflow.ctx.dag_run.execution_date'] = dag_run.execution_date.isoformat()
task = context['task']
if task and task.task_id:
params['airflow.ctx.task.task_id'] = task.task_id
task_instance = context['task_instance']
if task_instance and task_instance.execution_date:
params['airflow.ctx.task_instance.execution_date'] = \
task_instance.execution_date.isoformat()
return params

0 comments on commit ace79c0

Please sign in to comment.