Skip to content

Commit

Permalink
Merge branch 'branch-3.0-perf' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
aonishuk committed Feb 16, 2018
2 parents 469da1a + c5894af commit 60add52
Showing 1 changed file with 33 additions and 43 deletions.
76 changes: 33 additions & 43 deletions ambari-infra-solr-client/src/main/python/solrDataManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
def parse_arguments():
parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))

parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save")
parser.add_option("-m", "--mode", dest="mode", type="string", help="delete | save")
parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port")
parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
Expand Down Expand Up @@ -98,14 +98,14 @@ def parse_arguments():
parser.print_help()
sys.exit()

mode_values = ["archive", "delete", "save"]
mode_values = ["delete", "save"]
if options.mode not in mode_values:
print "mode must be one of {0}".format(" | ".join(mode_values))
parser.print_help()
sys.exit()

if options.mode == "delete":
for r in ["name", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
for r in ["name", "compression", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
if options.__dict__[r] is not None:
print "argument '{0}' may not be specified in delete mode".format(r)
parser.print_help()
Expand Down Expand Up @@ -153,7 +153,7 @@ def parse_arguments():
parser.print_help()
sys.exit()

if options.mode in ["archive", "save"]:
if options.mode == "save":
count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
(1 if options.__dict__["local_path"] is not None else 0)
if count != 1:
Expand All @@ -171,7 +171,7 @@ def parse_arguments():
print(" solr-url: " + options.solr_url)
print(" collection: " + options.collection)
print(" filter-field: " + options.filter_field)
if options.mode in ["archive", "save"]:
if options.mode == "save":
print(" id-field: " + options.id_field)
if options.__dict__["end"] is not None:
print(" end: " + options.end)
Expand All @@ -182,14 +182,14 @@ def parse_arguments():
print(" additional-filter: " + str(options.additional_filter))
if options.__dict__["name"] is not None:
print(" name: " + str(options.name))
if options.mode in ["archive", "save"]:
if options.mode == "save":
print(" read-block-size: " + str(options.read_block_size))
print(" write-block-size: " + str(options.write_block_size))
print(" ignore-unfinished-uploading: " + str(options.ignore_unfinished_uploading))
if (options.__dict__["solr_keytab"] is not None):
print(" solr-keytab: " + options.solr_keytab)
print(" solr-principal: " + options.solr_principal)
if options.mode in ["archive", "save"]:
if options.mode == "save":
print(" output: " + ("json" if options.json_file else "line-delimited-json"))
print(" compression: " + options.compression)
if (options.__dict__["hdfs_keytab"] is not None):
Expand Down Expand Up @@ -251,7 +251,7 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)

query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")

def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
def save(solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
solr_kinit_command = None
Expand All @@ -269,11 +269,9 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo
ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)

working_dir = get_working_dir(solr_url, collection)
if mode == "archive":
handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)

save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading)
save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field, range_end,
read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)

def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
Expand Down Expand Up @@ -343,7 +341,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre

os.remove(command_json_path)

def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
def save_data(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
logger.info("Starting to save data")
Expand Down Expand Up @@ -372,9 +370,9 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
prev_lot_end_id = results[3]

if records > 0:
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
hdfs_path, key_file_path, bucket, key_prefix, local_path, compression)
upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, compression)
total_records += records
logger.info("A total of %d records are saved", total_records)

Expand Down Expand Up @@ -443,8 +441,8 @@ def finish_file(tmp_file, json_file):
if json_file:
tmp_file.write("\n}")

def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
def upload_block(solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, compression):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
Expand All @@ -453,9 +451,9 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr

upload_file_path = compress_file(working_dir, tmp_file_path, file_name, compression)

upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path)
upload_command = create_command_file(True, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket,
key_prefix, local_path)
if hdfs_user:
upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
elif key_file_path:
Expand All @@ -466,12 +464,11 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
logger.warn("Unknown upload destination")
sys.exit()

delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
os.remove("{0}/command.json".format(working_dir))
delete_command = create_command_file(False, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value, prev_lot_end_id)

os.remove("{0}/command.json".format(working_dir))

def compress_file(working_dir, tmp_file_path, file_name, compression):
data_file_name = "{0}.json".format(file_name)
Expand Down Expand Up @@ -514,9 +511,8 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):

return upload_file_path

def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
local_path):
def create_command_file(upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
commands = {}

if upload:
Expand Down Expand Up @@ -555,19 +551,13 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
else:
logger.warn("Unknown upload destination")
sys.exit()

if mode == "save":
return upload_command


delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
.format(solr_url, collection, delete_query)
if mode == "save":
return delete_command

delete_command_data = {}
delete_command_data["command"] = delete_command
delete_command_data["collection"] = collection
Expand Down Expand Up @@ -720,12 +710,12 @@ def run_kinit(kinit_command, program):

if options.mode == "delete":
delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal)
elif options.mode in ["archive", "save"]:
save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end,
options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
options.key_file_path, options.bucket, options.key_prefix, options.local_path)
elif options.mode == "save":
save(options.solr_url, options.collection, options.filter_field, options.id_field, end, options.read_block_size,
options.write_block_size, options.ignore_unfinished_uploading, options.additional_filter, options.name,
options.solr_keytab, options.solr_principal, options.json_file, options.compression,
options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path, options.key_file_path,
options.bucket, options.key_prefix, options.local_path)
else:
logger.warn("Unknown mode: %s", options.mode)

Expand Down

0 comments on commit 60add52

Please sign in to comment.