Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Improve accuracy
consider AdUnit-ID in clustering sparse uckeys
remove dead-points in time-series
  • Loading branch information
radibnia77 committed Jun 3, 2021
1 parent fae1b3b commit 2a8fb033f59fcb40f3790b4fdf00e146f853d4b7
Showing 17 changed files with 418 additions and 141 deletions.
@@ -8,5 +8,5 @@
d. Recalculate bucket-id

### 1.6
1. Add residency and IPL features
2. Add pipeline and product tags to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
1. Add region and IPL features
2. Add TAG to config file. The whole set of tmp tables are named by product_tag and pipeline_tag. The user does not need to review the name of those tables anymore.
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import DAG
import datetime as dt
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta

default_args = {
'owner': 'dlpm',
'depends_on_past': False,
'start_date': dt.datetime(2020, 9, 21),
'retries': 0 # ,
# 'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'dlpm_data_prep',
default_args=default_args,
schedule_interval=None,
)


def sparkOperator(
file,
task_id,
**kwargs
):
return SparkSubmitOperator(
application='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/{}'.format(
file),
application_args=[
'/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/config.yml'],
conn_id='spark_default',
conf={'spark.driver.maxResultSize': '8g'},
driver_memory='16G',
executor_cores=5,
num_executors=10,
executor_memory='16G',
task_id=task_id,
dag=dag,
**kwargs
)

show_config = sparkOperator('show_config.py', 'show_config')

main_filter_si_region_bucket = sparkOperator('main_filter_si_region_bucket.py', 'main_filter_si_region_bucket')

main_ts = sparkOperator('main_ts.py',
'main_ts',
py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')

main_cluster = sparkOperator('main_cluster.py', 'main_cluster')

main_distribution = sparkOperator('main_distribution.py', 'main_distribution')

main_norm = sparkOperator('main_norm.py',
'main_norm',
py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')

main_tfrecords = sparkOperator('main_tfrecords.py',
'main_tfrecords',
jars='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/spark-tensorflow-connector_2.11-1.15.0.jar')


show_config >> main_filter_si_region_bucket >> main_ts >> main_cluster >> main_distribution >> main_norm >> main_tfrecords

@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -20,18 +20,17 @@
from datetime import timedelta

default_args = {
'owner': 'dl-predictor',
'owner': 'dlpm',
'depends_on_past': False,
'start_date': dt.datetime(2020, 9, 21),
'retries': 0 # ,
# 'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'dl_pre_processing',
'dlpm_data_prep_05142021_1500',
default_args=default_args,
schedule_interval=None,

)


@@ -41,30 +40,37 @@ def sparkOperator(
**kwargs
):
return SparkSubmitOperator(
application='/home/airflow/airflow/predictor_dl_model/pipeline/{}'.format(file),
application_args=['/home/airflow/airflow/predictor_dl_model/config.yml'],
application='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/{}'.format(file),
application_args=['/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/config-archive/config_05142021_1500.yml'],
conn_id='spark_default',
conf={'spark.driver.maxResultSize': '8g'},
driver_memory='32G',
executor_cores=5,
num_executors=32,
driver_memory='16G',
executor_cores=3,
num_executors=5,
executor_memory='16G',
task_id=task_id,
dag=dag,
**kwargs
)


main_filter_si_region_bucket = sparkOperator('main_filter_si_region_bucket.py', 'main_filter_si_region_bucket')

main_ts = sparkOperator('main_ts.py',
'main_ts',
py_files='/home/airflow/airflow/predictor_dl_model/pipeline/transform.py')
py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')

main_cluster = sparkOperator('main_cluster.py', 'main_cluster')

main_distribution = sparkOperator('main_distribution.py', 'main_distribution')

main_norm = sparkOperator('main_norm.py',
'main_norm',
py_files='/home/airflow/airflow/predictor_dl_model/pipeline/transform.py')
py_files='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/pipeline/transform.py')

main_tfrecords = sparkOperator('main_tfrecords.py',
'main_tfrecords',
jars='/home/airflow/airflow/din_model/spark-tensorflow-connector_2.11-1.15.0.jar')
jars='/home/airflow/airflow-apps/predictor-dl-model/predictor_dl_model/spark-tensorflow-connector_2.11-1.15.0.jar')


main_ts >> main_cluster >> main_distribution >> main_norm >> main_tfrecords
main_filter_si_region_bucket >> main_ts >> main_cluster >> main_distribution >> main_norm >> main_tfrecords
@@ -12,7 +12,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# limitations under the License..

import random

@@ -1,60 +1,49 @@
product_tag: 'dlpm'
pipeline_tag: '11092020'
factdata_table_name: 'factdata_hq_09222020_bucket_0'
pipeline_tag: '05182021_1500'
factdata_table_name: 'factdata_hq_09222020'

log:
level: 'WARN' # log level for spark and app

pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'
filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
region_mapping_table: 'region_mapping'
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
bucket_step: 100
new_bucket_size: 10
condition: ''
new_si_list: [
'06',
'11',
'05',
'04',
'03',
'02',
'01',
'l03493p0r3',
'x0ej5xhk60kjwq',
'g7m2zuits8',
'w3wx3nv9ow5i97',
'a1nvkhk62q',
'g9iv6p4sjy',
'c4n08ku47t',
'b6le0s4qo8',
'd9jucwkpr3',
'p7gsrebd4m',
'a8syykhszz',
'l2d4ec6csv',
'j1430itab9wj3b',
's4z85pd1h8',
'z041bf6g4s',
'71bcd2720e5011e79bc8fa163e05184e',
'a47eavw7ex',
'68bcd2720e5011e79bc8fa163e05184e',
'66bcd2720e5011e79bc8fa163e05184e',
'72bcd2720e5011e79bc8fa163e05184e',
'f1iprgyl13',
'q4jtehrqn2',
'm1040xexan',
'd971z9825e',
'a290af82884e11e5bdec00163e291137',
'w9fmyd5r0i',
'x2fpfbm8rt',
'e351de37263311e6af7500163e291137',
'k4werqx13k',
'5cd1c663263511e6af7500163e291137',
'17dd6d8098bf11e5bdec00163e291137',
'd4d7362e879511e5bdec00163e291137',
'15e9ddce941b11e5bdec00163e291137' ]
new_si_list: ['15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']

time_series: # This is done on whole bucketized data
input_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
conditions: []
@@ -69,11 +58,12 @@ pipeline:
output_table_name: '{product_tag}_{pipeline_tag}_tmp_cluster'
cluster_size:
number_of_virtual_clusters: 1000
datapoints_min_th: 0.15
datapoints_th_uckeys: 0.5
datapoints_th_clusters: 0.5
cluster_dense_num_ratio_cap: 0.01
datapoints_min_th: 0.12 #was [0.15]
datapoints_th_uckeys: 0.12
datapoints_th_clusters: 0.2
popularity_norm: 0.01
popularity_th: 5
popularity_th: 4
median_popularity_of_dense: 1856.2833251953125 # median imp of sparse=False, calculate once
normalization: # This is done on whole data, not slicing on buckets
output_table_name: '{product_tag}_{pipeline_tag}_trainready'
@@ -84,7 +74,34 @@ pipeline:
't':['UNKNOWN','3G','4G','WIFI','2G'],
'r':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'ipl':['', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82'],
'si':['d4d7362e879511e5bdec00163e291137', 'b6le0s4qo8', 'd47737w664', 'd971z9825e', '72bcd2720e5011e79bc8fa163e05184e', 'j1430itab9wj3b', 'w3wx3nv9ow5i97', 'g9iv6p4sjy', '71bcd2720e5011e79bc8fa163e05184e', '7b0d7b55ab0c11e68b7900163e3e481d', 'm1040xexan', 'x2fpfbm8rt', '05', '66bcd2720e5011e79bc8fa163e05184e', 'g7m2zuits8', 'l2d4ec6csv', 'a8syykhszz', 'w9fmyd5r0i', 'a47eavw7ex', 'p7gsrebd4m', 'q4jtehrqn2', '03', 'l03493p0r3', 's4z85pd1h8', 'f1iprgyl13', '17dd6d8098bf11e5bdec00163e291137', 'e351de37263311e6af7500163e291137', '68bcd2720e5011e79bc8fa163e05184e', '5cd1c663263511e6af7500163e291137', 'k4werqx13k', 'x0ej5xhk60kjwq', '04', 'a290af82884e11e5bdec00163e291137', '15e9ddce941b11e5bdec00163e291137', 'z041bf6g4s', 'd9jucwkpr3', 'c4n08ku47t']
'si':[
'15e9ddce941b11e5bdec00163e291137',
'66bcd2720e5011e79bc8fa163e05184e',
'7b0d7b55ab0c11e68b7900163e3e481d',
'a8syykhszz',
'w3wx3nv9ow5i97',
'x2fpfbm8rt',
'17dd6d8098bf11e5bdec00163e291137',
'5cd1c663263511e6af7500163e291137',
'68bcd2720e5011e79bc8fa163e05184e',
'71bcd2720e5011e79bc8fa163e05184e',
'a290af82884e11e5bdec00163e291137',
'a47eavw7ex',
'b6le0s4qo8',
'd4d7362e879511e5bdec00163e291137',
'd971z9825e',
'd9jucwkpr3',
'e351de37263311e6af7500163e291137',
'f1iprgyl13',
'j1430itab9wj3b',
'k4werqx13k',
'l03493p0r3',
'l2d4ec6csv',
'p7gsrebd4m',
's4z85pd1h8',
'w9fmyd5r0i',
'x0ej5xhk60kjwq',
'z041bf6g4s']
}
holidays: ['2019-11-09', '2019-11-10', '2019-11-11', '2019-11-25', '2019-11-26', '2019-11-27','2019-11-28', '2019-12-24','2019-12-25', '2019-12-26','2019-12-31', '2020-01-01', '2020-01-02', '2020-01-19','2020-01-20', '2020-01-21', '2020-01-22', '2020-01-23', '2020-01-24', '2020-01-25', '2020-02-08']
tfrecords:

0 comments on commit 2a8fb03

Please sign in to comment.