---
### Carrega pacotes necessários

In [None]:
#import pip
#pip.main(['install', 'elasticsearch'])
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd
import numpy as np
from time import time
import os
import sys
from dataiku import pandasutils as pdu
from elasticsearch import Elasticsearch, helpers
import json

---
### Conexão ao ES

In [None]:
es = Elasticsearch('http://user:pwd@server_ip:port/')

In [None]:
es.info()

---
### Criação de índice

In [None]:
#Exclui índice, se ele existir
indice= "sinasc_dss"
doc_type="sinasc-type"
try :
    es.indices.delete(index=indice)
except :
    pass

In [None]:
#Definição de tipo para documentos
sinasc_type = {
                    "mappings":{
                        'sinasc-type': {
                            'properties': {
                                'NUMERODN':{'type': 'keyword'},
                                'DTNASC':{'type': 'keyword'},
                                'data_nasc':{'type': 'date'},
                                'ano_nasc':{'type': 'integer'},
                                'dia_semana_nasc':{'type': 'keyword'},
                                'SEXO':{'type': 'keyword'},
                                'def_sexo':{'type': 'keyword'},
                                'RACACOR':{'type': 'keyword'},
                                'def_raca_cor':{'type': 'keyword'},

                                'ESTCIVMAE':{'type': 'keyword'},
                                'def_est_civil':{'type': 'keyword'},

                                'ESCMAE':{'type': 'keyword'},
                                'def_escol_mae':{'type': 'keyword'},

                                'CODOCUPMAE':{'type': 'keyword'},
                                'CODBAIRES':{'type': 'keyword'},
                                'CODMUNRES':{'type': 'keyword'},
                                'LOCNASC':{'type': 'keyword'},
                                'def_loc_nasc':{'type': 'keyword'},

                                'CODMUNNASC':{'type': 'keyword'},
                                'IDADEMAE':{'type': 'keyword'},
                                
                                'APGAR1':{'type': 'keyword'},
                                'APGAR5':{'type': 'keyword'},

                                'QTDFILVIVO':{'type': 'keyword'},
                                'QTDFILMORT':{'type': 'keyword'},
                                'GRAVIDEZ':{'type': 'keyword'},
                                'def_gravidez':{'type': 'keyword'},

                                'GESTACAO':{'type': 'keyword'},
                                'def_gestacao':{'type': 'keyword'},

                                'PARTO':{'type': 'keyword'},
                                'def_parto':{'type': 'keyword'},
                                
                                'CONSULTAS':{'type': 'keyword'},
                                'def_consultas':{'type': 'keyword'},

                                'PESO':{'type': 'float'},
                                
                                'IDANOMAL':{'type': 'keyword'},
                                'CODANOMAL':{'type': 'keyword'},
                                
                                'CODESTAB':{'type': 'keyword'},
                                'UFINFORM':{'type': 'keyword'},
                                'HORANASC':{'type': 'keyword'},
                                'CODBAINASC':{'type': 'keyword'},
                                
                                'DTRECEBIM':{'type': 'keyword'},

                                'CODINST':{'type': 'keyword'},
                                
                                'res_MUNNOME':{'type': 'keyword'},
                                'res_MUNNOMEX':{'type': 'keyword'},
                                'res_CAPITAL':{'type': 'keyword'},
                                'res_FRONTEIRA':{'type': 'keyword'},
                                'res_AMAZONIA':{'type': 'keyword'},
                                'res_LATITUDE':{'type': 'float'},
                                'res_LONGITUDE':{'type': 'float'},
                                'res_ALTITUDE':{'type': 'float'},
                                'res_AREA':{'type': 'float'},
                                'res_codigo_adotado':{'type': 'keyword'},
                                'res_SIGLA_UF':{'type': 'keyword'},
                                'res_CODIGO_UF':{'type': 'keyword'},
                                'res_NOME_UF':{'type': 'keyword'},

                                'res_MSAUDCOD':{'type': 'keyword'},
                                'res_RSAUDCOD':{'type': 'keyword'},
                                'res_CSAUDCOD':{'type': 'keyword'},

                                'nasc_MUNNOME':{'type': 'keyword'},
                                'nasc_MUNNOMEX':{'type': 'keyword'},
                                'nasc_CAPITAL':{'type': 'keyword'},
                                'nasc_FRONTEIRA':{'type': 'keyword'},
                                'nasc_AMAZONIA':{'type': 'keyword'},
                                'nasc_LATITUDE':{'type': 'float'},
                                'nasc_LONGITUDE':{'type': 'float'},
                                'nasc_ALTITUDE':{'type': 'float'},
                                'nasc_AREA':{'type': 'float'},
                                'nasc_codigo_adotado':{'type': 'keyword'},
                                'nasc_SIGLA_UF':{'type': 'keyword'},
                                'nasc_CODIGO_UF':{'type': 'keyword'},
                                'nasc_NOME_UF':{'type': 'keyword'},

                                'nasc_MSAUDCOD':{'type': 'keyword'},
                                'nasc_RSAUDCOD':{'type': 'keyword'},
                                'nasc_CSAUDCOD':{'type': 'keyword'},

                                'nasc_coordenadas' : {"type" : "geo_point"},
                                'res_coordenadas' : {"type" : "geo_point"},

                                'codanomal_capitulo':{'type': 'keyword'},
                                'codanomal_grupo':{'type': 'keyword'},
                                'codanomal_categoria':{'type': 'keyword'},
                                'codanomal_subcategoria':{'type': 'keyword'},
                            }
                        }
                    }
                }

In [None]:
#cria índice no Elasticsearch
es.indices.create(index=indice,body=sinasc_type)

---
### Carrega dados preparados/transformados do Dataiku

In [None]:
data_prepared = dataiku.Dataset("DORES_preparados")

---
### Recupera métricas (record_count) do dataset no Dataiku

In [None]:
def get_metric(project_name,dataset_name,metric_ids):
    client = dataiku.api_client()
    current_project = client.get_project(project_name)
    dataset = current_project.get_dataset(dataset_name)
    metrics = dataset.compute_metrics(partition='ALL', metric_ids=metric_ids)
    metrics = [{'metric':m["metricId"],'value':int(m["value"])} for m in metrics["result"]["computed"] if m["metricId"] in metric_ids][0]

    return metrics

In [None]:
def record_count(project_name,dataset_name):
    return get_metric(project_name,dataset_name,['records:COUNT_RECORDS'])['value']

In [None]:
nrows = record_count('ETLSINASC','DNRES_preparados')

---
### Gera json para indexação

In [None]:
def geraJson(df):
    return json.loads(df.T.to_json())

### Indexação em lote

In [None]:
#tamanho do chunk
chunksize = 10000

#número total de chunks a serem indexados
nchunks = nrows/chunksize

#imprime o número total de documentos a serem indexados
print("Documentos: %i\n"%nrows)

res_bulk=[]

for chunk,df in enumerate(data_prepared.iter_dataframes(chunksize=chunksize)):

    #gera o json do chunk de dados atual (formato pronto para indexação)
    data_json = geraJson(df)

    #imprime o número do chunk atual e o total de chunks a serem indexados
    print("Chunk: %i/%i"%(chunk,nchunks))

    #cria lista de ações para indexação de cada documento do chunk atual
    lista=[]
    for i, item in enumerate(data_json.values()):
        data_dict = {
            '_op_type': 'index',
            '_index': indice,
            '_type': doc_type,
            '_source': item
        }
        lista.append(data_dict)

    #indexa todos os documentos do chunk atual (bulk indexa em chunks)
    res = helpers.bulk(client=es, actions=lista, chunk_size=1000, raise_on_error=False, raise_on_exception = False)
    res_bulk.append(res)

    print(res)

In [None]:
res_df = pd.DataFrame(res_bulk)
res_df.columns = ['indexed_chunksize', 'errors']

# Write recipe outputs
res_Elasticsearch = dataiku.Dataset("bulk_elasticsearch")
res_Elasticsearch.write_with_schema(res_df)