Drafting of a solution for querying fragmented entity which data spread through multiple databases

In [559]:
import rethinkdb as r
from rx import Observable
from rx.testing import marbles

In [560]:
from tornado import ioloop, gen
from tornado.concurrent import Future, chain_future
import functools

In [561]:
r.set_loop_type('tornado')
connection = r.connect(host='localhost', port=28015)

In [562]:
def rx_query(rql):
    """
    Simple non-blocking query that return an Rx Observable
    An appropriate solution would require handle of backpressure and auto batching
    """
    def emitter(observer):
        @gen.coroutine
        def execute(connection_future):
            connection = yield connection_future
            try:
                result = yield rql.run(connection)
                if isinstance(result, r.net.Cursor):
                    while (yield result.fetch_next()):
                        item = yield result.next()
                        observer.on_next(item)
                else:
                    observer.on_next(result)
            except Exception as err:
                print("Observable emitted error:", err)
                observer.on_error(err)
            observer.on_completed()
        execute(connection)
    return Observable.create(emitter)

---

### Setup Database (RethinkDB)

In [558]:
map(lambda rql: rx_query(rql).subscribe(), [
        r.db_create('a').run(),
        r.db_create('b').run(),
        r.db('a').table_create('person_a').run(),
        r.db('b').table_create('person_b').run()
    ])

<map at 0x112de4cf8>

In [566]:
import random, string, uuid, itertools

def random_persons():
    while True:
        yield {
            'id': str(uuid.uuid4()),
            'name': random.choice(string.ascii_letters),
            'age': random.randint(1,100)
        }

def split(person):
    return ({
        'id': person['id'],
        'name': person['name']            
    }, {
        'id': person['id'],
        'age': person['age']
    })

In [569]:
def insert_batch(persons):
    return Observable.zip(
        rx_query(source_a.insert([split(p)[0] for p in persons])),
        rx_query(source_b.insert([split(p)[1] for p in persons])),
        lambda a, b: (a, b))

(Observable
    .from_iterable(random_persons())
    .take(10000) 
    .buffer_with_count(100)
    .flat_map(lambda person_batch: insert_batch(person_batch))
).subscribe()


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x111cf42e8>

Simple DSL for crafting search's criteria

In [571]:
class Predicate:
    def __init__(self, raw):
        assert len(raw.keys()) == 1
        key = list(raw.keys())[0]
        val = raw[key]
        if key == 'or' or key == 'and':
            self._type = 'compound'
            self._subs = [*map(Predicate, [{k: val[k]} for k in val.keys()])]
        else:
            self._type = 'atom'
            self._key = key
            self._val = raw[key]
    @property
    def type(self):
        return self._type
    @property
    def key(self):
        if self._type == 'compound': raise Exception("Can't take key from compound expression")
        return self._key
    @property
    def val(self):
        if self._type == 'compound': raise Exception("Can't take val from compound expression")
        return self._val
    def __repr__(self):
        return pprint.pformat(self.__dict__)

    
class Expr:
    def __init__(self, expr):
        self._filter = Predicate(expr['filter'])
        self._order = expr['order']
    def is_empty(self):
        return not self._filter and not self._order
    @property
    def filter(self):
        if not self._filter: return None
        return self._filter
    @property
    def order(self):
        if not self._order: return None
        return self._order
    @property
    def subs(self):
        return self._subs
    def __repr__(self):
        return pprint.pformat(self.__dict__)
    
Expr({
    'filter': {
        'or': {
            'name': 'Huy',
            'age': 1
        }
    },
    'order': ['name', 'age']
})

{'_filter': {'_subs': [{'_key': 'name', '_type': 'atom', '_val': 'Huy'},
           {'_key': 'age', '_type': 'atom', '_val': 1}],
 '_type': 'compound'},
 '_order': ['name', 'age']}

In [549]:
def field_name_to_source(name):
    return {
        'name': source_a,
        'age': source_b
    }[name]

In [572]:
expr = Expr({
    'filter': {
        'name': 'Huy'
    },
    'order': ['name', 'age']
})

# TODO:
def step(observable, ordered_ids):
    """Iteratively reduce the query Expr to a single Observable"""
    if observable: raise Exception("Unimplemented")
    if (expr.is_empty()):
        return streamed['observable']
    else:
        if not expr.filter and expr.order: raise Exception('handle later')
        if expr.filter:
            if expr.filter.type == 'compound':
                fst_key = liste(expr.filter.subs.keys())[0]
                sub_pred = expr.filter.subs[0]
                query = field_name_to_source(fst_key).filter({sub_pred.key: sub_pred.val})
                del expr.filter.subs[0]
            else:
                query = field_name_to_source(expr.filter.key).filter({expr.filter.key: expr.filter.val})
        if expr.order:
            query = query.order_by(expr.order[0])
    print('Crafted a query')
    return step(
        rx_query(query),
        # TODO: cache
        rx_query(query).map(lambda entry: entry['id']))

step(None)

TypeError: step() missing 1 required positional argument: 'ordered_ids'