Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Update tests and fix UTC time
1. Add more tests
2. Fix bug in calculating record interval_time_in_seconds
  • Loading branch information
radibnia77 committed Mar 9, 2021
1 parent 8e1a53e commit 2057d32c3f19256577233de1807fc5be14a61e1a
Showing 14 changed files with 640 additions and 278 deletions.
@@ -363,14 +363,16 @@ The Lookalike Service builds on the work done on the DIN model. From the DIN mo
##### System Entity Diagram

```mermaid
graph LR
graph TD
user("DMP Console")--> A(["API Handler"])
style user fill:#f9f,stroke:#333,stroke-width:4px
A-->I[["Extended-Audience-Reader"]]
I---J[("HDFS Extended Audience")]
A-->B[["Audience-Extender"]]
A-->|Asynchronism|A1[["Audience-Extender"]]
A1-->A2[["Queue"]]
A2-->B[["Audience-Extender-Worker"]]
B-->|step 1|H["Seed-Data-Reader"]
B-->|step 2|C["Seed-Clustering"]
@@ -529,4 +531,140 @@ Configuration of the Lookalike Service will be stored in Zookeeper. The configu
- Hive table name for score table
- Number of clusters to group existing audience
- Number of highest similarities to include in average
- Percentage of user extension,
- Percentage of user extension



# Spark Environment Tuning

Low performance on Spark operations can caused by these factors:

1. Level of Parallelism
2. Data Locality



### Level of Parallelism

Spark is about parallel computations. Too low parallelism means the job will be running longer too high parallelism would require a lots of resource. So defining the degree of parallelism depends on the number of cores available in the cluster. **Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster.**

There are 2 properties which can be used to increase the level of parallelism -

```
spark.default.parallelism
spark.sql.shuffle.partitions
```

```spark.sql.shuffle.partitions``` is used when you are dealing with spark SQL or dataframe API.
A right level of Parallelism means that a partition can be fit into a memory of one node. To achieve right level of Parallelism follow these steps:
> a. Identify right about of memory for each executer.
> b. Partition data so that each partition can be fit into memory of a node.
> c. Use right number of executers.
> d. Respect partitions in queries
### Data Locality
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together, then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
Calling `groupBy()`, `groupByKey()`, `reduceByKey()`, `join()` and similar functions on dataframe results in shuffling data between multiple executors and even machines and finally repartitions data into 200 partitions by default. Pyspark default defines shuffling partition to 200 using `spark.sql.shuffle.partitions` configuration.
### Experiment
The project was run on the spark cluster version 2.3 with Java 8.
#### Spark Environment Settings
The Hadoop cluster has the '600GB' Memory and '200' V-Cores.
The following command was used for each step of the pipeline.
```shell
spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict <python-file> config.yml
```

This command engages 50% of the cluster (110 V-Cores) to carry out the operation.



#### Elapsed Time

The following is the elapsed time for each step of the pipeline.



|STEP|INPUT TABLE NAME|TABLE SIZE RECORDS|PARTITIONS|ELAPSED|
|:-------------| :------------: |:------------: |-------------- |
|main_clean.py | ads_cleanlog_0520 |1,036,183|NONE|51mins, 18sec|
| | ads_showlog_0520 |44,946,000||
| | ads_persona_0520 |380,000||
|main_logs.py|lookalike_02242021_clicklog|251,271|DAY,DID|4mins, 41sec|
||lookalike_02242021_showlog|12,165,993||
|main_trainready.py|lookalike_02242021_logs|12,417,264|DAY,DID|15mins, 0sec|



#### Debugging for Performance Bottlenecks

One way to find a bottleneck is to measure the elapsed time for an operation.

Use the following code after a specific operation to measure the elapsed time.

```python
import timeit
def get_elapsed_time(df):
start = timeit.default_timer()
df.take(1)
end = timeit.default_timer()
return end-start
```

For example in the following pyspark code, the `get_elapsed_time(df)` is called in 2 different places. Note, that the time measurement is from the beginning of the code up to the place where`get_elapsed_time(df)` is called.

```spark
trainready_table_temp
batched_round = 1
for did_bucket in range(did_bucket_num):
command = """SELECT *
FROM {}
WHERE
did_bucket= '{}' """
df = hive_context.sql(command.format(trainready_table_temp, did_bucket))
df = collect_trainready(df)
print(get_elapsed_time(df))
df = build_feature_array(df)
print(get_elapsed_time(df))
for i, feature_name in enumerate(['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']):
df = df.withColumn(feature_name, col('metrics_list').getItem(i))
# Add did_index
df = df.withColumn('did_index', monotonically_increasing_id())
df = df.select('age', 'gender', 'did', 'did_index', 'interval_starting_time', 'interval_keywords',
'kwi', 'kwi_show_counts', 'kwi_click_counts', 'did_bucket')
mode = 'overwrite' if batched_round == 1 else 'append'
write_to_table_with_partition(df, trainready_table, partition=('did_bucket'), mode=mode)
batched_round += 1
return
```










@@ -1,5 +1,5 @@
product_tag: 'lookalike'
pipeline_tag: '02182021'
pipeline_tag: '03042021'
persona_table_name: 'ads_persona_0520'
showlog_table_name: 'ads_showlog_0520'
clicklog_table_name: 'ads_clicklog_0520'
@@ -108,91 +108,53 @@ pipeline:
clicklog_output_table: '{product_tag}_{pipeline_tag}_clicklog'
conditions: {
'new_slot_id_list': [
'06',
'11',
'05',
'04',
'03',
'02',
'01',
'l03493p0r3',
'x0ej5xhk60kjwq',
'g7m2zuits8',
'w3wx3nv9ow5i97',
'a1nvkhk62q',
'g9iv6p4sjy',
'c4n08ku47t',
'b6le0s4qo8',
'd9jucwkpr3',
'p7gsrebd4m',
'a8syykhszz',
'l2d4ec6csv',
'j1430itab9wj3b',
's4z85pd1h8',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'a47eavw7ex',
'68bcd2720e5011e79bc8fa163e05184e',
'66bcd2720e5011e79bc8fa163e05184e',
'72bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'q4jtehrqn2',
'm1040xexan',
'd971z9825e',
'a290af82884e11e5bdec00163e291137',
'w9fmyd5r0i',
'x2fpfbm8rt',
'e351de37263311e6af7500163e291137',
'k4werqx13k',
'5cd1c663263511e6af7500163e291137',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137',
'15e9ddce941b11e5bdec00163e291137'
'06',
'11',
'05',
'04',
'03',
'02',
'01',
'l03493p0r3',
'x0ej5xhk60kjwq',
'g7m2zuits8',
'w3wx3nv9ow5i97',
'a1nvkhk62q',
'g9iv6p4sjy',
'd47737w664',
'c4n08ku47t',
'b6le0s4qo8',
'd9jucwkpr3',
'p7gsrebd4m',
'a8syykhszz',
'l2d4ec6csv',
'j1430itab9wj3b',
'h034y5sp0i',
's4z85pd1h8',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'a47eavw7ex',
'68bcd2720e5011e79bc8fa163e05184e',
'66bcd2720e5011e79bc8fa163e05184e',
'72bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'q4jtehrqn2',
'm1040xexan',
'd971z9825e',
'a83jryvehg',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a290af82884e11e5bdec00163e291137',
'w9fmyd5r0i',
'x2fpfbm8rt',
'e351de37263311e6af7500163e291137',
'k4werqx13k',
'5cd1c663263511e6af7500163e291137',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137',
'15e9ddce941b11e5bdec00163e291137'
],
'new_slot_id_app_name_list': [
'Huawei Magazine',
'Huawei Magazine',
'Huawei Magazine',
'Huawei Magazine',
'Huawei Magazine',
'Huawei Magazine',
'Huawei Magazine',
'Huawei Browser',
'Huawei Video',
'Huawei Video',
'Huawei Video',
'Huawei Music',
'Huawei Music',
'Huawei Music',
'Huawei Music',
'Huawei Reading',
'Huawei Reading',
'Huawei Reading',
'Huawei Reading',
'Video 1.0',
'Video 2.0',
'Tencent Video',
'AI assistant',
'AI assistant',
'AI assistant',
'AI assistant',
'Huawei Video',
'Huawei Video',
'Huawei Video',
'Video 1.0',
'Themes',
'Huawei Music',
'Huawei Reading',
'Huawei Reading',
'Huawei Reading',
'Huawei Reading',
'Honor Reading',
'Video 1.0',
'Video 2.0',
'HiSkytone'
],
'starting_date': '2019-12-19', #2019-12-19
'ending_date': '2020-04-15' #2020-04-15 #2019-12-23
'starting_date': '2019-12-18', #2019-12-18
'ending_date': '2020-04-16' #2020-04-15 #2019-12-23
}
main_logs:
interval_time_in_seconds: 86400 # default=1 day, group logs in interval time.
@@ -69,25 +69,21 @@ def clean_batched_log(df, df_persona, conditions, df_keywords, did_bucket_num):
df_keywords: keywords-spread-app-id dataframe
This methods:
1. Filters right slot-ids and add media-category.
1. Filters right slot-ids.
2. Add gender and age from persona table to each record of log
3. Add keyword to each row by looking to spread-app-id
"""
def filter_new_si(df, new_slot_id_list, new_slot_id_app_name_list):
def filter_new_si(df, new_slot_id_list):
"""
This filters logs with pre-defined slot-ids.
"""
new_si_set = set(new_slot_id_list)
_udf = udf(lambda x: x in new_si_set, BooleanType())
df = df.filter(_udf(df.slot_id))
slot_map = dict(zip(new_slot_id_list, slot_app_map))
_udf_map = udf(lambda x: slot_map[x] if x in slot_map else '', StringType())
df = df.withColumn('media_category', _udf_map(df.slot_id))
return df

new_slot_id_list = conditions['new_slot_id_list']
slot_app_map = conditions['new_slot_id_app_name_list']
df = filter_new_si(df, new_slot_id_list, slot_app_map)
df = filter_new_si(df, new_slot_id_list)
df = df.join(df_persona, on=['did'], how='inner')
df = df.join(df_keywords, on=['spread_app_id'], how="inner")
df = add_day(df)
@@ -109,7 +105,7 @@ def clean_logs(cfg, df_persona, df_keywords, log_table_names):
starting_time = datetime.strptime(start_date, "%Y-%m-%d")
ending_time = datetime.strptime(end_date, "%Y-%m-%d")
columns = ['spread_app_id', 'did', 'adv_id', 'media', 'slot_id', 'device_name', 'net_type', 'price_model',
'action_time', 'media_category', 'gender', 'age', 'keyword', 'keyword_index', 'day', 'did_bucket']
'action_time', 'gender', 'age', 'keyword', 'keyword_index', 'day', 'did_bucket']

batched_round = 1
while starting_time < ending_time:
@@ -150,12 +146,11 @@ def clean_logs(cfg, df_persona, df_keywords, log_table_names):
# write_to_table(df_showlog_batched, "ads_showlog_0520_2days", mode='overwrite')
# write_to_table(df_clicklog_batched, "ads_clicklog_0520_2days", mode='overwrite')
# return


# Node: for mode='append' spark might throw socket closed exception, it was due to bug in spark and does not affect data and table.
mode = 'overwrite' if batched_round == 1 else 'append'

df_showlog_batched = clean_batched_log(df_showlog_batched, df_persona, conditions, df_keywords, did_bucket_num=did_bucket_num)

# Node: for mode='append' spark might throw socket closed exception, it was due to bug in spark and does not affect data and table.
df_showlog_batched = df_showlog_batched.select(columns)
write_to_table_with_partition(df_showlog_batched, showlog_output_table, partition=('day', 'did_bucket'), mode=mode)

0 comments on commit 2057d32

Please sign in to comment.