# Introduction 

Elasticsearch est un moteur de recherche temps réel et Open Source. 
Il possède de nombreux avantages : 
- il met en place une API RESTful.
- il est distribué, ce qui lui permet d'être tolérant aux pannes.
- il est basé sur le moteur d'indexation d'Apache Lucène.
- utilise le format JSON pour le stockage
- permet de faire de la recherche en texte libre. 

Il est utilisé dans de nombreuses entreprises pour faire de la recherche textuelle dans des documents ou alors traiter des tera de logs. 

Dans ce cours nous allons aborder deux technologies, ElasticSearch et Kibana. 

## Concepts

- `Near Realtime (NRT)` : La plupart des anciens moteurs de recherche devaient processer et indexer les documents pour qu'ils puissent être recherchés. ElasticSearch permet de rechercher les nouveaux documents presque instantanément.  


- `Document` : Le document est l'unité de base  qui peut être indexé dans ElasticSearch. Un document est représenté au format JSON. On peut stocker autant de document que l'on souhaite dans un index. Un document est un ensemble de données clés-valeurs.   


- `Index` : Un index est une collection de documents qui ont des caractéristiques similaires. Un index est identifié par un nom. On peut créer autant d'index que l'on veut.   


- `Node` : Un Node (ou noeud en francais) est un serveur qui fait parti d'un cluster de plusieurs noeuds. Un noeud stocke les données et est optimisé pour retrouver les données.   


- `Cluster` : Un cluster est un ensemble de noeud qui communiquent entre eux.  Un cluster peut contenir autant de noeuds que l'on veut.   


- `Shards` :  Un index peut être tellement gros que la donnée des documents est plus importante que la capacité de stockage d'un node et donc d'un serveur. Pour pallier ce problème ElasticSearch met en place une méthode pour découper la données des index en des nombreuses petites parties qui sont appelés des shards. Chaque shard est stocké sur un noeud différents. 

    Cette technique est intéressante car elle permet de scaler horizontalement notre cluster (ie: augmenter le nombre de machines et donc de noeuds)  
    

- `Replicas` : Comme chaque noeud ne contient pas l'intégralité des données, on pourrait penser que si un noeud tombe en panne, une partie des données serait perdue. ElasticSearch met en place une technique de réplication qui permet de stocker ces différents shards sur plusieurs noeuds. C'est la **réplication**.  
    
    Cette technique est particulièrement intéressante puisqu'elle permet de palier la panne de shards ou de noeuds.

# Let's Play 

In [1]:
from elasticsearch import Elasticsearch

LOCAL = True

es_client = Elasticsearch(hosts=["localhost" if LOCAL else "elasticsearch"])

In [2]:
es_client.ping()

True

Pour indexer des documents, il suffit d'appeler la méthode `index` du client.

In [3]:
document = {
    "name":"Decision Trees", 
    "description":"A decision tree is a decision support tool that uses a tree-like graph or model of decisions and their possible consequences, including chance-event outcomes, resource costs, and utility. Take a look at the image to get a sense of how it looks like.",
    "algo_type":"Supervised Learning"
}

In [4]:
res = es_client.index(index="algorithms", doc_type='algo', id=1, body=document)
print(res['result'])

created


Le document à été créé avec succès, on peut maintenant le récupérer via son index. 

In [5]:
import pprint

In [6]:
res = es_client.get(index="algorithms", doc_type='algo', id=1)
pprint.pprint(res["_source"])

{'algo_type': 'Supervised Learning',
 'description': 'A decision tree is a decision support tool that uses a '
                'tree-like graph or model of decisions and their possible '
                'consequences, including chance-event outcomes, resource '
                'costs, and utility. Take a look at the image to get a sense '
                'of how it looks like.',
 'name': 'Decision Trees'}


In [7]:
documents = [
    {
    "name":"Naive Bayes Classification", 
    "description":"Naive Bayes classifiers are a family of simple probabilistic classifiers based on applying Bayes’ theorem with strong (naive) independence assumptions between the features. The featured image is the equation — with P(A|B) is posterior probability, P(B|A) is likelihood, P(A) is class prior probability, and P(B) is predictor prior probability.",
    "algo_type":"Supervised Learning"
    },    {
    "name":"Logistic Regression", 
    "description":"Logistic regression is a powerful statistical way of modeling a binomial outcome with one or more explanatory variables. It measures the relationship between the categorical dependent variable and one or more independent variables by estimating probabilities using a logistic function, which is the cumulative logistic distribution.",
    "algo_type":"Supervised Learning"
},    {
    "name":"Principal Component Analysis", 
    "description":"PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components.",
    "algo_type":"Unsupervised Learning"
},    {
    "name":"Singular Value Decomposition", 
    "description":"In linear algebra, SVD is a factorization of a real complex matrix. For a given m * n matrix M, there exists a decomposition such that M = UΣV, where U and V are unitary matrices and Σ is a diagonal matrix.",
    "algo_type":"Unsupervised Learning"
}
]

Si on ne précise pas l'ID lors de l'indexation, ElasticSearch se charge d'en trouver un automatiquement

In [None]:
for doc in documents:
    res = es_client.index(index="algorithms", doc_type='algo', id=None, body=doc)
    print(res["result"])

On récupère maintenant tous les éléments de l'index

In [None]:
result = es_client.search(index="algorithms", body={"query": {"match_all": {}}})
result

La réponse nous donne des informations sur le temps d'exécution de la requête le nombre de documents correspondant à la requête et l'ensemble des documents. 

In [None]:
f"{result['hits']['total']} documents correspondent à la requêtes qui a pris {result['took']}ms"

Affichons maintenant tous les documents : 

In [None]:
ids = []
for hit in result['hits']['hits']:
    print("Name : {name}\n description : {description} \n Type : {algo_type}\n".format(**hit['_source']))
    print("******************")
    ids.append(hit["_id"])

Pour supprimer un document il suffit d'appeler la méthode `delete`.

In [None]:
es_client.delete(index="algorithms", doc_type="algo", id=ids[0])

In [None]:
result = es_client.search(index="algorithms", body={"query": {"match_all": {}}})
f"{result['hits']['total']} documents correspondent à la requêtes qui a pris {result['took']}ms"

Faisons des choses un peu plus intéressantes maintenant.   

Dans le répertoire `data/` il y a les données de 5000 films de la base de TMDb. 

## Nettoyage

Avant de commencer il faut nettoyer un peu les données du fichier csv.

In [20]:
import pandas as pd
import json
df_movies = pd.read_csv("./data/tmdb_5000_movies.csv")

In [21]:
def clean_words(l):
    return [elt["name"] for elt in json.loads(l)]

On nettoie alors la donnée des colonnes.

In [22]:
for col in ["genres", "keywords", "production_countries", "spoken_languages"]:
    df_movies.loc[:,col] = df_movies.loc[:,col].apply(clean_words)

Pour indexer les documents il nous faut une liste de dictionnaires. Pour cela on peut utiliser la méthode de pandas `to_dict` avec le paramètre `orient=records`.

In [23]:
documents = df_movies.fillna("").to_dict(orient="records")

In [24]:
documents[0:2]

[{'budget': 237000000,
  'genres': ['Action', 'Adventure', 'Fantasy', 'Science Fiction'],
  'homepage': 'http://www.avatarmovie.com/',
  'id': 19995,
  'keywords': ['culture clash',
   'future',
   'space war',
   'space colony',
   'society',
   'space travel',
   'futuristic',
   'romance',
   'space',
   'alien',
   'tribe',
   'alien planet',
   'cgi',
   'marine',
   'soldier',
   'battle',
   'love affair',
   'anti war',
   'power relations',
   'mind and soul',
   '3d'],
  'original_language': 'en',
  'original_title': 'Avatar',
  'overview': 'In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but becomes torn between following orders and protecting an alien civilization.',
  'popularity': 150.437577,
  'production_companies': '[{"name": "Ingenious Film Partners", "id": 289}, {"name": "Twentieth Century Fox Film Corporation", "id": 306}, {"name": "Dune Entertainment", "id": 444}, {"name": "Lightstorm Entertainment", "id": 574}]',
  '

Pour éviter de faire plein de petits appels à ElasticSearch pour indexer les 5000 films on peut utiliser un helper `bulk` pour indexer tous les documents d'un seul coup et éviter les appels réseaux qui sont très couteux en temps. 

In [25]:
from elasticsearch.helpers import bulk

In [26]:
import numpy as np

In [29]:
def generate_data(documents):
    for docu in documents:
        yield {
            "_index": "movies",
            "_type": "movie",
            "_source": {k:v if v else None for k,v in docu.items()},
        }

bulk(es_client, generate_data(documents))

(4803, [])

On peut utiliser l'interface http pour vérifier que les index sont bien à jours. L'index `movies` est présent on peut aussi voir le nombre de documents et la taille en mémoire.  

In [None]:
!curl http://localhost:9200/_cat/indices?v #!curl http://elasticsearch:9200/_cat/indices?v

On peut voir aussi quelques documents avec le endpoint HTTP suivant

In [None]:
!curl http://localhost:9200/movies/_search #!curl http://elasticsearch:9200/movies/_search

# Kibana 

In [None]:
from IPython.display import Image

C'est le moment de commencer à utiliser Kibana. Kibana est un soft de la suite ELK (ElasticSearch, Kibana, Logstash) qui permet de gérer de façon graphique les données dans les index ElasticSearch.

## Index Pattern

Dans l'onglet `Management`, il faut spécifier à Kibana d'aller chercher un index spécifique sur ElasticSearch. Aller dans `Create index pattern` et taper les premières lettres du nom de l'index que vous voulez parcourir. Ici `movies`, vous verrez alors la page suivante, référençant l'intégralité des champs de l'index. 

In [None]:
Image("./img/index_pattern.png")

Vous pouvez ensuite aller dans l'onglet `Discover` pour voir un apperçu des documents que vous venez d'indexer. Vous pouvez aussi, sur la droite, appliquer des filtres pour faire des requêtes simples sur vos données. Ici on appliquera un filtre sur le genre `Comedy`.

In [None]:
Image("./img/discover.png")

Vous pouvez aller ensuite dans l'onglet `Visualize` pour commencer à jouer avec les graphiques. Par exemple on peut créer un histograme des budgets des différents films. 

In [None]:
Image("./img/budget.png")

Ou encore des genres des différents films

In [None]:
Image("./img/gender.png")

Retournons maintenant un peu dans notre code pour commencer à utiliser ses données. Il va maintenant falloir se plonger dans les requêtes ElasticSearch. Ce n'est pas la choise la plus aisée, elles deviennent vite illisibles et complexes. 

In [None]:
QUERY = {
    "query": {
        "match_all": {}
    }
}

Cette requête simple permet de récupérer l'intégralité des documents d'un index.

In [None]:
QUERY = {
  "query": {
    "term" : { 
        "title" : "superman"} 
  }
}

Cette seconde requête permet de récupérer tous les documents dont le titre contient `SuperMan`.

In [None]:
result = es_client.search(index="movies", body=QUERY)
[elt['_source']['title'] for elt in result["hits"]["hits"]]

On veut maintenant chercher tous les films contenant `SuperMan` mais qui ne contiennent pas `Batman`. Pour cela on est forcé d'utilisé une requête composée appelée `bool query`  elle permet de transformer chaque requête en un filtre booléen. 

In [None]:
QUERY = {
  "query": {
    "bool" : {
      "must" : {
        "term" : { "title" : "superman" }
      },
      "must_not" : {
                  "term" : { "title" : "batman" }
      }
  }
}
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
[elt['_source']['title'] for elt in result["hits"]["hits"]]

On peut encore réduire les resultats en filtrant sur le budget du film. On veut en plus des films de SuperMan sans Batman récupérer les films qui on eu un budget de plus de ou égal à 20M  et moins de 55M.

In [None]:
QUERY = {
  "query": {
    "bool" : {
      "must" : [
          {
        "term" : { "title" : "superman" }},
        {"range" : {
          "budget" : { "gte" : 20000000, "lt" : 55000000 }
        }}
      ],
      "must_not" : {
                  "term" : { "title" : "batman" }
      }
  }
}
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
{elt['_source']['title']:elt['_source']['budget']  for elt in result["hits"]["hits"]}

Si on veut faire une recherche dans plusieurs champs de chaque document de l'index on peut faire une requête de `multi_match`

In [None]:
QUERY = {
  "query": {
    "multi_match" : {
      "query":    "Smith",
      "fields": [ "title", "overview" ] 
    }
  }
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
[elt['_source']['title']  for elt in result["hits"]["hits"]]

On peut vouloir trier les résultats selon la popularité par exemple.

In [None]:
QUERY = {
  "query": {
    "multi_match" : {
      "query":    "Smith",
      "fields": [ "title", "overview" ] 
    }
  },
    "sort" : [
        { "popularity" : {"order" : "desc"}}
    ]
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
{elt['_source']['title']:elt['_source']['popularity']  for elt in result["hits"]["hits"]}

ElasticSearch gère aussi bien les dates. On peut vouloir trier par date de sortie.

In [None]:
QUERY = {
  "query": {
    "multi_match" : {
      "query":    "Smith",
      "fields": [ "title", "overview" ] 
    }
  },
    "sort" : [
        { "release_date" : {"order" : "desc"}}
    ]
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
{elt['_source']['title']:elt['_source']['release_date']  for elt in result["hits"]["hits"]}

## Aggregations

Les aggregations sont des requêtes complexes mais qui permettent de faire des opérations très rapides sur les données. 
La syntaxe devient très vite complexe. Par exemple, pour récupérer le nombre de film par genre. 

In [None]:
QUERY = {
    "aggs": {
    "count_gender": {
      "terms": {
        "field": "genres.keyword",
        "size": 5,
        "order": {
          "_count": "desc"
        }
      }
        }
  }
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
result["aggregations"]["count_gender"]["buckets"]

Le paramètre size permet de récupérer les N premières aggrégations.

Ensuite si on veut aller plus loin on peut vouloir récupérer la moyenne des budgets par genre. Il faut intégrer un nouveau niveau d'aggregation. Pour cela on utilise la syntaxe suivante. 

In [None]:
QUERY = {
    "aggs": {
        "count_gender": {
            "terms": {
                "field": "genres.keyword",
                "size": 3,
                "order": {
                    "average_budget": "desc"
                }
            },
            "aggs": {
                "average_budget":{
                    "avg" : {
                        "field" : "budget"
                    }
                }
            }
        }
    }
}

In [None]:
result = es_client.search(index="movies", body=QUERY)
result["aggregations"]["count_gender"]["buckets"]

# Exercice 

In [None]:
import pandas as pd 

In [None]:
df_credits = pd.read_csv("data/tmdb_5000_credits.csv")

In [None]:
df_credits.head()

1) Nettoyer les données de la DataFrame d'acteurs comme précédement.   
2) Merger les deux DataFrame en utilisant l'identifiant du film.   
3) Créer un nouvel Index `augmented_movies` similaire à l'index précédant mais en ajoutant les données des acteurs.     
4) Créer une fonction permettant de trouver tous les films d'un acteur.   
5) Trouver quel acteur à jouer dans les films avec les plus gros budgets. 
6) Réaliser, sur Kibana, 3 graphiques de votre choix

In [13]:
from pymongo import MongoClient

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json

In [55]:
class MongoDB():
    def __init__(self):
        self.client = MongoClient()
        self.db = self.client['Vidal']
    
    def find(self, collection):
        return self.db[collection].find()
    
    def collection(self):
        return self.db.list_collection_names()
    
class ElasticsearchDB():
    def __init__(self):
        self.mongo = MongoDB()
        self.client = Elasticsearch()
    
    def index_bulk(self):
        for collection in self.mongo.collection():
            bulk(client=self.client, actions=self.index_mongo(collection), request_timeout=30)
    
    def index_mongo(self, collection):
        cursor = self.mongo.find(collection)
        for item in cursor:
            _id = str(item['_id'])
            del item['_id']
            action = {
                '_op_type': 'index',
                "_index" : collection,
                "_type"  : collection+"_document",
                "_id"    : _id,
                "_source": item,
                "_size": {"enabled": 'true'}
            }
            yield action
    
    #DEUXIEME POSSIBILITE


    def search_insubstance(self, substance):
        query = json.dumps({
          "query": {
            "bool" : {
              "must" : [
                  {
                "term" : { "nom_substance" : substance }},
              ]
          }
        }
        })
        #response = requests.get(uri, data=query)
        #results = json.loads(response.text)
    
        result = client.search(index="substance_items", body=query)
        return result


    def search_inmedicament(self, substance, excipient=False):
        if excipient==False:
            query = json.dumps({
              "query": {
                "bool" : {
                  "must_not" : {
                          "term" : { "excipient" : excipient }
                  }
              }
            }
            })
        else:
            query = json.dumps({
              "query": {
                "bool" : {
                  "must_not" : {
                          "term" : { "excipient" : excipient }
                  },
                  "should": [
                    "bool": {
                       "must": {[
                        { "term": { "substance": substance }}
                       ]}
                    }
                  ]
                }
              }
            })
            
    
        result = client.search(index="medicament_items", body=query)
        return result


    def format_results(self, result, content):
        """Print results nicely:
        doc_id) content
        """
        data = [doc for doc in result['hits']['hits']]
        for doc in data:
            print("%s" % (doc['_source'][content]))

In [56]:
elastic = ElasticsearchDB()
#elastic.index_bulk()
ss = elastic.search_insubstance("clomifène")
#elastic.format_results(r, "fiche")
mm =elastic.search_inmedicament("clomifène", "amidon soluble")

TypeError: unhashable type: 'list'

In [46]:
data = [doc for doc in mm['hits']['hits']]
for doc in data:
    print("%s" % (doc['_source']))

{'nom_medicament': 'PERGOTIME 50 mg cp séc', 'descriptif': {'liste': 'liste 1', 'cip': '8340093280098', 'modalités_de_conservation': 'avant ouverture : durant 4 ans.', 'remboursement': 'nr', 'agréé_aux_collectivités': 'non', 'commercialisé': 'non', 'modèle_hospitalier': 'non'}, 'excipient': ['cellulose', 'amidon de maïs', 'silice', 'magnésium stéarate', 'carboxyméthylamidon', 'lactose'], 'lien_medicament': 'https://www.vidal.fr/Medicament/pergotime_50_mg_cp_sec-12986.htm', 'substance': ['Clomifène citrate']}
{'nom_medicament': 'CLOMID 50 mg cp', 'descriptif': {'agréé_aux_collectivités': 'oui', 'liste': 'liste 1', 'cip': '9340093262338', 'remboursement': '65%', 'commercialisé': 'oui', 'modalités_de_conservation': "avant ouverture : durant 36 mois (conserver à l'abri de la chaleur, conserver à l'abri de la lumière, conserver à l'abri de l'humidité).", 'modèle_hospitalier': 'non'}, 'excipient': ['amidon de maïs', 'amidon soluble', 'magnésium stéarate', 'fer jaune oxyde', 'saccharose', 'la

In [38]:
for s, m in zip(ss["hits"]["hits"], mm["hits"]["hits"]]

SyntaxError: invalid syntax (<ipython-input-38-0bfdd7bb4a5e>, line 1)

In [11]:
!curl http://localhost:9200/_cat/indices?v #!curl http://elasticsearch:9200/_cat/indices?v

health status index      uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   algorithms CsF-T8sJSTqzwL5dV8Ejzg   5   1          1            0      7.1kb          7.1kb


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   212  100   212    0     0   1503      0 --:--:-- --:--:-- --:--:--  1503
curl: (3) <url> malformed

  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0curl: (6) Could not resolve host: elasticsearch


In [None]:
#from elasticsearch import Elasticsearch
#es = Elasticsearch()
#es.indices.delete(index='substance_items', ignore=[400, 404])

In [8]:
client = Elasticsearch()

In [109]:
QUERY = {
    "query": {
        "match_all": {}
    }
}

In [11]:
#PREMIERE POSSIBILITE

QUERY = {
  "query": {
    "bool" : {
      "must" : [
          {
        "term" : { "substance_items.nom_substance" : "lysozyme" }},
      ],
      "must_not" : {
                  "term" : { "medicament_items.excipient" : "amidon" }
      }
  }
}
}

result = client.search(index="substance_items,medicament_items", body=QUERY)
[elt['_source']['liste_medicament'] for elt in result['hits']['hits']]
print(result)

{'took': 6, 'timed_out': False, '_shards': {'total': 10, 'successful': 10, 'skipped': 0, 'failed': 0}, 'hits': {'total': 0, 'max_score': None, 'hits': []}}


In [1]:
#DEUXIEME POSSIBILITE


def search_insubstance(substance):
    query = json.dumps({
      "query": {
        "bool" : {
          "must" : [
              {
            "term" : { "nom_substance" : substance }},
          ]
      }
    }
    })
    #response = requests.get(uri, data=query)
    #results = json.loads(response.text)
    
    result = client.search(index="substance_items", body=query)
    return results


def search_inmedicament(excipient):
    query = json.dumps({
      "query": {
        "bool" : {
          "must_not" : {
                  "term" : { "excipient" : excipient }
          }
      }
    }
    })
    
    result = client.search(index="medicament_items", body=query)
    return results


def format_results(results, content):
    """Print results nicely:
    doc_id) content
    """
    data = [doc for doc in results['hits']['hits']]
    for doc in data:
        print("%s" % (doc['_source'][content]))

## Explications

1. On trouve une méthode qui nous permet de faire une query sur deux collections en même temps. C'est ce que j'ai tenté dans la première possibilité.

2. On créée deux fonctions qui récupère le résultat de nos deux query (à savoir une pour chercher la liste des médicaments qui contiennent la substance et une deuxième pour récupérer les médicaments ne contenant pas l'excipient prohibé), et on merge nos deux résultats dans une liste (par exemple).