Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial bulker implementation

  • Loading branch information...
commit fd46bc1a6c827d41826652fd24155f8c2bdef7a2 1 parent 872cfe9
Alberto Paro authored
39 performance/performance.py
View
@@ -1,8 +1,13 @@
-from pyes import ES
+import sys
+
+#sys.path.insert(0, "../")
+
+#from pyes import ES
+from brainaetic.echidnasearch.es import ES
from datetime import datetime
import shelve
#conn = ES('127.0.0.1:9200')
-conn = ES('192.168.1.51:9200')
+conn = ES('192.168.2.50:9200')
try:
conn.delete_index("test-index")
except:
@@ -10,23 +15,23 @@
dataset = shelve.open("samples.shelve")
-mapping = { u'description': {'boost': 1.0,
- 'index': 'analyzed',
- 'store': 'yes',
- 'type': u'string',
- "term_vector" : "with_positions_offsets"
- },
- u'name': {'boost': 1.0,
- 'index': 'analyzed',
- 'store': 'yes',
- 'type': u'string',
- "term_vector" : "with_positions_offsets"
- },
- u'age': {'store': 'yes',
+mapping = {u'description': {'boost': 1.0,
+ 'index': 'analyzed',
+ 'store': 'yes',
+ 'type': u'string',
+ "term_vector": "with_positions_offsets"
+},
+ u'name': {'boost': 1.0,
+ 'index': 'analyzed',
+ 'store': 'yes',
+ 'type': u'string',
+ "term_vector": "with_positions_offsets"
+ },
+ u'age': {'store': 'yes',
'type': u'integer'},
- }
+ }
conn.create_index("test-index")
-conn.put_mapping("test-type", {'properties':mapping}, ["test-index"])
+conn.put_mapping("test-type", {'properties': mapping}, ["test-index"])
start = datetime.now()
for k, userdata in dataset.items():
4 pyes/contrib/__init__.py
View
@@ -0,0 +1,4 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+__author__ = 'alberto'
36 pyes/contrib/mappings.py
View
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from pyes.es import ES
+from pyes import mappings
+
+def mappings_to_code(obj, doc_count=0):
+ result = []
+ odict = obj.as_dict()
+ if isinstance(obj, (mappings.DocumentObjectField, mappings.ObjectField, mappings.NestedObject)):
+ properties = odict.pop("properties", [])
+ doc_count += 1
+ kwargs = ["name=%r" % obj.name,
+ "type=%r" % odict.pop("type")] +\
+ ["%s=%r" % (k, odict[k]) for k in sorted(odict.keys())]
+ result.append(
+ "doc%d=" % doc_count + str(type(obj)).split(".")[-1].strip("'>") + "(" + ', '.join(kwargs) + ")")
+ for k in sorted(obj.properties.keys()):
+ result.extend(mappings_to_code(obj.properties[k], doc_count))
+ else:
+ kwargs = ["name=%r" % obj.name,
+ "type=%r" % odict.pop("type"),
+ "store=%r" % obj.store,
+ "index=%r" % odict.pop("index")] +\
+ ["%s=%r" % (k, odict[k]) for k in sorted(odict.keys())]
+ result.append("doc%d.add_property(" % doc_count +\
+ str(type(obj)).split(".")[-1].strip("'>") + "(" +\
+ ', '.join(kwargs) + "))")
+
+ return result
+
+if __name__ == '__main__':
+ es = ES("192.168.1.1:9200")
+ res = mappings_to_code(es.mappings.get_doctype("twitter", "twitter"))
+ print "\n".join(res)
+
+
111 pyes/es.py
View
@@ -105,7 +105,7 @@ def save(self, bulk=False, id=None, parent=None, force=False):
if force:
version = None
res = conn.index(self,
- meta.index, meta.type, id, parent=parent, bulk=bulk, version=version, force_insert=force)
+ meta.index, meta.type, id, parent=parent, bulk=bulk, version=version, force_insert=force)
if not bulk:
self._meta.id = res._id
self._meta.version = res._version
@@ -229,6 +229,38 @@ def dict_to_object(self, d):
return DotDict(d)
+class Bulker(object):
+ def __init__(self, conn, bulk_size=400, raise_on_bulk_item_failure=False):
+ self.conn = conn
+ self.bulk_size = bulk_size
+ # protects bulk_data
+ self.bulk_lock = threading.RLock()
+ with self.bulk_lock:
+ self.bulk_data = []
+ self.raise_on_bulk_item_failure = raise_on_bulk_item_failure
+
+ def add_to_bulk_queue(self, content):
+ with self.bulk_lock:
+ self.bulk_data.append(content)
+
+ def flush_bulker(self, forced=False):
+ with self.bulk_lock:
+ if forced or len(self.bulk_data) >= self.bulk_size:
+ batch = self.bulk_data
+ self.bulk_data = []
+ else:
+ return None
+
+ if len(batch) > 0:
+ bulk_result = self.conn._send_request("POST",
+ "/_bulk",
+ "\n".join(batch) + "\n")
+
+ if self.raise_on_bulk_item_failure:
+ _raise_exception_if_bulk_item_failed(bulk_result)
+
+ return bulk_result
+
class ES(object):
"""
ES connection object.
@@ -237,7 +269,7 @@ class ES(object):
encoder = ESJsonEncoder
decoder = ESJsonDecoder
- def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
+ def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
encoder=None, decoder=None,
max_retries=3,
default_indices=['_all'],
@@ -246,7 +278,8 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
model=ElasticSearchModel,
basic_auth=None,
raise_on_bulk_item_failure=False,
- document_object_field=None):
+ document_object_field=None,
+ bulker_class=Bulker):
"""
Init a es object.
Servers can be defined in different forms:
@@ -286,6 +319,7 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
self.connection = None
self._mappings = None
self.document_object_field = document_object_field
+ self.bulker_class = bulker_class
if model is None:
model = lambda connection, model: model
@@ -303,11 +337,7 @@ def __init__(self, server="localhost:9200", timeout=5.0, bulk_size=400,
#used in bulk
self.bulk_size = bulk_size #size of the bulk
- # protects bulk_data
- self.bulk_lock = threading.RLock()
- with self.bulk_lock:
- self.bulk_data = []
- self.raise_on_bulk_item_failure = raise_on_bulk_item_failure
+ self.bulker = bulker_class(self, bulk_size=bulk_size, raise_on_bulk_item_failure=raise_on_bulk_item_failure)
if encoder:
self.encoder = encoder
@@ -333,16 +363,16 @@ def __del__(self):
Destructor
"""
# Don't bother getting the lock
- if len(self.bulk_data) > 0:
+ if self.bulker:

Hi,

Looking at what the error message suggests (calling force_bulk()), wouldn't this condition always be True anyway?

Alberto Paro Owner
aparo added a note

yeah, we need to propagate the check if the bulker content is empty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
# It's not safe to rely on the destructor to flush the queue:
# the Python documentation explicitly states "It is not guaranteed
# that __del__() methods are called for objects that still exist "
# when the interpreter exits."
logger.error("pyes object %s is being destroyed, but bulk "
"operations have not been flushed. Call force_bulk()!",
- self)
+ self)
# Do our best to save the client anyway...
- self.force_bulk()
+ self.bulker.force_bulk()
def _check_servers(self):
"""Check the servers variable and convert in a valid tuple form"""
@@ -405,12 +435,17 @@ def _init_connection(self):
if _type in ["http", "https"]:
self.connection = http_connect(
[(_type, host, port) for _type, host, port in self.servers if _type in ["http", "https"]],
- timeout=self.timeout, basic_auth=self.basic_auth,
- max_retries=self.max_retries)
+ timeout=self.timeout
+ ,
+ basic_auth=self.basic_auth
+ ,
+ max_retries=self.max_retries)
return
elif _type == "thrift":
self.connection = thrift_connect([(host, port) for _type, host, port in self.servers if _type == "thrift"],
- timeout=self.timeout, max_retries=self.max_retries)
+ timeout=self.timeout
+ ,
+ max_retries=self.max_retries)
def _discovery(self):
"""
@@ -444,7 +479,7 @@ def _send_request(self, method, path, body=None, params=None, headers=None, raw=
else:
body = ""
request = RestRequest(method=Method._NAMES_TO_VALUES[method.upper()],
- uri=path, parameters=params, headers=headers, body=body)
+ uri=path, parameters=params, headers=headers, body=body)
if self.dump_curl is not None:
self._dump_curl_request(request)
@@ -536,8 +571,8 @@ def _set_default_indices(self, default_indices):
@property
def mappings(self):
if self._mappings is None:
- self._mappings = Mapper(self.get_mapping(["_all"]), connection=self,
- document_object_field=self.document_object_field)
+ self._mappings = Mapper(self.get_mapping(indices=["_all"]), connection=self,
+ document_object_field=self.document_object_field)
return self._mappings
#---- Admin commands
@@ -799,7 +834,7 @@ def optimize(self, indices=None,
only_expunge_deletes=only_expunge_deletes,
refresh=refresh,
flush=flush,
- )
+ )
if max_num_segments is not None:
params['max_num_segments'] = max_num_segments
result = self._send_request('POST', path, params=params)
@@ -974,9 +1009,6 @@ def cluster_stats(self, nodes=None):
path = self._make_path(parts)
return self._send_request('GET', path)
- def _add_to_bulk_queue(self, content):
- with self.bulk_lock:
- self.bulk_data.append(content)
def index_raw_bulk(self, header, document):
"""
@@ -984,7 +1016,7 @@ def index_raw_bulk(self, header, document):
header and document must be string "\n" ended
"""
- self._add_to_bulk_queue(u"%s%s" % (header, document))
+ self.bulker.add_to_bulk_queue(u"%s%s" % (header, document))
return self.flush_bulk()
def index(self, doc, index, doc_type, id=None, parent=None,
@@ -1018,7 +1050,7 @@ def index(self, doc, index, doc_type, id=None, parent=None,
if isinstance(doc, dict):
doc = json.dumps(doc, cls=self.encoder)
command = "%s\n%s" % (json.dumps(cmd, cls=self.encoder), doc)
- self._add_to_bulk_queue(command)
+ self.bulker.add_to_bulk_queue(command)
return self.flush_bulk()
if force_insert:
@@ -1063,22 +1095,7 @@ def flush_bulk(self, forced=False):
"""
Send pending operations if forced or if the bulk threshold is exceeded.
"""
- with self.bulk_lock:
- if forced or len(self.bulk_data) >= self.bulk_size:
- batch = self.bulk_data
- self.bulk_data = []
- else:
- return None
-
- if len(batch) > 0:
- bulk_result = self._send_request("POST",
- "/_bulk",
- "\n".join(batch) + "\n")
-
- if self.raise_on_bulk_item_failure:
- _raise_exception_if_bulk_item_failed(bulk_result)
-
- return bulk_result
+ self.bulker.flush_bulk(forced)
def force_bulk(self):
"""
@@ -1139,7 +1156,7 @@ def update(self, extra_doc, index, doc_type, id, querystring_args=None,
new_doc = current_doc
try:
return self.index(new_doc, index, doc_type, id,
- version=current_doc._meta.version, querystring_args=querystring_args)
+ version=current_doc._meta.version, querystring_args=querystring_args)
except VersionConflictEngineException:
if attempt <= 0:
raise
@@ -1154,7 +1171,7 @@ def delete(self, index, doc_type, id, bulk=False, querystring_args=None):
if bulk:
cmd = {"delete": {"_index": index, "_type": doc_type,
"_id": id}}
- self._add_to_bulk_queue(json.dumps(cmd, cls=self.encoder))
+ self.bulker.add_to_bulk_queue(json.dumps(cmd, cls=self.encoder))
return self.flush_bulk()
path = self._make_path([index, doc_type, id])
@@ -1258,8 +1275,8 @@ def mget(self, ids, index=None, doc_type=None, routing=None, **get_params):
if routing:
get_params["routing"] = routing
results = self._send_request('GET', "/_mget",
- body={'docs': body},
- params=get_params)
+ body={'docs': body},
+ params=get_params)
if 'docs' in results:
model = self.model
return [model(self, item) for item in results['docs']]
@@ -1369,7 +1386,7 @@ def count(self, query=None, indices=None, doc_types=None, **query_params):
if doc_types is None:
doc_types = []
if query is None:
- from ..query import MatchAllQuery
+ from .query import MatchAllQuery
query = MatchAllQuery()
if hasattr(query, 'to_query_json'):
@@ -1542,7 +1559,7 @@ def _do_search(self, auto_increment=False):
self.query.size = self.chuck_size
self._results = self.connection.search_raw(self.query, indices=self.indices,
- doc_types=self.doc_types, **self.query_params)
+ doc_types=self.doc_types, **self.query_params)
if 'search_type' in self.query_params and self.query_params['search_type'] == "scan":
self.scroller_parameters['search_type'] = self.query_params['search_type']
del self.query_params['search_type']
@@ -1561,7 +1578,7 @@ def _do_search(self, auto_increment=False):
else:
try:
self._results = self.connection.search_scroll(self.scroller_id,
- self.scroller_parameters.get("scroll", "10m"))
+ self.scroller_parameters.get("scroll", "10m"))
self.scroller_id = self._results['_scroll_id']
except ReduceSearchPhaseException:
#bad hack, should be not hits on the last iteration
@@ -1670,7 +1687,7 @@ def get_start_end(val):
query['size'] = end - start
results = self.connection.search_raw(query, indices=self.indices,
- doc_types=self.doc_types, **self.query_params)
+ doc_types=self.doc_types, **self.query_params)
hits = results['hits']['hits']
if not isinstance(val, slice):
3  pyes/mappings.py
View
@@ -484,7 +484,8 @@ def _process(self, data):
for indexname, indexdata in data.items():
self.indices[indexname] = {}
for docname, docdata in indexdata.items():
- o = get_field(docname, docdata)
+ o = get_field(docname, docdata, "document",
+ document_object_field=self.document_object_field)
o.connection = self.connection
o.index_name = indexname
self.indices[indexname][docname] = o
Vilhelm K. Vardøy

Hi,

Looking at what the error message suggests (calling force_bulk()), wouldn't this condition always be True anyway?

Alberto Paro
Owner

yeah, we need to propagate the check if the bulker content is empty.

Please sign in to comment.
Something went wrong with that request. Please try again.