In [1]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, StringIndexer, StringIndexerModel, QuantileDiscretizer, VectorAssembler
import hyspark
from my_utils.misc import *
from my_utils.spark_utils import *
 
 
def get_dataset_for_train(hc, use_three_folds=True, use_time_based=True, database=None):
    dataset_types = ('train', 'valid') if use_three_folds else ('train\', \'valid', 'test')
    dataset_table = 'fm_time_based_feature_for_train' if use_time_based else 'fm_user_based_feature_for_train'
    datasets = []
    for dataset_type in dataset_types:
        query = 'SELECT * FROM {}.{} WHERE dataset_type IN (\'{}\')'.format(database, dataset_table, dataset_type)
        cleaned_query = get_cleaned_hql(query)
        datasets.append(hc.sql(cleaned_query))
    return datasets[0], datasets[1]
 
 
def get_feature_name(hc, category=None, stat_data_type=None, database=None):
    query = 'SELECT * FROM %s.fm_feature_metadata' % database
    cleaned_query = get_cleaned_hql(query)
    feature_names = hc.sql(cleaned_query)
 
    if category is not None:
        feature_names = feature_names.filter(F.col('category').isin(category))
    if stat_data_type is not None:
        feature_names = feature_names.filter(F.col('stat_data_type').isin(stat_data_type))
    return set(feature_names.select('name').rdd.flatMap(lambda x: x).collect())
 
 
def set_feature_name(key_names, target_name, cat_feature_names, num_feature_names, feature_name_to_index,
                     is_training=True, use_field=False, decimals=6):
    def _parse_row(row):
        key = ','.join([str(row[key_name]) for key_name in key_names])
        value = str(row[target_name]) if is_training else ''
        for feature_name in list(cat_feature_names):
            try:
                if use_field:
                    feature_index = feature_name_to_index[feature_name]
                    value = ''.join(
                        [value, ' ', str(feature_index), ':', str(int(row[feature_name])), ':1'])
                else:
                    feature_index = feature_name_to_index[''.join([feature_name, '_', str(int(row[feature_name]))])]
                    value = ''.join([value, ' ', str(feature_index), ':1'])
            except KeyError:
                continue
        for i, feature_name in enumerate(list(num_feature_names)):
            try:
                feature_index = feature_name_to_index[feature_name]
                field = str(feature_index) + ':' if use_field else ''
                value = ''.join([value, ' ', str(feature_index), ':', field,
                                 str(round(row['scaled_num_features'][i], decimals))])
            except KeyError:
                continue
        return key, value.lstrip()
 
    return _parse_row
 
 
APP_NAME = 'building_xlearn_dataset_for_train'
MEM_PER_CORE = 10
INSTANCE = 'full'
 
USE_THREE_FOLDS = True
USE_TIME_BASED = True
USE_FIELD = False
SAMPLING_RATE = 0.1
IS_NEGATIVE_DOWN_SAMPLING = False
FEATURE_CATEGORIES = sorted(['user_demo', 'user_dtag', 'item_profile', 'item_context'])
 
HADOOP_DATA_DIR_NAME = 'data'
DATABASE = 'hcc_big_brain_lv2'
 
 
def build_xlearn_dataset_for_train():
    sc = hyspark.HySpark(APP_NAME, MEM_PER_CORE, instance=INSTANCE)
    hc = sc.hive_context
 
    train_df, test_df = get_dataset_for_train(hc, use_three_folds=USE_THREE_FOLDS, use_time_based=USE_TIME_BASED,
                                              database=DATABASE)
    train_sampling_rate = SAMPLING_RATE
    test_sampling_rate = SAMPLING_RATE
    target_name = 'click_yn'
    sampled_train_df = sample_dataset(
        train_df, train_sampling_rate, is_negative_down_sampling=IS_NEGATIVE_DOWN_SAMPLING,
        target_name=target_name).persist()
    sampled_test_df = sample_dataset(
        test_df, test_sampling_rate, is_negative_down_sampling=False).persist()
 
    with get_elapsed_time():
        train_count = sampled_train_df.count()
        train_target_ratio = sampled_train_df.groupBy(target_name).count().collect()[0]['count'] / train_count
        test_count = sampled_test_df.count()
        test_target_ratio = sampled_test_df.groupBy(target_name).count().collect()[0]['count'] / test_count
 
    print('Num of obs. in training set: {}\nTarget Ratio in training set: {:.4%}\nNum of obs. in test set: {}\n\
    Target Ratio in test set: {:.4%}'.format(train_count, train_target_ratio, test_count, test_target_ratio))
 
    cat_feature_names = get_feature_name(
        hc, category=FEATURE_CATEGORIES, stat_data_type=['binary', 'categorical'], database=DATABASE)
    num_feature_names = get_feature_name(
        hc, category=FEATURE_CATEGORIES, stat_data_type=['numerical'], database=DATABASE)
    all_feature_names = cat_feature_names | num_feature_names
 
    print('Num of cat features:', len(cat_feature_names), '\nNum of num features:', len(num_feature_names))
 
    with get_elapsed_time():
        null_ratios = pd.Series(get_null_ratio(sampled_train_df, all_feature_names))
        null_ratio_thr = 0.9
        dropped_feature_names = set(null_ratios[null_ratios > null_ratio_thr].index)
 
    print('Num of features to be removed:', len(dropped_feature_names))
 
    cat_feature_names = cat_feature_names.difference(dropped_feature_names)
    num_feature_names = num_feature_names.difference(dropped_feature_names)
    all_feature_names = all_feature_names.difference(dropped_feature_names)
 
    print('Num of cat features:', len(cat_feature_names), '\nNum of num features:', len(num_feature_names))
 
    key_names = ['csno', 'cntn_id', 'event_dttm']
    sampled_train_df = sampled_train_df.select(key_names + [target_name] + list(all_feature_names))
    sampled_train_df = cast_data_type(sampled_train_df, cat_feature_names, 'string')
    sampled_train_df = cast_data_type(sampled_train_df, num_feature_names, 'double')
    sampled_train_df = sampled_train_df.fillna('unknown', list(cat_feature_names)).fillna(0.0, list(num_feature_names))
    sampled_test_df = sampled_test_df.select(key_names + [target_name] + list(all_feature_names))
    sampled_test_df = cast_data_type(sampled_test_df, cat_feature_names, 'string')
    sampled_test_df = cast_data_type(sampled_test_df, num_feature_names, 'double')
    sampled_test_df = sampled_test_df.fillna('unknown', list(cat_feature_names)).fillna(0.0, list(num_feature_names))
 
    transformers = []
    for feature_name in cat_feature_names:
        indexer = StringIndexer(handleInvalid='keep').setInputCol(feature_name).setOutputCol(
            '_'.join(['indexed', feature_name]))
        transformers.append(indexer)
 
    discretize = False
    if discretize:
        n_buckets = 4
        for feature_name in num_feature_names:
            discretizer = QuantileDiscretizer(numBuckets=n_buckets, handleInvalid='keep').setInputCol(
                feature_name).setOutputCol('_'.join(['discretized', feature_name]))
            transformers.append(discretizer)
            indexer = StringIndexer(handleInvalid='keep').setInputCol(
                '_'.join(['discretized', feature_name])).setOutputCol(
                '_'.join(['indexed', feature_name]))
            transformers.append(indexer)
    else:
        transformers.extend([VectorAssembler().setInputCols(list(num_feature_names)).setOutputCol('num_features'),
                             AsDenseTransformer().setInputCol('num_features').setOutputCol('num_features'),
                             StandardScaler(withMean=True, withStd=True).setInputCol(
                                 'num_features').setOutputCol('scaled_num_features')])
 
    with get_elapsed_time():
        pipeline = Pipeline().setStages(transformers)
        fitted_pipeline = pipeline.fit(sampled_train_df)
        sampled_train_df = fitted_pipeline.transform(sampled_train_df)
        sampled_test_df = fitted_pipeline.transform(sampled_test_df)
 
    all_transformed_feature_names = pd.Series(sampled_train_df.columns)
    all_transformed_feature_names = set(
        all_transformed_feature_names[all_transformed_feature_names.str.startswith('indexed_')].tolist())
    cat_transformed_feature_names = all_transformed_feature_names
    num_transformed_feature_names = set() if discretize else num_feature_names
    all_transformed_feature_names = all_transformed_feature_names if discretize else \
        all_transformed_feature_names | {'scaled_num_features'}
    sampled_train_df = sampled_train_df.select(key_names + [target_name] + list(all_transformed_feature_names))
    sampled_test_df = sampled_test_df.select(key_names + [target_name] + list(all_transformed_feature_names))
 
    feature_name_to_index = {}
    if USE_FIELD:
        for i, feature_name in enumerate(list(cat_transformed_feature_names | num_transformed_feature_names)):
            feature_name_to_index[feature_name] = i
    else:
        indexed_labels_of_cat_features = [(x._java_obj.getOutputCol(), list(range(len(x.labels))))
                                          for x in fitted_pipeline.stages if isinstance(x, StringIndexerModel)]
        i = 0
        for indexed_labels in indexed_labels_of_cat_features:
            for indexed_label in indexed_labels[1]:
                feature_name = '_'.join([indexed_labels[0], str(indexed_label)])
                feature_name_to_index[feature_name] = i
                i += 1
        for j, feature_name in enumerate(list(num_feature_names)):
            feature_name_to_index[feature_name] = i + j
 
    print('Num of features:', len(feature_name_to_index))
 
    parse_row = set_feature_name(key_names, target_name, cat_transformed_feature_names, num_transformed_feature_names,
                                 feature_name_to_index, use_field=USE_FIELD, decimals=6)
 
    sampled_train_dataset = sampled_train_df.rdd.map(lambda x: parse_row(x)[1])
    sampled_test_dataset = sampled_test_df.rdd.map(lambda x: parse_row(x)[1])
 
    train_set_type = 'train' if USE_THREE_FOLDS else 'train+valid'
    test_set_type = 'valid' if USE_THREE_FOLDS else 'test'
    source_table = 'time' if USE_TIME_BASED else 'user'
    model_type = 'ffm' if USE_FIELD else 'fm'
    train_sampling_type = '%dbp_neg_sampled_' % int(
        10000 * SAMPLING_RATE) if IS_NEGATIVE_DOWN_SAMPLING else '%dbp_sampled_' % int(10000 * SAMPLING_RATE)
    test_sampling_type = '%dbp_sampled_' % int(
        10000 * SAMPLING_RATE) if USE_THREE_FOLDS else ''
    feature_categories = ['all'] if FEATURE_CATEGORIES is None else FEATURE_CATEGORIES
 
    _ = mkdir_in_hdfs(HADOOP_DATA_DIR_NAME)
 
    fitted_pipeline_name = os.path.join(HADOOP_DATA_DIR_NAME, 'fm_{}_based_{}_{}_pipeline({})'.format(
        source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
    sampled_train_dataset_name = os.path.join(HADOOP_DATA_DIR_NAME, '{}_{}_based_{}_{}_dataset({})'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
    sampled_test_dataset_name = os.path.join(HADOOP_DATA_DIR_NAME, '{}_{}_based_{}_{}_dataset({})'.format(
        model_type, source_table, test_sampling_type, test_set_type, '+'.join(feature_categories)))
 
    with get_elapsed_time():
        n_partitions = 3 * get_total_n_executors_core(sc.spark_context)
 
        _ = delete_file_in_hdfs(fitted_pipeline_name)
        _ = delete_file_in_hdfs(sampled_train_dataset_name)
        _ = delete_file_in_hdfs(sampled_test_dataset_name)
 
        fitted_pipeline.write().save(fitted_pipeline_name)
        sampled_train_dataset.coalesce(n_partitions).saveAsTextFile(sampled_train_dataset_name)
        sampled_test_dataset.coalesce(n_partitions).saveAsTextFile(sampled_test_dataset_name)
 
    metadata_name = os.path.join(HADOOP_DATA_DIR_NAME, '{}_{}_based_{}_{}_metadata({}).pkl'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
 
    dump_pickle('temp.pkl', (key_names, target_name, all_feature_names, cat_feature_names, num_feature_names,
                             all_transformed_feature_names, cat_transformed_feature_names,
                             num_transformed_feature_names, feature_name_to_index, train_target_ratio))
 
    _ = delete_file_in_hdfs(metadata_name)
    _ = copy_local_to_hdfs('temp.pkl', metadata_name)
    _ = delete_dir_or_file('temp.pkl')
 
    sc.stop()
 
 
if __name__ == '__main__':
    build_xlearn_dataset_for_train()

ModuleNotFoundError: No module named 'pyspark'

In [None]:
import glob
from my_utils.misc import *
 
 
USE_THREE_FOLDS = True
USE_TIME_BASED = True
USE_FIELD = False
SAMPLING_RATE = 0.1
IS_NEGATIVE_DOWN_SAMPLING = False
FEATURE_CATEGORIES = sorted(['user_demo', 'user_dtag', 'item_profile', 'item_context'])
 
DATA_DIR_NAME = 'data'
HADOOP_DATA_DIR_NAME = 'data'
 
 
def prepare_for_xlearn_train():
    train_set_type = 'train' if USE_THREE_FOLDS else 'train+valid'
    test_set_type = 'valid' if USE_THREE_FOLDS else 'test'
    train_sampling_type = '%dbp_neg_sampled' % int(
        10000 * SAMPLING_RATE) if IS_NEGATIVE_DOWN_SAMPLING else '%dbp_sampled' % int(10000 * SAMPLING_RATE)
    test_sampling_type = '%dbp_sampled' % int(
        10000 * SAMPLING_RATE) if IS_NEGATIVE_DOWN_SAMPLING else '%dbp_sampled' % int(10000 * SAMPLING_RATE)
    source_table = 'time' if USE_TIME_BASED else 'user'
    model_type = 'ffm' if USE_FIELD else 'fm'
    feature_categories = ['all'] if FEATURE_CATEGORIES is None else FEATURE_CATEGORIES
 
    train_set_name = '{}_{}_based_{}_{}_dataset({})'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories))
    test_set_name = '{}_{}_based_{}_{}_dataset({})'.format(
        model_type, source_table, test_sampling_type, test_set_type, '+'.join(feature_categories))
 
    with get_elapsed_time():
        if not os.path.exists(DATA_DIR_NAME):
            os.mkdir(DATA_DIR_NAME)
 
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, train_set_name))
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, test_set_name))
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, train_set_name + '.libsvm'))
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, test_set_name + '.libsvm'))
 
        _ = copy_hdfs_to_local(os.path.join(HADOOP_DATA_DIR_NAME, train_set_name),
                               os.path.join(DATA_DIR_NAME, train_set_name))
        _ = copy_hdfs_to_local(os.path.join(HADOOP_DATA_DIR_NAME, test_set_name),
                               os.path.join(DATA_DIR_NAME, test_set_name))
 
        concat_text_file(sorted(glob.glob(os.path.join(DATA_DIR_NAME, train_set_name, 'part-*'))), os.path.join(
            DATA_DIR_NAME, train_set_name + '.libsvm'))
        concat_text_file(sorted(glob.glob(os.path.join(DATA_DIR_NAME, test_set_name, 'part-*'))), os.path.join(
            DATA_DIR_NAME, test_set_name + '.libsvm'))
 
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, train_set_name))
        _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, test_set_name))
 
 
if __name__ == '__main__':
    prepare_for_xlearn_train()

In [None]:
import uuid
import pyspark.sql.functions as F
from pyspark.ml import PipelineModel
import hyspark
from my_utils.misc import *
from my_utils.spark_utils import *
 
 
def delete_dir_or_file(path):
    try:
        shutil.rmtree(path)
    except:
        try:
            os.remove(path)
        except:
            pass
 
 
def get_dataset_for_service(hc, is_training=True, use_time_based=True, database=None):
    if is_training:
        dataset_table = 'fm_time_based_feature_for_train' if use_time_based else 'fm_user_based_feature_for_train'
        dataset_type = 'test'
    else:
        pass
 
    query = 'SELECT * FROM {}.{} WHERE dataset_type = \'{}\''.format(database, dataset_table, dataset_type)
    cleaned_query = get_cleaned_hql(query)
    dataset = hc.sql(cleaned_query)
    return dataset
 
 
def set_feature_name(key_names, target_name, cat_feature_names, num_feature_names, feature_name_to_index,
                     is_training=True, use_field=False, decimals=6):
    def _parse_row(row):
        key = ','.join([str(row[key_name]) for key_name in key_names])
        value = str(row[target_name]) if is_training else ''
        for feature_name in list(cat_feature_names):
            try:
                if use_field:
                    feature_index = feature_name_to_index[feature_name]
                    value = ''.join(
                        [value, ' ', str(feature_index), ':', str(int(row[feature_name])), ':1'])
                else:
                    feature_index = feature_name_to_index[''.join([feature_name, '_', str(int(row[feature_name]))])]
                    value = ''.join([value, ' ', str(feature_index), ':1'])
            except KeyError:
                continue
        for i, feature_name in enumerate(list(num_feature_names)):
            try:
                feature_index = feature_name_to_index[feature_name]
                field = str(feature_index) + ':' if use_field else ''
                value = ''.join([value, ' ', str(feature_index), ':', field,
                                 str(round(row['scaled_num_features'][i], decimals))])
            except KeyError:
                continue
        return key, value.lstrip()
 
    return _parse_row
 
 
def set_model_conf(is_training=True, use_field=False, disable_norm=True):
    def _predict(rows):
        import xlearn as xl
 
        user_ids, item_ids, event_times, values, labels = [], [], [], [], []
        for key, value in rows:
            user_id, item_id, event_time = key.split(',')
            label = int(value[0]) if is_training else None
            user_ids.append(user_id)
            item_ids.append(item_id)
            event_times.append(event_time)
            values.append(value)
            labels.append(label)
 
        model = xl.create_fm() if use_field else xl.create_fm()
        if disable_norm:
            model.disableNorm()
        model.setSigmoid()
 
        uid = str(uuid.uuid4())
        data_file_name = 'temp-%s.libsvm' % uid
        score_file_name = 'temp-%s.txt' % uid
        with open(data_file_name, mode='wt') as f:
            f.write('\n'.join(values))
        model.setTest(data_file_name)
        model.predict('temp.out', score_file_name)
        with open(score_file_name, mode='rt') as f:
            scores = [float(line) for line in f]
        _ = delete_dir_or_file(data_file_name)
        _ = delete_dir_or_file(score_file_name)
        return zip(user_ids, item_ids, event_times, scores, labels)
 
    return _predict
 
 
APP_NAME = 'building_xlearn_dataset_for_service'
MEM_PER_CORE = 10
INSTANCE = 'full'
 
IS_TRAINING = True
BASE_DATE = '20191001'
 
USE_TIME_BASED = True
USE_FIELD = False
TRAIN_SAMPLING_RATE = 0.1
IS_NEGATIVE_DOWN_SAMPLING = False
FEATURE_CATEGORIES = sorted(['user_demo', 'user_dtag', 'item_profile', 'item_context'])
 
HADOOP_DATA_DIR_NAME = 'data'
HADOOP_MODEL_DIR_NAME = 'model'
IS_APPENDING = False
DATABASE = 'hcc_big_brain_lv2'
 
 
def build_xlearn_dataset_for_service():
    sc = hyspark.HySpark(APP_NAME, MEM_PER_CORE, instance=INSTANCE)
    hc = sc.hive_context
    hc.setConf("hive.exec.dynamic.partition", "true")
    hc.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
 
    service_df = get_dataset_for_service(hc, is_training=IS_TRAINING, use_time_based=USE_TIME_BASED, database=DATABASE)
 
    train_set_type = 'train+valid'
    source_table = 'time' if USE_TIME_BASED else 'user'
    model_type = 'ffm' if USE_FIELD else 'fm'
    train_sampling_type = '%dbp_neg_sampled' % int(
        10000 * TRAIN_SAMPLING_RATE) if IS_NEGATIVE_DOWN_SAMPLING else '%dbp_sampled' % int(10000 * TRAIN_SAMPLING_RATE)
    feature_categories = ['all'] if FEATURE_CATEGORIES is None else FEATURE_CATEGORIES
 
    metadata_name = os.path.join(HADOOP_DATA_DIR_NAME, '{}_{}_based_{}_{}_metadata({}).pkl'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
    _ = copy_hdfs_to_local(metadata_name, 'temp.pkl')
    key_names, target_name, all_feature_names, cat_feature_names, num_feature_names, all_transformed_feature_names, \
        cat_transformed_feature_names, num_transformed_feature_names, feature_name_to_index, train_target_ratio = \
        load_pickle('temp.pkl')
    _ = delete_dir_or_file('temp.pkl')
 
    with get_elapsed_time():
        fitted_pipeline_name = os.path.join(HADOOP_DATA_DIR_NAME, '{}_{}_based_{}_{}_pipeline({})'.format(
            model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
        fitted_pipeline = PipelineModel.load(fitted_pipeline_name)
 
    service_df = service_df.select(key_names + [target_name] + list(all_feature_names))
    service_df = cast_data_type(service_df, cat_feature_names, 'string')
    service_df = cast_data_type(service_df, num_feature_names, 'double')
    service_df = service_df.fillna('unknown', list(cat_feature_names)).fillna(0.0, list(num_feature_names))
    service_df = fitted_pipeline.transform(service_df)
    service_df = service_df.select(key_names + [target_name] + list(all_transformed_feature_names))
 
    parse_row = set_feature_name(key_names, target_name, cat_transformed_feature_names, num_transformed_feature_names,
                                 feature_name_to_index, use_field=USE_FIELD, decimals=6)
 
    service_dataset = service_df.rdd.map(lambda x: parse_row(x))
 
    model_name = '{}_{}_based_{}_{}_model({})'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories))
    _ = copy_hdfs_to_local(os.path.join(HADOOP_MODEL_DIR_NAME, model_name + '.out'), 'temp.out')
    sc.spark_context.addFile('temp.out')
 
    with get_elapsed_time():
        predict = set_model_conf(is_training=IS_TRAINING, use_field=USE_FIELD)
 
        columns = ['csno', 'cntn_id', 'event_dttm', 'score', 'label']
        score_df = service_dataset.mapPartitions(lambda x: predict(x)).toDF(columns)
        score_df = score_df.withColumn('part_dt', F.lit(BASE_DATE))
 
    with get_elapsed_time():
        mode = 'append' if IS_APPENDING else 'overwrite'
        score_df.write.saveAsTable('%s.fm_score_for_service' % DATABASE, mode=mode, partitionBy='part_dt')
        _ = delete_dir_or_file('temp.out')
 
    sc.stop()
 
 
if __name__ == '__main__':
    build_xlearn_dataset_for_service()

In [None]:
import glob
import multiprocessing
import numpy as np
import xlearn as xl
from sklearn.metrics import log_loss, roc_auc_score, average_precision_score
from sklearn.model_selection import ParameterSampler
from my_utils.misc import *
 
 
def print_result(i, res, verbose=True):
    string = '-' * 94
    string = '\n'.join([string, '<{}th model\'s result>'.format(i + 1), '[parameter]'])
    for k, v in res[str(i)]['param'].items():
        string = ' '.join([string, '{}: {}'.format(k, v)])
    string = '\n'.join([string, '[evaluation metric]'])
    for k, v in res[str(i)]['eval_metric'].items():
        string = ' '.join([string, '{}: {:.4%}'.format(k, v)])
    string = '\n'.join([string, '[execution time] {:.4f} sec'.format(res[str(i)]['exec_time']), '-' * 94])
    if verbose:
        print(string)
 
 
USE_THREE_FOLDS = True
USE_TIME_BASED = True
USE_FIELD = False
SAMPLING_RATE = 0.1
IS_NEGATIVE_DOWN_SAMPLING = False
FEATURE_CATEGORIES = sorted(['user_demo', 'user_dtag', 'item_profile', 'item_context'])
 
USING_THREAD_RATIO = 0.5
N_ITER = 30
 
DATA_DIR_NAME = 'data'
HADOOP_MODEL_DIR_NAME = 'model'
 
 
def train_xlearn_model():
    train_set_type = 'train' if USE_THREE_FOLDS else 'train+valid'
    test_set_type = 'valid' if USE_THREE_FOLDS else 'test'
    train_sampling_type = '%dbp_neg_sampled' % int(
        10000 * SAMPLING_RATE) if IS_NEGATIVE_DOWN_SAMPLING else '%dbp_sampled' % int(10000 * SAMPLING_RATE)
    test_sampling_type = '%dbp_sampled' % int(10000 * SAMPLING_RATE)
    source_table = 'time' if USE_TIME_BASED else 'user'
    model_type = 'ffm' if USE_FIELD else 'fm'
    feature_categories = ['all'] if FEATURE_CATEGORIES is None else FEATURE_CATEGORIES
 
    _ = mkdir_in_hdfs(HADOOP_MODEL_DIR_NAME)
 
    train_set_name = os.path.join(DATA_DIR_NAME, '{}_{}_based_{}_{}_dataset({}).libsvm'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories)))
    test_set_name = os.path.join(DATA_DIR_NAME, '{}_{}_based_{}_{}_dataset({}).libsvm'.format(
        model_type, source_table, test_sampling_type, test_set_type, '+'.join(feature_categories)))
    param_file_name = 'param_of_{}_{}_based_{}_{}_model({}).pkl'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories))
    model_name = '{}_{}_based_{}_{}_model({})'.format(
        model_type, source_table, train_sampling_type, train_set_type, '+'.join(feature_categories))
 
    model = xl.create_ffm() if USE_FIELD else xl.create_fm()
    model.setOnDisk()
    disable_norm = True
    if disable_norm:
        model.disableNorm()
    model.setTrain(train_set_name)
    model.setValidate(test_set_name)
    model.setSigmoid()
 
    default_param = {'task': 'binary', 'nthread': int(USING_THREAD_RATIO * multiprocessing.cpu_count()),
                     'opt': 'adagrad', 'epoch': 10, 'stop_window': 3, 'metric': 'auc'}
 
    if USE_THREE_FOLDS:
        param_grid = {'lr': 0.2 * 2 ** np.linspace(-3, 1, 5), 'lambda': 0.002 * 2 ** np.linspace(-1, 3, 5),
                      'k': 2 * 2 ** np.linspace(0, 4, 5, dtype=int)}
        param_list = [{'lr': 0.2, 'lambda': 0.002, 'k': 4}]
        if N_ITER > 0:
            param_list += list(ParameterSampler(param_grid, n_iter=N_ITER, random_state=42))
 
        with open(test_set_name, 'r') as file:
            y_actual = np.array([int(line[0]) for line in file])
 
        res = {}
        criteria = 'log_loss'
        max_val = None
 
        for i, param in enumerate(param_list):
            start_time = time.perf_counter()
 
            default_param.update(param)
            model.fit(default_param, 'temp.out')
            model.setTest(test_set_name)
            model.predict('temp.out', 'temp.txt')
 
            with open('temp.txt', 'r') as file:
                score = np.array([float(line) for line in file])
 
            res[str(i)] = {'param': {'lr': default_param['lr'], 'lambda': default_param['lambda'],
                                     'k': default_param['k']},
                           'eval_metric': {'log_loss': -1.0 * log_loss(y_actual, score),
                                           'auroc': roc_auc_score(y_actual, score),
                                           'auprc': average_precision_score(y_actual, score)},
                           'exec_time': time.perf_counter() - start_time}
 
            if i == 0:
                max_val = res[str(i)]['eval_metric'][criteria]
 
            if res[str(i)]['eval_metric'][criteria] >= max_val:
                dump_pickle('temp.pkl', res[str(i)])
 
                _ = delete_file_in_hdfs(os.path.join(HADOOP_MODEL_DIR_NAME, 'best_' + param_file_name))
                _ = delete_file_in_hdfs(os.path.join(HADOOP_MODEL_DIR_NAME, model_name + '.out'))
                _ = delete_dir_or_file(os.path.join(DATA_DIR_NAME, model_name + '.txt'))
 
                _ = copy_local_to_hdfs('temp.pkl', os.path.join(HADOOP_MODEL_DIR_NAME, 'best_' + param_file_name))
                _ = copy_local_to_hdfs('temp.out', os.path.join(HADOOP_MODEL_DIR_NAME, model_name + '.out'))
                _ = cmd_executor('cp %s %s' % ('temp.txt', os.path.join(DATA_DIR_NAME, model_name + '.txt')))
 
                _ = delete_dir_or_file('temp.pkl')
 
            _ = delete_dir_or_file('temp.out')
            _ = delete_dir_or_file('temp.txt')
 
            print_result(i, res, verbose=True)
 
        dump_pickle(os.path.join(DATA_DIR_NAME, 'all_' + param_file_name), res)
 
    else:
        with get_elapsed_time():
            _ = copy_hdfs_to_local(os.path.join(HADOOP_MODEL_DIR_NAME, 'best_' + param_file_name), 'temp.pkl')
            param = load_pickle('temp.pkl')['param']
 
            default_param.update(param)
            model.fit(default_param, 'temp.out')
 
            _ = delete_file_in_hdfs(os.path.join(HADOOP_MODEL_DIR_NAME, model_name + '.out'))
            _ = copy_local_to_hdfs('temp.out', os.path.join(HADOOP_MODEL_DIR_NAME, model_name + '.out'))
 
            _ = delete_dir_or_file('temp.pkl')
            _ = delete_dir_or_file('temp.out')
 
 
if __name__ == '__main__':
    train_xlearn_model()

In [None]:
import os
import pickle
import re
import shutil
import time
from contextlib import contextmanager
from subprocess import PIPE, Popen
 
 
def cmd_executor(args, delimit=' '):
    if not isinstance(args, list):
        args = args.split(delimit)
    process = Popen(args, stdout=PIPE, stderr=PIPE)
    res = []
    while process.poll() is None:
        line = process.stdout.readline()
        if line != b'' and line.endswith(b'\n'):
            res.append(line[:-1])
    stdout, stderr = process.communicate()
    res += stdout.split(b'\n')
    if stderr != b'':
        res += stderr.split(b'\n')
    res.remove(b'')
    res = '\n'.join([x.decode() for x in res]).strip()
    return res
 
 
def concat_text_file(source_files_path, target_file_path, n_chunks=1):
    n_files = len(source_files_path) // n_chunks
 
    for i in range(n_chunks):
        file_number = '-' + '%05d' % i if n_chunks > 1 else ''
        file_name = target_file_path[:target_file_path.rfind('.')] + file_number + target_file_path[
                                                                                   target_file_path.rfind('.'):]
        with open(file_name, 'wb') as wfd:
            start_num = i * n_files
            end_num = (i + 1) * n_files if i < (n_chunks - 1) else len(source_files_path)
            for f in source_files_path[start_num:end_num]:
                with open(f, 'rb') as fd:
                    shutil.copyfileobj(fd, wfd)
 
 
def copy_hdfs_to_local(source_path, target_path='.'):
    return cmd_executor('hdfs dfs -copyToLocal %s %s' % (source_path, target_path))
 
 
def copy_local_to_hdfs(source_path, target_path='.'):
    return cmd_executor('hdfs dfs -put %s %s' % (source_path, target_path))
 
 
def delete_dir_or_file(path):
    try:
        shutil.rmtree(path)
    except:
        try:
            os.remove(path)
        except:
            pass
 
 
def delete_file_in_hdfs(file_path, skip_trash=True):
    skip_trash_str = ' -skipTrash' if skip_trash else ''
    return cmd_executor('hdfs dfs -rm -r%s %s' % (skip_trash_str, file_path))
 
 
def dump_pickle(file_path, obj):
    with open(file_path, 'wb') as f:
        pickle.dump(obj, f)
 
 
def get_cleaned_hql(query):
    comment = re.compile(r'--+.*[\n]+')
    query = re.sub(comment, '', query)
    ws = re.compile(r'[\t\n]+')
    return re.sub(ws, ' ', query)
 
 
@contextmanager
def get_elapsed_time(format_string='Elapsed time: %.4f sec', verbose=True):
    start_time = time.perf_counter()
    yield
    elapsed_time = time.perf_counter() - start_time
    if verbose:
        print(format_string % elapsed_time)
 
 
def get_employee_id():
    return str(re.search(r'.+(\d{6}).?', os.getcwd()).group(1))
 
 
def load_pickle(file_path):
    with open(file_path, 'rb') as f:
        obj = pickle.load(f)
    return obj
 
 
def mkdir_in_hdfs(dir_path, create_parents=True):
    create_parents_arg = '-p ' if create_parents else ''
    return cmd_executor('hdfs dfs -mkdir %s%s' % (create_parents_arg, dir_path))

In [None]:
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.pipeline import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
 
 
class AsDenseTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(AsDenseTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
 
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
 
    def _transform(self, dataset):
        output_col = self.getOutputCol()
        input_col = dataset[self.getInputCol()]
        as_dense = F.udf(lambda s: DenseVector(s.toArray()), VectorUDT())
        return dataset.withColumn(output_col, as_dense(input_col))
 
 
def adjust_date_format(column_name, is_monthly=False, adding_periods=0):
    if is_monthly:
        return F.date_format(F.add_months(
            F.to_date(F.substring(F.col(column_name), 1, 6), 'yyyyMM'), adding_periods), 'yyyyMM')
    else:
        return F.date_format(F.date_add(F.to_date(F.col(column_name), 'yyyyMMdd'), adding_periods), 'yyyyMMdd')
 
 
def cast_data_type(df, column_names, data_type):
    for column_name in list(column_names):
        df = df.withColumn(column_name, df[column_name].cast(data_type))
    return df
 
 
def get_null_ratio(df, feature_names):
    tot_count = df.count()
    null_ratios = {}
    for feature_name in list(feature_names):
        null_ratios[feature_name] = df.filter(F.col(feature_name).isNull()).count() / tot_count
    return null_ratios
 
 
def get_spark_conf(spark_context, conf):
    return spark_context._conf.get(conf)
 
 
def get_total_n_executors_core(spark_context):
    n_instances = int(get_spark_conf(spark_context, 'spark.executor.instances'))
    n_cores_per_executor = int(get_spark_conf(spark_context, 'spark.executor.cores'))
    return n_instances * n_cores_per_executor
 
 
def sample_dataset(df, fraction, seed=42, is_negative_down_sampling=False, target_name=None, target_value=(0, 1)):
    if fraction == 1.0:
        sampled_df = df
    elif not is_negative_down_sampling or target_name is None:
        sampled_df = df.sample(False, fraction, seed=seed)
    else:
        pos_df = df.filter(F.col(target_name) == target_value[1])
        neg_df = df.filter(F.col(target_name) == target_value[0]).sample(False, fraction, seed=seed)
        sampled_df = pos_df.unionAll(neg_df)
    return sampled_df