From 4d29e7b71fdcc1b95845cb100898d680fff716cb Mon Sep 17 00:00:00 2001 From: Maxime Date: Wed, 22 Jul 2015 22:17:53 +0000 Subject: [PATCH] Improving the HiveToDruidOperator --- airflow/hooks/druid_hook.py | 25 ++++++++++++------------- airflow/operators/hive_to_druid.py | 22 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index ec3def70d9778..e79f4edba9fdb 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -27,7 +27,7 @@ def __init__( druid_ingest_conn_id='druid_ingest_default'): self.druid_query_conn_id = druid_query_conn_id self.druid_ingest_conn_id = druid_ingest_conn_id - self.header = {'content-type': 'application/json'} + self.header = {'content-type': 'application/json'} def get_conn(self): """ @@ -41,11 +41,11 @@ def get_conn(self): @property def ingest_post_url(self): - conn = self.get_connection(self.druid_ingest_conn_id) + conn = self.get_connection(self.druid_ingest_conn_id) host = conn.host port = conn.port endpoint = conn.extra_dejson.get('endpoint', '') - return "http://{host}:{port}/{endpoint}".format(**locals()) + return "http://{host}:{port}/{endpoint}".format(**locals()) def get_ingest_status_url(self, task_id): post_url = self.ingest_post_url @@ -63,7 +63,7 @@ def construct_ingest_query( metric_names = [ m['fieldName'] for m in metric_spec if m['type'] != 'count'] dimensions = [c for c in columns if c not in metric_names] - ingest_query_dict = { + ingest_query_dict = { "type": "index_hadoop", "spec": { "dataSchema": { @@ -105,32 +105,30 @@ def construct_ingest_query( } } - return json.dumps(ingest_query_dict, indent=4) - + return json.dumps(ingest_query_dict, indent=4) def send_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, intervals): - query = self.construct_ingest_query( + query = self.construct_ingest_query( datasource, static_path, ts_dim, columns, metric_spec, intervals) - r = requests.post( + r = requests.post( self.ingest_post_url, headers=self.header, data=query) - print(self.ingest_post_url) - print(query) - print(r.text) + logging.info(self.ingest_post_url) + logging.info(query) + logging.info(r.text) d = json.loads(r.text) if "task" not in d: raise AirflowDruidLoadException( "[Error]: Ingesting data to druid failed.") return d["task"] - def load_from_hdfs( self, datasource, static_path, ts_dim, columns, intervals, metric_spec=None): """ - load data to druid from hdfs + load data to druid from hdfs :params ts_dim: The column name to use as a timestamp :params metric_spec: A list of dictionaries """ @@ -142,6 +140,7 @@ def load_from_hdfs( r = requests.get(status_url) d = json.loads(r.text) if d['status']['status'] == 'FAILED': + logging.error(d) raise AirflowDruidLoadException( "[Error]: Ingesting data to druid failed.") elif d['status']['status'] == 'SUCCESS': diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 7068b11e2aef3..06749e803168d 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -68,10 +68,18 @@ def execute(self, context): DROP TABLE IF EXISTS {hive_table}; CREATE TABLE {hive_table} ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' - STORED AS TEXTFILE AS - {sql}; + STORED AS TEXTFILE + TBLPROPERTIES ('serialization.null.format' = '') + AS + {sql} """.format(**locals()) hive.run_cli(hql) + #hqls = hql.split(';') + #logging.info(str(hqls)) + #from airflow.hooks import HiveServer2Hook + #hive = HiveServer2Hook(hiveserver2_conn_id="hiveserver2_silver") + #hive.get_results(hqls) + m = HiveMetastoreHook(self.metastore_conn_id) t = m.get_table(hive_table) @@ -82,11 +90,21 @@ def execute(self, context): pos = hdfs_uri.find('/user') static_path = hdfs_uri[pos:] + schema, table = hive_table.split('.') + druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id) logging.info("Inserting rows into Druid") + logging.info("HDFS path: " + static_path) + druid.load_from_hdfs( datasource=self.druid_datasource, intervals=self.intervals, static_path=static_path, ts_dim=self.ts_dim, columns=columns, metric_spec=self.metric_spec) logging.info("Load seems to have succeeded!") + + logging.info( + "Cleaning up by dropping the temp " + "Hive table {}".format(hive_table)) + hql = "DROP TABLE IF EXISTS {}".format(hive_table) + #hive.run_cli(hql)