Skip to content
Permalink
Browse files
AMBARI-23421. Add solr-to-solr archive operation for solrDataManager.…
…py. (#849)
  • Loading branch information
oleewere committed Apr 2, 2018
1 parent 8ed15a1 commit 86ecedff2d326b9cb739d06523ba0a9cbe021c2d
Showing 1 changed file with 78 additions and 37 deletions.
@@ -18,22 +18,21 @@
limitations under the License.
'''

import gzip
import hashlib
import json
import logging
import optparse
import os
import time
import shutil
import signal
import sys

import tarfile
import time
from datetime import datetime, timedelta
from subprocess import call, Popen, PIPE
from urllib import quote, unquote
from zipfile import ZipFile, ZIP_DEFLATED
import tarfile
import gzip
import shutil

VERSION = "1.0"

@@ -90,6 +89,9 @@ def parse_arguments():

parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)

parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)

(options, args) = parser.parse_args()

for r in ["mode", "solr_url", "collection", "filter_field"]:
@@ -130,6 +132,8 @@ def parse_arguments():
parser.print_help()
sys.exit()

is_any_solr_output_property = options.__dict__["solr_output_collection"] is not None

is_any_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None or options.__dict__["hdfs_principal"] is not None
is_all_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_principal"] is not None
if is_any_hdfs_kerberos_property and not is_all_hdfs_kerberos_property:
@@ -154,10 +158,10 @@ def parse_arguments():
sys.exit()

if options.mode in ["archive", "save"]:
count = (1 if is_any_hdfs_property else 0) + (1 if is_any_s3_property else 0) + \
(1 if options.__dict__["local_path"] is not None else 0)
count = (1 if is_any_solr_output_property else 0) + (1 if is_any_hdfs_property else 0) + \
(1 if is_any_s3_property else 0) + (1 if options.__dict__["local_path"] is not None else 0)
if count != 1:
print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the 'local_path' argument must be specified"
print "exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the solr arguments ('solr_output_collection') or the 'local_path' argument must be specified"
parser.print_help()
sys.exit()

@@ -173,6 +177,8 @@ def parse_arguments():
print(" filter-field: " + options.filter_field)
if options.mode in ["archive", "save"]:
print(" id-field: " + options.id_field)
if options.__dict__["exclude_fields"] is not None:
print(" exclude fields: " + options.exclude_fields)
if options.__dict__["end"] is not None:
print(" end: " + options.end)
else:
@@ -192,6 +198,8 @@ def parse_arguments():
if options.mode in ["archive", "save"]:
print(" output: " + ("json" if options.json_file else "line-delimited-json"))
print(" compression: " + options.compression)
if options.__dict__["solr_output_collection"] is not None:
print(" solr output collection: " + options.solr_output_collection)
if (options.__dict__["hdfs_keytab"] is not None):
print(" hdfs-keytab: " + options.hdfs_keytab)
print(" hdfs-principal: " + options.hdfs_principal)
@@ -244,16 +252,16 @@ def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal)
else:
curl_prefix = "curl -k"

delete_range = "{0}:[*+TO+\"{1}\"]".format(filter_field, end)
delete_query = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, end), safe="/+\"*")
delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
.format(solr_url, collection, delete_query)
delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
delete_data = "<delete><query>{0}</query></delete>".format(delete_query)

query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
query_solr(solr_kinit_command, delete_command, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_command), "Deleting", delete_data)

def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
solr_output_collection, exclude_fields):
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
@@ -274,7 +282,7 @@ def save(mode, solr_url, collection, filter_field, id_field, range_end, read_blo

save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path)
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields)

def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
if hdfs_kinit_command:
@@ -324,7 +332,9 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre
logger.info("Previous run has left unfinished uploading")
logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")

if command["upload"]["type"] == "hdfs":
if command["upload"]["type"] == "solr":
upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["solr_output_collection"])
elif command["upload"]["type"] == "hdfs":
upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
elif command["upload"]["type"] == "s3":
@@ -345,7 +355,7 @@ def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_pre

def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path):
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, exclude_fields):
logger.info("Starting to save data")

tmp_file_path = "{0}/tmp.json".format(working_dir)
@@ -361,11 +371,14 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)

exclude_field_list = exclude_fields.split(',') if exclude_fields else None

done = False
total_records = 0
while not done:
results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file)
id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
exclude_field_list)
done = results[0]
records = results[1]
prev_lot_end_value = results[2]
@@ -374,12 +387,12 @@ def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_ur
if records > 0:
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
hdfs_path, key_file_path, bucket, key_prefix, local_path, compression)
hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection)
total_records += records
logger.info("A total of %d records are saved", total_records)

def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
write_block_size, prev_lot_end_value, prev_lot_end_id, json_file):
write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list):
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
tmp_file = open(tmp_file_path, 'w')
@@ -408,7 +421,7 @@ def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_

for doc in rsp['response']['docs']:
last_doc = doc
add_line(tmp_file, doc, json_file, records)
add_line(tmp_file, doc, json_file, records, exclude_field_list)
records += 1
if records == write_block_size:
break
@@ -430,12 +443,16 @@ def init_file(tmp_file, json_file):
if json_file:
tmp_file.write("{\n")

def add_line(tmp_file, doc, json_file, records):
def add_line(tmp_file, doc, json_file, records, exclude_fields):
if records > 0:
if json_file:
tmp_file.write(",\n")
else:
tmp_file.write("\n")
if exclude_fields:
for exclude_field in exclude_fields:
if doc and exclude_field in doc:
del doc[exclude_field]

tmp_file.write(json.dumps(doc))

@@ -445,7 +462,7 @@ def finish_file(tmp_file, json_file):

def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, compression):
key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
else:
@@ -455,8 +472,10 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr

upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path)
if hdfs_user:
key_file_path, bucket, key_prefix, local_path, solr_output_collection)
if solr_output_collection:
upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
elif hdfs_user:
upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
elif key_file_path:
upload_file_s3(upload_command, upload_file_path, bucket, key_prefix)
@@ -467,7 +486,7 @@ def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr
sys.exit()

delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None)
id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
@@ -516,7 +535,7 @@ def compress_file(working_dir, tmp_file_path, file_name, compression):

def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
local_path):
local_path, solr_output_collection):
commands = {}

if upload:
@@ -525,7 +544,16 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c
logger.debug("Creating command file with delete instructions in case of an interruption")

if upload:
if hdfs_path:
if solr_output_collection:
upload_command = "{0}/{1}/update/json/docs --data-binary @{2}"\
.format(solr_url, solr_output_collection, upload_file_path)
upload_command_data = {}
upload_command_data["type"] = "solr"
upload_command_data["command"] = upload_command
upload_command_data["upload_file_path"] = upload_file_path
upload_command_data["solr_output_collection"] = solr_output_collection
commands["upload"] = upload_command_data
elif hdfs_path:
upload_command = "sudo -u {0} hadoop fs -put {1} {2}".format(hdfs_user, upload_file_path, hdfs_path)
upload_command_data = {}
upload_command_data["type"] = "hdfs"
@@ -562,8 +590,9 @@ def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, c

delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
delete_query = quote("{0}+OR+{1}".format(delete_prev, delete_last), safe="/+\"*")
delete_command = "{0}/{1}/update?stream.body=<delete><query>{2}</query></delete>&commit=true&wt=json" \
delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)

delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
.format(solr_url, collection, delete_query)
if mode == "save":
return delete_command
@@ -657,29 +686,40 @@ def upload_file_local(upload_command, upload_file_path, local_path):
logger.warn(str(e))
sys.exit()

def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, collection):
if os.path.isfile(upload_file_path):
query_solr(solr_kinit_command, upload_command, "{0}-H Content-type:application/json {1}".format(curl_prefix, upload_command), "Saving")
logger.info("Save data to collection: %s", collection)

def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id):
query_solr(solr_kinit_command, delete_command, "{0} {1}".format(curl_prefix, delete_command), "Deleting")
delete_cmd = delete_command.split(" --data-binary")[0]
delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
query_solr(solr_kinit_command, delete_cmd, "{0}-H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)

def query_solr(solr_kinit_command, url, curl_command, action):
def query_solr(solr_kinit_command, url, curl_command, action, data=None):
if solr_kinit_command:
run_kinit(solr_kinit_command, "Solr")

try:
logger.debug("%s data from solr:\n%s", action, curl_command)
process = Popen(curl_command.split(), stdin=PIPE, stdout=PIPE, stderr=PIPE)
cmd = curl_command.split()
if data:
cmd.append("--data-binary")
cmd.append(data)
logger.debug("%s data from solr:\n%s", action, ' '.join(cmd))
process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except Exception as e:
print
logger.warn("Could not execute curl command:\n%s", curl_command)
logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(e))
sys.exit()

out, err = process.communicate()
if process.returncode != 0:
print
logger.warn("Could not execute curl command:\n%s", curl_command)
logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(err))
sys.exit()

@@ -725,7 +765,8 @@ def run_kinit(kinit_command, program):
options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
options.key_file_path, options.bucket, options.key_prefix, options.local_path)
options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
options.exclude_fields)
else:
logger.warn("Unknown mode: %s", options.mode)

0 comments on commit 86ecedf

Please sign in to comment.