# Managing ElasticSearch DB with Python

#### Import/install some libraries like elasticsearch, elasticsearch-dsl, certifi, ujson
- elasticsearch and elasticsearch-dsl are Official low-level client for Elasticsearch for python
- certifi is  curated collection of Root Certificates for validating the trustworthiness of SSL certificates while verifying the identity of TLS hosts
- UltraJSON is an ultra fast JSON encoder and decoder written in pure C with bindings for Python 2.5+ and 3.
- Requests is one the best HTTP library for Python

In [20]:
#In case these libraries are not installed
#!pip install elasticsearch
#!pip install elasticsearch-dsl
#!pip install ujson 
#!pip install requests 
#!pip install -U certifi
#!pip install docker==2.0.1
#!pip install pandasticsearch[pandas]

import requests
import ujson as json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import Search, DocType, Date, Integer, Keyword, Text
from datetime import datetime
from elasticsearch_dsl.connections import connections
import pandas as pd

ES_HOST = 'http://ec2-52-91-189-234.compute-1.amazonaws.com:9200'
# Define a default Elasticsearch client
es = Elasticsearch(ES_HOST)


In [4]:
##Creating a Database in ElasticSearch to playwith
FILE_URL = "http://apps.sloanahrens.com/qbox-blog-resources/kaggle-titanic-data/test.csv"
INDEX_NAME = 'titanic'
TYPE_NAME = 'passenger'
ID_FIELD = 'passengerid'

### List all Indices

In [5]:
# list all the indices
indices=es.indices.get_alias().keys()
sorted(indices)
print("There are \033[1m-{x}-\033[0m indices in ElasticSearch".format(x=len(indices)))
for index in sorted(indices):
    print(index)


There are [1m-13-[0m indices in ElasticSearch
.kibana
books
index_test
logstash-2017.05.22
logstash-2017.05.23
megacorp
schools
something
test
test-index
test-index1
test_index
titanic


In [92]:
# save match all query as python variable
myquery={"query": 
         {"match_all": {}
         }}

# execute the query using body parameter and return total number of records
# select count(*) from table
res = es.search(index="titanic", body=myquery)
for x in range(0, res['hits']['total']):
    print("\n" + str(x+1))
    for key, value in res['hits']['hits'][x]['_source'].items():
        print(str(key) + ": " + str(value))
    if x == 2:
      print("\n--------Too many documents. We are breaking here---------")
      break 
        
    



1
Cabin: D34
Ticket: 113778
Sex: male
Fare: 26.55
SibSp: 0
Age: None
PassengerId: 933
Parch: 0
Pclass: 1
Embarked: S
Name: Franklin, Mr. Thomas Parham

2
Cabin: None
Ticket: 2657
Sex: female
Fare: 7.2292
SibSp: 0
Age: 18.0
PassengerId: 900
Parch: 0
Pclass: 3
Embarked: C
Name: Abrahim, Mrs. Joseph (Sophie Halaut Easu)

3
Cabin: None
Ticket: 237249
Sex: female
Fare: 13.0
SibSp: 0
Age: 30.0
PassengerId: 935
Parch: 0
Pclass: 2
Embarked: S
Name: Corbett, Mrs. Walter H (Irene Colvin)

--------Too many documents. We are breaking here---------


In [49]:
 myquery={
     "query": {
         "match_all": {
              "Name" : "Abrahim"
             }
         }, "_source" : "false"
        }
res = es.search(index="titanic", body=myquery)
res['hits']['hits'][0]

GET http://ec2-52-91-189-234.compute-1.amazonaws.com:9200/titanic/_search [status:400 request:0.004s]


RequestError: TransportError(400, 'parsing_exception', '[match_all] unknown field [Name], parser not found')

In [14]:

#es.create(index="test", doc_type="articles", body={"content": "One more fox"})
es.get(id='78', index="test")



{'_id': '78',
 '_index': 'test',
 '_source': {'content': 'One more fox'},
 '_type': 'articles',
 '_version': 1,
 'found': True}

In [21]:
doc = {
    'author': 'kimchy',
    'text': 'Nothing lasts',
    'timestamp': datetime.now(),
}
res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
print(res['created'])

False


In [63]:
s = Search(using=es, index="test-index") \
    .filter("term", category="search") \
    .query("match", title="Elastic")   \
    .exclude("match", description="beta")


s.aggs.bucket('per_tag', 'terms', field='tags') \
    .metric('max_lines', 'max', field='lines')

response = s.execute()
print
for hit in response:
    print(hit.meta.score, hit.title)

for tag in response.aggregations.per_tag.buckets:
    print(tag.key, tag.max_lines.value)

In [33]:
res = es.get(index="test-index", doc_type='tweet', id=1)
print(res)

{'_type': 'tweet', '_index': 'test-index', '_source': {'text': 'Nothing lasts', 'timestamp': '2017-05-23T15:46:58.908242', 'author': 'kimchy'}, 'found': True, '_id': '1', '_version': 8}


In [24]:
doc = {
        "query": {
                    "match_all": {}
                }
        }
es.search(index="test-index", body=doc)




{'_shards': {'failed': 0, 'successful': 5, 'total': 5},
 'hits': {'hits': [{'_id': '1',
    '_index': 'test-index',
    '_score': 1.0,
    '_source': {'author': 'kimchy',
     'text': 'Nothing lasts',
     'timestamp': '2017-05-23T15:46:58.908242'},
    '_type': 'tweet'}],
  'max_score': 1.0,
  'total': 1},
 'timed_out': False,
 'took': 0}

In [85]:
res  = es.search(index="test-index", body=doc)

for hit in res['hits']['hits']:
    print(hit)
    
df = json_normalize(res['hits']['hits'])
df.head()

{'_type': 'tweet', '_index': 'test-index', '_id': '1', '_score': 1.0, '_source': {'text': 'Nothing lasts', 'timestamp': '2017-05-23T15:46:58.908242', 'author': 'kimchy'}}


Unnamed: 0,_id,_index,_score,_source.author,_source.text,_source.timestamp,_type
0,1,test-index,1.0,kimchy,Nothing lasts,2017-05-23T15:46:58.908242,tweet


In [32]:
#/test/mytype/_mapping

res = es.get_source(index="test-index", doc_type='tweet', id=1)
print(res)
res = es.

{'text': 'Nothing lasts', 'timestamp': '2017-05-23T15:46:58.908242', 'author': 'kimchy'}


In [11]:
es.indices.create(index='test-index1', ignore=[400,404])

{'error': {'index': 'test-index1',
  'index_uuid': 'fY3utQb5T-a35zTIEjLnPg',
  'reason': 'index [test-index1/fY3utQb5T-a35zTIEjLnPg] already exists',
  'root_cause': [{'index': 'test-index1',
    'index_uuid': 'fY3utQb5T-a35zTIEjLnPg',
    'reason': 'index [test-index1/fY3utQb5T-a35zTIEjLnPg] already exists',
    'type': 'index_already_exists_exception'}],
  'type': 'index_already_exists_exception'},
 'status': 400}

In [12]:
es.search(index='test-index', filter_path=['hits.hits', 'hits.hits._source'])

{'hits': {'hits': [{'_id': '1',
    '_index': 'test-index',
    '_score': 1.0,
    '_source': {'author': 'kimchy',
     'text': 'Nothing lasts',
     'timestamp': '2017-05-23T13:11:43.944443'},
    '_type': 'tweet'}]}}

In [13]:
es.search(index='test-index', filter_path=['hits.hits._source'])

{'hits': {'hits': [{'_source': {'author': 'kimchy',
     'text': 'Nothing lasts',
     'timestamp': '2017-05-23T13:11:43.944443'}}]}}

In [14]:
es.search(index='test-index')

{'_shards': {'failed': 0, 'successful': 5, 'total': 5},
 'hits': {'hits': [{'_id': '1',
    '_index': 'test-index',
    '_score': 1.0,
    '_source': {'author': 'kimchy',
     'text': 'Nothing lasts',
     'timestamp': '2017-05-23T13:11:43.944443'},
    '_type': 'tweet'}],
  'max_score': 1.0,
  'total': 1},
 'timed_out': False,
 'took': 0}