Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Release dlpredictor2
  • Loading branch information
spyglass700 committed Jul 16, 2020
1 parent 7a105fe commit d99352cd12b822ee7403b2ea57741d284247c861
Show file tree
Hide file tree
Showing 31 changed files with 1,654 additions and 426 deletions.
@@ -1,20 +1,47 @@
log:
level: 'WARN' # log level for spark and app
level: 'INFO' # log level for spark and app

pipeline:
factdata_table_name: "factdata3m2"
yesterday: "2018-03-31" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 90
tmp_table_name: 'trainready_tmp' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
bucket_size: 10 # maximum number of buckets to process starting from 0
bucket_step: 10 # size of bucket batch that is processed in one iteration
tfrecords_hdfs_path: 'factdata.tfrecord.02212020' # it is hdfs location for tfrecords, over-writes the existing files
tf_statistics_path: '/home/faezeh/tf_statistics.pkl'
holidays: ['2018-01-01', '2018-01-15', '2018-02-19', '2018-03-31', '2018-05-28', '2018-07-04', '2018-09-03',
'2018-11-22', '2018-11-23', '2018-12-25']
time_series: # This is done on whole bucketized data
factdata_table_name: "modified_ready_factdata_06162020"
conditions: []
yesterday: "2020-02-08" # data is used for training from -<prepare_past_days> to -1(yesterday)
prepare_past_days: 90
bucket_size: 1000 # maximum number of buckets to process starting from 0
bucket_step: 200 # size of bucket batch that is processed in one iteration
output_table_name: 'pipeline_ts_tmp_06262020' # name of the hive table that keeps cleansed and normalized data before writing it into tfrecords, over-writes the existing table
uckey_clustring: # This is done on whole data, not slicing on buckets
pre_cluster_table_name: 'pipeline_pre_cluster_tmp_07082020'
create_pre_cluster_table: True
output_table_name: 'pipeline_cluster_tmp_07082020'
cluster_size:
number_of_virtual_clusters: 1000
datapoints_min_th: 0.15
datapoints_th_uckeys: 0.5
datapoints_th_clusters: 0.5
popularity_norm: 0.01
popularity_th: 5
median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
normalization: # This is done on whole data, not slicing on buckets
output_table_name: 'trainready_tmp_07082020'
columns: {
'price_cat':['1','2','3'],
'a': ['','1','2','3','4','5','6'],
'g':['','g_f','g_m','g_x'],
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':['d4d7362e879511e5bdec00163e291137', 'b6le0s4qo8', 'd47737w664', 'd971z9825e', '72bcd2720e5011e79bc8fa163e05184e', 'j1430itab9wj3b', 'w3wx3nv9ow5i97', 'g9iv6p4sjy', '71bcd2720e5011e79bc8fa163e05184e', '7b0d7b55ab0c11e68b7900163e3e481d', 'm1040xexan', 'x2fpfbm8rt', '05', '66bcd2720e5011e79bc8fa163e05184e', 'g7m2zuits8', 'l2d4ec6csv', 'a8syykhszz', 'w9fmyd5r0i', 'a47eavw7ex', 'p7gsrebd4m', 'q4jtehrqn2', '03', 'l03493p0r3', 's4z85pd1h8', 'f1iprgyl13', '17dd6d8098bf11e5bdec00163e291137', 'e351de37263311e6af7500163e291137', '68bcd2720e5011e79bc8fa163e05184e', '5cd1c663263511e6af7500163e291137', 'k4werqx13k', 'x0ej5xhk60kjwq', '04', 'a290af82884e11e5bdec00163e291137', '15e9ddce941b11e5bdec00163e291137', 'z041bf6g4s', 'd9jucwkpr3', 'c4n08ku47t']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:
tfrecords_hdfs_path: 'factdata.tfrecord.07082020' # it is hdfs location for tfrecords, over-writes the existing files
tf_statistics_path: 'tf_statistics.pkl'
distribution:
output_table_name: 'pipeline_distribution_tmp_07082020'
output_detail_table_name: 'pipeline_distribution_detail_tmp_07082020'

tfrecorder_reader:
tfrecords_local_path: '/home/faezeh/factdata.tfrecord.02212020' # it us local path for tfrecords, over-writes the existing files
tfrecords_local_path: '/home/faezeh/factdata.tfrecord.07082020' # it us local path for tfrecords, over-writes the existing files
data_dir: 'data/vars'
valid_threshold: 0.0 # default=0.0, type=float, help="Series minimal length threshold (pct of data length)"
add_days: 0 # default=64, type=int, help="Add N days in a future for prediction"
@@ -60,7 +87,7 @@ save_model:


elastic_search:
es_host: "10.193.217.111"
es_host: "10.213.37.41"
es_port: 9200
es_index: 'model_stats'
es_type: 'stat'
Binary file not shown.
Binary file not shown.
@@ -20,3 +20,19 @@ Pipeline takes the following steps:
4. Writes model into local directory
5. Compare the new model and old model (new model evaluation)(future)
6. Set the predictor to use the new model - predictor reads the name of the model that it uses from Ealsticsearch (future)

### uckey aggregation
uckey='123',a=1,g=male,ts=[10,15], count=25, useless=true, m=12.5, m_n = (m-M)/STD = -0.1667
uckey='434',a=3,g=male,ts=[20,5], count=25, useless=true, m=12.5, m_n = -0.1667
uckey='645',a=3,g=female,ts=[30,10], count=40, useless=false, m=20, m_n = 0.33

M = mean of all ms = 15
STD = std of all ms = 4.33
total_count = 90

threshold =

agg
uckey='<don't care>',a={1:1,3:2},g={male:2,female:1},ts=[60,30]

sha256('123-434-655')='ffjfnjrfjeredmdkewmdwke' = 256 chars
@@ -20,7 +20,6 @@
import json
import requests
from elasticsearch import Elasticsearch

import yaml
import argparse

@@ -0,0 +1,245 @@
# Copyright 2019, Futurewei Technologies
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import math
import pickle
import statistics
import yaml
import argparse
import logging
import sys

from pyspark import SparkContext, SparkConf, Row
from pyspark.sql.functions import concat_ws, count, lit, col, udf, expr, collect_list, explode, avg, stddev, rand
from pyspark.sql.types import IntegerType, StringType, MapType, ArrayType, FloatType, BooleanType
from pyspark.sql import HiveContext
from datetime import datetime, timedelta


import transform as transform


def _save_as_table(df, table_name, hive_context, create_table):

if create_table:
command = """
DROP TABLE IF EXISTS {}
""".format(table_name)

hive_context.sql(command)

df.createOrReplaceTempView("r900_temp_table")

command = """
CREATE TABLE IF NOT EXISTS {} as select * from r900_temp_table
""".format(table_name)

hive_context.sql(command)


def estimate_number_of_non_dense_clusters(df, median_popularity_of_dense):
# find avg of non-dense popularity
median_non_dense_p = df.filter('sparse=True').agg(
expr('percentile_approx(p, 0.5)').alias('_nondensp')).take(1)[0]['_nondensp']

no_of_items_in_a_cluster = median_popularity_of_dense / median_non_dense_p

no_of_cluster = df.filter('sparse=True').count() * \
1.0 / no_of_items_in_a_cluster / 3.0

return int(no_of_cluster) + 1


def list_to_map(mlist):
count_map = {}
for item in mlist:
if item not in count_map:
count_map[item] = 0
count_map[item] += 1
sum_of_values = sum(count_map.values())
for k, v in count_map.items():
count_map[k] = v*1.0/sum_of_values
return count_map


def agg_ts(mlist):
# mlsit size is prepare_past_days
l = len(mlist[0])
result = [0 for _ in range(l)]
for ts in mlist:
for i in range(len(ts)):
n = ts[i]
if not n:
n = 0
result[i] += n
return result


def agg_on_uckey_price_cat(df):

column_names = ['ts', 'a', 'g', 't', 'si', 'r']
agg_exprs = [collect_list(col).alias(col) for col in column_names]
df = df.groupBy('uckey', 'price_cat').agg(*agg_exprs)

list_to_map_udf = udf(list_to_map, MapType(
StringType(), FloatType(), False))
for column_name in column_names:
if column_name == 'ts':
continue
column_name_agg = column_name + '_agg'
df = df.withColumn(column_name_agg, list_to_map_udf(column_name))
df = df.drop(column_name)
df = df.withColumnRenamed(column_name_agg, column_name)

ts_agg_udf = udf(agg_ts, ArrayType(IntegerType()))
df = df.withColumn('ts_agg', ts_agg_udf(df.ts))
df = df.drop('ts')
df = df.withColumnRenamed('ts_agg', 'ts')

return df


def is_spare(datapoints_threshold, popularity_norm):
def _helper(p_n, ts):
num_list = [_ for _ in ts if _ is not None and _ != 0]
if (len(num_list) * 1.0 > datapoints_threshold * len(ts) and p_n >= popularity_norm):
return False
return True
return _helper


def is_non_spiked_uckey(whole_popularity_avg, popularity_th, datapoints_min_th):
def _helper(p, ts):
num_list = [_ for _ in ts if _ is not None and _ != 0]
return not(p > whole_popularity_avg and len(num_list) * 1.0 < datapoints_min_th * len(ts))
return _helper


def remove_weak_uckeys(df, popularity_th, datapoints_min_th):
df = df.filter(udf(lambda p: p >= popularity_th, BooleanType())(df.p))
whole_popularity_avg = df.agg(avg('p').alias('_avg')).take(1)[0]['_avg']
df = df.filter(udf(is_non_spiked_uckey(whole_popularity_avg,
popularity_th, datapoints_min_th), BooleanType())(df.p, df.ts))
return df


def run(hive_context, cluster_size_cfg, input_table_name, pre_cluster_table_name, output_table_name, create_pre_cluster_table):

datapoints_th_uckeys = cluster_size_cfg['datapoints_th_uckeys']
datapoints_th_clusters = cluster_size_cfg['datapoints_th_clusters']
popularity_norm = cluster_size_cfg['popularity_norm']
median_popularity_of_dense = cluster_size_cfg['median_popularity_of_dense']
number_of_virtual_clusters = cluster_size_cfg['number_of_virtual_clusters']
popularity_th = cluster_size_cfg['popularity_th']
datapoints_min_th = cluster_size_cfg['datapoints_min_th']

# Read factdata table
command = """
select ts,price_cat,uckey,a,g,t,si,r from {}
""".format(input_table_name)

# DataFrame[uckey: string, price_cat: string, ts: array<int>, a: string, g: string, t: string, si: string, r: string]
df = hive_context.sql(command)

# add imp
df = df.withColumn('imp', udf(lambda ts: sum(
[_ for _ in ts if _]), IntegerType())(df.ts))

# add popularity = mean
df = df.withColumn('p', udf(lambda ts: sum(
[_ for _ in ts if _])/(1.0*len(ts)), FloatType())(df.ts))

# remove weak uckeys
df = remove_weak_uckeys(df, popularity_th, datapoints_min_th)

# add normalized popularity = mean_n
df, _ = transform.normalize_ohe_feature(df, ohe_feature='p')

df = df.withColumn('sparse', udf(
is_spare(datapoints_th_uckeys, popularity_norm), BooleanType())(df.p_n, df.ts))

if number_of_virtual_clusters <= 0:
number_of_virtual_clusters = estimate_number_of_non_dense_clusters(
df, median_popularity_of_dense)

df = df.withColumn("cn", (rand()*1000000 %
number_of_virtual_clusters).cast('int'))

if create_pre_cluster_table:
_save_as_table(df, pre_cluster_table_name, hive_context, True)

# change the uckey of sparse to cn
df = df.withColumn('new_uckey', udf(lambda uckey, cn, sparse: str(
cn) if sparse else uckey, StringType())(df.uckey, df.cn, df.sparse))
df = df.drop('uckey')
df = df.withColumnRenamed('new_uckey', 'uckey')

df = agg_on_uckey_price_cat(df)

# replace nan and zero with median
df = transform.replace_nan_with_zero(df)

# add imp
df = df.withColumn('imp', udf(lambda ts: sum(
[_ for _ in ts if _]), IntegerType())(df.ts))

# add popularity = mean
df = df.withColumn('p', udf(lambda ts: sum(
[_ for _ in ts if _])/(1.0*len(ts)), FloatType())(df.ts))

# add normalized popularity = mean_n
df, _ = transform.normalize_ohe_feature(df, ohe_feature='p')

df = df.filter(udf(lambda p_n, ts: not is_spare(datapoints_th_clusters, -
sys.maxsize-1)(p_n, ts), BooleanType())(df.p_n, df.ts))

_save_as_table(df, output_table_name, hive_context, True)


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Prepare data')
parser.add_argument('config_file')
args = parser.parse_args()

# Load config file
with open(args.config_file, 'r') as ymlfile:
cfg = yaml.load(ymlfile)

cfg_log = cfg['log']
cfg = cfg['pipeline']

sc = SparkContext()
hive_context = HiveContext(sc)
sc.setLogLevel(cfg_log['level'])

output_table_name = cfg['uckey_clustring']['output_table_name']
pre_cluster_table_name = cfg['uckey_clustring']['pre_cluster_table_name']
create_pre_cluster_table = cfg['uckey_clustring']['create_pre_cluster_table']
input_table_name = cfg['time_series']['output_table_name']
cluster_size_cfg = cfg['uckey_clustring']['cluster_size']

run(hive_context=hive_context,
cluster_size_cfg=cluster_size_cfg,
input_table_name=input_table_name,
pre_cluster_table_name=pre_cluster_table_name,
output_table_name=output_table_name,
create_pre_cluster_table=create_pre_cluster_table)

sc.stop()

0 comments on commit d99352c

Please sign in to comment.