Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command to reindex existing data (fixes #20) #56

Merged
merged 5 commits into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions kinto_elasticsearch/command_reindex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import argparse
import elasticsearch
import logging
import sys

from pyramid.paster import bootstrap

from kinto.core.storage.exceptions import RecordNotFoundError
from kinto.core.storage import Sort, Filter
from kinto.core.utils import COMPARISON


DEFAULT_CONFIG_FILE = 'config/kinto.ini'

logger = logging.getLogger(__package__)


def main(cli_args=None):
if cli_args is None:
cli_args = sys.argv[1:]

parser = argparse.ArgumentParser()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we could work on adding an entry-point so that plugins can add sub-commands to the kinto command?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created Kinto/kinto#1330 about that.

parser.add_argument('--ini',
help='Application configuration file',
dest='ini_file',
required=False,
default=DEFAULT_CONFIG_FILE)
parser.add_argument('-b', '--bucket',
help='Bucket name.',
type=str)
parser.add_argument('-c', '--collection',
help='Collection name.',
type=str)
args = parser.parse_args(args=cli_args)

print("Load config...")
env = bootstrap(args.ini_file)
registry = env['registry']

# Make sure that kinto-elasticsearch is configured.
try:
indexer = registry.indexer
except AttributeError:
logger.error("kinto-elasticsearch not available.")
return 62

bucket_id = args.bucket
collection_id = args.collection

# Get index schema from collection metadata.
try:
schema = get_index_schema(registry.storage, bucket_id, collection_id)
except RecordNotFoundError:
logger.error("No collection '%s' in bucket '%s'" % (collection_id, bucket_id))
return 63

# Give up if collection has no index mapping.
if schema is None:
logger.error("No `index:schema` attribute found in collection metadata.")
return 64

# XXX: Are you sure?
Copy link
Member

@Natim Natim Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead we could add a --dry-run or a `--recreate`` attribute

recreate_index(indexer, bucket_id, collection_id, schema)
reindex_records(indexer, registry.storage, bucket_id, collection_id)

return 0


def get_index_schema(storage, bucket_id, collection_id):
# Open collection metadata.
# XXX: https://github.com/Kinto/kinto/issues/710
metadata = storage.get(parent_id="/buckets/%s" % bucket_id,
collection_id="collection",
object_id=collection_id)
return metadata.get("index:schema")


def recreate_index(indexer, bucket_id, collection_id, schema):
index_name = indexer.indexname(bucket_id, collection_id)
# Delete existing index.
indexer.delete_index(bucket_id, collection_id)
print("Old index '%s' deleted." % index_name)
# Recreate the index with the new schema.
indexer.create_index(bucket_id, collection_id, schema=schema)
print("New index '%s' created." % index_name)


def get_paginated_records(storage, bucket_id, collection_id, limit=5000):
# We can reach the storage_fetch_limit, so we use pagination.
parent_id = "/buckets/%s/collections/%s" % (bucket_id, collection_id)
sorting = [Sort('last_modified', -1)]
pagination_rules = []
while "not gone through all pages":
records, _ = storage.get_all(parent_id=parent_id,
collection_id="record",
pagination_rules=pagination_rules,
sorting=sorting,
limit=limit)
if len(records) == 0:
break # Done.

yield records

smallest_timestamp = records[-1]["last_modified"]
pagination_rules = [
[Filter("last_modified", smallest_timestamp, COMPARISON.LT)]
]


def reindex_records(indexer, storage, bucket_id, collection_id):
total = 0
for records in get_paginated_records(storage, bucket_id, collection_id):
try:
with indexer.bulk() as bulk:
for record in records:
bulk.index_record(bucket_id,
collection_id,
record=record)
print(".", end="")
total += len(bulk.operations)
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")
print("\n%s records reindexed." % total)
4 changes: 2 additions & 2 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, indexer):
self.indexer = indexer
self.operations = []

def index_record(self, bucket_id, collection_id, record, id_field):
def index_record(self, bucket_id, collection_id, record, id_field="id"):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
self.operations.append({
Expand All @@ -81,7 +81,7 @@ def index_record(self, bucket_id, collection_id, record, id_field):
'_source': record,
})

def unindex_record(self, bucket_id, collection_id, record, id_field):
def unindex_record(self, bucket_id, collection_id, record, id_field="id"):
indexname = self.indexer.indexname(bucket_id, collection_id)
record_id = record[id_field]
self.operations.append({
Expand Down
6 changes: 2 additions & 4 deletions kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ def on_record_changed(event):
if action == ACTIONS.DELETE.value:
bulk.unindex_record(bucket_id,
collection_id,
record=change["old"],
id_field="id")
record=change["old"])
else:
bulk.index_record(bucket_id,
collection_id,
record=change["new"],
id_field="id")
record=change["new"])
except elasticsearch.ElasticsearchException:
logger.exception("Failed to index record")

Expand Down
15 changes: 11 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@
with open('CHANGELOG.rst') as history_file:
history = history_file.read()

requirements = [
REQUIREMENTS = [
'elasticsearch',
'kinto>=6.0.0'
]

test_requirements = [
TEST_REQUIREMENTS = [
'mock',
'unittest2',
'webtest',
]

ENTRY_POINTS = {
'console_scripts': [
'kinto-elasticsearch-reindex = kinto_elasticsearch.command_reindex:main'
],
}

setup(
name='kinto-elasticsearch',
version='0.3.0.dev0',
Expand All @@ -33,7 +39,7 @@
],
package_dir={'kinto_elasticsearch': 'kinto_elasticsearch'},
include_package_data=True,
install_requires=requirements,
install_requires=REQUIREMENTS,
license="Apache License (2.0)",
zip_safe=False,
keywords='kinto elasticsearch index',
Expand All @@ -48,5 +54,6 @@
'Programming Language :: Python :: 3.5',
],
test_suite='tests',
tests_require=test_requirements
tests_require=TEST_REQUIREMENTS,
entry_points=ENTRY_POINTS
)
6 changes: 6 additions & 0 deletions tests/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
use = egg:kinto
kinto.userid_hmac_secret = some-secret-string

kinto.storage_backend = kinto.core.storage.postgresql
kinto.storage_url = postgres://postgres:postgres@localhost:5432/postgres

kinto.permission_backend = kinto.core.permission.postgresql
kinto.permission_url = postgres://postgres:postgres@localhost:5432/postgres

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Natim is this intentional?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for tests i guess.

kinto.includes = kinto_elasticsearch
kinto.plugins.flush

Expand Down
85 changes: 85 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import elasticsearch
import mock
import os
import unittest
from kinto_elasticsearch.command_reindex import main, reindex_records
from . import BaseWebTest

HERE = os.path.abspath(os.path.dirname(__file__))


class TestMain(BaseWebTest, unittest.TestCase):

schema = {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}

def test_cli_fail_if_elasticsearch_plugin_not_installed(self):
with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'wrong_config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 62
logger.error.assert_called_with('kinto-elasticsearch not available.')

def test_cli_fail_if_collection_or_bucket_do_not_exists(self):
with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 63
logger.error.assert_called_with(
"No collection 'cid' in bucket 'bid'")

def test_cli_fail_if_collection_has_no_index_schema(self):
# Create collection or bucket
self.app.put("/buckets/bid", headers=self.headers)
self.app.put("/buckets/bid/collections/cid", headers=self.headers)

with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 64
logger.error.assert_called_with(
'No `index:schema` attribute found in collection metadata.')

def test_cli_reindexes_if_collection_has_an_index_schema(self):
# Create collection or bucket
self.app.put("/buckets/bid", headers=self.headers)
body = {"data": {"index:schema": self.schema}}
self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "efg", "date": "2017-02-01"}}},
headers=self.headers)

exit_code = main(['--ini', os.path.join(HERE, 'config.ini'),
'--bucket', 'bid', '--collection', 'cid'])
assert exit_code == 0

def test_cli_logs_elasticsearch_exceptions(self):
indexer = mock.MagicMock()
indexer.bulk().__enter__().index_record.side_effect = elasticsearch.ElasticsearchException

with mock.patch('kinto_elasticsearch.command_reindex.logger') as logger:
with mock.patch('kinto_elasticsearch.command_reindex.get_paginated_records',
return_value=[[{}, {}]]) as get_paginated_records:
reindex_records(indexer,
mock.sentinel.storage,
mock.sentinel.bucket_id,
mock.sentinel.collection_id)
get_paginated_records.assert_called_with(mock.sentinel.storage,
mock.sentinel.bucket_id,
mock.sentinel.collection_id)
logger.exception.assert_called_with('Failed to index record')

def test_cli_default_to_sys_argv(self):
with mock.patch('sys.argv', ['cli', '--ini', os.path.join(HERE, 'wrong_config.ini')]):
exit_code = main()
assert exit_code == 62
9 changes: 0 additions & 9 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ def test_returns_false_if_connection_fails(self):

class PostActivation(BaseWebTest, unittest.TestCase):

@classmethod
def get_app_settings(cls, extras=None):
settings = super().get_app_settings(extras)
settings['storage_backend'] = 'kinto.core.storage.postgresql'
settings['storage_url'] = 'postgres://postgres:postgres@localhost:5432/postgres'
settings['permission_backend'] = 'kinto.core.permission.postgresql'
settings['permission_url'] = settings['storage_url']
return settings

def setUp(self):
app = self.make_app(settings={"kinto.includes": ""})
capabilities = app.get("/").json["capabilities"]
Expand Down
5 changes: 5 additions & 0 deletions tests/wrong_config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[app:main]
use = egg:kinto
kinto.userid_hmac_secret = some-secret-string

kinto.includes = kinto.plugins.flush