Skip to content

Commit

Permalink
IndexStrategy.SpecificIndex
Browse files Browse the repository at this point in the history
and other improvements
  • Loading branch information
aaxelb committed Apr 7, 2023
1 parent d483d28 commit ccf2904
Show file tree
Hide file tree
Showing 32 changed files with 1,584 additions and 1,309 deletions.
3 changes: 2 additions & 1 deletion _quest.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ view info about indexes (including current but not-yet-created)

specific-index actions:
- pls_create
- pls_keep_live
- pls_start_keeping_live
- pls_start_backfill
- pls_mark_backfill_complete
- pls_make_default_for_searching
- pls_stop_keeping_live
- pls_delete
6 changes: 3 additions & 3 deletions api/search/urls.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from django.urls import path
from django.urls import re_path
from django.views.decorators.csrf import csrf_exempt

from api.search import views


urlpatterns = [
path(
re_path(
# sharev2 back-compat
r'creativeworks/_search',
r'^creativeworks/_search/?$',
csrf_exempt(views.Sharev2ElasticSearchView.as_view()),
name='search'
),
Expand Down
22 changes: 11 additions & 11 deletions api/search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ def post(self, request):
return self._handle_request(request)

def _handle_request(self, request):
queryparams = request.query_params.copy()
requested_index_strategy = queryparams.pop('indexStrategy', None)
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
index_strategy = IndexStrategy.by_request(
request=request,
default_strategy=settings.DEFAULT_SHAREV2_INDEX_STRATEGY,
specific_index = IndexStrategy.get_for_searching(
requested_index_strategy,
default_name=settings.DEFAULT_SHAREV2_INDEX_STRATEGY,
)
except exceptions.IndexStrategyError as error:
raise http.Http404(str(error))
queryparams = request.query_params.copy()
queryparams.pop('indexStrategy', None)
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
response = index_strategy.pls_handle_query__api_backcompat(
response_json = specific_index.pls_handle_query__sharev2_backcompat(
request_body=request.data,
request_queryparams=queryparams,
)
return Response(data=response, headers={'Content-Type': 'application/json'})
except (exceptions.IndexStrategyError, NotImplementedError):
return Response(status=418) # TODO
return Response(data=response_json, headers={'Content-Type': 'application/json'})
except (exceptions.IndexStrategyError, NotImplementedError) as error:
return Response(status=418, data=str(error)) # TODO
20 changes: 11 additions & 9 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from django.http import HttpResponseGone
from django.utils.feedgenerator import Atom1Feed
from django.conf import settings
from raven.contrib.django.raven_compat.models import client as sentry_client

from share.search import IndexStrategy
from share.search.exceptions import IndexStrategyError
from share.util.xml import strip_illegal_xml_chars


Expand Down Expand Up @@ -39,9 +41,9 @@ def title(self, obj):
def get_object(self, request):
self._order = request.GET.get('order')
elastic_query = request.GET.get('elasticQuery')
self._index_strategy = IndexStrategy.by_request(
request=request,
default=settings.DEFAULT_SHAREV2_INDEX_STRATEGY,
self._index_strategy = IndexStrategy.get_for_searching(
request.GET.get('indexStrategy'),
default_name=settings.DEFAULT_SHAREV2_INDEX_STRATEGY,
)

if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
Expand All @@ -62,12 +64,12 @@ def get_object(self, request):
return elastic_data

def items(self, obj):
elastic_response = self._index_strategy.pls_handle_query__api_backcompat(
request_body=obj,
)
json_response = elastic_response.json()

if elastic_response.status_code != 200 or 'error' in json_response:
try:
json_response = self._index_strategy.pls_handle_query__sharev2_backcompat(
request_body=obj,
)
except IndexStrategyError:
sentry_client.captureException()
return

def get_item(hit):
Expand Down
13 changes: 4 additions & 9 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,14 @@ def split(string, delim):

ELASTICSEARCH = {
'SNIFF': bool(os.environ.get('ELASTICSEARCH_SNIFF')),
'PRIMARY_INDEX': os.environ.get('ELASTICSEARCH_PRIMARY_INDEX', 'share'),
'TIMEOUT': int(os.environ.get('ELASTICSEARCH_TIMEOUT', '45')),
'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 2000)),
'KOMBU_QUEUE_SETTINGS': {
'serializer': 'json',
'compression': 'zlib',
'no_ack': False, # WHY KOMBU THAT'S NOT HOW ENGLISH WORKS
},
'CHUNK_SIZE': int(os.environ.get('ELASTICSEARCH_CHUNK_SIZE', 20)),
'INDEX_STRATEGIES': {}, # populated below based on environment
}
ELASTICSEARCH5_URL = os.environ.get('ELASTICSEARCH_URL')
if ELASTICSEARCH5_URL:
ELASTICSEARCH['INDEX_STRATEGIES']['sharev2_elastic5'] = {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic5:Sharev2Elastic5IndexStrategy',
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic5.Sharev2Elastic5IndexStrategy',
'CLUSTER_SETTINGS': {
'URL': ELASTICSEARCH5_URL,
},
Expand All @@ -323,7 +317,7 @@ def split(string, delim):
ELASTICSEARCH8_USERNAME = os.environ.get('ELASTICSEARCH8_USERNAME', 'elastic')
ELASTICSEARCH8_SECRET = os.environ.get('ELASTICSEARCH8_SECRET')
ELASTICSEARCH['INDEX_STRATEGIES']['sharev2_elastic8'] = {
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic8:Sharev2Elastic8IndexStrategy',
'INDEX_STRATEGY_CLASS': 'share.search.index_strategy.sharev2_elastic8.Sharev2Elastic8IndexStrategy',
'CLUSTER_SETTINGS': {
'URL': ELASTICSEARCH8_URL,
'AUTH': (ELASTICSEARCH8_USERNAME, ELASTICSEARCH8_SECRET),
Expand Down Expand Up @@ -352,6 +346,7 @@ def split(string, delim):
RABBITMQ_PASSWORD = os.environ.get('RABBITMQ_PASSWORD', 'guest')
RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'localhost')
RABBITMQ_PORT = os.environ.get('RABBITMQ_PORT', '5672')
RABBITMQ_MGMT_PORT = os.environ.get('RABBITMQ_MGMT_PORT', '15672')
RABBITMQ_VHOST = os.environ.get('RABBITMQ_VHOST', '/')

CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://{}:{}@{}:{}/{}'.format(RABBITMQ_USERNAME, RABBITMQ_PASSWORD, RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_VHOST))
Expand Down
119 changes: 69 additions & 50 deletions share/admin/search.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import logging

from django.http.response import HttpResponseRedirect
from django.template.response import TemplateResponse

from share.admin.util import admin_url
from share.models.index_backfill import IndexBackfill
from share.search.index_messenger import IndexMessenger
from share.search.index_strategy import IndexStrategy
from share.search.index_strategy.sharev2_elastic5 import Sharev2Elastic5IndexStrategy


logger = logging.getLogger(__name__)


def search_indexes_view(request):
Expand All @@ -12,7 +19,7 @@ def search_indexes_view(request):
request,
'admin/search-indexes.html',
context={
'indexes_by_strategy': _indexes_by_strategy(),
'index_status_by_strategy': _index_status_by_strategy(),
},
)
if request.method == 'POST':
Expand All @@ -22,86 +29,98 @@ def search_indexes_view(request):
return HttpResponseRedirect('')


def _indexes_by_strategy():
def _index_status_by_strategy():
backfill_by_indexname = {
backfill.specific_indexname: backfill
for backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=IndexStrategy.all_strategies_by_name().keys())
)
}
status_by_strategy = {}
backfill_by_index = _backfill_by_index()
for index_strategy in IndexStrategy.all_strategies().values():
status_by_strategy[index_strategy.name] = [
(index_status, backfill_by_index.get(index_status.specific_indexname))
for index_status in index_strategy.specific_index_statuses()
]
for index_strategy in IndexStrategy.all_strategies():
current_index = index_strategy.for_current_index()
status_by_strategy[index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'backfill': _serialize_backfill(
current_index,
backfill_by_indexname.get(current_index.indexname),
),
},
'prior': sorted((
specific_index.pls_get_status()
for specific_index in index_strategy.each_specific_index()
if not specific_index.is_current
), reverse=True),
}
return status_by_strategy


def _backfill_by_index():
all_strategies = IndexStrategy.all_strategies()
backfill_by_index = {}
for backfill in IndexBackfill.objects.filter(index_strategy_name__in=all_strategies.keys()):
current_indexname = all_strategies[backfill.index_strategy_name].indexname
backfill_by_index[backfill.specific_indexname] = {
def _serialize_backfill(specific_index: IndexStrategy.SpecificIndex, backfill: IndexBackfill):
if not specific_index.is_current:
return {}
if not backfill:
return {
'can_start_backfill': (
backfill.specific_indexname == current_indexname
and backfill.backfill_status == IndexBackfill.INITIAL
not isinstance(specific_index.index_strategy, Sharev2Elastic5IndexStrategy)
and specific_index.pls_check_exists()
),
'can_mark_backfill_complete': (backfill.backfill_status == IndexBackfill.INDEXING),
'is_complete': (backfill.backfill_status == IndexBackfill.COMPLETE),
'backfill_status': backfill.backfill_status,
'backfill_modified': backfill.modified.isoformat(timespec='minutes'),
'backfill_admin_url': admin_url(backfill),
}
for index_strategy in all_strategies.values():
if (
index_strategy.SUPPORTS_BACKFILL
and index_strategy.indexname not in backfill_by_index
and index_strategy.pls_check_exists()
):
backfill_by_index[index_strategy.indexname] = {
'can_start_backfill': True,
}
return backfill_by_index
return {
'backfill_status': backfill.backfill_status,
'backfill_modified': backfill.modified.isoformat(timespec='minutes'),
'backfill_admin_url': admin_url(backfill),
'backfill_queue_depth': IndexMessenger().get_queue_depth(
specific_index.index_strategy.nonurgent_messagequeue_name,
),
'can_start_backfill': (backfill.backfill_status == IndexBackfill.INITIAL),
'can_mark_backfill_complete': (backfill.backfill_status == IndexBackfill.INDEXING),
'is_complete': (backfill.backfill_status == IndexBackfill.COMPLETE),
}


def _pls_create(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.assert_setup_is_current()
index_strategy.pls_setup_as_needed()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
assert specific_index.is_current
specific_index.pls_create()


def _pls_keep_live(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.pls_keep_live()
def _pls_start_keeping_live(specific_indexname):
specific_index = IndexStrategy.get_specific_index(specific_indexname)
specific_index.pls_start_keeping_live()


def _pls_stop_keeping_live(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.pls_stop_keeping_live()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
specific_index.pls_stop_keeping_live()


def _pls_start_backfill(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.assert_setup_is_current()
index_strategy.pls_start_backfill()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
assert specific_index.is_current
specific_index.index_strategy.get_or_create_backfill().pls_start(specific_index)


def _pls_mark_backfill_complete(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.pls_mark_backfill_complete()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
specific_index.index_strategy.get_or_create_backfill().pls_mark_complete()


def _pls_make_default_for_searching(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
index_strategy.pls_make_default_for_searching()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
specific_index.index_strategy.pls_make_default_for_searching(specific_index)


def _pls_delete(specific_indexname):
index_strategy = IndexStrategy.by_request(specific_indexname)
assert not index_strategy.is_current
index_strategy.pls_delete()
specific_index = IndexStrategy.get_specific_index(specific_indexname)
assert not specific_index.is_current
specific_index.pls_delete()


PLS_DOERS = {
'create': _pls_create,
'keep_live': _pls_keep_live,
'start_keeping_live': _pls_start_keeping_live,
'start_backfill': _pls_start_backfill,
'mark_backfill_complete': _pls_mark_backfill_complete,
'make_default_for_searching': _pls_make_default_for_searching,
Expand Down
41 changes: 23 additions & 18 deletions share/bin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from share.bin.util import command
from share.search import IndexStrategy
from share.search.exceptions import IndexStrategyError
from share.search.daemon import IndexerDaemon


Expand All @@ -30,34 +31,38 @@ def purge(args, argv):
Usage: {0} search purge <index_names>...
"""
for index_name in args['<index_names>']:
index_strategy = IndexStrategy.by_specific_index_name(index_name)
index_strategy.pls_delete()
specific_index = IndexStrategy.get_specific_index(index_name)
specific_index.pls_delete()


@search.subcommand('Create indicies and apply mappings')
def setup(args, argv):
"""
Usage: {0} search setup <index_name>
Usage: {0} search setup <index_or_strategy_name>
{0} search setup --initial
"""
is_initial = args.get('--initial')
if is_initial:
index_strategys = IndexStrategy.all_strategies().values()
specific_indexes = [
index_strategy.for_current_index()
for index_strategy in IndexStrategy.all_strategies()
]
else:
index_strategys = [IndexStrategy.by_request(args['<index_name>'])]
for index_strategy in index_strategys:
index_strategy.pls_setup_as_needed()


@search.subcommand('Queue daemon messages to reindex all suids')
def reindex_all_suids(args, argv):
"""
Usage: {0} search reindex_all_suids <index_name>
Most likely useful for a freshly `setup` index (perhaps after a purge).
"""
index_strategy = IndexStrategy.by_request(args['<index_name>'])
index_strategy.pls_setup_as_needed(start_backfill=True)
index_or_strategy_name = args['<index_or_strategy_name>']
try:
specific_indexes = [
IndexStrategy.get_by_name(index_or_strategy_name).for_current_index(),
]
except IndexStrategyError:
try:
specific_indexes = [
IndexStrategy.get_specific_index(index_or_strategy_name),
]
except IndexStrategyError:
raise IndexStrategyError(f'unrecognized index or strategy name "{index_or_strategy_name}"')
for specific_index in specific_indexes:
specific_index.pls_create()
specific_index.pls_start_keeping_live()


@search.subcommand('Start the search indexing daemon')
Expand Down
Loading

0 comments on commit ccf2904

Please sign in to comment.