# サーチエンジンとしての利用手順(データ蓄積)


## 準備

本章のコマンドを実行するための設定を行います。  

セットアップ済みの環境のうちコマンドを送る先であるCoordinating(Client) Nodeのホストアドレスを、次のセルに記入して実行し、保存してください。

In [1]:
%env ES_HOST=XXX.XXX.XXX.232:9200

env: ES_HOST=XXX.XXX.XXX.232:9200


また、データを蓄積する先のindexとtypeを、次のセルを実行して保存してください。  

※特にセルの内容の書き換えは必要ありません。  
　前の章と異なるindex名にしているのは、この章のコマンドで投入する内容が前の章までのものと混ざらないようにするためです。

In [2]:
%env INDEX=sample_index
%env TYPE=logs

env: INDEX=sample_index
env: TYPE=logs


## Logstashとは

Logstashは、Elastic Stackを構成するプロダクトの１つです。  
データの取り込み、加工、出力（格納）を処理するパイプラインを簡単に構築できます。

対応できる形式は次のように多岐に渡ります。

- 入力：各種ファイル、RDB、通信ストリーム、ログ、S3、BeatsというElasticのデータ収集ツール…
- 加工：各種形式の解釈、分割、集計、地理情報の付与、個人情報のマスク…
- 出力：Elasticsearch、S3、各種ファイル、通信…

対応できる形式などについては、Logstash([公式ページ](https://www.elastic.co/jp/products/logstash))を参照してください。  
  
本章では、入力に各種ファイル、格納先にElasticsearchを指定する、単純なケースでのデータ蓄積を実施します。  
また、変換処理をLogstashで実施するだけでなく、ElasticsearchのIngest Nodeで分散実施するケースについても説明します。


## Logstashの設定

Logstashを利用する際には、".conf"という拡張子のファイルに振る舞いを定義し、それを読み込んで実行させます。  
sample.confというconfファイルを用いる場合の実行コマンドは次のようになります。

confファイルはinput・filter・outputの３つの設定によって構成されます。  
- **input** : 入力元や形式の設定
- **filter** : 入力された内容の加工内容の設定
- **output** : 出力先や形式の設定

confファイルの例を次に示します。

```ruby:example.conf
input{
  stdin{
    codec => plain{
      charset => "UTF-8"
    }
  }
}
filter{
  grok { 
    match => [ "message", "%{COMBINEDAPACHELOG}" ] 
  }
  geoip{
    source => "ip"
  }
}
output{
  stdout{
    codec => dots
  }
  elasticsearch{
    hosts => localhost
    user => "elastic"
    password => "changeme"
    index => "apache_sample"
  }
}
```

設定ファイルの各要素の内容や、上の例で対応する部分の意味は次の通りです。

||意味|設定例の内容|設定内容の参考ページ|
|----|----|----|----|
|**input**|データの入力元や形式、およびその詳細を定義します。|stdin{}は標準入力からデータを受けとる設定です。<br>また、文字コードをUTF-8に指定しています。<br>stdin{}の代わりにtwitter{},file{}のように指定すれば入力元を標準入力以外にすることもできます。|[Input plugins](https://www.elastic.co/guide/en/logstash/current/input-plugins.html)|
|**filter**|inputから受け取った入力の加工内容を定義します。<br>複数のfilterを記述した場合、記述順に加工されます。|入力の特定部分をパースするgrok filterと<br>地理情報を付加するgeoip filterを指定しています。|[Filter plugins](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html)|
|**output**|データの出力先や形式、およびその詳細を定義します。|stdout{}とelasticsearch{}の2つを記述することで、標準出力とElasticsearchの両方に出力するよう指定しています。<br>Elasticsearchへの出力設定では、宛先ホストやアカウント、index名を指定しています。|[Output plugins](https://www.elastic.co/guide/en/logstash/current/output-plugins.html)|

## データフォーマット

ここでは、様々なデータフォーマットのデータをElasticsearchに登録する際の具体的な設定と手法について説明します。

Logstashのサーバーに関する設定を行います。  
次のセルを実行してユーザー名と、秘密鍵のパスを設定してください。

In [3]:
#Logstashのサーバーでコマンドを実行するユーザー
USER='ansible'

#公開鍵認証を行う場合の秘密鍵のパス
KEYPATH='~/.ssh/ansible_id_rsa'

### JSON

JSON形式のデータとして、次の内容のサンプルを登録します。

In [4]:
!head -n 1 sample_data/weather.json

{"timestamp":"1","atmospheric_pressure":"1000.4","sea_level_pressure":"1003.4","precipitation_day":"0","precipitation_max_hour":"0","precipitation_max_10min":"0","temperature_avg":"3.7","temperature_max":"8.2","temperature_min":"0.7","humidity_avg":"41","humidity_min":"30","wind_speed_avg":"4.4","wind_speed_max":"9.4","wind_direction":"西","wind_speed_max_moment":"18.7","wind_direction_max_moment":"北西","sunshine_duration":"1.3","snowfall":"","snowfall_max":"","information_daytime":"時々晴一時雪","information_night":"晴"}


confファイルには、codec => "json" と指定することでJSON形式を解釈できるように指定します。  
ファイル名は"json.conf"とします。

```ruby:json.conf
input{  
  stdin{ 
    codec => "json"  
  }  
}  
filter{  
}  
output{  
  elasticsearch{  
    hosts => <ES_HOST>  
    index => <INDEX>  
  }  
}  
```


weather.jsonとjson.confをLogstashのサーバーに配置してください。

実際のデータ登録は次のコマンドで実行します。

In [10]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/weather.json | sudo /usr/share/logstash/bin/logstash -f json.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [14]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/sample_json/_search?pretty"

{
  "took" : 22,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 30,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_json",
        "_id" : "AVjxYFq1eJQ1fjM9_cyd",
        "_score" : 1.0,
        "_source" : {
          "precipitation_max_10min" : "0",
          "wind_speed_avg" : "4.4",
          "snowfall" : "",
          "precipitation_max_hour" : "0",
          "temperature_max" : "8.2",
          "wind_direction" : "西",
          "information_daytime" : "時々晴一時雪",
          "precipitation_day" : "0",
          "@timestamp" : "2016-12-12T04:52:52.957Z",
          "humidity_avg" : "41",
          "humidity_min" : "30",
          "wind_speed_max" : "9.4",
          "sunshine_duration" : "1.3",
          "temperature_avg" : "3.7",
          "temperature_min" : "0.7",
          "@version" : "1",
          "host" : "cnXXXX1004.ecloud.nii.ac.jp",
      

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 11393  100 11393    0     0   375k      0 --:--:-- --:--:-- --:--:--  383k


### 複数行JSON

1つのJSONデータが改行を含んだ形式の場合を試します。  
次の内容のサンプルを登録します。

In [12]:
!head -n 3 sample_data/weather_multiline.json

{
"timestamp":"1",
"atmospheric_pressure":"1000.4",


confファイルには、codec => multilineと指定することで、複数行にまたがるデータ形式を解釈できるように指定します。
通常、Logstashでは1行ごとに1つのイベントとして処理しますが、multilineでは特定のパターンを見つけるまで行を読み込み、結合することで１つのイベントとして処理します。  

ファイル名は"json_multiline.conf"とします。

```rubby:weather_multiline.conf
input{
  stdin{
    codec => multiline{
      pattern => "}"
      negate => "true"
      what => "next"
    }
  }
}
filter{
  json{
    source => "message"
  }
}
output{
  elasticsearch{
    hosts => <ES_HOST>
    index => <INDEX>
  }
}
```

実際のデータ登録は次のコマンドで実行します。

In [13]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/weather_multiline.json | sudo /usr/share/logstash/bin/logstash -f json_multiline.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [15]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/sample_multiline_json/_search?pretty"

{
  "took" : 10,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 31,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_multiline_json",
        "_id" : "AVjxZOVqeJQ1fjM9_cy7",
        "_score" : 1.0,
        "_source" : {
          "wind_speed_avg" : "4.4",
          "snowfall" : "",
          "temperature_max" : "8.2",
          "precipitation_day" : "0",
          "wind_speed_max" : "9.4",
          "sunshine_duration" : "1.3",
          "temperature_avg" : "3.7",
          "temperature_min" : "0.7",
          "@version" : "1",
          "host" : "cnXXXX1004.ecloud.nii.ac.jp",
          "wind_direction_max_moment" : "北西",
          "information_night" : "晴",
          "timestamp" : "1",
          "precipitation_max_10min" : "0",
          "precipitation_max_hour" : "0",
          "wind_direction" : "西",
          "message" : "{\r\n\"timestamp\":\"1\"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 19387  100 19387    0     0   873k      0 --:--:-- --:--:-- --:--:--  901k


### CSV


CSV形式のデータとして、次の内容のサンプルを登録します。

In [16]:
!head -n 3 sample_data/weather.csv

1,1000.4,1003.4,0,0,0,3.7,8.2,0.7,41,30,4.4,9.4,西,18.7,北西,1.3,,,時々晴一時雪,晴
2,1007.4,1010.4,,,,2.7,7.9,-2.2,41,19,2.1,4.2,西,7.8,西,6,,,時々曇,一時曇
3,1012,1015.1,,,,3.8,8.9,-1.1,42,19,3,7.4,北西,10.9,北西,8.9,,,晴,後薄曇


confファイルには、"filter"の下に"csv"定義を設定し、列ごとの名前を指定することで、CSVデータの各列を対応する要素に紐づけてCSV形式を解釈できるようになります。  
ファイル名は"csv.conf"とします。

```ruby:csv.conf
input{
   stdin{}
}
filter{
  csv{
    columns => ["timestamp","atmospheric_pressure","sea_level_pressure","precipitation_day","precipitation_max_hour","precipitation_max_10min","temperature_avg","temperature_max","temperature_min","humidity_avg","humidity_min","wind_speed_avg","wind_speed_max","wind_direction","wind_speed_max_moment","wind_direction_max_moment","sunshine_duration","snowfall","snowfall_max","information_daytime","information_night"]
  }
}
output{
  elasticsearch{
    hosts => <ES_HOST>  
    index => <INDEX>
  }
}
```

実際のデータ登録は次のコマンドで実行します。

In [17]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/weather.csv | sudo /usr/share/logstash/bin/logstash -f csv.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [18]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/sample_csv/_search?pretty"

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 30,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_csv",
        "_id" : "AVjxZdZ8eJQ1fjM9_czo",
        "_score" : 1.0,
        "_source" : {
          "wind_speed_avg" : "3.2",
          "snowfall" : null,
          "temperature_max" : "8.2",
          "precipitation_day" : "35.5",
          "wind_speed_max" : "8.1",
          "sunshine_duration" : "0",
          "temperature_avg" : "5.4",
          "temperature_min" : "3.4",
          "@version" : "1",
          "host" : "cnXXXX1004.ecloud.nii.ac.jp",
          "wind_direction_max_moment" : "西",
          "information_night" : "後晴",
          "timestamp" : "15",
          "precipitation_max_10min" : "2.5",
          "precipitation_max_hour" : "11",
          "wind_direction" : "北西",
          "message" : "15,1009.7,1012.7,35.5,11,2.

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 12573  100 12573    0     0   633k      0 --:--:-- --:--:-- --:--:--  646k


### LTSV

LTSV形式のデータとして、次の内容のサンプルを登録します。

In [19]:
!head -n 3 sample_data/weather.ltsv

"timestamp":"21"	"atmospheric_pressure":"1024.3"	"sea_level_pressure":"1027.4"	"precipitation_day":"4"	"precipitation_max_hour":"1.5"	"precipitation_max_10min":"0.5"	"temperature_avg":"2.5"	"temperature_max":"3.8"	"temperature_min":"1.5"	"humidity_avg":"69"	"humidity_min":"38"	"wind_speed_avg":"3.1"	"wind_speed_max":"5"	"wind_direction":"北西"	"wind_speed_max_moment":"9.3"	"wind_direction_max_moment":"北西"	"sunshine_duration":"0"	"snowfall":""	"snowfall_max":""	"information_daytime":"時々雪後雨"	"information_night":"一時雨"


confファイルの設定では、mutate filterを使って入力をタブで分割したあとに、その内容をパースします。  
ファイル名は"ltsv.conf"とします。

```ruby:ltsv.conf
input{
  stdin{}
}
filter{
  mutate{
    split => {"message" => "    "}
  }
  grok{
    match => {
      "message" => "\"timestamp\":\"%{DATA:timestamp}\""
    }
  }
  grok{
    match => {
      "message" => "\"atmospheric_pressure\":\"%{DATA:atmospheric_pressure}\""
    }
  }
  grok{
    match => {
      "message" => "\"sea_level_pressure\":\"%{DATA:sea_level_pressure}\""
    }
  }
  grok{
    match => {
      "message" => "\"precipitation_day\":\"%{DATA:precipitation_day}\""
    }
  }
  grok{
    match => {
      "message" => "\"precipitation_max_hour\":\"%{DATA:precipitation_max_hour}\""
    }
  }
  grok{
    match => {
      "message" => "\"precipitation_max_10min\":\"%{DATA:precipitation_max_10min}\""
    }
  }
  grok{
    match => {
      "message" => "\"temperature_avg\":\"%{DATA:temperature_avg}\""
    }
  }
  grok{
    match => {
      "message" => "\"temperature_max\":\"%{DATA:temperature_max}\""
    }
  }
  grok{
    match => {
      "message" => "\"temperature_min\":\"%{DATA:temperature_min}\""
    }
  }
  grok{
    match => {
      "message" => "\"humidity_avg\":\"%{DATA:humidity_avg}\""
    }
  }
  grok{
    match => {
      "message" => "\"humidity_min\":\"%{DATA:humidity_min}\""
    }
  }
  grok{
    match => {
      "message" => "\"wind_speed_avg\":\"%{DATA:wind_speed_avg}\""
    }
  }
  grok{
    match => {
      "message" => "\"wind_speed_max\":\"%{DATA:wind_speed_max}\""
    }
  }
  grok{
    match => {
      "message" => "\"wind_direction\":\"%{DATA:wind_direction}\""
    }
  }
  grok{
    match => {
      "message" => "\"wind_direction_max_moment\":\"%{DATA:wind_direction_max_moment}\""
    }
  }
  grok{
    match => {
      "message" => "\"sunshine_duration\":\"%{DATA:sunshine_duration}\""
    }
  }
  grok{
    match => {
      "message" => "\"snowfall\":\"%{DATA:snowfall}\""
    }
  }
  grok{
    match => {
      "message" => "\"snowfall_max\":\"%{DATA:snowfall_max}\""
    }
  }
  grok{
    match => {
      "message" => "\"information_daytime\":\"%{DATA:information_daytime}\""
    }
  }
  grok{
    match => {
      "message" => "\"information_night\":\"%{DATA:information_night}\""
    }
  }
  date{
    match => ["timestamp","ISO8601"]
    timezone => "UTC"
    remove_field => ["timestamp","message"]
  }
}
output{
  elasticsearch{
    hosts => "XXX.XXX.XXX.232"
    index => "sample_index"
    document_type => "sample_ltsv"
  }
}
```

実際のデータ登録は次のコマンドで実行します。

In [20]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/weather.ltsv | sudo /usr/share/logstash/bin/logstash -f ltsv.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [21]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/_search"

{"took":10,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":92,"max_score":1.0,"hits":[{"_index":"sample_index","_type":"sample_json","_id":"AVjxYFq1eJQ1fjM9_cyd","_score":1.0,"_source":{"precipitation_max_10min":"0","wind_speed_avg":"4.4","snowfall":"","precipitation_max_hour":"0","temperature_max":"8.2","wind_direction":"西","information_daytime":"時々晴一時雪","precipitation_day":"0","@timestamp":"2016-12-12T04:52:52.957Z","humidity_avg":"41","humidity_min":"30","wind_speed_max":"9.4","sunshine_duration":"1.3","temperature_avg":"3.7","temperature_min":"0.7","@version":"1","host":"cnXXXX1004.ecloud.nii.ac.jp","atmospheric_pressure":"1000.4","wind_direction_max_moment":"北西","wind_speed_max_moment":"18.7","snowfall_max":"","sea_level_pressure":"1003.4","information_night":"晴","timestamp":"1"}},{"_index":"sample_index","_type":"sample_json","_id":"AVjxYFq1eJQ1fjM9_cyi","_score":1.0,"_source":{"precipitation_max_10min":"1.5","wind_speed_avg":"5.2","snowfall":"",

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  8920  100  8920    0     0   571k      0 --:--:-- --:--:-- --:--:--  580k


### 任意のテキスト(1行完結)

ここまでの説明のように、JSONやCSVなどの定型のデータであれば設定は簡単ですが、実際に読み込みたいデータはそのように形式がそろっていないことも多くあります。

次のサンプルデータのように、「名前=値」という形式がスペース区切りで1行に並んでいるようなデータを登録することを考えます。

In [22]:
!head sample_data/single_line.txt

timestamp=2015-01-01T00:00:00 location=tokyo atmospheric_pressure=1000.4 sea_level_pressure=1003.4 precipitation_day=0 precipitation_max_hour=0 precipitation_max_10min=0 temperature_avg=3.7 temperature_max=8.2 temperature_min=0.7 humidity_avg=41.0 humidity_min=30.0 wind_speed_avg=4.4 wind_speed_max=9.4 wind_direction=西 wind_speed_max_moment=18.7 wind_direction_max_moment=北西 sunshine_duration=1.3 information_daytime=時々晴一時雪 information_night=晴

この形式のデータを読み込むためのconfファイルは、次のような内容になります。  
ファイル名は"single_line.conf"とします。

```ruby:single_line.conf
input{
  stdin{}
}

filter{
  grok{
    match => {"message" => "timestamp=%{TIMESTAMP_ISO8601:timestamp}"}
  }
  grok{
    match => {"message" => "location=%{WORD:location}"}
  }
  grok{
    match => {"message" => "atmospheric_pressure=%{WORD:atmospheric_pressure}"}
  }
  grok{
    match => {"message" => "sea_level_pressure=%{WORD:sea_level_pressure}"}
  }
  grok{
    match => {"message" => "precipitation_day=%{WORD:precipitation_day}"}
  }
  grok{
    match => {"message" => "precipitation_max_hour=%{WORD:precipitation_max_hour}"}
  }
  grok{
    match => {"message" => "precipitation_max_10min=%{WORD:precipitation_max_10min}"}
  }
  grok{
    match => {"message" => "temperature_avg=%{WORD:temperature_avg}"}
  }
  grok{
    match => {"message" => "temperature_max=%{WORD:temperature_max}"}
  }
  grok{
    match => {"message" => "temperature_min=%{WORD:temperature_min}"}
  }
  grok{
    match => {"message" => "humidity_avg=%{WORD:humidity_avg}"}
  }
  grok{
    match => {"message" => "humidity_min=%{WORD:humidity_min}"}
  }
  grok{
    match => {"message" => "wind_speed_avg=%{WORD:wind_speed_avg}"}
  }
  grok{
    match => {"message" => "wind_speed_max=%{WORD:wind_speed_max}"}
  }
  grok{
    match => {"message" => "wind_direction=%{WORD:wind_direction}"}
  }
  grok{
    match => {"message" => "wind_speed_max_moment=%{WORD:wind_speed_max_moment}"}
  }
  grok{
    match => {"message" => "wind_direction_max_moment=%{WORD:wind_direction_max_moment}"}
  }
  grok{
    match => {"message" => "sunshine_duration=%{WORD:sunshine_duration}"}
  }
  grok{
    match => {"message" => "snowfall=%{WORD:snowfall}"}
  }
  grok{
    match => {"message" => "snowfall_max=%{WORD:snowfall_max}"}
  }
  grok{
    match => {"message" => "information_daytime=%{WORD:information_daytime}"}
  }
  grok{
    match => {"message" => "information_night=%{WORD:information_night}"}
  }
  date{
    match => ["timestamp","ISO8601"]
    timezone => "UTC"
    remove_field => "timestamp"
  }
}

output{
  elasticsearch{
    hosts => <ES_HOST>
    index => <INDEX>
  }
}
```

この設定では、2種類の加工処理を行っています。  
入力の特定部分をパースするための「grok filter」と  
文字列をタイムスタンプとして認識させるための「date filter」を用いています。  
加工処理の詳細に関しては、Logstashの[公式ページ](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html)を参照してください。

実際のデータ登録は次のコマンドで実行します。

In [23]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/single_line.txt | sudo /usr/share/logstash/bin/logstash -f single_line.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [25]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/sample_single_line/_search?pretty"

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : null,
    "hits" : [ ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   193  100   193    0     0  17948      0 --:--:-- --:--:-- --:--:-- 19300


### 任意のテキスト(複数行完結)


前の節の形式に加え、データが複数行にまたがっているような場合であっても、Logstashは対応が可能です。  
次のサンプルのようなデータを登録してみましょう。

In [26]:
!head -n 3 sample_data/multi_line.txt

timestamp=2015-01-01T00:00:00 location=tokyo atmospheric_pressure=1000.4
sea_level_pressure=1003.4 precipitation_day=0 precipitation_max_hour=0
precipitation_max_10min=0 temperature_avg=3.7 temperature_max=8.2


この形式のデータを読み込むためのconfファイルは、次のような内容になります。  
ファイル名は"multi_line.conf"とします。

```ruby:multi_line.conf
input{
  stdin{
    codec => multiline{
        pattern => "information_night"
        negate => "true"
        what => "next"
    }
  }
}

filter{
  grok{
    match => {"message" => "timestamp=%{TIMESTAMP_ISO8601:timestamp}"}
  }
  grok{
    match => {"message" => "location=%{WORD:location}"}
  }
  grok{
    match => {"message" => "atmospheric_pressure=%{WORD:atmospheric_pressure}"}
  }
  grok{
    match => {"message" => "sea_level_pressure=%{WORD:sea_level_pressure}"}
  }
  grok{
    match => {"message" => "precipitation_day=%{WORD:precipitation_day}"}
  }
  grok{
    match => {"message" => "precipitation_max_hour=%{WORD:precipitation_max_hour}"}
  }
  grok{
    match => {"message" => "precipitation_max_10min=%{WORD:precipitation_max_10min}"}
  }
  grok{
    match => {"message" => "temperature_avg=%{WORD:temperature_avg}"}
  }
  grok{
    match => {"message" => "temperature_max=%{WORD:temperature_max}"}
  }
  grok{
    match => {"message" => "temperature_min=%{WORD:temperature_min}"}
  }
  grok{
    match => {"message" => "humidity_avg=%{WORD:humidity_avg}"}
  }
  grok{
    match => {"message" => "humidity_min=%{WORD:humidity_min}"}
  }
  grok{
    match => {"message" => "wind_speed_avg=%{WORD:wind_speed_avg}"}
  }
  grok{
    match => {"message" => "wind_speed_max=%{WORD:wind_speed_max}"}
  }
  grok{
    match => {"message" => "wind_direction=%{WORD:wind_direction}"}
  }
  grok{
    match => {"message" => "wind_speed_max_moment=%{WORD:wind_speed_max_moment}"}
  }
  grok{
    match => {"message" => "wind_direction_max_moment=%{WORD:wind_direction_max_moment}"}
  }
  grok{
    match => {"message" => "sunshine_duration=%{WORD:sunshine_duration}"}
  }
  grok{
    match => {"message" => "snowfall=%{WORD:snowfall}"}
  }
  grok{
    match => {"message" => "snowfall_max=%{WORD:snowfall_max}"}
  }
  grok{
    match => {"message" => "information_daytime=%{WORD:information_daytime}"}
  }
  grok{
    match => {"message" => "information_night=%{WORD:information_night}"}
  }
  date{
    match => ["timestamp","ISO8601"]
    timezone => "UTC"
    remove_field => "timestamp"
  }
}

output{
  elasticsearch{
    hosts => "XXX.XXX.XXX.1:9200"
    index => "sample_index"
  }
}
```

1行で完結するデータを読み込むときとの差分はcodecでmultilineの設定をしているところです。  
Logstashでは通常、入力を１行単位で読み込んで処理します。  
multilineの指定をすることで、複数行を一つのイベントとして連結して処理することができます。

実際のデータ登録は次のコマンドで実行します。

In [27]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/multi_line.txt | sudo /usr/share/logstash/bin/logstash -f multi_line.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
正しく登録されていれば、json形式で登録されたデータが出力されます。

In [28]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/sample_multi_line/_search?pretty"

{
  "took" : 12,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_multi_line",
        "_id" : "AVjxaDXqeJQ1fjM9_cz5",
        "_score" : 1.0,
        "_source" : {
          "precipitation_max_10min" : "0",
          "wind_speed_avg" : "4",
          "precipitation_max_hour" : "0",
          "temperature_max" : "8",
          "message" : "timestamp=2015-01-01T00:00:00 location=tokyo atmospheric_pressure=1000.4\\r\\nsea_level_pressure=1003.4 precipitation_day=0 precipitation_max_hour=0\\r\\nprecipitation_max_10min=0 temperature_avg=3.7 temperature_max=8.2\\r\\ntemperature_min=0.7 humidity_avg=41.0 humidity_min=30.0\\r\\nwind_speed_avg=4.4 wind_speed_max=9.4 wind_direction=\\x90\\xBC\\r\\nwind_speed_max_moment=18.7 wind_direction_max_moment=\\x96k\\x90\\xBC\\r\\nsunshine_duration=1.3 information_daytim

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  2175  100  2175    0     0   114k      0 --:--:-- --:--:-- --:--:--  118k


## データ加工


ここまでの例では、読み込んだ値を特に加工せず、そのまま格納していました。  
この章では、confファイルの"filter"を定義することで値を加工する方法について説明します。

### geoipの付与

geoip filterを用いることで、ipアドレスをもとに地理情報を付与することができます。  
利用するサンプルデータは次のコマンドを実行することで確認できます。

In [29]:
!head -n 4 sample_data/geoip.json

{
  "recordtime":"2016-12-02",
  "clientip":"XXX.XXX.XXX.134"
}


具体的な設定を次に示します。

In [30]:
!cat logstash_conf/geoip.conf

input{
  stdin{
    codec => multiline{
      pattern => "}"
      negate => "true"
      what => "next"
    }
  }
}

filter{
  json{
    source => "message"
  }
  date{
    match => ["recordtime","yyyy-MM-dd"]
    remove_field => ["recordtime","message","host"]
  }
  geoip{
    source => "clientip"
  }
}

output{
  elasticsearch{
    hosts => "XXX.XXX.XXX.232"
    index => "sample_index"
    document_type => "sample_geoip"
  }
}

geoip{}の中のsourceという項目に、地理情報を取得したいipアドレスを指定します。  

実際のデータ登録は次のセルのコマンドを実行してください。  

In [31]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/geoip.json | sudo /usr/share/logstash/bin/logstash -f geoip.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
もとのデータに含まれている**recordtime**,**clientip**以外に国名や緯度・経度などが追加されています。

In [34]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/_search?pretty" -d @- <<EOF
{
    "size":"1",
        "query":{
            "match":{"_type":"sample_geoip"}
            }
}
EOF

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 13,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_geoip",
        "_id" : "AVjxaOLceJQ1fjM9_c0G",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2016-12-03T15:00:00.000Z",
          "geoip" : {
            "timezone" : "America/New_York",
            "ip" : "XXX.XXX.XXX.21",
            "latitude" : 40.9777,
            "continent_code" : "NA",
            "city_name" : "Rye",
            "country_code2" : "US",
            "country_name" : "United States",
            "dma_code" : 501,
            "country_code3" : "US",
            "region_name" : "New York",
            "location" : [
              -73.6935,
              40.9777
            ],
            "postal_code" : "10580",
            "longitude" : -73.6935,
            "region_code" : "NY"
         

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1247  100  1156  100    91   100k   8112 --:--:-- --:--:-- --:--:--  102k


### useragentの解析

useragentの情報を解析するためには、useragent filterを利用します。  
利用するサンプルデータは次のコマンドを実行することで確認できます。

In [35]:
!head -n 3 sample_data/useragent.txt

Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8) AppleWebKit/536.25 (KHTML, like Gecko) Version/6.0 Safari/536.25
Mozilla/4.0 (compatible; MSIE 6.0; Nitro) Opera 8.50 [ja]
Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:9.0.1) Gecko/20100101 Firefox/9.0.1


具体的な設定を次に示します。

In [36]:
!cat logstash_conf/useragent.conf

input{
  stdin{}
}
filter{
  useragent{
    source => "message"
    remove_field => ["message","host"]
  }
}
output{
  elasticsearch{
    hosts => "XXX.XXX.XXX.232"
    index => "sample_index"
    document_type => "sample_useragent"
  }
}

useragent filterを利用すると、自動的にuseragentの情報をパースすることができます。  

実際のデータ登録は次のセルのコマンドを実行してください。  

In [37]:
!ansible logstash-server -i /etc/ansible/hosts -m shell -a "cat sample_data/useragent.txt | sudo /usr/share/logstash/bin/logstash -f useragent.conf" -u $USER --private-key=$KEYPATH

[0;32mXXX.XXX.XXX.232 | SUCCESS | rc=0 >>
Sending Logstash logs to /var/log/logstash which is now configured via log4j2.properties.
[0m


コマンドの実行に成功したら、次の検索APIで正しく登録されているか確認してください。  
もとのデータに含まれているrecordtime,clientip以外に国名や緯度・経度などが追加されています。

In [40]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/_search?pretty" -d @- <<EOF
{
    "size":"1",
        "query":{
            "match":{"_type":"sample_useragent"}
            }
}

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample_index",
        "_type" : "sample_useragent",
        "_id" : "AVjxa3nceJQ1fjM9_c0H",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2016-12-12T05:05:01.656Z",
          "os" : "Mac OS X 10.8",
          "major" : "6",
          "minor" : "0",
          "os_minor" : "8",
          "os_major" : "10",
          "@version" : "1",
          "name" : "Safari",
          "os_name" : "Mac OS X",
          "device" : "Other"
        }
      }
    ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   790  100   695  100    95  57457   7853 --:--:-- --:--:-- --:--:-- 63181


## Ingest Nodeの利用方法

Ingest Nodeを利用するためには、まずpipelineを作成する必要があります。  

![images/03_pipeline.png](images/03_pipeline.png)


Node種類の説明で記述したように、pipelineはIngest Nodeが受け取ったドキュメントに対して、いくつかの加工処理(processor)を順次実行するものとなります。  

pipelineの作成には、Put Pipeline APIを用います。pipelineには次の二つの要素を定義します。  
* description : 処理内容の説明
* processors : 実際の処理

ここでは受け取ったドキュメントに対して新しいフィールドを追加するためのpipelineを作成します。  
ここで扱う例では、"country"というフィールドを追加し、値を"japan"に設定しています。

In [41]:
%%bash
curl -XPUT "http://$ES_HOST/_ingest/pipeline/my-pipeline" -d @- << EOF
{
  "description":"describe pipeline",
  "processors":[
    {
      "set":{
        "field":"country",
        "value": "japan"
      }
    }
  ]
}
EOF

{"acknowledged":true}

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   158  100    21  100   137    419   2738 --:--:-- --:--:-- --:--:--  2795


作成済みpipelineの確認にはGet Ppeline APIを用います。次のコマンドで作成済みpipelineの確認が可能です。

In [45]:
%%bash
curl -XGET "http://$ES_HOST/_ingest/pipeline?pretty"

{
  "my-pipeline" : {
    "description" : "describe pipeline",
    "processors" : [
      {
        "set" : {
          "field" : "country",
          "value" : "japan"
        }
      }
    ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   199  100   199    0     0  35183      0 --:--:-- --:--:-- --:--:-- 39800


pipelineの動作をシミュレートするためには、Simulate Pipeline APIを用います。  
pipelineの定義と、ドキュメントの内容を記述することで、実際にpipeline処理が行われた時の出力結果を確認することができます。

In [43]:
%%bash
curl -XPOST "http://$ES_HOST/_ingest/pipeline/_simulate?pretty" -d @- << EOF
{
    "pipeline":{
        "description":"_description",
        "processors":[
            {
                "set":{
                    "field":"country",
                    "value":"japan"
                }
            }
        ]
    },
    "docs":[
        {
            "_index":"$INDEX",
            "_type":"$TYPE",
            "_id":"id",
            "_source":{
                "timestamp":"1",
                "atmospheric_pressure":"1000.4",
                "sea_level_pressure":"1003.4",
                "precipitation_day":"0",
                "precipitation_max_hour":"0",
                "precipitation_max_10min":"0",
                "temperature_avg":"3.7",
                "temperature_max":"8.2",
                "temperature_min":"0.7",
                "humidity_avg":"41",
                "humidity_min":"30",
                "wind_speed_avg":"4.4",
                "wind_speed_max":"9.4",
                "wind_direction":"西",
                "wind_speed_max_moment":"18.7",
                "wind_direction_max_moment":"北西",
                "sunshine_duration":"1.3",
                "snowfall":"",
                "snowfall_max":"",
                "information_daytime":"時々晴一時雪",
                "information_night":"晴"
            }
        },
        {
            "_index":"$INDEX",
            "_type":"$TYPE",
            "_id":"id",
            "_source":{
                "timestamp":"2",
                "atmospheric_pressure":"1007.4",
                "sea_level_pressure":"1010.4",
                "precipitation_day":"",
                "precipitation_max_hour":"",
                "precipitation_max_10min":"",
                "temperature_avg":"2.7",
                "temperature_max":"7.9",
                "temperature_min":"-2.2",
                "humidity_avg":"41",
                "humidity_min":"19",
                "wind_speed_avg":"2.1",
                "wind_speed_max":"4.2",
                "wind_direction":"西",
                "wind_speed_max_moment":"7.8",
                "wind_direction_max_moment":"西",
                "sunshine_duration":"6",
                "snowfall":"",
                "snowfall_max":"",
                "information_daytime":"時々曇",
                "information_night":"一時曇"
            }
        }
    ]
}
EOF

{
  "docs" : [
    {
      "doc" : {
        "_index" : "sample_index",
        "_id" : "id",
        "_type" : "logs",
        "_source" : {
          "country" : "japan",
          "wind_speed_avg" : "4.4",
          "snowfall" : "",
          "temperature_max" : "8.2",
          "precipitation_day" : "0",
          "wind_speed_max" : "9.4",
          "sunshine_duration" : "1.3",
          "temperature_avg" : "3.7",
          "temperature_min" : "0.7",
          "wind_direction_max_moment" : "北西",
          "information_night" : "晴",
          "timestamp" : "1",
          "precipitation_max_10min" : "0",
          "precipitation_max_hour" : "0",
          "wind_direction" : "西",
          "information_daytime" : "時々晴一時雪",
          "humidity_avg" : "41",
          "humidity_min" : "30",
          "atmospheric_pressure" : "1000.4",
          "wind_speed_max_moment" : "18.7",
          "snowfall_max" : "",
          "sea_level_pressure" : "1003.4"
        },
        "_ingest" : {
     

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  4431  100  2169  100  2262  96207    97k --:--:-- --:--:-- --:--:--  100k


## トラブルシューティング

### 文字コード指定

Logstashでデータを扱う際には、文字コードを指定することができます。  
codecプラグインの中でcharsetという項目を設定することで文字コードが指定されます。  
デフォルト設定はutf-8になっています。

文字コードをshift-jisに指定する場合の設定例を示します。

### 自動で入るフィールドの削除

Elasticsearchにドキュメントを登録する際に、自動で付加されるフィールドがいくつかあります。  
例えば\_allフィールドや\_sourceフィールドなどが自動的に付加されます。  
これらのフィールドは高速に検索を行う時などに有用なフィールドですが、削除することでindexのサイズを少なくすることもできます。  
_allや_sourceの詳細に関してはElastic Referenceの[Meta-Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-fields.html)を参照してください。

上記の_allや_sourceは、Elasticsearchへの登録時に作成されるフィールドですが、  
hostや@timestampなどのようにLogstashで自動的に作成されるフィールドもあります。  
これらのフィールドはLogstashの設定ファイルに設定を記述することで削除することができます。  
詳しくはElastic Referenceの[mutate](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-remove_field)を参照してください。

### ドキュメントIDの重要性と付与方法

#### ドキュメントID の重要性

Elasticsearchに登録されるドキュメントは、ドキュメントごとに固有のドキュメントIDを持ちます。  
Logstashを用いてElasticsearchへのドキュメント登録を行う場合、ドキュメントIDを明示的に指定しなければランダムなドキュメントIDが付与されるようになっています。  
  
既にElasticsearchに登録されているものと重複するドキュメントIDを持ったドキュメントを新たに登録(index)しようとした場合には、ドキュメントは更新されません。  
新しいドキュメントで上書きする場合は、更新(update)する必要があります。  
この振る舞いは、index時の設定によって変更可能です。  
設定の変更方法については、Logstash ReferenceのOutputプラグインelasticsearchの[actionパラメータの説明](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action)を参照してください。
  
ドキュメントIDを明示的に指定することの利点として挙げられるのが、ドキュメントの再登録時の重複防止です。  
明示的に指定しなかった場合、内容が同一のドキュメントを複数回Elasticsearchに登録すると、その分だけ重複して登録されてしまいます。  
ドキュメントIDを指定しておくことで、ドキュメント登録の処理が中断してしまった場合でも重複を気にせず再登録することができます。

#### ドキュメントIDの付与方法

LogstashでドキュメントIDを明示的に付与するにはLogstashのoutputにある、elasticsearchプラグインの設定を利用します。  
ドキュメントIDという項目で直接指定することができます。  
他のフィールドの値を用いることもできるので、例えばmessageの内容をもとにドキュメントIDを付与すれば、同一内容のドキュメントに関して常に一意なidを付与することができます。  
具体的な設定は次の通りです。

In [47]:
!cat logstash_conf/document_id.conf

input{
  stdin{
  }
}

filter{
  csv{
    columns => ["timestamp","location","atmospheric_pressure","sea_level_pressure","precipitation_day","precipitation_max_hour","precipitation_max_10min","temperature_avg","temperature_max","temperature_min","humidity_avg","humidity_min","wind_speed_avg","wind_speed_max","wind_direction","wind_speed_max_moment","wind_direction_max_moment","sunshine_duration","snowfall","snowfall_max","information_daytime","information_night"]
  }
  date{
    match => ["timestamp","ISO8601"]
    remove_field => "timestamp"
  }
}

output{
  elasticsearch{
    index => "document_id_test"
    document_id => "%{location}-%{@timestamp}"
    hosts => "localhost"
  }
}

### 処理時間算出

データ投入作業時にかかる時間を、概算で算出する方法を示します。  
やり方として、定期的にElasticsearchへクエリを送り、単位時間あたりどのくらいデータ投入ができているかを計測し算出します。  
次の全件検索コマンドを実行します。

In [54]:
%%bash
curl -XGET "http://$ES_HOST/$INDEX/_search?pretty" -d @- << EOF
{
  "size": 0,
  "query" : { 
    "match_all" : {} 
  } 
}
EOF

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 110,
    "max_score" : 0.0,
    "hits" : [ ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   247  100   194  100    53  19088   5214 --:--:-- --:--:-- --:--:-- 21555


"hits"配下の"total"が投入されたデータ数になり、この値を使用します。  
例えば、ある程度の間隔でコマンドを実行し、totalの変化量を知ることでおおまかにですが全体の処理時間を算出することができます。  