# Python Notebook (incremental) Tests of Elastic Search

## <span style="color:red"> check that elasticsearch is running properly first </span>

### This is for testing that elasticsearch is working on the cluster.

1. get bearings and inventory
2. test that elastic search is callable
3. test that an example database can be created and queried
4. generate a database from my own data pipeline (from streaming data)
5. query that database
6. generate a database from my own data pipeline (from batch data)
7. query that database

In [1]:
from elasticsearch import Elasticsearch
import json
import yaml

**First, make sure that we are in the right place.**

In [2]:
import os
cwd = os.getcwd()
print cwd

/home/ubuntu/insight-taxi-pulse/src/test


In [3]:
ls = os.listdir(cwd)
for f in ls:
    print f

.ipynb_checkpoints
stream_to_elastic.py
testing_elastic_with_notebook_from_test_folder.ipynb
testing_elastic_with_notebook.ipynb
metastore_db
elasticsearch_query.py
config
derby.log


**Now try some elastic stuff.**

In [4]:
def yaml_loader(yaml_file):
	with open(yaml_file) as yml:
		config = yaml.load(yml)
	return config

In [5]:
config_path = 'config/stream_consumer_config.yml'
config = yaml_loader(config_path)
for c in config:
    print c
    print config[c]

topics
['stream_users', 'stream_cars']
namenodeip
34.198.103.9
port
ec2-34-193-153-112.compute-1.amazonaws.com:9092


In [6]:
es = Elasticsearch(hosts=[{'host':'34.198.103.9', 'port':9200}],http_auth=('elastic','changeme'))
print es


<Elasticsearch([{'host': '34.198.103.9', 'port': 9200}])>


In [7]:
es = Elasticsearch(http_auth=('elastic','changeme'))
if not es.ping():
    raise ValueError("Connection failed")

**try running example**  
(https://elasticsearch-py.readthedocs.io/en/master/)

In [8]:
from datetime import datetime
es = Elasticsearch(http_auth=('elastic','changeme'))

doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
}
res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
print(res['created'])

res = es.get(index="test-index", doc_type='tweet', id=1)
print(res['_source'])

es.indices.refresh(index="test-index")

res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

True
{u'text': u'Elasticsearch: cool. bonsai cool.', u'author': u'kimchy', u'timestamp': u'2017-01-31T16:28:48.390982'}
Got 1 Hits:
2017-01-31T16:28:48.390982 kimchy: Elasticsearch: cool. bonsai cool.


** try running elastic search on my own stuff **  
NEED TO RUN DATABASE SCRIPTS FIRST  

*./src/database/elasticsearch_batch.py:		es.index(index=incoming_topic, doc_type=incoming_topic[:-1], id=str(count), body =new_entry)*

*./src/database/elasticsearch_stream.py:		es.index(index=incoming_topic, doc_type=incoming_topic[:-1], id=str(count), body =new_entry)*

In [9]:
## This doesn't work yet because I need to create it
## es.indices.refresh(index="cars")

### Run Streaming Pipeline.

1. spin up cluser and startup services
2. run producer
3. run spark-streaming consumer
4. generate a database from my own data pipeline (from streaming data)
5. query that database (code shown above)


*... running producer testing notebook from producer directory (./src/producer/<'notebook'>)*

In [10]:
for index in es.indices.get('*'):
  print index

test-index
.monitoring-data-2
users
cars
.monitoring-es-2-2017.01.30
.monitoring-es-2-2017.01.31


In [12]:
res = es.search(index="users", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(drop_location)s" % hit["_source"])

Got 18 Hits:
2017-01-31 16:26:33.623 {u'lat': 40.750997, u'lon': -73.97842}
2017-01-31 16:26:19.541 {u'lat': 40.775472, u'lon': -73.97647499999998}
2017-01-31 16:26:26.581 {u'lat': 40.748762, u'lon': -73.993063}
2017-01-31 16:26:27.587 {u'lat': 40.758027, u'lon': -73.983168}
2017-01-31 16:26:29.599 {u'lat': 40.757612, u'lon': -73.967647}
2017-01-31 16:26:31.611 {u'lat': 40.778967, u'lon': -73.958028}
2017-01-31 16:26:16.523 {u'lat': 40.756097, u'lon': -73.972882}
2017-01-31 16:26:18.535 {u'lat': 40.754932, u'lon': -73.998672}
2017-01-31 16:26:23.564 {u'lat': 40.784452, u'lon': -73.954187}
2017-01-31 16:26:34.629 {u'lat': 40.766127, u'lon': -73.980652}
