Skip to content

Commit

Permalink
rudamentary websocket sync to db mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
tgbugs committed Oct 19, 2018
1 parent 6ab9040 commit fbc5fda
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docs/setup.md
Expand Up @@ -2,7 +2,7 @@
```bash
export DBNAME=scibot_ASDF # WARNING this WILL overwrite existing databases
scibot-dbsetup 5432 ${DBNAME} # initial db, user, extension, and schema creation
scibot db-init scibot ${DBNAME} # create the schema from the hypothesis orm code
scibot db-init ${DBNAME} # create the schema from the hypothesis orm code
scibot api-sync ${DBNAME} # retrieve and load existing annotations
```

Expand Down
7 changes: 5 additions & 2 deletions scibot/cli.py
Expand Up @@ -24,7 +24,7 @@ def main():
os.environ.update({'SCIBOT_DATABASE': database})

from scibot import config
from scibot.db import getSession, init_scibot, AnnoSyncFactory
from scibot.db import getSession, init_scibot, AnnoSyncFactory, WebsocketSyncFactory

if args['db-init']:
# insurace, it is passed into init direclty as well
Expand All @@ -43,7 +43,10 @@ def main():
pub_sync.sync_annos()

elif args['ws-sync']:
'TODO'
session = getSession(echo=args['--debug'])
WebsocketSync = WebsocketSyncFactory(session)
wss = WebsocketSync(config.api_token, config.username, config.group)
wss.run()

elif args['debug']:
from time import time
Expand Down
126 changes: 99 additions & 27 deletions scibot/db.py
@@ -1,3 +1,5 @@
import atexit
import asyncio
from pathlib import Path
from datetime import datetime
from itertools import chain
Expand All @@ -14,6 +16,8 @@
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.dialects.postgresql import ARRAY
from hyputils.hypothesis import Memoizer
from hyputils.handlers import dbSyncHandler
from hyputils.subscribe import setup_websocket, preFilter
from scibot import config
from scibot.anno import quickload, quickuri, add_doc_all, validate
from scibot.utils import makeSimpleLogger, uri_normalization
Expand Down Expand Up @@ -367,33 +371,6 @@ def q_create_docs(self, rows):
anno_id_to_doc_id = {i:d.id for i, d in ids_docs}
return anno_id_to_doc_id

def h_prepare_document(self, row):
datum = validate(row)
document_dict = datum.pop('document')
document_uri_dicts = document_dict['document_uri_dicts']
document_meta_dicts = document_dict['document_meta_dicts']
dd = row['id'], datum['target_uri'], row['created'], row['updated']
return (*dd, document_uri_dicts, document_meta_dicts)

def h_create_documents(self, rows):
seen = {}
for row in sorted(rows, key=lambda r:r['created']):
id = row['id']
p = self.h_prepare_document(row)
id, target_uri, created, updated, document_uri_dicts, document_meta_dicts = p
if target_uri in seen:
yield id, seen[target_uri]
else:
document = update_document_metadata( # TODO update normalization rules
self.session,
target_uri,
document_meta_dicts,
document_uri_dicts,
created=created,
updated=updated)
seen[target_uri] = document
yield id, document

def sync_anno_stream(self, search_after=None, stop_at=None):
""" streaming one anno at a time version of sync """
for row in self.yield_from_api(search_after=last_updated, stop_at=stop_at):
Expand All @@ -416,6 +393,101 @@ def sync_anno_stream(self, search_after=None, stop_at=None):
self.log.debug('adding all annotations')


class WebsocketSyncFactory(AnnoSyncFactory):

def __init__(self,
api_token=config.api_token,
username=config.username,
group=config.group,
helpers=tuple(),
threaded=False):
super().__init__(api_token, username, group)
self.prefilter = preFilter(groups=[group]).export()
handler = type(f'dbSyncHandler{group}', # TODO this is where we customize
(dbSyncHandler,),
dict(session=self.session))
self.filter_handlers = [handler(self.handler)]
self.group
self.username
self.api_token
self.ws_loop, self.exit_loop = setup_websocket(self.api_token, self.prefilter, self.filter_handlers)
self.threaded = threaded
self.loop = asyncio.get_event_loop()
if self.threaded: # yes do this at init and not at call time, you should know by then
self.stream_thread = Thread(target=self.loop_target,
args=(self.loop, self.ws_loop))

def handler(self, message):
act = message['options']['action']
print('act', act)
print(message)
if act != 'delete':
row = message['payload'][0]
self.create_anno(row)
if act == 'create':
pass
elif act == 'update':
pass
elif act == 'delete':
pass
elif act == 'flag':
'lol'
else:
raise UnknownAction(act) # email the maintainer basically

def create_anno(self, row):
datum = validate(row)

document_dict = datum.pop('document')
document_uri_dicts = document_dict['document_uri_dicts']
document_meta_dicts = document_dict['document_meta_dicts']

id = row['id']
target_uri = datum['target_uri']
created = row['created']
updated = row['updated']

annotation = models.Annotation(**datum)

document = update_document_metadata( # TODO update normalization rules
self.session,
target_uri,
document_meta_dicts,
document_uri_dicts,
created=created, # FIXME doesn't quite seem right, would klobber
updated=updated)

print(id)
annotation.document = document
annotation.id = id
annotation.target_uri = target_uri
annotation.created = created
annotation.updated = updated
self.session.add(annotation)
self.session.flush()
self.session.commit() # FIXME hypothesis doesn't call this

@staticmethod
def loop_target(loop, ws_loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(ws_loop(loop))

def close_stuff(self):
self.exit_loop()
if self.threaded: # FIXME
self.stream_thread.join()

def run(self):
atexit.register(self.close_stuff)
if self.threaded:
self.stream_thread.start()
else:
try:
self.loop.run_until_complete(self.ws_loop(self.loop))
except KeyboardInterrupt:
return # at exist will deal with it


def uuid_to_urlsafe(uuid):
return _get_urlsafe_from_hex(uuid.hex)

Expand Down

0 comments on commit fbc5fda

Please sign in to comment.