diff --git a/resources/TPCDSgen.py b/resources/TPCDSgen.py index 645da0d..45d723d 100644 --- a/resources/TPCDSgen.py +++ b/resources/TPCDSgen.py @@ -13,45 +13,53 @@ import os import time import random +import logging +import glob +import re HDFS_CMD = "hdfs dfs" +MAX_BACKOFF_UNIT = 60 +MIN_BACKOFF_UNIT = 1 + +_logger = logging.getLogger(__name__) + def usage(): print(__file__) def generate_data_to_hdfs(hdfs_output, partition, scale_factor, num_parts): """Generate data using dsdgen and upload it to HDFS.""" + _logger.info("starting : ./dsdgen -dir . -force Y -scale {} -child {} -parallel {}".format(scale_factor, partition, num_parts)) execute("./dsdgen -dir . -force Y -scale %d -child %d -parallel %d" % (scale_factor, partition, num_parts)) - - dim_tables=["call_center","catalog_page","date_dim","household_demographics","income_band","item","promotion","reason","ship_mode","store","time_dim","warehouse","web_page","web_site"] - if partition == 1: - for d in dim_tables: - copy_table_to_hdfs(hdfs_output,d,partition,num_parts) - - tables=["catalog_returns","catalog_sales","inventory","store_returns","store_sales","web_sales","web_returns","customer", "customer_demographics","customer_address"] - for t in tables: - copy_table_to_hdfs(hdfs_output,t,partition,num_parts) - -def copy_table_to_hdfs(hdfs_output, table_name, partition,num_parts): - local_file_name = "%s_%s_%s.dat" % (table_name,partition,num_parts) - hdfs_file_name = "%s/%s/%s" % (hdfs_output, table_name, local_file_name) + _logger.info("completed : ./dsdgen -dir . -force Y -scale {} -child {} -parallel {}".format(scale_factor, partition, num_parts)) + for t in glob.glob("*.dat"): + copy_table_to_hdfs(hdfs_output, re.sub(r"_[0-9]+_[0-9]+.dat","",t), t) + _logger.info("delting the local file {}".format(t)) + os.remove(t) + +def copy_table_to_hdfs(hdfs_output, table_name, data_file): + _logger.info("Beginning copy_table_to_hdfs for table_name: {} file {} ".format(table_name,data_file)) execute("%s -mkdir -p %s/%s" % (HDFS_CMD, hdfs_output, table_name)) - execute("%s -copyFromLocal -f %s %s" % (HDFS_CMD, local_file_name, hdfs_file_name)) - os.remove(local_file_name) + execute("%s -copyFromLocal -f %s %s/%s/" % (HDFS_CMD, data_file, hdfs_output, table_name)) + _logger.info("copy_table_to_hdfs complete for table_name: {}".format(table_name)) -def execute(cmd,retry=10): - if(retry<0): +def execute(cmd,retries_remaining=10): + if(retries_remaining<0): + _logger.info("All retries for {} exhauseted. Failing the attempt".format(cmd)) sys.exit(1) try: subprocess.check_call(cmd,stdin=subprocess.PIPE,stderr=subprocess.STDOUT,shell=True) except: - time.sleep(retry*random.randint(retry*60,600)) - execute(cmd,retry-1) + backoff_time = (11-retries_remaining)*random.randint((11-retries_remaining)*MIN_BACKOFF_UNIT,MAX_BACKOFF_UNIT) + _logger.info("command {} failed. Retries remaining {}. Sleeping for {} before trying again".format(cmd, retries_remaining, backoff_time)) + time.sleep(backoff_time) + execute(cmd,retries_remaining-1) def main(): + logging.basicConfig(level=logging.INFO,stream=sys.stderr) parser = argparse.ArgumentParser(description='Generate TPCDS data in parallel') parser.add_argument('-s','--scale', metavar='SCALE_FACTOR',type=int, required=True, help='scale factor for TPCDS datagen')