From 36a21b30e62e5b732c4488455da5e823f4d6c1da Mon Sep 17 00:00:00 2001 From: Reza Date: Fri, 15 Oct 2021 11:29:16 -0700 Subject: [PATCH] add skip-mapping to configuration and code --- Processes/dlpredictor/conf/config.yml | 24 +-- .../dlpredictor/main_build_ipl_dist.py | 144 ++++++++++++++++++ .../dlpredictor/dlpredictor/main_spark_es.py | 120 +++++++++++++-- Processes/dlpredictor/run.sh | 6 + 4 files changed, 270 insertions(+), 24 deletions(-) create mode 100644 Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py diff --git a/Processes/dlpredictor/conf/config.yml b/Processes/dlpredictor/conf/config.yml index 4a61e9f..dcfff92 100644 --- a/Processes/dlpredictor/conf/config.yml +++ b/Processes/dlpredictor/conf/config.yml @@ -1,17 +1,23 @@ -log_level: 'WARN' +log_level: 'info' product_tag: 'dlpredictor' -pipeline_tag: '06242021_1500' +pipeline_tag: 'dlpm_10052021_1400_reza' #input tables from dlpm pipeline -factdata_table: 'dlpm_06242021_1500_tmp_area_map' # this raw data, with filtered si, remapped r and ipl and partitioned by bucket-id -distribution_table: 'dlpm_06242021_1500_tmp_distribution' -norm_table: 'dlpm_06242021_1500_trainready' -model_stat_table: 'dlpm_06242021_1500_model_stat' -bucket_size: 10 +factdata_table: 'factdata_hq_09222020' +area_map_table: 'dlpm_10052021_1400_tmp_area_map' # this raw data, with filtered si, remapped r and ipl and partitioned by bucket-id +distribution_table: 'dlpm_10052021_1400_tmp_distribution' +norm_table: 'dlpm_10052021_1400_trainready' +model_stat_table: 'dlpm_10052021_1400_model_stat' +region_mapping_table: 'region_mapping' +bucket_size: 1 bucket_step: 1 +ipl_dist_table: '{product_tag}_{pipeline_tag}_ipl_dist_map' +unique_original_uckey_table: '{product_tag}_{pipeline_tag}_unique_original_uckey' +skip_ipl_reverse_mapping: true # this makes main_spark_es to process mapped uckeys and skip reverse mapping. +condition: '' -yesterday: '2020-06-20' -serving_url: 'http://10.193.217.105:8502/v1/models/dl_20210609:predict' +yesterday: '2021-07-31' +serving_url: 'http://10.193.217.105:8508/v1/models/dl_20210706:predict' config_table: '{product_tag}_{pipeline_tag}_config' diff --git a/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py b/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py new file mode 100644 index 0000000..c086c36 --- /dev/null +++ b/Processes/dlpredictor/dlpredictor/main_build_ipl_dist.py @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import pickle +import yaml +import argparse + +from pyspark import SparkContext, SparkConf, Row +from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, explode, sum, array, split +from pyspark.sql.types import BooleanType, IntegerType, StringType, FloatType +from pyspark.sql import HiveContext +from pyspark.sql.window import Window +from dlpredictor.configutil import * +import hashlib + +''' +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5G dlpredictor/main_build_ipl_dist.py conf/config.yml +''' + + +def __save_as_table(df, table_name, hive_context, create_table): + if create_table: + command = """ + DROP TABLE IF EXISTS {} + """.format(table_name) + + hive_context.sql(command) + + df.createOrReplaceTempView("r900_temp_table") + + command = """ + CREATE TABLE IF NOT EXISTS {} AS SELECT * FROM r900_temp_table + """.format(table_name) + + hive_context.sql(command) + + +def run(hive_context, conditions, factdata_table, ipl_dist_table, unique_original_uckey_table, region_mapping_table, bucket_size, bucket_step): + + # ts will be counts from yesterday-(past_days) to yesterday + mapping_df = hive_context.sql('SELECT old,new FROM {}'.format(region_mapping_table)) + + start_bucket = 0 + df_union = None + df_distinct_uckey = None + + while True: + + end_bucket = min(bucket_size, start_bucket + bucket_step) + + if start_bucket > end_bucket: + break + + # Read factdata table + command = """ + SELECT count_array,uckey,bucket_id FROM {} WHERE bucket_id BETWEEN {} AND {} + """.format(factdata_table, str(start_bucket), str(end_bucket)) + + if len(conditions) > 0: + command = command + " and {}".format(' and '.join(conditions)) + + start_bucket = end_bucket + 1 + + df = hive_context.sql(command) + # [Row(count_array=[u'0:0', u'1:0', u'2:0', u'3:0'], day=u'2018-03-09', hour=0, uckey=u'banner,1,3G,g_f,1,pt,1002,icc')] + + # extract ipl + df = df.withColumn('ipl', split(df['uckey'], ',').getItem(7).cast(StringType())) + + def _udf_helper(count_arrays): + result = 0 + for count_array in count_arrays: + for item in count_array: + imp = int(item.split(':')[1]) + result += imp + return result + + df_uckey = df.select('uckey') + if df_distinct_uckey is None: + df_distinct_uckey = df_uckey.select('uckey').distinct() + else: + df_distinct_uckey = df_distinct_uckey.union(df_uckey) + df_distinct_uckey = df_distinct_uckey.select('uckey').distinct() + + df = df.groupby('ipl').agg(udf(_udf_helper, IntegerType())(collect_list('count_array')).alias('imp')) + if df_union is None: + df_union = df + else: + df_union = df_union.union(df) + + df = df_union.groupby('ipl').agg(sum('imp').alias('region_imp')) + df = mapping_df.join(df, mapping_df.old == df.ipl, 'outer') + df = df.withColumn('region_total_imp', sum('region_imp').over(Window.partitionBy('new'))) + df = df.withColumn('ratio', udf(lambda x, y: float(x)/y if x and y else 0, FloatType())('region_imp', 'region_total_imp')) + + __save_as_table(df=df, table_name=ipl_dist_table, hive_context=hive_context, create_table=True) + + __save_as_table(df=df_distinct_uckey, table_name=unique_original_uckey_table, hive_context=hive_context, create_table=True) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Prepare data') + parser.add_argument('config_file') + args = parser.parse_args() + + # Load config file + with open(args.config_file, 'r') as ymlfile: + cfg = yaml.load(ymlfile, Loader=yaml.FullLoader) + resolve_placeholder(cfg) + + sc = SparkContext() + hive_context = HiveContext(sc) + sc.setLogLevel(cfg['log_level']) + + hive_context.setConf("hive.exec.dynamic.partition", "true") + hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + + factdata_table = cfg['factdata_table'] + region_mapping_table = cfg['region_mapping_table'] + bucket_size = cfg['bucket_size'] + bucket_step = cfg['bucket_step'] + conditions = cfg['condition'] + ipl_dist_table = cfg['ipl_dist_table'] + unique_original_uckey_table = cfg['unique_original_uckey_table'] + + run(hive_context=hive_context, conditions=conditions, factdata_table=factdata_table, + ipl_dist_table=ipl_dist_table, unique_original_uckey_table=unique_original_uckey_table, region_mapping_table=region_mapping_table, + bucket_size=bucket_size, bucket_step=bucket_step) + + sc.stop() diff --git a/Processes/dlpredictor/dlpredictor/main_spark_es.py b/Processes/dlpredictor/dlpredictor/main_spark_es.py index 779d2ae..f2fa82a 100644 --- a/Processes/dlpredictor/dlpredictor/main_spark_es.py +++ b/Processes/dlpredictor/dlpredictor/main_spark_es.py @@ -5,7 +5,7 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at - + # http://www.apache.org/licenses/LICENSE-2.0.html # Unless required by applicable law or agreed to in writing, software @@ -23,8 +23,8 @@ from pyspark import SparkContext from pyspark.sql import HiveContext -from pyspark.sql.functions import udf, expr, collect_list, struct -from pyspark.sql.types import StringType, ArrayType, MapType, FloatType, StructField, StructType +from pyspark.sql.functions import udf, expr, collect_list, struct, split, explode +from pyspark.sql.types import StringType, ArrayType, MapType, FloatType, StructField, StructType, IntegerType, BooleanType from dlpredictor import transform from dlpredictor.configutil import * @@ -33,6 +33,11 @@ from dlpredictor.util.sparkesutil import * +''' +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg --conf spark.driver.maxResultSize=5G dlpredictor/main_spark_es.py conf/config.yml +''' + + def sum_count_array(hour_counts): ''' [{14: [u'1:3']}, {13: [u'1:3']}, {11: [u'1:3']}, {15: [u'1:5']}, {22: [u'1:8']}, {23: [u'1:6']}, {19: [u'1:1']}, {18: [u'1:1']}, {12: [u'1:5']}, {17: [u'1:5']}, {20: [u'1:3']}, {21: [u'1:21']}] @@ -103,6 +108,51 @@ def __save_as_table(df, table_name, hive_context, create_table): hive_context.sql(command) +def ipl_revrse_mapping(df, ipl_dist_map_brodcast, df_uckey_distinct): + + df = df.withColumn('ipl', split(df['uckey'], ',').getItem(7).cast(StringType())) + df = df.filter(udf(lambda ipl: ipl in ipl_dist_map_brodcast.value, BooleanType())(df.ipl)) + df = df.withColumn('real_ipl_ratio_map', udf(lambda ipl: ipl_dist_map_brodcast.value[ipl], MapType(StringType(), FloatType(), False))(df.ipl)) + + # +-------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------------------------+---+------------------+ + # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl_ratio_map| + # +-------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------------------------+---+------------------+ + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0] |0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40 |40 |[40 -> 1.0] | + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0] |0.010742836 |native,z041bf6g4s,WIFI,g_m,3,CPM,30,30 |30 |[30 -> 1.0] | + + df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map', 'ratio', 'uckey', 'ipl', explode('real_ipl_ratio_map')).withColumnRenamed( + "key", "real_ipl").withColumnRenamed("value", "ipl_ratio") + + # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+ + # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl|ipl_ratio | + # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+ + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.033795446|native,z041bf6g4s,WIFI,g_f,4,CPM,57,57|57 |57 |1.0 | + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40|40 |40 |1.0 | + + # change uckey with new ipl, this for ipl fix not region + def __fix_uckey_ipl(uckey, ipl): + l = uckey.split(',') + l[7] = str(ipl) + return ','.join(l) + df = df.withColumn('uckey', udf(__fix_uckey_ipl, StringType())(df.uckey, df.real_ipl)) + + # filter uckeys to make sure we predict for valid uckeys + df = df.join(df_uckey_distinct, on='uckey', how='inner') + + # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+ + # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey |ipl|real_ipl|ipl_ratio | + # +-------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------------------------+---+--------+-----------+ + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.010742836|native,z041bf6g4s,WIFI,g_m,3,CPM,30,30|30 |30 |1.0 | + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.00800733 |native,z041bf6g4s,WIFI,g_f,5,CPM,40,40|40 |40 |1.0 | + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.042944785|native,z041bf6g4s,4G,g_m,4,CPM,3,3 |3 |3 |1.0 | + # |1009 |2 |[2020-06-26 -> 89.0, 2020-06-27 -> 91.0, 2020-06-24 -> 78.0, 2020-06-25 -> 81.0, 2020-06-28 -> 77.0, 2020-06-29 -> 63.0, 2020-06-30 -> 62.0, 2020-06-22 -> 83.0, 2020-06-23 -> 79.0, 2020-06-21 -> 101.0]|0.011951239|native,z041bf6g4s,4G,g_f,5,CPM,71,101 |71 |101 |0.08843476 | + + # update ratio + df = df.withColumn('ratio', udf(lambda r1, r2: float(r1*r2) if r1 and r2 else float(0), FloatType())(df.ratio, df.ipl_ratio)) + + return df + + def run(cfg): # os.environ[ @@ -126,11 +176,15 @@ def run(cfg): # Reading the max bucket_id bucket_size = cfg['bucket_size'] bucket_step = cfg['bucket_step'] - factdata = cfg['factdata_table'] + factdata_area_map = cfg['area_map_table'] distribution_table = cfg['distribution_table'] norm_table = cfg['norm_table'] traffic_dist = cfg['traffic_dist'] model_stat_table = cfg['model_stat_table'] + ipl_dist_table = cfg['ipl_dist_table'] + unique_original_uckey_table = cfg['unique_original_uckey_table'] + skip_ipl_reverse_mapping = bool(cfg['skip_ipl_reverse_mapping']) + prediction_table_name = cfg['es_predictions_index'] yesterday = cfg['yesterday'] serving_url = cfg['serving_url'] @@ -151,6 +205,35 @@ def run(cfg): df_dist.cache() df_dist.count() + if not skip_ipl_reverse_mapping: + command = """ + SELECT + DIST.old as mapped_ipl, + DIST.ipl as real_ipl, + DIST.ratio + FROM {} AS DIST + """.format(ipl_dist_table) + df = hive_context.sql(command) + ipl_dist_list = df.collect() + ipl_dist_map = {} + for _ in ipl_dist_list: + mapped_ipl = _['mapped_ipl'] + if not mapped_ipl: + continue + mapped_ipl = str(mapped_ipl) + real_ipl = _['real_ipl'] + ratio = float(0) + if _['ratio']: + ratio = float(_['ratio']) + if mapped_ipl not in ipl_dist_map: + ipl_dist_map[mapped_ipl] = {} + ipl_dist_map[mapped_ipl][real_ipl] = ratio + + ipl_dist_map_brodcast = sc.broadcast(ipl_dist_map) + + # Get original uckeys + df_uckey_distinct = hive_context.sql('SELECT uckey FROM {}'.format(unique_original_uckey_table)) + # Read norm table # DataFrame[uckey: string, ts: array, p: float, a__n: float, a_1_n: float, a_2_n: float, a_3_n: float, a_4_n: float, a_5_n: float, a_6_n: float, t_UNKNOWN_n: float, t_3G_n: float, t_4G_n: float, t_WIFI_n: float, t_2G_n: float, g__n: float, g_g_f_n: float, g_g_m_n: float, g_g_x_n: float, price_cat_1_n: float, price_cat_2_n: float, price_cat_3_n: float, si_vec_n: array, r_vec_n: array, p_n: float, ts_n: array] command = """ @@ -197,14 +280,14 @@ def run(cfg): FACTDATA.uckey FROM {} AS FACTDATA WHERE FACTDATA.bucket_id BETWEEN {} AND {} - """.format(factdata, str(start_bucket), str(end_bucket)) + """.format(factdata_area_map, str(start_bucket), str(end_bucket)) start_bucket = end_bucket + 1 df = hive_context.sql(command) - # add partition_group - df = df.repartition("uckey") + # decrease partitions + df = df.coalesce(200) # [Row(count_array=[u'1:504'], day=u'2019-11-02', hour=2, uckey=u'magazinelock,04,WIFI,g_m,1,CPM,78', hour_price_imp_map={2: [u'1:504']})] df = df.withColumn('hour_price_imp_map', expr("map(hour, count_array)")) @@ -213,8 +296,7 @@ def run(cfg): df = df.groupBy('uckey', 'day').agg(collect_list('hour_price_imp_map').alias('hour_price_imp_map_list')) # [Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19', day=u'2019-11-02', day_price_imp=[u'3:58'])] - df = df.withColumn('day_price_imp', udf( - sum_count_array, ArrayType(StringType()))(df.hour_price_imp_map_list)).drop('hour_price_imp_map_list') + df = df.withColumn('day_price_imp', udf(sum_count_array, ArrayType(StringType()))(df.hour_price_imp_map_list)).drop('hour_price_imp_map_list') # [Row(uckey=u'native,68bcd2720e5011e79bc8fa163e05184e,4G,g_m,2,CPM,19', day=u'2019-11-02', day_price_imp=[u'3:58'], day_price_imp_map={u'2019-11-02': [u'3:58']})] df = df.withColumn('day_price_imp_map', expr("map(day, day_price_imp)")) @@ -269,24 +351,32 @@ def run(cfg): # [Row(cluster_uckey=u'1119', price_cat=u'2', day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}, ratio=0.11989551782608032, uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')] df = df.select('cluster_uckey', 'price_cat', 'day_prediction_map', 'ratio', 'uckey') + # ------------------------------+----------+--------------------------------------+ + # |cluster_uckey|price_cat|day_prediction_map |ratio |uckey | + # +-------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------------+ + # |1009 |2 |[2020-06-26 -> 169.0, 2020-06-27 -> 170.0, 2020-06-24 -> 158.0, 2020-06-25 -> 155.0, 2020-06-28 -> 146.0, 2020-06-29 -> 127.0, 2020-06-30 -> 127.0, 2020-06-22 -> 171.0, 2020-06-23 -> 159.0, 2020-06-21 -> 227.0]|0.00800733|native,z041bf6g4s,WIFI,g_f,5,CPM,40,40| + # +-------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------------+ + + if not skip_ipl_reverse_mapping: + df = ipl_revrse_mapping(df, ipl_dist_map_brodcast=ipl_dist_map_brodcast, df_uckey_distinct=df_uckey_distinct) + # [Row(ucdoc_elements=Row(price_cat=u'2', ratio=0.11989551782608032, day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0}), uckey=u'native,66bcd2720e5011e79bc8fa163e05184e,WIFI,g_m,5,CPC,5')] ucdoc_elements_type = StructType([StructField('price_cat', StringType(), False), StructField( 'ratio', FloatType(), False), StructField('day_prediction_map', MapType(StringType(), FloatType()), False)]) df = df.withColumn('ucdoc_elements_pre_price_cat', udf(lambda price_cat, ratio, day_prediction_map: (price_cat, ratio, day_prediction_map), ucdoc_elements_type)(df.price_cat, df.ratio, df.day_prediction_map)).select('ucdoc_elements_pre_price_cat', 'uckey') + df.write.option("header", "true").option("encoding", "UTF-8").mode('overwrite').format('hive').saveAsTable(prediction_table_name + '_details') + # [Row(uckey=u'splash,d971z9825e,WIFI,g_m,1,CPT,74', ucdoc_elements=[Row(price_cat=u'1', ratio=0.5007790923118591, day_prediction_map={u'2019-11-02': 220.0, u'2019-11-03': 305.0})])] - df = df.groupBy('uckey').agg(collect_list( - 'ucdoc_elements_pre_price_cat').alias('ucdoc_elements')) + df = df.groupBy('uckey').agg(collect_list('ucdoc_elements_pre_price_cat').alias('ucdoc_elements')) - df = df.withColumn('prediction_output', udf(transform.generate_ucdoc(traffic_dist), StringType())( - df.uckey, df.ucdoc_elements)) + df = df.withColumn('prediction_output', udf(transform.generate_ucdoc(traffic_dist), StringType())(df.uckey, df.ucdoc_elements)) df_predictions_doc = df.select('uckey', 'prediction_output') # Save the predictions to Hive. - table_name = cfg['es_predictions_index'] - df_predictions_doc.write.option("header", "true").option("encoding", "UTF-8").mode('overwrite').format('hive').saveAsTable(table_name) + df_predictions_doc.write.option("header", "true").option("encoding", "UTF-8").mode('overwrite').format('hive').saveAsTable(prediction_table_name) # rdd = df_predictions_doc.rdd.map(lambda x: transform.format_data(x, 'ucdoc')) # rdd.saveAsNewAPIHadoopFile( diff --git a/Processes/dlpredictor/run.sh b/Processes/dlpredictor/run.sh index a0051f9..004b52e 100644 --- a/Processes/dlpredictor/run.sh +++ b/Processes/dlpredictor/run.sh @@ -8,6 +8,12 @@ then spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --py-files $SCRIPTPATH/dist/dlpredictor-1.6.0-py2.7.egg $SCRIPTPATH/dlpredictor/show_config.py $SCRIPTPATH/conf/config.yml fi +# Build ipl_dist_map table AND unique_origianl_uckey +if true +then + spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 32G --driver-memory 32G --py-files dist/dlpredictor-1.6.0-py2.7.egg,lib/imscommon-2.0.0-py2.7.egg,lib/predictor_dl_model-1.6.0-py2.7.egg --conf spark.driver.maxResultSize=5G dlpredictor/main_build_ipl_dist.py conf/config.yml +fi + # Start the predictor if true then