Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Refactoring and use of broadcasting instead of join
  • Loading branch information
radibnia77 committed Oct 4, 2021
1 parent 48f78a5 commit 4be0d4d3442bf81b800ac03640d077ef1c85ffdb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 44 deletions.
@@ -1,6 +1,6 @@
product_tag: 'dlpm'
pipeline_tag: '08092021_1200' # IMPORTANT: The pipeline tag has to be changed before each run to prevent record duplication.
factdata_table_name: 'factdata_hq_09222020'
factdata_table_name: 'factdata_09202021' #factdata_hq_09222020

log:
level: 'WARN' # log level for spark and app
@@ -9,7 +9,7 @@ pipeline:
config_table: '{product_tag}_{pipeline_tag}_config'
filter: # This is for data filtering- si and region
percentile: 10 # This is for filtering traffic less than 1/10 of average traffic
region_mapping_table: 'region_mapping'
region_mapping_table: 'ipl_region_mapping_09282021' #region_mapping_01012020
output_table_name: '{product_tag}_{pipeline_tag}_tmp_area_map'
init_start_bucket: 0
bucket_size: 1000
@@ -5,7 +5,7 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0.html

# Unless required by applicable law or agreed to in writing, software
@@ -55,39 +55,43 @@ def __save_as_table(df, table_name, hive_context, create_table):
).write.format('hive').option("header", "true").option("encoding", "UTF-8").mode('append').insertInto(table_name)


def drop_region(df):
new_uckey = udf(lambda uckey: ','.join(
[v for i, v in enumerate(uckey.split(',')) if i != 6]))
df = df.withColumn('_uckey', new_uckey(df.uckey)).drop(
'uckey').withColumnRenamed('_uckey', 'uckey')
def drop_residency(df):
new_uckey = udf(lambda uckey: ','.join([v for i, v in enumerate(uckey.split(',')) if i != 6]))
df = df.withColumn('_uckey', new_uckey(df.uckey)).drop('uckey').withColumnRenamed('_uckey', 'uckey')
return df


def modify_ipl(df, mapping_df):
df = df.withColumn('ipl', split(
df['uckey'], ',').getItem(7).cast(IntegerType()))
df = df.join(mapping_df, df.ipl == mapping_df.ipl_new, how='left')
new_uckey = udf(lambda arr: ','.join(list(arr[0].split(','))[
:-1] + ['' if x is None else x for x in arr[-1:]]))
df = df.withColumn('uckey', new_uckey(array(df.uckey, df.ipl_old)))
return df

def modify_ipl(df, mapping):
def __udf_method(_uckey_str):
uckey = list(_uckey_str.split(','))
uckey_ipl = uckey[-1]
if uckey_ipl in mapping:
uckey_ipl = mapping[uckey_ipl]
uckey[-1] = uckey_ipl
uckey_new = ','.join(uckey)
return uckey_new

def _udf_map_r(arr):
uckey = list(arr[0].split(','))
uckey_to_r = uckey[:-2]
uckey_ipl = uckey[-1]
new_r = arr[1]
uckey_new = ','.join(uckey_to_r + ['' if new_r is None else new_r] + [uckey_ipl])
return uckey_new
new_uckey = udf(__udf_method, StringType())
df = df.withColumn('uckey', new_uckey(df.uckey))
df = df.drop('virtual').drop('original')
return df


def modify_residency(df, mapping_df):
df = df.withColumn('r', split(
df['uckey'], ',').getItem(6).cast(IntegerType()))
df = df.join(mapping_df, df.r == mapping_df.r_new, how='left')
new_uckey = udf(_udf_map_r, StringType())
df = df.withColumn('uckey', new_uckey(array(df.uckey, df.r_old)))
df = df.withColumn('original', split(df['uckey'], ',').getItem(6).cast(IntegerType()))
df = df.join(mapping_df, on=['original'], how='left')

def __udf_method(_uckey_str, _r, _virtual_region):
uckey = list(_uckey_str.split(','))
uckey_residency = uckey[:-2]
uckey_ipl = uckey[-1]
new_residency = _virtual_region
uckey_new = ','.join(uckey_residency + ['' if new_residency is None else new_residency] + [uckey_ipl])
return uckey_new

new_uckey = udf(__udf_method, StringType())
df = df.withColumn('uckey', new_uckey(df.uckey, df.original, df.virtual))
df = df.drop('virtual').drop('original')
return df


@@ -104,14 +108,13 @@ def __hash_sha256(s):
def run(hive_context, conditions, factdata_table_name, output_table_name, region_mapping_table, init_start_bucket, bucket_size, bucket_step, new_bucket_size, new_si_set):

# ts will be counts from yesterday-(past_days) to yesterday
mapping_df = hive_context.sql(
'SELECT * FROM {}'.format(region_mapping_table))
mapping_df_region = mapping_df.withColumn(
'r_old', mapping_df["old"]).withColumn('r_new', mapping_df["new"])
mapping_df_ipl = mapping_df.withColumnRenamed(
'old', 'ipl_old').withColumnRenamed('new', 'ipl_new')

mapping_df.cache()
mapping_df = hive_context.sql('SELECT old AS original, new AS virtual FROM {}'.format(region_mapping_table))
mapping_list = mapping_df.collect()
mapping = {}
for row in mapping_list:
original = row['original']
virtual = row['virtual']
mapping[original] = virtual

start_bucket = init_start_bucket
first_round = True
@@ -128,7 +131,7 @@ def run(hive_context, conditions, factdata_table_name, output_table_name, region
""".format(factdata_table_name, str(start_bucket), str(end_bucket))

if len(conditions) > 0:
command = command + " and {}".format(' and '.join(conditions))
command = command + " AND {}".format(' AND '.join(conditions))

start_bucket = end_bucket + 1

@@ -139,17 +142,16 @@ def run(hive_context, conditions, factdata_table_name, output_table_name, region
_udf = udf(lambda x: x.split(',')[1] in new_si_set, BooleanType())
df = df.filter(_udf(df.uckey))

# remove region from uckey
# df = drop_region(df)

df = modify_ipl(df, mapping_df_ipl)
df = modify_residency(df, mapping_df_region)
df = modify_ipl(df, mapping)
# df = modify_residency(df, mapping_df)

df = assign_new_bucket_id(df, new_bucket_size)

df = df.repartition(200)

# Writing into partitions might throw some exceptions but does not impair data.
__save_as_table(df, output_table_name, hive_context, first_round)
print('Processed ' + str(start_bucket-1) + ' buckets.')
print('Processed ' + str(start_bucket - 1) + ' buckets.')

first_round = False

0 comments on commit 4be0d4d

Please sign in to comment.