In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import ssl

import pandas as pd
import pytz
from dateutil.parser import parse as parse_date
from elasticsearch.connection import create_ssl_context
from tqdm import tqdm
from elasticsearch import Elasticsearch


class ElasticSearchUtils(object):
    """엘라스틱 서치 유틸"""

    def __init__(self, host, http_auth=None):
        """ 생성자 """
        self.host = host
        self.index = None

        self.http_auth = None
        if http_auth is not None:
            self.http_auth = (http_auth.split(':'))

        self.timezone = pytz.timezone('Asia/Seoul')

        self.elastic = None
        self.open()

    def open(self, host=None):
        """서버에 접속한다."""
        if host is not None:
            self.host = host

        ssl_context = None
        check_verify_mode = False

        if check_verify_mode is True:
            ssl_context = self.get_ssl_verify_mode()

        # host 접속
        try:
            self.elastic = Elasticsearch(
                hosts=self.host,
                timeout=30,
                http_auth=self.http_auth,
                ssl_context=ssl_context,
            )
        except Exception as e:
            print(e)
            return

        return

    @staticmethod
    def get_ssl_verify_mode():
        """ """
        # https://github.com/elastic/elasticsearch-py/issues/712
        ssl_context = create_ssl_context()

        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE

        return ssl_context

    def scroll(self, index, scroll_id, query, size=1000):
        """스크롤 API를 호출한다."""
        if index is None:
            index = self.index

        params = {
            'request_timeout': 2 * 60
        }

        # 스크롤 아이디가 있다면 scroll 함수 호출
        if scroll_id == '':
            search_result = self.elastic.search(
                index=index,
                body=query,
                scroll='2m',
                size=size,
                params=params,
            )
        else:
            search_result = self.elastic.scroll(
                scroll_id=scroll_id,
                scroll='2m',
                params=params,
            )

        # 검색 결과 추출
        scroll_id = search_result['_scroll_id']

        hits = search_result['hits']

        total = hits['total']
        if isinstance(total, dict) and 'value' in total:
            total = total['value']

        count = len(hits['hits'])

        return hits['hits'], scroll_id, count, total

    def export(self, index, query=None, size=1000, result=None, limit=0):
        """데이터를 서버에서 덤프 받는다."""
        if query is None:
            query = {}

        if isinstance(query, str):
            query = json.loads(query)

        self.index = index

        count = 1
        sum_count = 0
        scroll_id = ''

        p_bar = None

        while count > 0:
            hits, scroll_id, count, total = self.scroll(
                index=index,
                size=size,
                query=query,
                scroll_id=scroll_id,
            )

            if p_bar is None:
                p_bar = tqdm(total=total, desc='dump: ' + index, dynamic_ncols=True)

            p_bar.update(count)

            sum_count += count

            for item in hits:
                if result is None:
                    document = json.dumps(item['_source'], ensure_ascii=False, sort_keys=True)
                    print(document, flush=True)
                else:
                    result.append(item['_source'])

            # 종료 조건
            if count < size:
                break

            if 0 < limit <= len(result):
                break

        if p_bar is not None:
            p_bar.close()

        return

    @staticmethod
    def simplify_nlu_wrapper(doc_list):
        """ """
        result = []
        for doc in tqdm(doc_list):
            if 'nlu_wrapper' not in doc:
                continue

            for k in doc['nlu_wrapper']:
                buf = {}
                for item in doc['nlu_wrapper'][k]:
                    for c in item:
                        if c not in buf:
                            buf[c] = ''

                        if isinstance(item[c], list):
                            buf[c] += '\n'.join(item[c])
                        else:
                            buf[c] += item[c]

                row = {
                    'document_id': doc['document_id'],
                    'date': doc['date'],
                    'column': k,
                }
                row.update(buf)

                result.append(row)

        return result

    def dump_docs(self, index, date_range):
        """ """
        token = date_range.split('~', maxsplit=1)

        dt_start = parse_date(token[0])
        dt_end = parse_date(token[1])

        query = {
            '_source': [
                'document_id',
                'date',
                'nlu_wrapper.*.ne_str',
                'nlu_wrapper.*.morp_str',
            ],
            'query': {
                'bool': {
                    'filter': {
                        'range': {
                            'date': {
                                'gte': dt_start.strftime('%Y-%m-%d'),
                                'lte': dt_end.strftime('%Y-%m-%d'),
                                'format': 'yyyy-MM-dd'
                            }
                        }
                    }
                }
            }

        }

        doc_list = []
        self.export(index=index, query=query, result=doc_list)

        nlu_wrapper = self.simplify_nlu_wrapper(doc_list)

        df = pd.DataFrame(nlu_wrapper)

        return df.fillna('')

In [3]:
index_list = [
#     'corpus_process-naver-economy-2010',
#     'corpus_process-naver-economy-2011',
#     'corpus_process-naver-economy-2012',
#     'corpus_process-naver-economy-2013',
#     'corpus_process-naver-economy-2014',
#     'corpus_process-naver-economy-2015',
#     'corpus_process-naver-economy-2016',
#     'corpus_process-naver-economy-2017',
#     'corpus_process-naver-economy-2018',
    'corpus_process-naver-economy-2019',
]

In [4]:
host_info = {
    'host': 'https://corpus.ncsoft.com:9200',
    'http_auth': 'heehwanpark:heehwanpark2019!',
}

utils = ElasticSearchUtils(**host_info)

In [5]:
for index in tqdm(list(reversed(index_list))):
    df = utils.dump_docs(index=index, date_range='2019-12-01~2019-12-02')
    
df

  0%|          | 0/1 [00:00<?, ?it/s]
dump: corpus_process-naver-economy-2019: 100%|██████████| 813/813 [00:00<00:00, 4163576.50it/s]

100%|██████████| 813/813 [00:00<00:00, 22629.18it/s]
100%|██████████| 1/1 [00:00<00:00,  1.82it/s]


Unnamed: 0,document_id,date,column,morp_str,ne_str
0,016-0001608648,2019-12-01T09:02:00+09:00,title,"“/SW+만도/NNP+,/SP 중국/NNP 공장/NNG 2020/SN+년/NNB 실...","“<만도:ORG_BUSINESS>, 중국 공장 2020년 실적 회복 견인”"
1,016-0001608648,2019-12-01T09:02:00+09:00,content,"삼성증권/NNP+,/SP 목표/NNG+주가/NNG 4/SN+만/NR+1000/SN+...","<삼성증권:ORG_BUSINESS>, 목표주가 4만1000원 제시[헤럴드경제=김성미..."
2,018-0004528466,2019-12-01T09:15:00+09:00,title,이번/NNG+주/NNG+(/SS+12/SN+월/NNB 2/SN+~/SP+6/SN+일...,이번주(12월 2~6일) 재테크 캘린더
3,018-0004528466,2019-12-01T09:15:00+09:00,content,[/SS+이데일리/NNP 최정희/NNP 기자/NNG+]/SS 다음/NNG+은/JX ...,[이데일리 최정희 기자] 다음은 이번주(12월 2~6일) 재테크 캘린더다.◇12월 ...
4,018-0004528503,2019-12-01T10:28:00+09:00,title,[/SS+마켓/NNG+포인트/NNG+]/SS+코스피/NNG 주간/NNG 외국인/NN...,[마켓포인트]코스피 주간 외국인 순매수 1위 `<호텔신라:ORG_BUSINESS>`
...,...,...,...,...,...
1794,277-0004582018,2019-12-02T09:03:00+09:00,content,[/SS+아시아경제/NNP 유현석/NNP 기자/NNG+]/SS 유안타증권/NNP+은...,[<아시아경제:ORG_BUSINESS> 유현석 기자] <유안타증권:ORG_BUSIN...
1795,277-0004582012,2019-12-02T09:02:00+09:00,title,"12/SN+월/NNB 2/SN+일/NNB 코스피/NNG+,/SP 9/SN+./SP+...","12월 2일 코스피, 9.59p 오른 2097.55 출발(0.46%↑)"
1796,277-0004582012,2019-12-02T09:02:00+09:00,content,2019/SN+년/NNB 12/SN+월/NNB 02/SN+일/NNB 코스피/NNG ...,2019년 12월 02일 코스피 지수는 전 거래일 대비 9.59p (0.46%) 상...
1797,277-0004582014,2019-12-02T09:02:00+09:00,title,"12/SN+월/NNB 2/SN+일/NNB 코스닥/NNG+,/SP 2/SN+./SP+...","12월 2일 코스닥, 2.59p 오른 635.58 출발(0.41%↑)"
