Skip to content
This repository has been archived by the owner on Sep 23, 2022. It is now read-only.

Commit

Permalink
Added scan and scroll methods + unittests for these two. Updated to v…
Browse files Browse the repository at this point in the history
…ersion 0.5.4
  • Loading branch information
eriky committed Jan 29, 2013
1 parent 4276fd0 commit e8236dd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 19 deletions.
10 changes: 10 additions & 0 deletions README.rst
Expand Up @@ -69,10 +69,20 @@ Dependencies

Changelog
=========
0.5.4
-----
* Added two methods: scan and scroll. With these methods you can now perform
scan queries and scroll through the results.

0.5.3
-----
* Bugfixing

0.5.2
-----
* fixed an issue where the dependency on the requests library was
not automatically procesed by easy_install / pip

0.5.1
-----
* Refactored the bulk API to use more standard methods from ESClient
Expand Down
70 changes: 51 additions & 19 deletions esclient.py
Expand Up @@ -11,7 +11,7 @@

__author__ = 'Erik-Jan van Baaren'
__all__ = ['ESClient']
__version__ = (0, 5, 3)
__version__ = (0, 5, 4)


def get_version():
Expand Down Expand Up @@ -124,7 +124,7 @@ def send_request(self, method, path, body=None, query_string_args={},encode_json
def _search_operation(self, request_type, query_body=None,
operation_type="_search", query_string_args=None,
indexes=["_all"], doctypes=[]):
"""Perform a search operation. This method can be use for search,
"""Perform a search operation. This method can be used for search,
delete by search and count.
Searching in ElasticSearch can be done in two ways:
Expand All @@ -134,27 +134,18 @@ def _search_operation(self, request_type, query_body=None,
You can choose one, but not both at the same time.
"""
if query_body and query_string_args:
raise ESClientException("Found both a query body and query" +
"arguments")
#if query_body and query_string_args:
# raise ESClientException("Found both a query body and query" +
# "arguments")


indexes = ','.join(indexes)
doctypes = ','.join(doctypes)
path = self._make_path([indexes, doctypes, operation_type])

if query_body:
self.send_request(request_type, path, body=query_body)
elif query_string_args:
self.send_request(request_type, path,
query_string_args=query_string_args)
elif operation_type == "_count":
# If both options were not used, there one more option left: no
# query at all. A query is optional when counting, so we fire a
# request to the URL without a query only in this specific case.
self.send_request('GET', path)
else:
raise ESClientException("Mandatory query was not supplied")

self.send_request(request_type, path, body=query_body,
query_string_args=query_string_args)

try:
return self._parse_json_response(self.last_response.text)
except:
Expand Down Expand Up @@ -208,7 +199,48 @@ def search(self, query_body=None, query_string_args=None,
query_string_args=query_string_args, indexes=indexes,
doctypes=doctypes)


def scan(self, query_body=None, query_string_args=None,
indexes=["_all"], doctypes=[], scroll="10m", size=50):
"""Perform a scan search.
The scan search type allows to efficiently scroll a large result
set. This method returns a scroll_id, which can be used to get
more results with the scroll(id=scroll_id) method.
"""
if not query_string_args:
query_string_args = {}

query_string_args["search_type"] = "scan"
query_string_args["scroll"] = scroll
query_string_args["size"] = size

result = self._search_operation('GET', query_body=query_body,
query_string_args=query_string_args, indexes=indexes,
doctypes=doctypes)

return result["_scroll_id"]

def scroll(self, scroll_id, scroll_time="10m"):
"""Get the next batch of results from a scan search.
ElasticSearch will return a new scroll_id to you after every
call to scoll.
A scroll has ended when you get no more resulst from ElasticSeach.
Options:
scroll_id -- the scroll id as returned by the scan method
"""
query_string_args = {}
query_string_args["scroll"] = scroll_time
body = scroll_id

self.send_request('GET', '/_search/scroll', body=body,
query_string_args=query_string_args, encode_json=False)

return json.loads(self.last_response.text)

def delete_by_query(self, query_body=None, query_string_args=None,
indexes=["_all"], doctypes=[]):
"""Delete based on a search operation.
Expand Down
21 changes: 21 additions & 0 deletions test_esclient.py
Expand Up @@ -98,6 +98,27 @@ def test_search_body_api(self):
indexes=['contacts_esclient_test'])
self.assertEqual(result['hits']['total'], 2)

def test_scan_scroll_api(self):
query_body= {
"query": {
"match_all": {}
}
}
scroll_id = self.es.scan(query_body=query_body, indexes=['contacts_esclient_test'], size=1)

total_docs = 0
while True:
count = 0
res = self.es.scroll(scroll_id)
for hit in res['hits']['hits']:
total_docs += 1
count += 1
if count == 0:
break

self.assertEqual(total_docs, 2)


def test_deletebyquery_querystring_api(self):
"""Delete documents with a query using querystring option"""
query_string_args = {
Expand Down

0 comments on commit e8236dd

Please sign in to comment.