Skip to content

Commit

Permalink
Kit improvements and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
dharmeshkakadia committed Sep 15, 2017
1 parent 16cf081 commit de60ab4
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions resources/TPCDSgen.py
Expand Up @@ -13,45 +13,53 @@
import os
import time
import random
import logging
import glob
import re

HDFS_CMD = "hdfs dfs"

MAX_BACKOFF_UNIT = 60
MIN_BACKOFF_UNIT = 1

_logger = logging.getLogger(__name__)

def usage():
print(__file__)

def generate_data_to_hdfs(hdfs_output, partition, scale_factor, num_parts):
"""Generate data using dsdgen and upload it to HDFS."""

_logger.info("starting : ./dsdgen -dir . -force Y -scale {} -child {} -parallel {}".format(scale_factor, partition, num_parts))
execute("./dsdgen -dir . -force Y -scale %d -child %d -parallel %d" % (scale_factor, partition, num_parts))

dim_tables=["call_center","catalog_page","date_dim","household_demographics","income_band","item","promotion","reason","ship_mode","store","time_dim","warehouse","web_page","web_site"]
if partition == 1:
for d in dim_tables:
copy_table_to_hdfs(hdfs_output,d,partition,num_parts)

tables=["catalog_returns","catalog_sales","inventory","store_returns","store_sales","web_sales","web_returns","customer", "customer_demographics","customer_address"]
for t in tables:
copy_table_to_hdfs(hdfs_output,t,partition,num_parts)

def copy_table_to_hdfs(hdfs_output, table_name, partition,num_parts):
local_file_name = "%s_%s_%s.dat" % (table_name,partition,num_parts)
hdfs_file_name = "%s/%s/%s" % (hdfs_output, table_name, local_file_name)
_logger.info("completed : ./dsdgen -dir . -force Y -scale {} -child {} -parallel {}".format(scale_factor, partition, num_parts))
for t in glob.glob("*.dat"):
copy_table_to_hdfs(hdfs_output, re.sub(r"_[0-9]+_[0-9]+.dat","",t), t)
_logger.info("delting the local file {}".format(t))
os.remove(t)

def copy_table_to_hdfs(hdfs_output, table_name, data_file):
_logger.info("Beginning copy_table_to_hdfs for table_name: {} file {} ".format(table_name,data_file))
execute("%s -mkdir -p %s/%s" % (HDFS_CMD, hdfs_output, table_name))
execute("%s -copyFromLocal -f %s %s" % (HDFS_CMD, local_file_name, hdfs_file_name))
os.remove(local_file_name)
execute("%s -copyFromLocal -f %s %s/%s/" % (HDFS_CMD, data_file, hdfs_output, table_name))
_logger.info("copy_table_to_hdfs complete for table_name: {}".format(table_name))

def execute(cmd,retry=10):
if(retry<0):
def execute(cmd,retries_remaining=10):
if(retries_remaining<0):
_logger.info("All retries for {} exhauseted. Failing the attempt".format(cmd))
sys.exit(1)

try:
subprocess.check_call(cmd,stdin=subprocess.PIPE,stderr=subprocess.STDOUT,shell=True)
except:
time.sleep(retry*random.randint(retry*60,600))
execute(cmd,retry-1)
backoff_time = (11-retries_remaining)*random.randint((11-retries_remaining)*MIN_BACKOFF_UNIT,MAX_BACKOFF_UNIT)
_logger.info("command {} failed. Retries remaining {}. Sleeping for {} before trying again".format(cmd, retries_remaining, backoff_time))
time.sleep(backoff_time)
execute(cmd,retries_remaining-1)


def main():
logging.basicConfig(level=logging.INFO,stream=sys.stderr)
parser = argparse.ArgumentParser(description='Generate TPCDS data in parallel')
parser.add_argument('-s','--scale', metavar='SCALE_FACTOR',type=int, required=True,
help='scale factor for TPCDS datagen')
Expand Down

0 comments on commit de60ab4

Please sign in to comment.