Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Merge pull request #2 from radibnia77/main
Add tests
  • Loading branch information
xun-hu-at-futurewei-com committed Mar 9, 2021
2 parents 42b6986 + 1524937 commit 049f420fd948d7a0533df891dc22ecdfc3911351
Showing 8 changed files with 365 additions and 16 deletions.
Binary file not shown.
@@ -35,11 +35,11 @@
def __data_parser(serialized_example):

features = tf.parse_single_example(serialized_example,
features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.int64, allow_missing=True),
'keywords' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
features={'interval_starting_time': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'kwi' :tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'did_index': tf.FixedLenFeature([], tf.int64),
'click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'kwi_click_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
'kwi_show_counts': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
# 'media_category_index': tf.FixedLenFeature([], tf.int64),
# 'net_type_index': tf.FixedLenFeature([], tf.int64),
'gender': tf.FixedLenFeature([], tf.int64),
@@ -49,11 +49,11 @@ def __data_parser(serialized_example):

})
did_str = tf.cast(features['did'], tf.string)
time_interval = tf.cast(features['interval_starting_time'], tf.int64)
keyword = tf.cast(features['keywords'], tf.string)
time_interval = tf.cast(features['interval_starting_time'], tf.string)
keyword = tf.cast(features['kwi'], tf.string)
ucdoc = tf.cast(features['did_index'], tf.int64)
click_counts = tf.cast(features['click_counts'], tf.string)
show_counts = tf.cast(features['show_counts'], tf.string)
click_counts = tf.cast(features['kwi_click_counts'], tf.string)
show_counts = tf.cast(features['kwi_show_counts'], tf.string)
# media_category = tf.cast(features['media_category_index'], tf.int64)
# net_type_index = tf.cast(features['net_type_index'], tf.int64)
gender = tf.cast(features['gender'], tf.int64)
@@ -126,11 +126,14 @@ def run(cfg):
with open(stats, 'rb') as f:
ucdoc_num = pickle.load(f)['distinct_records_count']
counter = 0
mapping=[]
for i in range(ucdoc_num):
x = sess.run(next_el)
log = list(x[0:7])
time_interval, ucdoc, click_counts, show_counts, gender, age, keyword = log[0], log[1], log[2], log[3], log[4], log[5], log[6]
log = list(x[0:8])
time_interval_s, ucdoc, click_counts, show_counts, gender, age, keyword ,did = log[0], log[1], log[2], log[3], log[4], log[5], log[6], log[7]
mapping.append([did,ucdoc])

time_interval = [int(i) for i in time_interval_s]
keyword_int = [[int(i) for i in keyword[j].decode().split(",")] for j in range(len(keyword))]
show_counts_list = str_to_intlist(show_counts)
click_counts_list = str_to_intlist(click_counts)
@@ -186,17 +189,20 @@ def run(cfg):
cate_list = np.array([x for x in range(30)])
user_count, item_count , cate_count = len(set(ucdoc_lst)) , 30, 30
print(counter)
with open('label_gdin_30.pkl', 'wb') as f:
mapping = pd.DataFrame(mapping)
mapping.to_csv("mapping_pipeline.csv")
with open('label_lookalike.pkl', 'wb') as f:
pickle.dump(label, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(user_att,f, pickle.HIGHEST_PROTOCOL )

with open('ad_dataset_gdin_30.pkl', 'wb') as f:
with open('ad_dataset_lookalike.pkl', 'wb') as f:
pickle.dump(train_set, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(test_set, f, pickle.HIGHEST_PROTOCOL)
pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL)
pickle.dump((user_count, item_count, cate_count), f, pickle.HIGHEST_PROTOCOL)



if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Prepare data')
@@ -0,0 +1,26 @@
product_tag: 'lookalike'
pipeline_tag: 'unittest'
log:
level: 'ERROR' # log level for spark and app
pipeline:
main_clean:
did_bucket_num: 2 # Number of partitions for did
load_logs_in_minutes: 1440 #1440/day, original=14400
conditions: {
'new_slot_id_list': [
'abcdef0', 'abcdef1', 'abcdef2', 'abcdef3', 'abcdef4',
'abcdef5', 'abcdef6', 'abcdef7', 'abcdef8', 'abcdef9'
],
'new_slot_id_app_name_list': [
'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading',
'Huawei Magazine', 'Huawei Browser', 'Huawei Video', 'Huawei Music', 'Huawei Reading'
],
'starting_date': '2020-01-01',
'ending_date': '2020-01-11'
}
main_logs:
interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
logs_output_table_name: 'lookalike_unittest_trainready_input'
main_trainready:
trainready_output_table: 'lookalike_unittest_trainready_output'

@@ -63,6 +63,10 @@ def create_keywords_table (spark, table_name):
df = create_keywords(spark)
write_to_table(df, table_name)

# Creates unified log data and writes it to Hive.
def create_unified_log_table (spark, table_name):
df = create_unified_log(spark)
write_to_table(df, table_name)

#==========================================
# Create dataframes for the unit tests
@@ -160,6 +164,20 @@ def create_raw_log (spark):
def create_cleaned_log (spark):
data = [
('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
# ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ),
('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', '2020-01-02', '1', ),
('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', '2020-01-03', '1', ),
('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', '2020-01-04', '1', ),
@@ -220,6 +238,85 @@ def create_keywords(spark):

return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

# Returns a dataframe with a unified log data.
def create_unified_log (spark):


data = [
('0000001', 0, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'game-avg', '2', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 1, '2020-01-01 12:34:56.78', 'reading', '3', 'splash', 'WIFI', 0, 0, '1000', 1577836800, 1577910896, '2020-01-01', '1', ),
('0000001', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
('0000001', 1, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 0, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
('0000001', 0, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
('0000001', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 0, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
('0000002', 0, '2020-01-02 12:34:56.78', 'travel', '1', 'splash', 'WIFI', 1, 0, '1000', 1577923200, 1577997296, '2020-01-02', '1', ),
('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
('0000003', 1, '2020-01-03 12:34:56.78', 'travel', '1', 'native', '4G', 0, 1, '1001', 1578009600, 1578083696, '2020-01-03', '1', ),
]

schema = StructType([
StructField('did', StringType(), True),
StructField('is_click', IntegerType(), True),
StructField('action_time', StringType(), True),
StructField('keyword', StringType(), True),
StructField('keyword_index', StringType(), True),
StructField('media', StringType(), True),
StructField('net_type', StringType(), True),
StructField('gender', IntegerType(), True),
StructField('age', IntegerType(), True),
StructField('adv_id', StringType(), True),
StructField('interval_starting_time', IntegerType(), True),
StructField('action_time_seconds', IntegerType(), True),
StructField('day', StringType(), True),
StructField('did_bucket', StringType(), True),
])

return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

# Returns a dataframe with trainready data.
def create_trainready_data (spark):
data = [
(0, 0, '0000001', 1000000001, [u'1578009600', u'1577923200', u'1577836800'], [u'travel', u'travel', u'travel,game-avg'], [u'1', u'1', u'1,2'], [u'1:2', u'1:2', u'1:2,2:1'], [u'1:1', u'1:1', u'1:1,2:0'], '1', ),
(0, 1, '0000002', 1000000002, [u'1577923200'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
(1, 0, '0000003', 1000000003, [u'1578009600'], [u'travel'], [u'1'], [u'1:2'], [u'1:1'], '1', ),
(1, 1, '0000004', 1000000004, [u'1578096000'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
(2, 0, '0000005', 1000000005, [u'1578182400'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '1', ),
(2, 1, '0000006', 1, [u'1578268800'], [u'game-avg'], [u'2'], [u'2:2'], [u'2:1'], '0', ),
(3, 0, '0000007', 2, [u'1578355200'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
(3, 1, '0000008', 3, [u'1578441600'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
(4, 0, '0000009', 4, [u'1578528000'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '0', ),
(4, 1, '0000010', 1000000006, [u'1578614400'], [u'reading'], [u'3'], [u'3:2'], [u'3:1'], '1', ),
]

schema = StructType([
StructField('age', IntegerType(), True),
StructField('gender', IntegerType(), True),
StructField('did', StringType(), True),
StructField('did_index', LongType(), True),
StructField('interval_starting_time', ArrayType(), True),
StructField('interval_keywords', ArrayType(), True),
StructField('kwi', ArrayType(), True),
StructField('kwi_show_counts', ArrayType(), True),
StructField('kwi_click_counts', ArrayType(), True),
StructField('did_bucket', StringType(), True),
])

return spark.createDataFrame(spark.sparkContext.parallelize(data), schema)


# Prints to screen the code to generate the given data frame.
def print_df_generator_code (df):
columns = df.columns
@@ -144,6 +144,7 @@ def test_clean_logs (self):

# Validate the cleaned clicklog table.
df_clicklog = util.load_df(self.hive_context, clicklog_output_table)
print(df_clicklog.sort('action_time').show(100, False))
self.validate_cleaned_log(df_clicklog, conditions, df_persona, df_keywords, df_log, cfg['pipeline']['main_clean']['did_bucket_num'])

# Validate the cleaned showlog table.
@@ -83,14 +83,15 @@ def test_run (self):

# Validate the output.
df = util.load_df(self.hive_context, log_output_table)
print(df.sort('action_time_seconds').show(100, False))
print_df_generator_code(df.sort('did', 'is_click'))
self.validate_unified_logs(df, create_cleaned_log(self.spark))


def validate_unified_logs (self, df, df_log):
# Verify the column names.
columns = ['is_click', 'did', 'adv_id', 'media',
'net_type', 'action_time', 'gender', 'age',
columns = ['is_click', 'did', 'adv_id', 'media', 'action_time_seconds',
'net_type', 'action_time', 'gender', 'age', 'interval_starting_time',
'keyword', 'keyword_index', 'day', 'did_bucket']
for name in columns:
self.assertTrue(name in df.columns)

0 comments on commit 049f420

Please sign in to comment.