Skip to content

Commit

Permalink
Removed autosync server option.
Browse files Browse the repository at this point in the history
  • Loading branch information
coady committed Feb 17, 2020
1 parent 2a7ae49 commit 1fdbf9c
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 151 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Optional server extras:
# Changes
dev
* Python 3 required
* Removed server `autosync`

2.4
* PyLucene >=8 required
Expand Down
5 changes: 2 additions & 3 deletions docs/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ server
:cwd: ..
.. automodule:: lupyne.server
.. note:: Lucene doc ids are ephemeral; only use doc ids across requests for the same index version.
.. warning:: Autosyncing is not recommended for production.

Lucene index files are incremental, so synchronizing files and refreshing searchers is a viable replication strategy.
The `autoupdate` and `autosync` features demonstrate this, but are not meant to recommend HTTP for file syncing.
Autoupdating is considered production-ready; autosyncing is not.
Both searchers and indexers support `autoupdate`, and indexers support snapshots,
which allow replicating index files safely and atomically.

CherryPy was chosen because not only is it well suited to exposing APIs, but it includes a production multithreaded server.
Lucene caches heavily, and PyLucene is not bound by the `GIL`_ when in the Java VM.
Expand Down
2 changes: 1 addition & 1 deletion lupyne/engine/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Field(FieldType): # type: ignore
)

def __init__(self, name: str, docValuesType='', indexOptions='', dimensions=0, **settings):
super(Field, self).__init__()
super().__init__()
self.name = name
for name in self.properties.intersection(settings):
setattr(self, name, settings.pop(name))
Expand Down
2 changes: 1 addition & 1 deletion lupyne/engine/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def count(self, *query, **options) -> int:
if len(query) > 1:
return self.docFreq(index.Term(*query))
query = self.parse(*query, **options) if query else Query.alldocs()
return super(IndexSearcher, self).count(query)
return super().count(query)

def collector(self, count=None, sort=None, reverse=False, scores=False, mincount=1000):
if count is None:
Expand Down
122 changes: 23 additions & 99 deletions lupyne/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,14 @@
"""

import argparse
import collections
import contextlib
import heapq
import http
import itertools
import os
import re
import time
import warnings
import lucene
import cherrypy
import clients
from requests.compat import json
from lupyne import engine

Expand Down Expand Up @@ -212,15 +208,6 @@ def run():

super().__init__(bus, run, frequency)

def subscribe(self):
super().subscribe()
if cherrypy.engine.state == cherrypy.engine.states.STARTED:
self.start()

def unsubscribe(self):
super().unsubscribe()
self.thread.cancel()


class WebSearcher:
"""Dispatch root with a delegated Searcher.
Expand All @@ -242,9 +229,6 @@ class WebSearcher:
}

def __init__(self, *directories, **kwargs):
self.urls = collections.deque(kwargs.pop('urls', ()))
if self.urls:
engine.IndexWriter(*directories).close()
if len(directories) > 1:
self.searcher = engine.MultiSearcher(directories, **kwargs)
else:
Expand All @@ -266,27 +250,9 @@ def close(self):
def etag(self):
return 'W/"{}"'.format(self.searcher.version)

def sync(self, url):
"""Sync with remote index."""
directory = self.searcher.path
resource = clients.Resource(url, headers={'if-none-match': self.etag})
response = resource.client.put('update/snapshot')
if response.status_code in (http.client.PRECONDITION_FAILED, http.client.METHOD_NOT_ALLOWED):
return []
response.raise_for_status()
names = sorted(set(response.json()).difference(os.listdir(directory)))
resource /= response.headers['location']
try:
for name in names:
with open(os.path.join(directory, name), 'wb') as file:
resource.download(file, name)
finally:
resource.delete()
return names

@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(methods=['GET', 'POST'])
@cherrypy.tools.allow(methods=['GET'])
def index(self, url=''):
"""Return index information and synchronize with remote index.
Expand All @@ -298,9 +264,6 @@ def index(self, url=''):
:return: {*string*: *int*,... }
"""
if cherrypy.request.method == 'POST':
self.sync(url)
cherrypy.response.status = int(http.client.ACCEPTED)
if isinstance(self.searcher, engine.MultiSearcher):
return {reader.directory().toString(): reader.numDocs() for reader in self.searcher.indexReaders}
return {self.searcher.directory.toString(): len(self.searcher)}
Expand All @@ -320,24 +283,8 @@ def update(self, **caches):
:return: *int*
"""
names = ()
while self.urls:
url = self.urls[0]
try:
names = self.sync(url)
break
except IOError:
with contextlib.suppress(ValueError):
self.urls.remove(url)
self.searcher = self.searcher.reopen(**caches)
self.updated = time.time()
if names:
engine.IndexWriter(self.searcher.directory).close()
if not self.urls and hasattr(self, 'fields'):
other = WebIndexer(self.searcher.directory, analyzer=self.searcher.analyzer)
other.indexer.shared, other.indexer.fields = self.searcher.shared, self.fields
(app,) = (app for app in cherrypy.tree.apps.values() if app.root is self)
mount(other, app=app, autoupdate=getattr(self, 'autoupdate', 0))
return len(self.searcher)

@cherrypy.expose
Expand Down Expand Up @@ -692,53 +639,43 @@ def index(self):

@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(paths=[('POST',), ('GET', 'PUT', 'DELETE'), ('GET',)])
def update(self, id='', name='', **options):
@cherrypy.tools.allow(paths=[('POST',), ('GET', 'DELETE')])
def update(self, id='', *, snapshot=False, **options):
"""Commit index changes and refresh index version.
**POST** /update
Commit write operations and return document count. See :meth:`WebSearcher.update` for caching options.
Commit write operations and optionall snapshot. See :meth:`WebSearcher.update` for caching options.
{"merge": true|\ *int*,... }
{"merge": true|\ *int*, "snapshot": true}
.. versionchanged:: 1.2 request body is an object instead of an array
.. versionchanged:: 2.5 snapshot moved to POST parameter
:return: *int*
:return: *int*|\ [*string*,... ]
**GET, PUT, DELETE** /update/[snapshot|\ *int*]
Verify, create, or release unique snapshot of current index commit and return array of referenced filenames.
**GET, DELETE** /update/*int*
Verify or release unique snapshot of current index commit and return array of referenced filenames.
.. versionchanged:: 1.4 lucene identifies snapshots by commit generation; use location header
.. versionchanged:: 1.4 lucene identifies snapshots by commit generation; use location header
:return: [*string*,... ]
**GET** /update/*int*/*chars*
Download index file corresponding to snapshot id and filename.
"""
response = cherrypy.serving.response
if not id:
self.indexer.commit(**options)
self.updated = time.time()
return len(self.indexer)
method = cherrypy.request.method
response = cherrypy.serving.response
if method == 'PUT':
if id != 'snapshot':
raise cherrypy.NotFound()
if not snapshot:
return len(self.indexer)
commit = self.indexer.policy.snapshot()
response.status = int(http.client.CREATED)
response.headers['location'] = cherrypy.url('/update/{0:d}'.format(commit.generation), relative='server')
else:
with HTTPError((ValueError, AssertionError), http.client.NOT_FOUND):
commit = self.indexer.policy.getIndexCommit(int(id))
assert commit is not None, 'commit not snapshotted'
if method == 'DELETE':
self.indexer.policy.release(commit)
if not name:
return list(commit.fileNames)
with HTTPError((TypeError, AssertionError), http.client.NOT_FOUND):
directory = self.searcher.path
assert name in commit.fileNames, 'file not referenced in commit'
return cherrypy.lib.static.serve_download(os.path.join(directory, name))
with HTTPError((ValueError, AssertionError), http.client.NOT_FOUND):
commit = self.indexer.policy.getIndexCommit(int(id))
assert commit is not None, 'commit not snapshotted'
if cherrypy.request.method == 'DELETE':
self.indexer.policy.release(commit)
return list(commit.fileNames)

@cherrypy.expose
@cherrypy.tools.allow(paths=[('GET', 'POST'), ('GET',), ('GET', 'PUT', 'DELETE', 'PATCH')])
Expand Down Expand Up @@ -839,20 +776,13 @@ def init(vmargs='-Xrs,-Djava.awt.headless=true', **kwargs):
app.root.__init__(*app.root.__dict__.pop('args'), **app.root.__dict__.pop('kwargs'))


def mount(root, path='', config=None, autoupdate=0, app=None):
def mount(root, path='', config=None, autoupdate=0):
"""Attach root and subscribe to plugins.
:param root,path,config: see cherrypy.tree.mount
:param autoupdate: see command-line options
:param app: optionally replace root on existing app
"""
if app is None:
app = cherrypy.tree.mount(root, path, config)
else:
cherrypy.engine.unsubscribe('stop', app.root.close)
if hasattr(app.root, 'monitor'):
app.root.monitor.unsubscribe()
app.root = root
app = cherrypy.tree.mount(root, path, config)
cherrypy.engine.subscribe('stop', root.close)
if autoupdate:
root.monitor = AttachedMonitor(cherrypy.engine, root.update, autoupdate)
Expand Down Expand Up @@ -894,23 +824,17 @@ def start(root=None, path='', config=None, pidfile='', daemonize=False, autorelo
parser.add_argument('-d', '--daemonize', action='store_true', help='run the server as a daemon')
parser.add_argument('--autoreload', type=float, metavar='SECONDS', help='automatically reload modules; replacement for engine.autoreload')
parser.add_argument('--autoupdate', type=float, metavar='SECONDS', help='automatically update index version and commit any changes')
parser.add_argument('--autosync', metavar='URL,...', help='automatically synchronize searcher with remote hosts and update')
parser.add_argument('--real-time', action='store_true', help='search in real-time without committing')

if __name__ == '__main__':
args = parser.parse_args()
read_only = args.read_only or args.autosync or len(args.directories) > 1
read_only = args.read_only or len(args.directories) > 1
kwargs = {'nrt': True} if args.real_time else {}
if read_only and (args.real_time or not args.directories):
parser.error('incompatible read/write options')
if args.autosync:
kwargs['urls'] = args.autosync.split(',')
if not (args.autoupdate and len(args.directories) == 1):
parser.error('autosync requires autoupdate and a single directory')
warnings.warn('autosync is not recommended for production usage')
if args.config and not os.path.exists(args.config):
args.config = {'global': json.loads(args.config)}
cls = WebSearcher if read_only else WebIndexer
root = cls.new(*map(os.path.abspath, args.directories), **kwargs)
del args.directories, args.read_only, args.autosync, args.real_time
del args.directories, args.read_only, args.real_time
start(root, callback=init, **args.__dict__)
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
project_urls={'Documentation': 'https://coady.github.io/lupyne/'},
license='Apache Software License',
packages=['lupyne', 'lupyne.engine'],
extras_require={'server': ['cherrypy>=11', 'clients>=0.2'], 'docs': ['nbsphinx', 'jupyter']},
extras_require={'server': ['cherrypy>=11'], 'docs': ['nbsphinx', 'jupyter']},
python_requires='>=3.5',
tests_require=['pytest-cov'],
tests_require=['pytest-cov', 'clients'],
classifiers=[
'Development Status :: 6 - Mature',
'Framework :: CherryPy',
Expand Down
60 changes: 15 additions & 45 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import operator
import os
import shutil
import signal
import subprocess
import sys
Expand Down Expand Up @@ -308,50 +309,19 @@ def test_example(request, servers):


def test_replication(tempdir, servers):
primary = servers.start(servers.ports[0], tempdir, '--autoupdate=1')
directory = os.path.join(tempdir, 'backup')
primary = servers.start(servers.ports[0], tempdir)
sync, update = '--autosync=' + primary.url, '--autoupdate=1'
secondary = servers.start(servers.ports[1], '-r', directory, sync, update)
resource = servers.start(servers.ports[2], '-r', directory)
for args in [('-r', tempdir), (update, tempdir), (update, tempdir, tempdir)]:
assert subprocess.call((sys.executable, '-m', 'lupyne.server', sync) + args, stderr=subprocess.PIPE)
engine.IndexWriter(directory).close()
secondary = servers.start(servers.ports[1], '-r', directory, '--autoupdate=1')
primary.post('docs', [{}])
assert primary.post('update') == 1
assert primary.client.put('update/0').status_code == http.client.NOT_FOUND
response = resource.client.post(json={'url': primary.url})
assert response.status_code == http.client.ACCEPTED and sum(response.json().values()) == 0
assert resource.post('update') == 1
assert resource.post(json={'url': primary.url})
assert resource.post('update') == 1
primary.post('docs', [{}])
assert primary.post('update') == 2
time.sleep(1.5)
assert sum(secondary().values()) == 2
servers.stop(servers.ports[-1])
root = server.WebSearcher(directory, urls=(primary.url, secondary.url))
app = server.mount(root)
root.fields = {}
assert root.update() == 2
assert len(root.urls) == 2
servers.stop(servers.ports[0])
assert secondary.docs()
assert secondary.client.post('docs', []).status_code == http.client.METHOD_NOT_ALLOWED
assert secondary.terms() == []
assert root.update() == 2
assert len(root.urls) == 1
servers.stop(servers.ports[1])
assert root.update() == 2
assert len(root.urls) == 0 and isinstance(app.root, server.WebIndexer)
app.root.close()
root = server.WebSearcher(directory)
app = server.mount(root, autoupdate=0.1)
root.fields, root.autoupdate = {}, 0.1
cherrypy.config['log.screen'] = servers.config['log.screen']
cherrypy.engine.state = cherrypy.engine.states.STARTED
root.monitor.start() # simulate engine starting
time.sleep(0.2)
app.root.indexer.add()
time.sleep(0.2)
assert len(app.root.indexer) == len(root.searcher) + 1
app.root.monitor.unsubscribe()
del app.root
assert primary.client.get('update/0').status_code == http.client.NOT_FOUND
assert primary.client.get('update/x').status_code == http.client.NOT_FOUND
response = primary.client.post('update', {'snapshot': True})
assert response.status_code == http.client.CREATED
location, filenames = response.headers['location'], response.json()
assert primary.get(location) == filenames
for filename in filenames:
shutil.copy(os.path.join(tempdir, filename), directory)
assert primary.delete(location) == filenames
time.sleep(1.1)
assert secondary.get('docs') == [0]

0 comments on commit 1fdbf9c

Please sign in to comment.