Skip to content

Commit

Permalink
Use independent files as refresh data source (#77)
Browse files Browse the repository at this point in the history
* use independent files as refresh data source

Signed-off-by: Allen Xu <allxu@nvidia.com>

* update readme

Signed-off-by: Allen Xu <allxu@nvidia.com>
  • Loading branch information
wjxiz1992 committed Aug 4, 2022
1 parent 745e5a4 commit 1a977b6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
10 changes: 8 additions & 2 deletions nds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,10 @@ or later. More details including work-around for version 3.2.0 and 3.2.1 could b

Arguments supported for data maintenance:
```
usage: nds_maintenance.py [-h] [--maintenance_queries MAINTENANCE_QUERIES] maintenance_queries_folder time_log
usage: nds_maintenance.py [-h] [--maintenance_queries MAINTENANCE_QUERIES] [--data_format DATA_FORMAT] refresh_data_path maintenance_queries_folder time_log
positional arguments:
refresh_data_path path to refresh data
maintenance_queries_folder
folder contains all NDS Data Maintenance queries. If "--maintenance_queries"
is not set, all queries under the folder will beexecuted.
Expand All @@ -314,15 +315,20 @@ optional arguments:
-h, --help show this help message and exit
--maintenance_queries MAINTENANCE_QUERIES
specify Data Maintenance query names by a comma seprated string. e.g. "LF_CR,LF_CS"
--data_format DATA_FORMAT
data format for refresh data, e.g. parquet, orc, avro
```

An example command to run only _LF_CS_ and _DF_CS_ functions:
```
./spark-submit-template convert_submit_cpu_iceberg.template \
nds_maintenance.py \
update_data_sf3k \
./data_maintenance \
time.csv \
--maintenance_queries LF_CS,DF_CS
--maintenance_queries LF_CS,DF_CS \
--data_format orc
```

## Data Validation
Expand Down
16 changes: 14 additions & 2 deletions nds/nds_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from PysparkBenchReport import PysparkBenchReport

from check import get_abs_path
from nds_transcode import get_maintenance_schemas

INSERT_FUNCS = [
'LF_CR',
Expand Down Expand Up @@ -135,7 +136,7 @@ def run_dm_query(spark, query_list):
for q in query_list:
spark.sql(q)

def run_query(query_dict, time_log_output_path):
def run_query(query_dict, time_log_output_path, refresh_data_path, refresh_data_format):
# TODO: Duplicate code in nds_power.py. Refactor this part, make it general.
execution_time_list = []
total_time_start = time.time()
Expand All @@ -147,6 +148,7 @@ def run_query(query_dict, time_log_output_path):
spark_session = SparkSession.builder.appName(
app_name).getOrCreate()
spark_app_id = spark_session.sparkContext.applicationId
register_temp_views(spark_session, refresh_data_path, refresh_data_format)
DM_start = time.time()
for query_name, q_content in query_dict.items():
# show query name in Spark web UI
Expand Down Expand Up @@ -176,9 +178,16 @@ def run_query(query_dict, time_log_output_path):
writer.writerow(header)
writer.writerows(execution_time_list)

def register_temp_views(spark_session, refresh_data_path, data_format):
refresh_tables = get_maintenance_schemas(True).keys()
for table in refresh_tables:
spark_session.read.format(data_format).load(
refresh_data_path + '/' + table).createOrReplaceTempView(table)

if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument('refresh_data_path',
help='path to refresh data')
parser.add_argument('maintenance_queries_folder',
help='folder contains all NDS Data Maintenance queries. If ' +
'"--maintenance_queries" is not set, all queries under the folder will be' +
Expand All @@ -190,8 +199,11 @@ def run_query(query_dict, time_log_output_path):
type=lambda s: s.split(','),
help='specify Data Maintenance query names by a comma seprated string.' +
' e.g. "LF_CR,LF_CS"')
parser.add_argument('--data_format',
help='data format for refresh data, e.g. parquet, orc, avro',
default="parquet")

args = parser.parse_args()
query_dict = get_maintenance_queries(args.maintenance_queries_folder,
args.maintenance_queries)
run_query(query_dict, args.time_log)
run_query(query_dict, args.time_log, args.refresh_data_path, args.data_format)

0 comments on commit 1a977b6

Please sign in to comment.