## IndexとType
Elasticsearchに実際にデータを入れていく前にIndexとTypeの違いをまとめます。

RDBになぞらえてシンプルに

+ Index=DB
+ Type=Table

と説明されることがあるらしいですが、正確には違うらしいです。

とりあえずはそんな感じの認識で進めます。

進めてええんかい（ツッコミ）

細かい意味は以下のページを参考にしてください

- [Index vs. Type | Elastic](https://www.elastic.co/jp/blog/index-vs-type)
- [データ構造について – AWSで始めるElasticSearch(4) ｜ Developers.IO](https://dev.classmethod.jp/cloud/aws/use-elasticsearch-4-data-structure/)

## PythonからElasticsearchを利用してみる

まあ、インストール

In [1]:
! pip install elasticsearch



### Index():ドキュメントの追加、更新
適当にスニペットを書いていきます
```Python
es = Elasticsearch()
```
で特にオプションを記述しなければ、localhost:9200を見に行きます

index()メソッドではドキュメントの追加、更新ができます。

この時点で指定したインデックス、タイプが無ければ自動で作成されます。

pythonで読む前にサーバは起動しておく必要があるみたい

In [2]:
from elasticsearch import Elasticsearch

es = Elasticsearch()

es.index(index="my-index", doc_type="test-type", body={"key": "value"})



{'_index': 'my-index',
 '_type': 'test-type',
 '_id': 'Y0DWi3gBNk5iJDwqsJOI',
 '_version': 1,
 'result': 'created',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 12,
 '_primary_term': 2}

これでmy-indexというインデックスにtest-typeというタイプで{"key": "value"}というドキュメントが作成されました

idを指定してドキュメントを作成することも可能

In [3]:
es.index(index="my-index", doc_type="test-type", id=1,  body={"key": "value1"})

{'_index': 'my-index',
 '_type': 'test-type',
 '_id': '1',
 '_version': 6,
 'result': 'updated',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 13,
 '_primary_term': 2}

### get():ドキュメントを取得する
このようにインデックス、タイプ、id、を指定することで登録済みのドキュメントを取得できる

In [4]:
es.get(index="my-index", doc_type="test-type", id=1)



{'_index': 'my-index',
 '_type': 'test-type',
 '_id': '1',
 '_version': 6,
 '_seq_no': 13,
 '_primary_term': 2,
 'found': True,
 '_source': {'key': 'value1'}}

### search():ドキュメントの検索
適当にドキュメントを三つぐらい追加しておきます

In [5]:
es.index(index="my-index", doc_type="test-type", id=1, body={"key":"value1"})
es.index(index="my-index", doc_type="test-type", id=2, body={"key":"value2"})
es.index(index="my-index", doc_type="test-type", id=3, body={"key":"value3"})

{'_index': 'my-index',
 '_type': 'test-type',
 '_id': '3',
 '_version': 3,
 'result': 'updated',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 16,
 '_primary_term': 2}

検索するためにはインデックスとクエリを指定します

クエリにmatch_allとすれば全権取得できます

In [6]:
es.search(index="my-index", body={"query": {"match_all": {}}})

{'took': 16,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 6, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'my-index',
    '_type': 'test-type',
    '_id': 'jVZggHgBtsqmUzTO1MsR',
    '_score': 1.0,
    '_source': {'key': 'value'}},
   {'_index': 'my-index',
    '_type': 'test-type',
    '_id': 'X0Aei3gBNk5iJDwqeZOM',
    '_score': 1.0,
    '_source': {'key': 'value'}},
   {'_index': 'my-index',
    '_type': 'test-type',
    '_id': 'YEAfi3gBNk5iJDwqOpPg',
    '_score': 1.0,
    '_source': {'key': 'value'}},
   {'_index': 'my-index',
    '_type': 'test-type',
    '_id': '1',
    '_score': 1.0,
    '_source': {'key': 'value1'}},
   {'_index': 'my-index',
    '_type': 'test-type',
    '_id': '2',
    '_score': 1.0,
    '_source': {'key': 'value2'}},
   {'_index': 'my-index',
    '_type': 'test-type',
    '_id': '3',
    '_score': 1.0,
    '_source': {'key': 'value3'}}]}}

このように条件を付ければフィルターをかけることができます

In [7]:
es.search(index="my-index", body={"query": {"match": {"key":"value1"}}})

{'took': 9,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1, 'relation': 'eq'},
  'max_score': 1.540445,
  'hits': [{'_index': 'my-index',
    '_type': 'test-type',
    '_id': '1',
    '_score': 1.540445,
    '_source': {'key': 'value1'}}]}}

# Python Elasticsearch 基本的な使い方まとめ
ここからはQiita様に教えてもらう

https://qiita.com/satto_sann/items/8a63761bbfd6542bb9a2

## 接続
インストールしたelasticsearchパッケージを利用してElasticsearchが起動しているホスト（localhost）へ接続する方法は以下の通り

### 基本的な接続方法
特にElasticsearch側で認証を設定していなければ、この方法で接続できる

In [14]:
# Elasticsearchクライアント作成
es = Elasticsearch("http://localhost:9200")

ポートやhttpまたはhttpsを指定することもできる

In [9]:
# Elasticsearchインスタンスを作成
es = Elasticsearch(
    ["localhost", "otherhost"],
    scheme="http",
    port=9200
)

### HTTP認証を利用した接続
ElasticsearchにIDやパスワードを設定している場合は、この方法で接続できます

In [10]:
# es = Elasticsearch(
#     "http://localhost:9200",
#     http_auth=("user_id", "password")
# )
# user_idやpasswordは設定されたID、パスワードを表しています。

### 接続の解除
上記で確立した内部接続をclose()で閉じることができます

In [11]:
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")

# 内部接続を閉じる
es.close()

close()しないと、インスタンスがガーベージコレクションされた際に例外が発生するらしい。明示的に書いた方が吉

## インデックスの基本操作
インデックスを扱うための基本的な操作について紹介します。

インデックスの操作にはindicesという属性を使います

### インデックスの作成

sutudentsというインデックスを作成します。

タイプやドキュメントが入っていない空のインデックスです

In [15]:
es.indices.create(index="students")

RequestError: RequestError(400, 'resource_already_exists_exception', 'index [students/cq64RNunShCS2MZZmuU1Vw] already exists')

### インデックスの削除
存在するインデックスを暗黙的に上書きすることはできない。

この後、数回作り変えるので先に削除の方法を示す

In [16]:
es.indices.delete(index="students")

{'acknowledged': True}

### マッピングを使った方法
データタイプやインデックスの構造を指定して作成できます

In [17]:
mapping = {
    "mappings":{
        "properties":{
            "name":{"type":"text"},
            "age":{"type":"long"},
            "email":{"type":"text"}
        }
    }
}

es.indices.create(index="students", body=mapping)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'students'}

## インデックス情報の取得
### インデックス一覧の取得
接続しているElasticsearchでは、どのようなインデックスがあるのかを確認したい場合はcat属性のindices(index="*",h="index")を利用する

indicesはインデックス情報を返すメソッド

今回は全インデックスを一覧で取得したいので、引数のindexにはワイルドカードを指定します

また、引数hには、列名を指定することで列情報が改行区切りで帰ってくる

In [18]:
# インデックス一覧の取得
indices = es.cat.indices(index="*", h="index").splitlines()
# インデックスの表示
for index in indices:
    print(index)

my-index
students


頭で作ったmy-indexも表示されている。よき。

### インデックスのマッピングの確認
特定のインデックスのマッピングを確認する場合は、get_mapping(index="インデックス名")を利用します。

In [19]:
print(es.indices.get_mapping(index="students"))

{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text'}, 'name': {'type': 'text'}}}}}


また、引数にindexを指定しない場合、全インデックスのマッピングを取得することができる

In [20]:
print(es.indices.get_mapping())

{'my-index': {'mappings': {'properties': {'key': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}, 'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text'}, 'name': {'type': 'text'}}}}}


### インデクスの更新
マッピングを変更してインデックスの構造を更新する場合は、put_mapping(index="インデックス名",body="変更分のマッピング")を利用する。

例えば、studentsに新たに学籍番号を追加する場合には以下のようになる

In [21]:
mapping = {
    "properties":{
        "student_number":{"type":"long"}
    }
}

es.indices.put_mapping(index="students", body=mapping)

{'acknowledged': True}

指定するマッピングは全て渡す必要はなく、差分だけでよい

また、インデックス作成の際にはmapping配下にネストしたが、更新の場合は、そうではないことに注意

現在のマッピングは、以下のようになっている

In [22]:
es.indices.get_mapping(index="students")

{'students': {'mappings': {'properties': {'age': {'type': 'long'},
    'email': {'type': 'text'},
    'name': {'type': 'text'},
    'student_number': {'type': 'long'}}}}}

### インデックスの存在確認
エラーハンドリングを追加するにあたって、インデックスが存在するかを確認したい。

そのような場合は、exists=(index="インデックス名")を利用する

In [23]:
print(es.indices.exists(index="students"))

True


## ドキュメントの基本操作
### ドキュメントの作成
ドキュメントを新しく登録するには、create(index="インデックス名",id="ドキュメントID",body="新規ドキュメント")を利用する

※インデクスがない場合は、登録するドキュメントから自動的に方を判断して作成される。

In [24]:
# 登録したいドキュメント
student = {
    "name":"Taro",
    "age":36,
    "email":"taro@example.com"
}

# ドキュメントの登録
es.create(index="students",id=1,body=student)

{'_index': 'students',
 '_type': '_doc',
 '_id': '1',
 '_version': 1,
 'result': 'created',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 0,
 '_primary_term': 1}

### バルクインサート
createを複数回呼び出すことで、複数ドキュメントを登録できますが、bulk（インスタンス,データ）を使うことで、一度に登録できる。

引数で渡すデータの構造は、以下の通りで、少々複雑

```
{
    "_op_type": "createやdelete、updateといったアクションを指定"
    "_index": "インデックス名"
    "_id": "ドキュメントのIDを指定（createの場合はなくてもいい）"
    "_source": "登録したいドキュメント"
}
```

上記のデータ配列に格納して、blukに渡すことで複数ドキュメントを操作することができる。

今回は配列ではなく```yield```を使った方法をサンプルとして紹介

In [25]:
from elasticsearch import helpers

def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "jiro",
            "age": 25,
            "email": "jiro@example.com"
        },
        {
            "name": "Saburo",
            "age": 20,
            "email": "saburo@example.com"
        }
    ]
    
    # bulkで扱えるデータ構造に変換する
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }
        
# 複数ドキュメント登録
helpers.bulk(es, gendata())

(2, [])

※100MBを超える大量のドキュメントをバルクインサートを実行する場合、エラーが発生する。このようなときは、記事下部に記載した[非同期で大量のドキュメントをバルクインサート](https://qiita.com/satto_sann/private/8a63761bbfd6542bb9a2#%E9%9D%9E%E5%90%8C%E6%9C%9F%E3%81%A7%E5%A4%A7%E9%87%8F%E3%81%AE%E3%83%89%E3%82%AD%E3%83%A5%E3%83%A1%E3%83%B3%E3%83%88%E3%82%92%E3%83%90%E3%83%AB%E3%82%AF%E3%82%A4%E3%83%B3%E3%82%B5%E3%83%BC%E3%83%88)を参照

## ドキュメントの検索
### クエリを使って検索
登録したドキュメントを検索するには、search(index="インデクス名", body="検索クエリ", size=検索数)を利用

bodyやsizeは指定しない場合は、全件表示される

In [26]:
# ageの値が20より大きいドキュメントを検索するためのクエリ
query = {
    "query":{
        "range":{
            "age":{
                "gt": 20
            }
        }
    }
}

# ドキュメントを検索
result = es.search(index="students", body=query, size=3)
# 検索結果からドキュメントの内容のみ表示
for document in result["hits"]["hits"]:
    print(document["_source"])

{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}


Saburoの年齢は20歳なので、検索から外れ表示されない

### IDを使って検索
ドキュメントIDを指定して直接検索する場合は、get_source(index="インデックス名",id="ドキュメントID”)を利用する。

サンプルでは、idが1のドキュメントを検索している

In [27]:
print(es.get_source(index="students", id=1))

{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}


### ドキュメント数の取得
インデクスの中にドキュメント数がいくつあるのか確認したい場合は、count(index="インデックス名)を利用する

In [28]:
print(es.count(index="students"))

{'count': 3, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}


### ドキュメントの更新
ドキュメントの更新にはupdate(index="インデックス名", id="ドキュメントID", body="変更内容")を利用

In [29]:
# doc配下に変更したいパラメータを記述
student = {
    "doc": {
        "age": 40
    }
}

# ドキュメントIDを指定して更新
es.update(index="students", id=1, body=student)

# 更新されているか確認
print(es.get_source(index="students", id=1))

{'name': 'Taro', 'age': 40, 'email': 'taro@example.com'}


### ドキュメントの削除
特定のドキュメントを削除する場合は、delete(index="インデックス名", id="ドキュメントID")を利用

サンプルでは、idが1のドキュメントを削除する

In [30]:
es.delete(index="students", id=1)

{'_index': 'students',
 '_type': '_doc',
 '_id': '1',
 '_version': 3,
 'result': 'deleted',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 4,
 '_primary_term': 1}

### ドキュメントの存在確認
ドキュメントの存在を確認するためには、exists(index="インデックス名", id="ドキュメントID")を利用

In [31]:
print(es.exists(index="stsudents", id=1))

False


ほんまに消せてたね

※インデクスには存在しているものの```_source```があるのか調べるにはexists_sourceを利用する

## アドバンス
個々では、通常あまり使わないようなメソッドや小ネタの紹介をする

### 非同期で検索
ドキュメント数が何万件とある場合、検索に時間がかかってしまう。このような場合AsyncElasticsearchを使うことで非同期でリソースを効率的に検索することができる

### 準備
機能を使うためにはasyncioをインストールする必要がある

In [32]:
!python -m pip install elasticsearch[async]>=7.8.0

### 非同期の検索サンプル
非同期で検索するためのサンプルを用意しました。asyncやawaitを付け加えただけで、基本的にはsearchとやることは変わらない

In [42]:
import asyncio
from elasticsearch import AsyncElasticsearch

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")

async def main():
    # 非同期検索
    result = await es.search(
        index="students",
        body={"query": {"match_all": {}}},
        size=20
    )
    # 検索結果の表示
    for student in result['hits']['hits']:
        print(student['_source'])

    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

RuntimeError: This event loop is already running

動きとしては、

- ```asyncio.get_event_loop()```でイベントループを取得
- 並列で動かしたい関数```main()```に```async```を付けて定義
- 時間がかかる処理```search```は```await```を付けて定義
- イベントループの```run_until_complete```で並列的に実行しつつ終わるまで待つ

このような感じらしい。走らんのやけど

In [49]:
es = AsyncElasticsearch()

async def main():
    resp = await es.search(
        index="students",
        body={"query": {"match_all": {}}},
        size=20,
    )
    print(resp)

loop = asyncio.get_event_loop()

In [50]:
loop.run_until_complete(main())

RuntimeError: This event loop is already running

### 非同期でバルクインサート
コードはbulkとほぼ変わらず、asyncやawait及び非同期に対応したasync_bulkを利用するだけ

In [43]:
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")


async def gendata():
    # 登録したいドキュメント
    students = [
        {
            "name": "Siro",
            "age": 19,
            "email": "siro@example.com"
        },
        {
            "name": "Goro",
            "age": 13,
            "email": "goro@example.com"
        }
    ]

    # bulkで扱えるデータ構造に変換します
    for student in students:
        yield {
            "_op_type": "create",
            "_index": "students",
            "_source": student
        }


async def main():
    # 非同期でバルクインサートを実行
    await async_bulk(es, gendata())
    # セッションをクローズ
    await es.close()

# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())

RuntimeError: This event loop is already running

どうやらnotebookで走らせると死ぬらしい。コマンド入力で走らせると普通に行けた。

とりあえずは同期なしでやってみたらいいと思う