In [None]:
import pathlib
import re
import asyncio
import logging
from collections import deque

import ujson as json
#import commond.ipynb from same folder
import import_ipynb
from common import *

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
    fmt='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
)
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

In [None]:
def get_create_index_dsl(shrads=1):
    request_body = {
        "settings": {
            "index.mapping.ignore_malformed": True,
            "number_of_shards": shrads,
            "number_of_replicas": 1,
            "analysis": {
                "analyzer": {
                    "name_analyzer": {
                        "tokenizer": "name_index"
                    },
                    "address_analyzer": {
                        "tokenizer": "address_index"
                    }
                }
            }
        },
        "mappings": {
            "shop": {
                "_all": {
                    "enabled": False
                },
                "dynamic": "false",
                "properties": {
                    "sid": {
                        "type": "keyword"
                    },
                    "name": {
                        "type": "text",
                        "analyzer": "name_analyzer",
                        "search_analyzer": "name_analyzer"
                    },
                    "address": {
                        "type": "text",
                        "analyzer": "address_analyzer",
                        "search_analyzer": "address_analyzer"
                    },
                    "tels": {
                        "type": "keyword"
                    }
                }
            }
        }
    }
    return request_body

In [None]:
def delete_index(es, index_name):
    res = es.indices.delete(index=index_name)
    print("delete res: ", res)

In [None]:
def create_index(es, index_name, shards):
    if es.indices.exists(index_name):
        print(f"{index_name} exists")
        return True
    request_body = get_create_index_dsl(shards)
    res = es.indices.create(index=index_name, body=request_body)
    print('done')
    return True

In [None]:
def test_create_index():
    es_hosts = None
    es_hosts = ['localhost']
    indexs = ['lifestyle', 'yellowpage', 'bkwd']
    es = Elasticsearch(es_hosts, port=9200)
    for ix in indexs:
        create_index(es, ix, 2)

In [None]:
def ingrest_bulk_json(json_file, index_name):
    try:
        # make the bulk call, and get a response
        results = list(parallel_bulk(client=es, actions=json_file, index="index_name",chunk_size=1000, thread_count=4, queue_size=16))
        elf.assertTrue(len(set([r[1] for r in results])) > 1)
        print ("\nRESPONSE:", results)
    except Exception as e:
        print("\nERROR:", e)
    print('done!')

In [None]:
# Ingest bulk from json file, which is generated from filename.
json_file = 'sample.json'

ingrest_bulk_json(json_file)