# Working with Elasticsearch

### For Elastic Stack guidance, see [Elastic Stack and Product Documentation](https://www.elastic.co/guide/index.html)

### Python Elasticsearch Client docs: http://elasticsearch-py.readthedocs.io

### Import necessary modules

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from IPython.core.interactiveshell import InteractiveShell
import pandas as pd
import pathlib
import time
# twittertools is my local twittertools.py module
import twittertools

In [2]:
InteractiveShell.ast_node_interactivity = 'all'

In [3]:
def cat_to_DataFrame(string_):
    """
    Converts a multi-line input string to a pandas DataFrame,
    using line 0 as the assumed table header.
    """
    lines = [line.split() for line in string_.split('\n') if line]
    return pd.DataFrame(data=lines[1:], columns=lines[0])

### Instantiate Elasticsearch object

In [4]:
# Default connection to localhost:9200
es = Elasticsearch()

### Print system information for reference

In [5]:
reports = [('Health:', es.cat.health(v=True)), ('Nodes:', es.cat.nodes(v=True)),
           ('Shards:', es.cat.shards(v=True)), ('Indices:', es.cat.indices(v=True))]
for name, report in reports:
    print(name)
    cat_to_DataFrame(report)

Health:


Unnamed: 0,epoch,timestamp,cluster,status,node.total,node.data,shards,pri,relo,init,unassign,pending_tasks,max_task_wait_time,active_shards_percent
0,1512595386,14:23:06,elasticsearch,yellow,1,1,6,6,0,0,6,0,-,50.0%


Nodes:


Unnamed: 0,ip,heap.percent,ram.percent,cpu,load_1m,load_5m,load_15m,node.role,master,name
0,127.0.0.1,15,25,45,1.17,0.64,0.5,mdi,*,1Y86nkN


Shards:


Unnamed: 0,index,shard,prirep,state,docs,store,ip,node
0,.kibana,0,p,STARTED,2.0,10.9kb,127.0.0.1,1Y86nkN
1,.kibana,0,r,UNASSIGNED,,,,
2,twitter,4,p,STARTED,1936.0,1.4mb,127.0.0.1,1Y86nkN
3,twitter,4,r,UNASSIGNED,,,,
4,twitter,3,p,STARTED,1904.0,1.3mb,127.0.0.1,1Y86nkN
5,twitter,3,r,UNASSIGNED,,,,
6,twitter,1,p,STARTED,1922.0,1.3mb,127.0.0.1,1Y86nkN
7,twitter,1,r,UNASSIGNED,,,,
8,twitter,2,p,STARTED,1921.0,1.3mb,127.0.0.1,1Y86nkN
9,twitter,2,r,UNASSIGNED,,,,


Indices:


Unnamed: 0,health,status,index,uuid,pri,rep,docs.count,docs.deleted,store.size,pri.store.size
0,yellow,open,twitter,zcJjp6EYRNmgjKQzuuY1ug,5,1,9668,0,6.9mb,6.9mb
1,yellow,open,.kibana,Yuy-vovlQyeUPVim2PcdFw,1,1,2,0,10.9kb,10.9kb


### Get tweets from a few Twitter user timelines; index them in Elasticsearch

In [6]:
# For this demonstration only, delete any existing /twitter index
result = es.indices.delete(index='twitter', ignore=[400, 404])

In [7]:
# Create Authenticated TwitterTools object
filepath = pathlib.Path.home().joinpath('.twitter', 'credentials.json')
twt = twittertools.TwitterTools(filepath)

In [8]:
all_indexed = 0
total_tweets = 0
screen_names = ['pourmecoffee', 'washingtonpost', 'brainpicker', 'wilw']
for screen_name in screen_names: 
    tweets = twt.get_user_timeline(screen_name)
    total_tweets += len(tweets)
    print(f"{len(tweets)} tweets retrieved from @{screen_name}'s timeline;", end=' ')
    total_indexed = 0
    for tweet in tweets:
        doc = twittertools.unpack_tweet(tweet)
        result = es.index(index='twitter', doc_type='tweet', body=doc)
        if result['_shards']['successful']:
            total_indexed += 1
    print(f'{total_indexed} indexed', flush=True)
    all_indexed += total_indexed
print('Total tweets indexed:', all_indexed)

3203 tweets retrieved from @pourmecoffee's timeline; 3203 indexed
3244 tweets retrieved from @washingtonpost's timeline; 3244 indexed
3222 tweets retrieved from @brainpicker's timeline; 3222 indexed
3195 tweets retrieved from @wilw's timeline; 3195 indexed
Total tweets indexed: 12864


### Confirm number of tweets indexed

In [9]:
# There may be latency between indexing and getting
# complete search results. Introduce a short wait...
max_wait = 5.0  # seconds
sleep_wait = 0.50
sleep_count = 0
search = Search(using=es, index='twitter', doc_type='tweet')
while True:
    doc_count = search.count()
    if doc_count == all_indexed:
        break
    sleep_count += 1
    if sleep_count*sleep_wait > max_wait:
        break
    time.sleep(sleep_wait)
print(f'\nFound {doc_count} indexed tweets')


Found 12864 indexed tweets


### Perform simple match query on tweet texts

In [10]:
results = search.query("match", text="NASA").execute()
print(f'Got {results["hits"]["total"]} matches')
print('Top ten by relevance score:')
for hit in results['hits']['hits']:
    tweet = hit['_source']
    print('-', tweet['screen_name'], tweet['created'], tweet['text'])

Got 36 matches
Top ten by relevance score:
- pourmecoffee 2017-06-29T17:40:25Z @NASA @CassiniSaturn pierogi mmm
- pourmecoffee 2017-06-05T22:31:52Z @NASA Like my tweets.
- pourmecoffee 2017-08-18T11:51:27Z @NASA @NASA_TDRS You're firing on Earth do you take me for a fool?
- pourmecoffee 2017-07-12T18:24:04Z @NASA Finally some non-Manhatten-sized icebergs.
- washingtonpost 2017-11-28T18:58:00Z A NASA astronaut films his spacewalk — and a breathtaking view of Earth https://t.co/3WrPgggQVF
- washingtonpost 2017-11-16T05:47:56Z These are the melting glaciers that might someday drown your city, according to NASA https://t.co/f2fd7fUexU
- washingtonpost 2017-11-29T08:05:43Z A NASA astronaut films his spacewalk — and a breathtaking view of Earth https://t.co/ZaCEVRbTME
- washingtonpost 2017-11-19T15:04:44Z Perspective: Please stop annoying this NASA scientist with your ridiculous Planet X doomsday theories https://t.co/soTGVzQqqm
- washingtonpost 2017-11-19T00:52:20Z Perspective: Please stop 