-
Notifications
You must be signed in to change notification settings - Fork 16
/
copy_data_from_live_to_new.py
46 lines (40 loc) · 1.97 KB
/
copy_data_from_live_to_new.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# this file exports data using a scan scroll from one index and bulk loads it to another
import json, requests
save_to_file = True
index_name = 'doaj'
types = ['account','article','suggestion','upload','cache','lcc','editor_group','news','lock','provenance','background_job']
old_index = 'http://10.131.168.182:9200'
new_index = 'http://localhost:9200'
scroll_minutes = '5m'
size = 5000
bulk_size = 50000 # this is rough, as the scan may not get exact sizes
processed = {}
for tp in types:
processed[tp] = 0
start = requests.post(old_index + '/' + index_name + '/' + tp + '/_search?search_type=scan&scroll=' + scroll_minutes, json={"query": { "match_all": {} }, "size": size })
res = start.json()
records = []
first = True
while (res.get('scroll_id',False) != False and (first == True or len(res.get('hits',{}).get('hits',[])) > 0)) or len(records) != 0:
first = False # a scan scroll will start empty, unlike a normal scroll - so need to know when looking at the first result
if len(records) > bulk_size:
if save_to_file == True:
out = open('records_' + tp + '_to_' + processed + '.json','w')
out.write(json.dumps(records, indent=2))
out.close()
bn = ''
for record in records:
bn += json.dumps({'index':{'_index':index_name, '_type': tp, '_id': record['id']}}) + '\n'
bn += json.dumps(record) + '\n'
s = requests.post(new_index + '/_bulk', data=bn)
print(tp)
print(processed[tp])
print(s.status_code)
records = []
processed[tp] += len(res['hits']['hits'])
for r in res.get('hits',{}).get('hits',[]):
records.append(r['_source'])
if res.get('scroll_id',False) != False:
nxt = requests.get(old_index + '/_search/scroll?scroll=' + scroll_minutes + '&scroll_id=' + res['_scroll_id'])
res = nxt.json()
print(json.dumps(processed, indent=2))