Skip to content

Commit

Permalink
es doc
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Kubajda committed Apr 4, 2019
1 parent 9ad4914 commit 52edf32
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 38 deletions.
7 changes: 7 additions & 0 deletions bspump/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
from .source import ElasticSearchSource, ElasticSearchAggsSource
from .lookup import ElasticSearchLookup

__all__ = [
"ElasticSearchConnection",
"ElasticSearchSink",
"ElasticSearchSource",
"ElasticSearchAggsSource",
"ElasticSearchLookup"
]
26 changes: 25 additions & 1 deletion bspump/elasticsearch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,31 @@


class ElasticSearchConnection(Connection):
"""
ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance
usage:
1. adding connection to PumpService
.. code:: python
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
2. pass connection name ("ESConnection" in our example) to relevant BSPump's object:
.. code:: python
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
"""

ConfigDefaults = {
'url': 'http://localhost:9200/', # Could be multiline, each line is a URL to a node in ElasticSearch cluster
Expand All @@ -29,7 +54,6 @@ class ElasticSearchConnection(Connection):
'allowed_bulk_response_codes': '201',
}


def __init__(self, app, connection_id, config=None):
super().__init__(app, connection_id, config=config)

Expand Down
41 changes: 27 additions & 14 deletions bspump/elasticsearch/lookup.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
import abc
import requests
import logging
import json

from ..abc.lookup import MappingLookup

L = logging.getLogger(__name__)


class ElasticSearchLookup(MappingLookup):

'''
The lookup that is linked with a ES.
It provides a mapping (dictionary-like) interface to pipelines.
It feeds lookup data from ES using a query.
It also has a simple cache to reduce a number of datbase hits.
"""
The lookup that is linked with a ES.
It provides a mapping (dictionary-like) interface to pipelines.
It feeds lookup data from ES using a query.
It also has a simple cache to reduce a number of datbase hits.
**configs**
*index* - Elastic's index
*key* - field name to match
Example:
*scroll_timeout* - Timeout of single scroll request (default is '1m'). Allowed time units:
https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
class ProjectLookup(bspump.elasticsearch.ElasticSearchLookup):
Example:
async def count(self, database):
return await database['projects'].count_documents({})
.. code:: python
def find_one(self, database, key):
return database['projects'].find_one({'_id':key})
class ProjectLookup(bspump.elasticsearch.ElasticSearchLookup):
'''
async def count(self, database):
return await database['projects'].count_documents({})
def find_one(self, database, key):
return database['projects'].find_one({'_id':key})
"""

ConfigDefaults = {
'index': '', # Specify an index
Expand All @@ -44,8 +59,6 @@ def __init__(self, app, lookup_id, es_connection, config=None):
metrics_service = app.get_service('asab.MetricsService')
self.CacheCounter = metrics_service.create_counter("es.lookup", tags={}, init_values={'hit': 0, 'miss': 0})



def _find_one(self, key):
prefix = '_search'
request = {
Expand Down
5 changes: 4 additions & 1 deletion bspump/elasticsearch/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#

class ElasticSearchSink(Sink):
"""
ElasticSearchSink allows you to insert events into ElasticSearch through POST requests
"""


ConfigDefaults = {
Expand All @@ -24,7 +28,6 @@ class ElasticSearchSink(Sink):
"rollover_mechanism": 'time',
"max_index_size": 30*1024*1024*1024, #This is 30GB
"timeout": 30,

}


Expand Down
79 changes: 57 additions & 22 deletions bspump/elasticsearch/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,37 @@

class ElasticSearchSource(TriggerSource):
"""
request_body - https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
scroll_timeout - Timeout of single scroll request. Allowed time units:
ElasticSearchSource is using standard Elastic's search API to fetch data.
**configs**
*index* - Elastic's index (default is 'index-``*``').
*scroll_timeout* - Timeout of single scroll request (default is '1m'). Allowed time units:
https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
**specific pamameters**
*paging* - boolean (default is True)
*request_body* - dictionary described by Elastic's doc:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
.. code:: python
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
"""

ConfigDefaults = {
Expand All @@ -30,8 +57,6 @@ def __init__(self, app, pipeline, connection, request_body=None, paging=True, id
self.ScrollTimeout = self.Config['scroll_timeout']
self.Paging = paging


#print("index", self.Index)
if request_body is not None:
self.RequestBody = request_body
else:
Expand All @@ -44,7 +69,6 @@ def __init__(self, app, pipeline, connection, request_body=None, paging=True, id
}
}}


async def cycle(self):

scroll_id = None
Expand Down Expand Up @@ -91,7 +115,32 @@ async def cycle(self):

class ElasticSearchAggsSource(TriggerSource):
"""
request_body - https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
ElasticSearchAggsSource is used for Elastic's search aggregations.
**configs**
*index*: - Elastic's index (default is 'index-``*``').
**specific pamameters**
*request_body*
dictionary described by Elastic's doc:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
.. code:: python
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
"""

ConfigDefaults = {
Expand All @@ -117,9 +166,6 @@ def __init__(self, app, pipeline, connection, request_body=None, id=None, config
}
}




async def cycle(self):
request_body = self.RequestBody
path = '{}/_search?'.format(self.Index)
Expand All @@ -140,7 +186,6 @@ async def cycle(self):

aggs = msg['aggregations']


if len(aggs) == 0:
return

Expand All @@ -150,9 +195,6 @@ async def cycle(self):
path = {}
await self.process_aggs(path, start_name, start)




async def process_aggs(self, path, aggs_name, aggs):

if 'buckets' in aggs:
Expand All @@ -166,15 +208,14 @@ async def process_aggs(self, path, aggs_name, aggs):
await self.process(event)
path.pop(aggs_name)




async def process_buckets(self, path, parent, buckets):
'''
Recursive function for buckets processing.
It iterates through keys of the dictionary, looking for 'buckets' or 'value'.
If there are 'buckets', calls itself, if there is 'value', calls process_aggs
and sends an event to process
'''

for bucket in buckets:
Expand All @@ -183,9 +224,3 @@ async def process_buckets(self, path, parent, buckets):
path[parent] = bucket[k]
elif isinstance(bucket[k], dict):
await self.process_aggs(path, k, bucket[k])






1 change: 1 addition & 0 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bspump
import bspump.kafka
import bspump.influxdb
import bspump.elasticsearch

# -- Project information -----------------------------------------------------

Expand Down
4 changes: 4 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ Welcome to BSPump reference documentation!
:members:
:undoc-members:

.. automodule:: bspump.elasticsearch
:members:
:undoc-members:

Indices and tables
==================

Expand Down

0 comments on commit 52edf32

Please sign in to comment.