# Ingest data in Elasticsearch from an XML file

Welcome in this notebook. Its purpose is to show how to ingest large XML data to Elasticsearch and Kibana. 


In the following lines, we will 
* start by downloading a XML file from an online API (in this example : Isidore)
* stream through the file (necessary for big files)
* use python client to index the document to Elasticsearch

Notice that for the sake of the example, the XML file is very small, however this method is designed to suit large XML file. 

### Prerequisite

* Have both Elasticsearch and Kibana running. 
* Install the requirements.txt 
`pip install -r requirements.txt`


## Downloading the file from online API

In this example, we are going to work with an XML file. 
Either follow on the process from the API or go streight to the definition of the file (FILE)


In [None]:
import requests
from xml.etree import ElementTree
import lxml.etree as ET #this library manages xpath better
import unicodedata

### Retrieving data from an API

In [None]:
url = "https://api.isidore.science/resource/search" # ardress of isidore API
subject = "http://data.bnf.fr/ark:/12148/cb16620091k" # in this example we are looking for "digital humanities" subject
nb_result = 50 # in this example, we are only looking for 50 results

response = requests.get(url, {"subject": subject, "replies": nb_result} )
data = ElementTree.fromstring(response.content)
tree = ElementTree.ElementTree(data) #this step creates a manageable XML tree, rather than a text. 

### Printing data in a file 

If you are dealing with small datasets, this step is not compulsory ; you can store the data in your memory and work directly from the `response` variable. 
In this example, to purpose is to show how to load big xml file. 

In [None]:
FILE = "export.xml" # here we are loading the data to a new file "export.xml"


tree.write(open(FILE, 'w'), encoding="unicode")

## Reading document by document the XML file

In this example, the xml file is not a very large one. It contains 50 documents to index, which is not a lot!
However, let's pretend you are willing to manipulate a very large XML file! In order not to run out of memory, it is better to read it entry by entry. This is the next step!


### Setting up the variables

In order to parse the XML document we need to be aware of it structure. Have a look at your XML file, and determine which is the tag for each entry you want to index in Elasticsearch. 
Change de following NODE variable according to your document, or keep it to run this exemple. 

In [None]:
NODE = "reply" #in our case, we can either chose "entry" or "isidore" (each entry has only one direct child which is isidore tag)

In [None]:
def get_relevant_nodes(xml_data):
    my_dict = {}
    my_dict['_id'] = unicodedata.normalize('NFKD', xml_data.xpath("./@uri")[0])
    my_dict['authors'] = [unicodedata.normalize('NFKD', d) for d in xml_data.xpath("//creator/@normalizedAuthor")]
    my_dict['date'] = unicodedata.normalize('NFKD', next(iter(xml_data.xpath("//date[1]/@origin")),"")) #this syntax returns first element [0] or ""
    my_dict['subjects'] = [unicodedata.normalize('NFKD', d) for d in xml_data.xpath("//subject/text()")]
    my_dict['abstract'] = unicodedata.normalize('NFKD', next(iter(xml_data.xpath("//abstract/text()")), ""))
    my_dict['concepts'] = [unicodedata.normalize('NFKD', d) for d in xml_data.xpath("//concept/preflabel[@xml:lang='fr']/text()")]
    for k in my_dict.keys():
        if my_dict[k]:
            return my_dict #we only return my_dict if at least one of the value is filled

### Stream XML

This function is designed to read piece by piece the XML document. Each time it recognizes a tag "NODE", it manages its content, before erasing it from memory. This method streams throught the XML document and prevent out of memory errors. 



In [None]:
def streamXML():
    """
    This function reads the XML file piece by piece, and "acts" when it recognize a specific node. 
    """
    for event, elem in ET.iterparse(FILE, events=('start', 'end')):
        if event == 'end' and elem.tag == NODE : # Each time the XML parser meets the end tag of our "NODE" (here "isidore")
            #print(elem.xpath("./@uri"))
            
            my_dict = get_relevant_nodes(elem)
            elem.clear() #Don't forget to free memory space!

            if my_dict:
                yield my_dict # this function "yields" for each element the specific node as a dictionnary. It will be processed by the index function. 
            else:
                pass   

### Index

Now, we need to set up the index function!  


In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk


#### Set up Elastic client

In this example, the index is going to be created automatically.  
We still need to set up our Elasticsearch client.  
If you need help and example to set up your elastic config [see the documentation](https://elasticsearch-py.readthedocs.io/en/master/#ssl-and-authentication)

#### Bulk API

Indexing with Elasticsearch, there are 2 mistakes you don't want to make: 
* index each document 1 by 1 (long and coastly)
* index aaaaall documents at once (especially if you have a big number of documents indeed, let's say, more than 500). 

The solution is to group a reasonable amount of documents (a chunk) and bulk ingest it, before moving to the next chunk. 

*In this example, we are dealing with a very small number of documents, but it is important to understand this concept for bigger usecase*. 

In [None]:
ES_CONFIG = [] #in this example, the config can be empty because of a basic configuration (no login, ES on localhost:9200)
INDEX = "isidore" #change this name accordingly to your dataset. 
CHUNK = 500 #this is the default value of the library. In this example, we will only have 1 chunk, because we have only 50 entries. 


In [None]:
def index():
    client = Elasticsearch(
        #CONFIG
    )    
    print("Indexing documents...")
    successes = 0
    for ok, action in streaming_bulk(client=client, chunk_size=CHUNK, index=INDEX, actions=streamXML()): #this calls the function streaming_bulk
        successes += ok
    print(f'Indexed {successes} documents')

In [None]:
index()