# 00 elasticsearch 5 con Docker

## Docs

- [elasticsearch dockerhub](https://hub.docker.com/_/elasticsearch/)
- [kibana dockerhub](https://hub.docker.com/_/elasticsearch/)
- [elasticsearch-py docs](https://elasticsearch-py.readthedocs.io/en/master/)

### Elasticsearch en local

```sh
docker run --name elastic -v [ES_FOLDER]:/usr/share/elasticsearch/data -p 9200:9200 -d elasticsearch:5
docker run --name sense --link elastic:elasticsearch  -p 5601:5601 -d kibana:5
```

> **DEBIAN** Es posible tener que ampliar la memoria que se le da a los contenedores, para esto:
>
> ```sh
> sudo sysctl -w vm.max_map_count=262144
> ```

## Elasticsearch en un servidor remoto

```sh
docker run --name sense -e ELASTICSEARCH_URL=http://[ES_IP]:[ES_PORT] -p 5601:5601 -d kibana:5
```

## Python requirements para este notebook

```sh
elasticsearch==5.1.0
Faker==0.7.7
requests
```



# 01 elasticsearch-py

## 01-01 introducción

Vamos a **crear un cliente de elasticsearch** pasandole como parametro una lista con todas las *ips:port* de los nodos de elasticsearch. En este caso solo tenemos uno.

In [None]:
%%capture
import json
from elasticsearch import Elasticsearch


originAddr = 'localhost'
originPort = 9200
es = Elasticsearch(['{}:{}'.format(originAddr, str(originPort))])


def print_response(response):
    print(json.dumps(response, sort_keys=True, indent=2))

Vamos a **crear un indice** muy simpl sin mappings para los datos. Mas adelante hablaremos de los mappings, pero por ahora no son necesaios.

In [None]:
es.indices.create(index='test-index')

Se le puede pasar una lisa (o un valor simple) de los **errorres de HTTP a ignorar**.

Por ejemplo, si queremos **crear un indice si no existe**, ignoraremos el **error 400**.

In [None]:
es.indices.create(index='test-index', ignore=400)

Para **borrar un indice** se hace igual que para crearlos, incluido la manera de **ignorar errores**.

In [None]:
es.indices.delete(index='test-index', ignore=[400, 404])

Para **indexar un documento**, hay dos maneras de hacerlo:

- **create**: crea un nuevo documento y falla si ya existe un documento con el mismo indice.
- **index**: crea un nuevo documento o lo actualiza si ya existe el indice.

In [None]:
%%capture
es.indices.delete(index='test', ignore=[400, 404])
_doc = {
    "name": "david",
    "age": 28,
    "date": "17-11-1988"
}
es.indices.create(index='test', ignore=400)
es.create(index='test', doc_type='user', id=1, body=_doc)
try:
    es.create(index='test', doc_type='user', id=1, body=_doc)
except Exception as e:
    print(e)
es.delete(index='test', doc_type='user', id=1)
es.index(index='test', doc_type='user', id=1, body=_doc)
_doc["name"] = "David Sanchez Falero"
es.index(index='test', doc_type='user', id=1, body=_doc)
es.indices.delete(index='test', ignore=[400, 404])

## 01-02 Ejemplo 1: primeras queries y uso de los mappings

Ahora vamos a crear un indice nuevo que almacene perfiles de usuarios y vamos a crearnos 10 perfiles aleatorios. Para esto vamos a usar la libreria Faker de python.

In [None]:
import datetime
import time
from faker import Factory

fake = Factory.create()

es.indices.delete(index='users', ignore=404)
es.indices.create(index='users', ignore=400)

for i in range(0, 10):
    profile = fake.simple_profile()
    birthdate = datetime.datetime.strptime(profile['birthdate'], '%Y-%m-%d')
    profile['timestamp'] = int(time.mktime(birthdate.timetuple()) * 1e3)
    es.index(index='users', doc_type='user', id=i, body=profile)

In [None]:
response = es.count(index='users')
response.get('count')

In [None]:
print_response(es.search(index='users'))

In [None]:
print_response(es.search(index='users', doc_type='user', _source=False))

In [None]:
print_response(es.search(index='users', doc_type='user', _source_include=['name'], size=1))

In [None]:
response = es.search(index='users',
                         doc_type='user',
                         _source_include=['name'],
                         size=1,
                         filter_path=['hits.hits._source.name'])
print_response(response)
user_name = response['hits']['hits'][0]['_source']['name']

In [None]:
query = {
  "query": {
    "match": {
      "name": user_name.lower()
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "match": {
      "name.keyword": user_name
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "match": {
      "name.keyword": user_name.lower()
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "match": {
      "name": user_name[4:]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "match": {
      "name": user_name[2:4]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "bool": {
      "must": [
        {
          "wildcard": {
            "name": {
              "value": "*{}*".format(user_name[1:3])
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "birthdate": {
              "gte": "20/01/1988",
              "lte": "2016",
              "format": "dd/MM/yyyy||yyyy"
            }
          }
        }
      ]
    }
  }
}

print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "birthdate": {
              "gte": "20/01/1988",
              "lte": "2016",
              "format": "dd/MM/yyyy||yyyy"
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "query": {
    "bool": {
      "must": [
        {
          "wildcard": {
            "name": {
              "value": "* *"
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query))

In [None]:
query = {
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "timestamp": {
              "gte": "20/01/1988",
              "lte": "2016",
              "format": "dd/MM/yyyy||yyyy"
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query, ignore=400))

In [None]:
from elasticsearch.client import IndicesClient
import json

indice_client = IndicesClient(origin)

mapping = indice_client.get_mapping(index='users', doc_type='user')

print(json.dumps(mapping, sort_keys=True, indent=2))

Las dos ultimas queries han fallado ya que, si no le decimos nada a elasticsearch, no sabe buscar por textos completos con espacios con wildcards y no sabe identificar que el numero entero que le introducimos como tiemstamp tiene que ser una fecha.

Para esto es para lo que se utilizan los mappings, en este caso vamos a ver como definir un tipo de dato con los modelos estandar de elasticsearch y como hacer uno especifico.

Para el caso del timestamp, tendremos que decirle que es de tipo date y que vienen en milliseconds

```json
"timestamp": {
  "type": "date",
  "index": true,
  "format": "strict_date_optional_time||epoch_millis"
}
```

Para el caso de los wildcards con espacios, tenemos que hacer nuestro analyzer, ya que no hay uno por defecto para hacer ese tipo de busqueadas. Para eso crearemos un analyzer que llamaremos keyword_lowercase, este se encargara de definir la manera de analizar los textos para que no lo separe por tokens.

```json
"name": {
  "type": "text",
  "index": true,
  "fielddata": true,
  "analyzer": "keyword_lowercase",
  "fields": {
    "keyword": {
      "type": "keyword",
      "index": true
    }
  }
}
```

Para crear este analyzer, tenemos que añadir un campo a nuestro mapping, los settings, donde lo definiremos.

```json
"settings": {
  "analysis": {
    "analyzer": {
      "keyword_lowercase": {
        "type": "custom",
        "tokenizer": "keyword",
        "filter": [
          "lowercase"
        ]
      }
    }
  }
}
```

El mapping final seria el siguiente.

```json
{
    "settings": {
      "analysis": {
        "analyzer": {
          "keyword_lowercase": {
            "type": "custom",
            "tokenizer": "keyword",
            "filter": [
              "lowercase"
            ]
          }
        }
      }
    },
    "mappings": {
      "user": {
        "properties": {
          "address": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "birthdate": {
            "type": "date"
          },
          "mail": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "name": {
            "type": "text",
            "index": true,
            "fielddata": true,
            "analyzer": "keyword_lowercase",
            "fields": {
              "keyword": {
                "type": "keyword",
                "index": true
              }
            }
          },
          "sex": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "timestamp": {
            "type": "date",
            "index": true,
            "format": "strict_date_optional_time||epoch_millis"
          },
          "username": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          }
        }
      }
    }
}
```

Ahora tenemos que actualizar el mapping, podriamos borrar los datos y crear el indice de nuevo, pero perderiamos todos los datos, asi que vamos a ver como se hace un cambio de mapping y como trabajar para hacer que cambiar el mapping no suponga un problema.

Lo primero que vamos es a crearnos un indice nuevo al que llamaremos users_v2 y que tendra el nuevo mapping.

In [None]:
mapping = {
    "settings": {
      "analysis": {
        "analyzer": {
          "keyword_lowercase": {
            "type": "custom",
            "tokenizer": "keyword",
            "filter": [
              "lowercase"
            ]
          }
        }
      }
    },
    "mappings": {
      "user": {
        "properties": {
          "address": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "birthdate": {
            "type": "date"
          },
          "mail": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "name": {
            "type": "text",
            "index": True,
            "fielddata": True,
            "analyzer": "keyword_lowercase",
            "fields": {
              "keyword": {
                "type": "keyword",
                "index": True
              }
            }
          },
          "sex": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "timestamp": {
            "type": "date",
            "index": True,
            "format": "strict_date_optional_time||epoch_millis"
          },
          "username": {
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            },
            "type": "text"
          }
        }
      }
    }
  }

es.indices.create(index='users_v1', ignore=400, body=mapping)

Ahora tenemos que copiar los datos de un sitio a otro, para esto vamos a utilizar el healper de elasticsearch para recorrer todo un indice **scan** y la operacion de para hacer operaciones con mucho documentos **bulk**.

La problematica realmente se da cuando tenemos mas de 10000 docs, ya que no podemos pedirlos todos de golpe, asi que usamos el **scroll**, que es una funcionalidad para recorrer todo el indice en multiples consulas, esto es lo que encapsula el **scan**.

In [None]:
from elasticsearch.helpers import scan
from elasticsearch.client.indices import IndicesClient

def copy_to(originIndexName, dstIndexName):
    create_bulk_data = []
    scroll = scan(es, index=originIndexName, scroll='10s')
    for res in scroll:
        create_bulk_data.append({"index": {"_index": dstIndexName,
                                           "_type": res['_type'],
                                           "_id": res['_id']}})
        create_bulk_data.append(res['_source'])
    if create_bulk_data:
        es.bulk(index=dstIndexName, body=create_bulk_data)

copy_to('users', 'users_v1')

In [None]:
print_response(es.search(index='users_v1', doc_type='user', _source=False))

In [None]:
indice_client.put_alias(index='users_v1', name='users', ignore=400)

In [None]:
es.indices.delete(index='users')
indice_client.put_alias(index='users_v1', name='users', ignore=400)

In [None]:
query = {
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "timestamp": {
              "gte": "20/01/1988",
              "lte": "2016",
              "format": "dd/MM/yyyy||yyyy"
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query, ignore=400))

In [None]:
words = user_name.split()
name_wildcards = ' '.join((words[0][-1], words[0][0:2]))
print(name_wildcards)
query = {
  "query": {
    "bool": {
      "must": [
        {
          "wildcard": {
            "name": {
              "value": "* *".format(name_wildcards.lower())
            }
          }
        }
      ]
    }
  }
}
print_response(es.search(index='users', doc_type='user', body=query, _source=False))