Skip to content
Permalink
Browse files
AMBARI-23945. Infra Solr migration - Add filter-cores support during …
…restore
  • Loading branch information
oleewere committed Jun 17, 2018
1 parent cc6a4f7 commit 45cf1094225f5bb2d29b298e522e749a494aea9a
Showing 1 changed file with 53 additions and 8 deletions.
@@ -870,7 +870,38 @@ def upgrade_ranger_solrconfig_xml(options, config, service_filter):
copy_znode(options, config, "{0}/configs/{1}/solrconfig.xml".format(solr_znode, ranger_config_set_name),
"{0}/configs/{1}/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))

def update_state_json(collection, config, options):
def generate_core_pairs(original_collection, collection, config, options):
core_pairs_data={}

original_cores={}
original_collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("backup_collections.json"))
if original_collection in original_collections_data and 'leaderCoreHostMap' in original_collections_data[original_collection]:
original_cores = original_collections_data[original_collection]['leaderCoreHostMap']

sorted_original_cores=[]
for key in sorted(original_cores):
sorted_original_cores.append((key, original_cores[key]))

new_cores={}
collections_data = get_collections_data(COLLECTIONS_DATA_JSON_LOCATION.format("restore_collections.json"))
if collection in collections_data and 'leaderCoreHostMap' in collections_data[collection]:
new_cores = collections_data[collection]['leaderCoreHostMap']

sorted_new_cores=[]
for key in sorted(new_cores):
sorted_new_cores.append((key, new_cores[key]))

if len(new_cores) < len(original_cores):
raise Exception("Old collection core size is: " + str(len(new_cores)) +
". You will need at least: " + str(len(original_cores)))
else:
for index, original_core_data in enumerate(sorted_original_cores):
core_pairs_data[sorted_new_cores[index][0]]=original_core_data[0]
with open(COLLECTIONS_DATA_JSON_LOCATION.format(collection + "/restore_core_pairs.json"), 'w') as outfile:
json.dump(core_pairs_data, outfile)
return core_pairs_data

def update_state_json(original_collection, collection, config, options):
solr_znode='/infra-solr'
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
@@ -884,17 +915,25 @@ def update_state_json(collection, config, options):
json_file_list=glob.glob("{0}/*.json".format(coll_data_dir))
logger.debug("Downloaded json files list: {0}".format(str(json_file_list)))

cores_data_json_list = [k for k in json_file_list if 'state.json' not in k]
state_json_list = [k for k in json_file_list if 'state.json' in k]
cores_data_json_list = [k for k in json_file_list if 'state.json' not in k and 'new_state.json' not in k and 'restore_core_pairs.json' not in k]
state_json_list = [k for k in json_file_list if '/state.json' in k]

if not cores_data_json_list:
raise Exception('Cannot find any downloaded restore core metadata for {0}'.format(collection))
if not state_json_list:
raise Exception('Cannot find any downloaded restore collection state metadata for {0}'.format(collection))

core_pairs = generate_core_pairs(original_collection, collection, config, options)
cores_to_skip = []
logger.debug("Generated core pairs: {0}".format(str(core_pairs)))
if options.skip_cores:
cores_to_skip = options.skip_cores.split(',')
logger.debug("Cores to skip: {0}".format(str(cores_to_skip)))

state_json_file=state_json_list[0]
state_data = read_json(state_json_file)
core_json_data=[]

for core_data_json_file in cores_data_json_list:
core_json_data.append(read_json(core_data_json_file))

@@ -918,7 +957,9 @@ def update_state_json(collection, config, options):
data_dir = core_data['dataDir'] if 'dataDir' in core_data else None
ulog_dir = core_data['ulogDir'] if 'ulogDir' in core_data else None

if replica in core_details:
if cores_to_skip and (core in cores_to_skip or (core in core_pairs and core_pairs[core] in cores_to_skip)):
print "Skipping core '{0}' as it is in skip-cores list (or its original pair: '{1}')".format(core, core_pairs[core])
elif replica in core_details:
old_core_node=core_details[replica]['core_node']
new_core_node=core_details[replica]['new_core_node']

@@ -1269,25 +1310,29 @@ def update_state_jsons(options, accessor, parser, config, service_filter):
collections=list_collections(options, config, COLLECTIONS_DATA_JSON_LOCATION.format("collections.json"))
collections=filter_collections(options, collections)
if is_ranger_available(config, service_filter):
original_ranger_collection = config.get('ranger_collection', 'ranger_collection_name')
backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
if backup_ranger_collection in collections:
update_state_json(backup_ranger_collection, config, options)
update_state_json(original_ranger_collection, backup_ranger_collection, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_ranger_collection)
if is_atlas_available(config, service_filter):
original_fulltext_index_name = config.get('atlas_collections', 'fulltext_index_name')
backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
if backup_fulltext_index_name in collections:
update_state_json(backup_fulltext_index_name, config, options)
update_state_json(original_fulltext_index_name, backup_fulltext_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)
original_edge_index_name = config.get('atlas_collections', 'edge_index_name')
backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
if backup_edge_index_name in collections:
update_state_json(backup_edge_index_name, config, options)
update_state_json(original_edge_index_name, backup_edge_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_edge_index_name)
original_vertex_index_name = config.get('atlas_collections', 'vertex_index_name')
backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
if backup_vertex_index_name in collections:
update_state_json(backup_vertex_index_name, config, options)
update_state_json(original_vertex_index_name, backup_vertex_index_name, config, options)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping update collection state operation.".format(backup_fulltext_index_name)

0 comments on commit 45cf109

Please sign in to comment.