In [22]:
import logging
import airflow
from airflow.hooks.hive_hooks import HiveCliHook, HiveServer2Hook, HiveMetastoreHook

In [23]:
__version = 2.2

In [24]:
class HiveToVWInput(object):
    def __init__(self,
                 src_table,
                 dst_table,
                 dst_hdfs_location,
                 label_column,
                 tag_column,
                 limit=None,
                 sample_sql=None,
                 excludes=None,
                 custom_namespaces=None,
                 hive_conn_id='hive_silver',
                 metastore_conn_id='metastore_silver',
                ):
        if len(src_table.split('.')) != 2:
            raise TypeError('src_table should be of the format namespace.table_name')
        else:
            (self.src_db, self.src_table_name) = src_table.split('.')
        if len(dst_table.split('.')) != 2:
            raise TypeError('dst_table should be of the format namespace.table_name')
        else:
            (self.dst_db, self.dst_table_name) = dst_table.split('.')
        self.src_table = src_table
        self.dst_table = dst_table
        self.dst_hdfs_location = dst_hdfs_location
        self.sample_sql = sample_sql
        self.label_column = label_column
        self.limit = limit
        self.hive_conn_id = hive_conn_id
        self.metastore_conn_id = metastore_conn_id
        self.tag_column = tag_column

        # Fetch the table defininition from the hive metastore
        hmh = HiveMetastoreHook(metastore_conn_id)
        table_def = hmh.get_table(db=self.src_db, table_name=self.src_table_name)
        cols = table_def.sd.cols

        # Build the VW namespaces from column names
        # These are in the format { 'col1': ('ns1', 'datatype'), 'col2': ('ns2', 'datatype') }
        nslookup = {}
        for c in cols:
            if c.name in excludes or c.name == self.label_column:
                next
            elif c.name in custom_namespaces.keys():
                nslookup[c.name] = {'namespace': custom_namespaces[c.name], 'type': c.type }
            elif len(c.name.split('__')) > 2:
                nslookup[c.name] = {'namespace': c.name.split('__')[1], 'type': c.type }
            else:
                nslookup[c.name] = {'namespace': 'other', 'type': c.type }
        self.nslookup = nslookup
                
        # Let's build a namespace-keyed dictionary with (colname, dtype) as values
        nsgroups = {}
        for col in nslookup.keys():
            if nslookup[col]['namespace'] not in nsgroups.keys():
                nsgroups[nslookup[col]['namespace']] = [(col, nslookup[col]['type'])]
            else:
                nsgroups[nslookup[col]['namespace']].append((col, nslookup[col]['type']))
        self.nsgroups = nsgroups
    
    def __col_sql(self, colname, coltype):
        """ Return the properly formatted SQL for the particular column name and type"""
        if coltype == 'double':
            #return "CONCAT('{col}:', PRINTF('%.2f', COALESCE({col},0.0)), ' ')".format(col=colname)
            return "CASE WHEN COALESCE({col},0.0) = 0.0 THEN '' ELSE CONCAT('{col}:', PRINTF('%.2f', {col}), ' ') END".format(col=colname)
        elif coltype in ['bigint', 'int']:
            #return "CONCAT('{col}:', PRINTF('%d', COALESCE({col},0)), ' ')".format(col=colname)
            return "CASE WHEN COALESCE({col},0) = 0 THEN '' ELSE CONCAT('{col}:', PRINTF('%d', {col}), ' ') END".format(col=colname)
        elif coltype in ['boolean']:
            #return "CONCAT('{col}:', PRINTF('%d', COALESCE({col},0)), ' ')".format(col=colname)
            return "CASE WHEN COALESCE(CAST({col} as int),0) = 0 THEN '' ELSE CONCAT('{col}:', PRINTF('%d', CAST({col} as int)), ' ') END".format(col=colname)
        elif coltype == 'string':
            # Clean up all the spurious characters
            return "CASE WHEN COALESCE({col},'') = '' THEN '' ELSE CONCAT(REGEXP_REPLACE({col}, '[\\\\x00-\\\\x2a|\\\\x2c|\\\\x2f|\\\\x3a-\\\\x40|\\\\x5b-\\\\x5e|\\\\x60|\\\\x7b-\\\\x7f]', ''), ' ') END".format(col=colname)
        else:
            raise NotImplementedError
        
    def __col_ns(self, namespaces):
        """ Return the combined SQL for each particular VW namespace 
            This will be of the format "|namespace FeatureA:123 FeatureB:0.0 SomeText"
        """
        featureset = []
        for ns, cols in namespaces.iteritems():
            features = "\n,".join([self.__col_sql(c[0], c[1]) for c in cols ])
            featureset.append("\n'|{ns} '\n, {fs}".format(ns=ns, fs=features))
        return ','.join(featureset)
    
    def __assemble_sql(self, cols, label_col, tag_col, src_table):
        s = "AND {}".format(self.sample_sql) if self.sample_sql else ""
        sql = """
            SELECT CONCAT(PRINTF('%.4f', COALESCE({label}, 0.0)), ' 1.0 ', {tag_col}, {cols} ) 
            FROM {table_name}
            WHERE ds > ''
                AND {label} IS NOT NULL
                {sample}
            """.format(label=label_col,
                       cols=self.__col_ns(cols), 
                       tag_col = tag_col,
                       table_name=src_table,
                       sample=s
                      )
        return sql
    
    def create_table(self, table_name, table_location):
        """ Create the destination table """
        hh = HiveCliHook(self.hive_conn_id)
        create_sql = """
            DROP TABLE IF EXISTS {table_name};
            CREATE EXTERNAL TABLE {table_name} (
                input_line    STRING
            )
            STORED AS TEXTFILE
            LOCATION '{location}';
            ;
        """.format(table_name=table_name, location=table_location)
        hh.run_cli(create_sql)
        
    def __gen_sql(self):
        select_sql = self.__assemble_sql(self.nsgroups,
                                         self.label_column, 
                                         self.tag_column,
                                         self.src_table)
        
        limit_sql = "LIMIT {}".format(self.limit) if self.limit else ""
        
        insert_sql = """
            SET hive.exec.compress.output=false;
            SET mapred.reduce.tasks=10;
            INSERT OVERWRITE TABLE {dst_table}
            {select_sql}
            {limit_sql}
            ;
        """.format(dst_table=self.dst_table, 
                   select_sql=select_sql,
                   limit_sql=limit_sql)
        
        return(insert_sql)
    
    def run(self):
        """ Compile the SQL and run it on Hive! """
        hh = HiveCliHook(self.hive_conn_id)
       
        self.create_table(self.dst_table, self.dst_hdfs_location)

        finalsql = self.__gen_sql()
        logging.info(finalsql)
        hh.run_cli(finalsql)
        logging.info("You should be able to find your files at {}".format(self.dst_hdfs_location))

In [25]:
# Exclude these columns from the final result
EXCLUDE_COLS = [
    'ds',
    'guest_id',
    'part',
    'randnum',
    'sample01',
    'sample1',
    'sample10',
    'sample25',
    'sample35',
    'sample50',
    'target_p180',
    'target_p28',
    'target_p730',
]

# Create custom namespace mappings for columns
# Anything not given a namespace will be mapped to "other"
CUSTOM_NS = {
   'dow': 'time',
   'dsmonth': 'time',
   'yr': 'time',
   'dim_week_of_year': 'calendar',
   'dim_year': 'calendar',
   'dim_year': 'calendar',
   'dim_city': 'location',
   'dim_country': 'location',
   'dim_lat': 'location',
   'dim_lng': 'location',
   'dim_city': 'location',
   'dim_city': 'location',
}

In [None]:
htvw = HiveToVWInput(
    'hamel.ltv_train_s1_top50', # Input table name
    'hamel.vw_input_data_trainS1_top50', # Output table name
    '/user/hive/warehouse/hamel.db/vw_input_data_trainS1_top50', # HDFS File location
    'target_p365', # Label column
    'guest_id', #tag column
    #limit=1000000000000, # Limit the number of rows from the source table
    sample_sql="", # This will be added to the where clause for sampling 
    excludes=EXCLUDE_COLS, # column exclude list
    custom_namespaces=CUSTOM_NS, # Custom mapping for VW namespaces
    )
htvw.run()

[2016-11-25 00:03:20,769] {base_hook.py:53} INFO - Using connection to: localhost
[2016-11-25 00:03:20,825] {hive_hooks.py:131} INFO - hive -f /tmp/airflow_hiveop_IJ3qga/tmp5ChOS8
[2016-11-25 00:03:23,215] {hive_hooks.py:145} INFO - 16/11/25 00:03:23 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.
[2016-11-25 00:03:23,293] {hive_hooks.py:145} INFO - 
[2016-11-25 00:03:23,294] {hive_hooks.py:145} INFO - Logging initialized using configuration in jar:file:/mnt/var/opt/CDH-5.3.3-1.cdh5.3.3.p1174.932/jars/hive-common-0.13.1-cdh5.3.3.jar!/hive-log4j.properties
[2016-11-25 00:03:24,853] {hive_hooks.py:145} INFO - OK
[2016-11-25 00:03:24,855] {hive_hooks.py:145} INFO - Time taken: 0.788 seconds
[2016-11-25 00:03:24,978] {hive_hooks.py:145} INFO - OK
[2016-11-25 00:03:24,979] {hive_hooks.py:145} INFO - Time taken: 0.123 seconds
[2016-11-2

In [None]:
htvw_val = HiveToVWInput(
    'hamel.ltv_validation_top50', # Input table name
    'hamel.vw_input_data_validation_top50 ', # Output table name
    '/user/hive/warehouse/hamel.db/vw_input_data_validation_top50', # HDFS File location
    'target_p365', # Label column
    'guest_id', #tag column
    #limit=1000000000000, # Limit the number of rows from the source table
    sample_sql="", # This will be added to the where clause for sampling 
    excludes=EXCLUDE_COLS, # column exclude list
    custom_namespaces=CUSTOM_NS, # Custom mapping for VW namespaces
    )

htvw_val.run()