Skip to content

Commit

Permalink
Perform operations in bulk (fixes #5)
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed May 22, 2017
1 parent eb8d456 commit 176db56
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Changelog
**New features**

- Flush indices when server is flushed (fixes #4)
- Perform insertions and deletion in bulk for better efficiency (fixes #5)


0.0.1 (2017-05-22)
Expand Down
57 changes: 35 additions & 22 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from contextlib import contextmanager

import elasticsearch

from pyramid.settings import aslist


Expand All @@ -13,38 +13,51 @@ def __init__(self, hosts, prefix="kinto"):
self.client = elasticsearch.Elasticsearch(hosts)
self.prefix = prefix

def indexname(self, bucket_id, collection_id):
return "{}-{}-{}".format(self.prefix, bucket_id, collection_id)

def search(self, bucket_id, collection_id, query, **kwargs):
indexname = "{}-{}-{}".format(self.prefix, bucket_id, collection_id)
indexname = self.indexname(bucket_id, collection_id)
return self.client.search(index=indexname,
doc_type=indexname,
body=query,
**kwargs)

def index_record(self, bucket_id, collection_id, record, id_field):
indexname = "{}-{}-{}".format(self.prefix, bucket_id, collection_id)
record_id = record[id_field]
def flush(self):
self.client.indices.delete(index="{}-*".format(self.prefix))

if not self.client.indices.exists(index=indexname):
self.client.indices.create(index=indexname)
@contextmanager
def bulk(self):
bulk = BulkClient(self)
yield bulk
elasticsearch.helpers.bulk(self.client, bulk.operations)

index = self.client.index(index=indexname,
doc_type=indexname,
id=record_id,
body=record,
refresh=True)
return index

def unindex_record(self, bucket_id, collection_id, record, id_field):
indexname = "{}-{}-{}".format(self.prefix, bucket_id, collection_id)
class BulkClient:
def __init__(self, indexer):
self.indexer = indexer
self.operations = []

def index_record(self, bucket_id, collection_id, record, id_field):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
result = self.client.delete(index=indexname,
doc_type=indexname,
id=record_id,
refresh=True)
return result
self.operations.append({
'_op_type': 'index',
'_index': indexname,
'_type': indexname,
'_id': record_id,
'_source': record,
})

def flush(self):
self.client.indices.delete(index="{}-*".format(self.prefix))
def unindex_record(self, bucket_id, collection_id, record, id_field):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
self.operations.append({
'_op_type': 'delete',
'_index': indexname,
'_type': indexname,
'_id': record_id,
})


def load_from_config(config):
Expand Down
30 changes: 16 additions & 14 deletions kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import elasticsearch
from elasticsearch import helpers
from kinto.core.events import ACTIONS


Expand All @@ -14,20 +15,21 @@ def on_record_changed(event):
collection_id = event.payload["collection_id"]
action = event.payload["action"]

for change in event.impacted_records:
try:
if action == ACTIONS.DELETE.value:
indexer.unindex_record(bucket_id,
collection_id,
record=change["old"],
id_field="id")
else:
indexer.index_record(bucket_id,
collection_id,
record=change["new"],
id_field="id")
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")
try:
with indexer.bulk() as bulk:
for change in event.impacted_records:
if action == ACTIONS.DELETE.value:
bulk.unindex_record(bucket_id,
collection_id,
record=change["old"],
id_field="id")
else:
bulk.index_record(bucket_id,
collection_id,
record=change["new"],
id_field="id")
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")


def on_server_flushed(event):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def test_new_records_are_indexed(self):
assert len(result["hits"]["hits"]) == 0

def test_response_is_served_if_indexer_fails(self):
with mock.patch("kinto_elasticsearch.indexer.elasticsearch.Elasticsearch.index",
with mock.patch("kinto_elasticsearch.indexer.elasticsearch.helpers.bulk",
side_effect=elasticsearch.ElasticsearchException):
r = self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"hola": "mundo"}},
Expand Down

0 comments on commit 176db56

Please sign in to comment.