### 库

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from tqdm.notebook import tqdm, trange, tnrange
import json, re

In [2]:
import numpy as np
import pandas as pd
import os, sys
from time import time, sleep

In [3]:
from vVariableInspector import _getshapeof
from vUtil.vFile import fprint, readlines, linesReader
from vUtil.vEmail import sendEmail
from vUtil.vTime import convertSeconds, getNow

In [4]:
from vMysql import MysqlProxy

In [5]:
from util import frmt, rmUnseen, groupConcat, deleteIndexRows
from cfg import host, master, port, contType
from cfg import dbHost, dbUser, dbPwd
from cfg import ncols

### mysql、es

In [6]:
index = 'landinn_paper'

In [7]:
es=Elasticsearch(hosts=host,port=port)

In [8]:
db = MysqlProxy(ip=dbHost, user=dbUser, password=dbPwd)

### 函数

In [9]:
sTmpPaper = '''
create temporary table tmp_paper
(
    select  golaxy_paper_id as gid, 
            ifnull(paper_title, paper_title_en) as title, 
            ifnull(keyword, keyword_en) as keywords,
            cast(paper_year as char) as paper_year,
            paper_date
    from papers 
    where !ifnull(is_deleted, 0)
    limit %d, %d
);
'''

sRmTmpPaper = '''
drop temporary table tmp_paper;
'''

In [10]:
sSelectPublish = '''
select gid, cast(author_id as char) as scholar_id
from
    tmp_paper as a
left join paper_author_affiliations as b
on a.gid = b.paper_id
where !ifnull(b.is_deleted, 0);
'''

In [11]:
sSelectData = '''
select gid, title, keywords, paper_year, paper_date, ifnull(abstract, abstract_en) as abstract
from
    tmp_paper as a
left join papers_abstracts as b on a.gid = b.paper_id
where !ifnull(b.is_deleted, 0);
'''

In [12]:
def getActions (data, publish):
    actions = []
    
    p2s = {publish['gid'][i] : publish['scholar_id'][i] for i in range(len(publish))}
    
    for i in range(len(data)):
        if data['gid'][i] not in p2s: continue
        action={'_op_type':'index',#操作 index update create delete  
            '_index':index,#index
            '_id' : data['gid'][i],
            '_source':
           {
                "id" : data['gid'][i],
                "title" : rmUnseen(data['title'][i]),
                "abstract" : rmUnseen(data['abstract'][i]),
                "keywords" : rmUnseen(data['keywords'][i]),
                "scholars" : p2s[data['gid'][i]],
                "year" : (data['paper_year'][i] if data['paper_date'][i] is pd.NaT or data['paper_date'][i] is None 
                          else str(data['paper_date'][i].year))
            }}
        actions.append(action)
    return actions

### 索引

In [13]:
table = 'papers'

In [14]:
deleteIndexRows(db, es, table, 'golaxy_paper_id', index=index)

HBox(children=(FloatProgress(value=0.0, layout=Layout(flex='2'), max=15.0), HTML(value='')), layout=Layout(dis…




{'landinn_paper': 0}

In [32]:
sizeBulk = 100
now = 0
nPaper = db.count(table, where='!ifnull(is_deleted,0)').values.item()

In [33]:
now = 5443800

In [34]:
# sizeBulk = 100

In [35]:
now, sizeBulk, nPaper

(5443800, 100, 5444863)

In [36]:
startOfAll = time()
tr = trange(now, nPaper, sizeBulk, ncols=ncols)
for i in tr:
    tr.set_description(f'({getNow()}){i}')
    now = i
    db.sql(sTmpPaper % (now, sizeBulk))
    
    data = db.sql(sSelectData)
    publish = db.sql(sSelectPublish)
    db.sql(sRmTmpPaper)
    db.close()
    
    actions = getActions(data, groupConcat(publish, 'gid', ';'))
    if actions: helpers.bulk(client=es,actions=actions)
sendEmail(f'insert cost time {convertSeconds(time() - startOfAll)}', 'insert landinn es papers complete')

HBox(children=(FloatProgress(value=0.0, layout=Layout(flex='2'), max=11.0), HTML(value='')), layout=Layout(dis…




### 关闭

In [37]:
es.close()

In [38]:
db.close()