# Criação de Mapping e Inserção em Bulk

In [12]:
import json
import pandas as pd

In [8]:
import elasticsearch

ES_URL = 'http://localhost:9200'
ES_USER = 'elastic'
ES_PASS = 'elastic123'

client = elasticsearch.Elasticsearch(
    ES_URL,
    basic_auth=(ES_USER, ES_PASS),
    verify_certs=False
)

In [None]:
# Desligando warning para facilitar nossa vida
import urllib3
urllib3.disable_warnings()

# Exploração dos Dados

Trabalharemos sobre o [20 Newsgroup Dataset](http://qwone.com/~jason/20Newsgroups/), que possui as seguintes colunas:

- `newsgroup`: Nome do grupo de discussão
- `message`: Corpo da mensagem, incluindo o cabeçalho

In [13]:
df_data = pd.read_json('20_newsgroup.ndjson', orient='records', lines=True)

In [21]:
df_data

Unnamed: 0,newsgroup,message
0,rec.autos,From: lerxst@wam.umd.edu (where's my thing)\nS...
1,comp.sys.mac.hardware,From: guykuo@carson.u.washington.edu (Guy Kuo)...
2,comp.sys.mac.hardware,From: twillis@ec.ecn.purdue.edu (Thomas E Will...
3,comp.graphics,From: jgreen@amber (Joe Green)\nSubject: Re: W...
4,sci.space,From: jcm@head-cfa.harvard.edu (Jonathan McDow...
...,...,...
11309,sci.med,From: jim.zisfein@factory.com (Jim Zisfein) \n...
11310,comp.sys.mac.hardware,From: ebodin@pearl.tufts.edu\nSubject: Screen ...
11311,comp.sys.ibm.pc.hardware,From: westes@netcom.com (Will Estes)\nSubject:...
11312,comp.graphics,From: steve@hcrlgw (Steven Collins)\nSubject: ...


In [6]:
df_data['newsgroup'].value_counts()

rec.sport.hockey            600
soc.religion.christian      599
rec.motorcycles             598
rec.sport.baseball          597
sci.crypt                   595
rec.autos                   594
sci.med                     594
comp.windows.x              593
sci.space                   593
comp.os.ms-windows.misc     591
sci.electronics             591
comp.sys.ibm.pc.hardware    590
misc.forsale                585
comp.graphics               584
comp.sys.mac.hardware       578
talk.politics.mideast       564
talk.politics.guns          546
alt.atheism                 480
talk.politics.misc          465
talk.religion.misc          377
Name: newsgroup, dtype: int64

Exemplo de mensagem do grupo `sci.med`.

Repare que, pelo padrão da mensagem de newsgroup, que é muito similar a um email, o texto começa com cabeçalhos no formato:

```
<nome>: <valor>
```

Seguido por uma linha em branco, ou seja, uma sequência de dois caracteres de quebra de linha (`\n\n`), seguido pelo corpo da mensagem.

In [7]:
print(df_data.query('newsgroup == "sci.med"').iloc[0]['message'])

From: bmdelane@quads.uchicago.edu (brian manning delaney)
Subject: Brain Tumor Treatment (thanks)
Reply-To: bmdelane@midway.uchicago.edu
Organization: University of Chicago
Lines: 12

There were a few people who responded to my request for info on
treatment for astrocytomas through email, whom I couldn't thank
directly because of mail-bouncing probs (Sean, Debra, and Sharon).  So
I thought I'd publicly thank everyone.

Thanks! 

(I'm sure glad I accidentally hit "rn" instead of "rm" when I was
trying to delete a file last September. "Hmmm... 'News?' What's
this?"....)

-Brian



# Configurações

In [1]:
INDEX_NAME = '20newsgroup_teste_insercao'

In [2]:
import nltk

In [3]:
english_stopwords = nltk.corpus.stopwords.words('english')

In [4]:
custom_stopwords = [
    'from', 'subject', 'lines', 'organization', 'nntp',
    'posting', 'hosts', 'writes', 'host', "i'm", "i've",
    'would', 'like', 'one', 'distribution', 'know', 'get',
    'think', 'even', 'go', 'say', 'many', 'time', 'want',
    'much', 'us', 'could', 'also', 'reply', 're', 'i',
    "i'll", "you", "have", 'university', 'article'
]

In [5]:
stopwords = list(set(english_stopwords) | set(custom_stopwords))

In [6]:
INDEX_MAPPING = {
    "settings": {
        "number_of_shards": 3,
        "analysis":{
            "analyzer": {
                "email_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["asciifolding", "lowercase","replace_numbers", "replace_puctuation", "stop_en",
                               "stop_custom"]
                }
            },
            "filter": {
                "stop_en": {
                    "type": "stop",
                    "stopwords": "_english_",
                },
                "stop_custom": {
                    "type": "stop",

                    "stopwords": stopwords
                },
                "replace_numbers": {
                    "type": "pattern_replace",
                    "pattern": "([0-9]+)",
                    "replacement": ""
                },
                "replace_puctuation": {
                    "type": "pattern_replace",
                    "pattern": "([.,]+)",
                    "replacement": ""
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "newsgroup": {
                "type": "text",
                "fields": {
                    "raw": {
                        "type": "keyword"
                    }
                }
            },
            "message": {
                "type": "text",
                "analyzer": "email_analyzer",
                "fielddata": True,
                "fielddata_frequency_filter": {
                    "min": 0.01,
                    "min_segment_size": 10,
                }
            }
        }
    }
}

# Inserção com operações simples

In [9]:
# Apagar caso o indice exista
if client.indices.exists(index=INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
client.indices.create(index=INDEX_NAME, **INDEX_MAPPING)

  if client.indices.exists(index=INDEX_NAME):
  client.indices.delete(index=INDEX_NAME)
  client.indices.create(index=INDEX_NAME, **INDEX_MAPPING)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': '20newsgroup_teste_insercao'})

In [14]:
%%time
with open('20_newsgroup.ndjson','r') as fin:
    for line in fin:
        document = json.loads(line)
        client.index(index=INDEX_NAME, document=document)



CPU times: user 1min 31s, sys: 1.09 s, total: 1min 32s
Wall time: 2min 34s


# Inserção com Bulk

In [None]:
from elasticsearch.helpers import bulk

Definindo uma **função geradora**, para retornar um documento por vez

In [None]:
def gen_documents(filename):
    with open('20_newsgroup.ndjson','r') as fin:
        for line in fin:
            yield json.loads(line)

Definindo uma função geradora para transformar um **iterável de documentos** em um **iterável de ações bulk**

In [None]:
def gen_index_actions(documents):
    for doc in documents:
        yield {
            '_op_type': 'index',
            '_index': INDEX_NAME,
            '_source': doc,
        }

In [None]:
# Apagar caso o indice exista
if client.indices.exists(index=INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
client.indices.create(index=INDEX_NAME, **INDEX_MAPPING)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': '20newsgroup_teste_insercao'})

In [None]:
%%time
documents = gen_documents('20_newsgroup.ndjson')
actions = gen_index_actions(documents)
        
success, errors = bulk(client, actions)

client.indices.refresh(index=INDEX_NAME)
client.indices.flush(index=INDEX_NAME)

CPU times: user 924 ms, sys: 43.1 ms, total: 967 ms
Wall time: 2.71 s


ObjectApiResponse({'_shards': {'total': 6, 'successful': 3, 'failed': 0}})

# Inserção Bulk Controlando o Refresh

In [None]:
# Apagar caso o indice exista
if client.indices.exists(index=INDEX_NAME):
    client.indices.delete(index=INDEX_NAME)
client.indices.create(index=INDEX_NAME, **INDEX_MAPPING)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': '20newsgroup_teste_insercao'})

In [None]:
%%time

# Desligar Refresh no indice
client.indices.put_settings(index=INDEX_NAME, settings={"index": {"refresh_interval": "-1"}})

# Realizar a inserção em bulk (muitos documentos)
documents = gen_documents('20_newsgroup.ndjson')
actions = gen_index_actions(documents)

success, errors = bulk(client, actions)

# Restaurar a configuração padrão do Refresh
client.indices.put_settings(index=INDEX_NAME, settings={"index": {"refresh_interval": "2s"}})

# Refresh e Flush
client.indices.refresh(index=INDEX_NAME)
client.indices.flush(index=INDEX_NAME)

CPU times: user 881 ms, sys: 39.1 ms, total: 920 ms
Wall time: 3.6 s


ObjectApiResponse({'_shards': {'total': 6, 'successful': 3, 'failed': 0}})

In [15]:
query = {
    'term': {
        'message': 'nice'
    }
}

resp = client.search(index=INDEX_NAME, query=query)

  resp = client.search(index=INDEX_NAME, query=query)


In [16]:
resp

