Skip to content

Commit

Permalink
Merge fb5ed8c into 5211738
Browse files Browse the repository at this point in the history
  • Loading branch information
romanchyla committed Dec 9, 2019
2 parents 5211738 + fb5ed8c commit ebbe560
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 6 deletions.
4 changes: 2 additions & 2 deletions adsmp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def checksum(self, data, ignore_keys=('mtime', 'ctime', 'update_timestamp')):


def update_remote_targets(self, solr=None, metrics=None, links=None,
commit_solr=False):
commit_solr=False, solr_urls=None):
"""Updates remote databases/solr
@param batch: list solr documents (already formatted)
@param metrics: tuple with two lists, the first is a list
Expand Down Expand Up @@ -606,7 +606,7 @@ def update_crc(type, data, faileds):
crcs[b][type + '_checksum'] = self.checksum(x)

if len(batch):
failed_bibcodes = self.reindex(batch, self.conf.get('SOLR_URLS'), commit=commit_solr)
failed_bibcodes = self.reindex(batch, solr_urls or self.conf.get('SOLR_URLS'), commit=commit_solr)
failed_bibcodes = set(failed_bibcodes)
update_crc('solr', batch, failed_bibcodes)

Expand Down
5 changes: 3 additions & 2 deletions adsmp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def task_update_record(msg):

@app.task(queue='index-records')
def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=True, update_links=True, commit=False,
ignore_checksums=False):
ignore_checksums=False, solr_targets=None):
"""
This task is (normally) called by the cronjob task
(that one, quite obviously, is in turn started by cron)
Expand Down Expand Up @@ -203,7 +203,8 @@ def task_index_records(bibcodes, force=False, update_solr=True, update_metrics=T
(bibcode, bib_data_updated, orcid_claims_updated, nonbib_data_updated, fulltext_updated, \
metrics_updated))
if batch or batch_insert or batch_update or links_data:
app.update_remote_targets(solr=batch, metrics=(batch_insert, batch_update), links=links_data, commit_solr=commit)
app.update_remote_targets(solr=batch, metrics=(batch_insert, batch_update), links=links_data,
commit_solr=commit, solr_urls=solr_targets)



Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ DateTime==4.1.1
librabbitmq==1.6.1
python-dateutil==2.6.0
requests==2.13.0
psycopg2==2.7.1
psycopg2==2.8.4
55 changes: 54 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,51 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
raise e


def rebuild_collection(collection_name):
"""
Will grab all recs from the database and send them to solr
"""

now = get_date()
if collection_name.startswith('http'):
solr_urls = [collection_name]
else:
solr_urls = []
urls = app.conf['SOLR_URLS']
for u in urls:
parts = u.split('/')
parts[-2] = collection_name
solr_urls.append('/'.join(parts))

logger.info('Sending all records to: %s', ';'.join(solr_urls))
sent = 0

batch = []
with app.session_scope() as session:
# deleted docs are in the db, no? but i don't see how to filter them out
for rec in session.query(Records) \
.options(load_only(Records.bibcode, Records.updated, Records.processed)) \
.yield_per(1000):

sent += 1
if sent % 1000 == 0:
logger.debug('Sending %s records', sent)

batch.append(rec.bibcode)
if len(batch) > 1000:
tasks.task_index_records(batch, force=True, update_solr=True,
update_metrics=False, update_links=False,
ignore_checksums=True, solr_targets=solr_urls)
batch = []

if len(batch) > 0:
tasks.task_index_records(batch, force=True, update_solr=True,
update_metrics=False, update_links=False,
ignore_checksums=True, solr_targets=solr_urls)

logger.info('Done processing %s records', sent)


if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Process user input.')
Expand Down Expand Up @@ -263,6 +308,12 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
action='store_true',
default=False,
help='sends bibcodes to augment affilation pipeline, works with --filename')

parser.add_argument('-x',
'--rebuild-collection',
action='store',
default='collection2',
help='Will send all solr docs for indexing to another collection; by purpose this task is synchronous. You can send the name of the collection or the full url to the solr instance incl http')

args = parser.parse_args()

Expand Down Expand Up @@ -324,7 +375,9 @@ def reindex(since=None, batch_size=None, force_indexing=False, update_solr=True,
# aff values omes from bib pipeline

app.request_aff_augment(bibcode)


elif args.rebuild_collection:
rebuild_collection(args.rebuild_collection)
elif args.reindex:
update_solr = 's' in args.reindex.lower()
update_metrics = 'm' in args.reindex.lower()
Expand Down
114 changes: 114 additions & 0 deletions scripts/reindex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# purpose of this script is to rebuild a new solr collection
# it will automatically activate it (by swapping cores)

import os
import sys
import pickle
import requests
import time
from subprocess import PIPE, Popen


homedir = os.path.dirname(os.path.dirname(__file__))
lockfile = os.path.abspath(homedir + '/reindex.lock')
solr_url = 'http://localhost:9983/solr'
admin_url = '%s/admin/cores' % solr_url
update_url = '%s/collection2/update' % solr_url

def run():

# it is important that we do not run multiple times
if os.path.exists(lockfile):
print 'Lockfile %s already exists; exiting! (if you want to proceed, delete the file)' % (lockfile)
data = read_lockfile(lockfile)
for k,v in data.items():
print '%s=%s' % (k,v)
exit(1)
else:
data = {}


try:
# verify both cores are there
cores = requests.get(admin_url + '?wt=json').json()
if set(cores['status'].keys()) != set(['collection1', 'collection2']):
raise Exception('we dont have both cores available')

assert cores['status']['collection2']['dataDir'] != cores['status']['collection1']['dataDir']

print 'We are starting the indexing into collection2; once finished; we will automatically activate the new core'


print 'First, we will delete all documents from collection2'
r = requests.get(update_url + '?commit=true&stream.body=%3Cdelete%3E%3Cquery%3E*%3A*%3C/query%3E%3C/delete%3E&waitSearcher=true', timeout=30*60)
r.raise_for_status()
print 'Done deleting all docs from collection2'

cores = requests.get(admin_url + '?wt=json').json()
if set(cores['status'].keys()) != set(['collection1', 'collection2']):
raise Exception('we dont have both cores available')

now = time.time()
data['start'] = now
write_lockfile(lockfile, data)

command = 'python run.py --rebuild-collection collection2 >> %s/logs/reindex.log' % (homedir)

retcode, stdout, stderr = execute(command, cwd=homedir)

if retcode != 0:
data['error'] = '%s failed with retcode=%s\nstderr:\n%s' % (command, retcode, stderr)
print 'stderr=%s' % (stderr)
raise

print 'Successfully finished indexing in %s secs' % (time.time() - now)

# issue commit and verify the docs are there
r = requests.get(update_url + '?commit=true&waitSearcher=true', timeout=30*60)
r.raise_for_status()

# all went well, verify the numDocs is similar to the previous collection
cores = requests.get(admin_url + '?wt=json').json()
verify_collection2_size(cores['status']['collection2'])

# all is well; swap the cores!
r = requests.get(admin_url + '?action=SWAP&core=collection2&other=collection1&wt=json')
r.raise_for_status()

time.sleep(5)

# verify the new core is loaded
new_cores = requests.get(admin_url + '?wt=json').json()

assert cores['status']['collection2']['dataDir'] == new_cores['status']['collection1']['dataDir']


print 'Deleting the lock; congratulations on your new solr collection!'
os.remove(lockfile)

except Exception, e:
print 'Failed; we will keep the process permanently locked: %s' % (e)
data['last-exception'] = str(e)
write_lockfile(lockfile, data)

def execute(command, **kwargs):
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, **kwargs)
out, err = p.communicate()
return (p.returncode, out, err)

def read_lockfile(lockfile):
with open(lockfile, 'rb') as f:
return pickle.load(f)

def write_lockfile(lockfile, data):
with open(lockfile, 'wb') as f:
pickle.dump(data, f)

def verify_collection2_size(data):
if data.get('numDocs', 0) <= 14150713:
raise Exception('Too few documents in the new index: %s' % data.get('numDocs', 0))
if data.get('sizeInBytes', 0) / (1024*1024*1024.0) < 160.0: # index size at least 160GB
raise Exception('The index is suspcisously small: %s' % (data.get('sizeInBytes', 0) / (1024*1024*1024.0)))

if __name__ == '__main__':
run()

0 comments on commit ebbe560

Please sign in to comment.