In [None]:
mapping = {
    "mappings": {
        "properties": {
            "name": {"type": "text"},
            "age": {"type": "long"},
            "email": {"type": "text"}
        }
    }
}

es.indices.create(index="students", body=mapping)

In [None]:
print(es.indices.get_mapping(index="students"))

In [None]:
# インデックス一覧の取得
indices = es.cat.indices(index='*', h='index').splitlines()
# インデックスの表示
for index in indices:
    print(index)

In [None]:
es.indices.delete(index="students")

In [None]:
from elasticsearch import Elasticsearch, helpers # bulkを使うために追加


# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")

def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Jiro",
            "age": 25,
            "email": "jiro@example.com"
        },
        {
            "name": "Saburo",
            "age": 20,
            "email": "saburo@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }

# 複数ドキュメント登録
helpers.bulk(es, gendata())

In [None]:
# ageの値が20より大きいドキュメントを検索するためのクエリ
query = {
    "query": {
        "range": {
            "age": {
                "gt": 20
            }
        }
    }
}

# ドキュメントを検索
result = es.search(index="students", body=query, size=3)
# 検索結果からドキュメントの内容のみ表示
for document in result["hits"]["hits"]:
    print(document["_source"])

In [None]:
# For large dataset
python -m pip install elasticsearch [ async ]  >  = 7.8.0

In [None]:
import asyncio
from elasticsearch import AsyncElasticsearch


# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def main():
    # 非同期検索
    result = await es.search(
        index="students",
        body={"query": {"match_all": {}}},
        size=20
    )
    # 検索結果の表示
    for student in result['hits']['hits']:
        print(student['_source'])

    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

In [None]:
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Siro",
            "age": 19,
            "email": "siro@example.com"
        },
        {
            "name": "Goro",
            "age": 13,
            "email": "goro@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }


async def main():
    # 非同期でバルクインサートを実行
    await async_bulk(es, gendata())
    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

In [None]:
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk, BulkIndexError

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Siro",
            "age": 19,
            "email": "siro@example.com"
        },
        {
            "name": "Goro",
            "age": 13,
            "email": "goro@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }


async def main():
    try:
        # ドキュメントを複数（チャンク）に分けてバルクインサート
        async for ok, result in async_streaming_bulk(client=es,
                                                     actions=gendata(),
                                                     chunk_size=50,  # 一度に扱うドキュメント数
                                                     max_chunk_bytes=52428800  # 一度に扱うバイト数
                                                     ):
            # 各チャンクごとの実行結果を取得
            action, result = result.popitem()
            # バルクインサートに失敗した場合
            if not ok:
                print(f"failed to {result} document {action}")
    # 例外処理
    except BulkIndexError as bulk_error:
        # エラーはリスト形式
        print(bulk_error.errors)

    # セッションのクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())