In [1]:
import sys
print('Python {}'.format(sys.version))

Python 3.7.0 (default, Sep 13 2018, 20:15:46) 
[GCC 7.3.0]


In [2]:
# import crossbar-zlmdb and open event history example database
import zlmdb
from cfxdb import Schema

# when running notebook inside docker
DBFILE = '../testdb'

# when running notebook directly on host
DBFILE = '../../crossbar/.testdb'

db = zlmdb.Database(DBFILE, maxsize=2**30, readonly=True)

schema = Schema.attach(db)

print(schema)

<cfxdb.schema.Schema object at 0x7f1438491eb8>


In [3]:
# get a single event from event history by publication ID

with db.begin() as txn:
    # get the first event ID
    for pub in schema.publications.select(txn, limit=1, return_keys=False):
        print(pub.publication)
        print(pub.topic)
        print(pub.publisher)
        print(pub.args)
        print(pub.kwargs)
        print(pub.payload)
        #print(pub.marshal())
        publisher = schema.sessions[txn, pub.publisher]
        print(publisher.authid)
        print(publisher.joined_at)
        print(publisher.left_at)
        

66339941290
com.example.geoservice.alert.5.24
2475181101487056
[{'category': 'alert', 'x': 5, 'y': 24, 'value1': b'\x15H\xb6&3a\x80\xe6m!n\x85@c\x80\x01', 'value2': 0.6736680899470212, 'value3': 'SJVW-YP7W-3K6A', 'i': 29952, 'j': 2}, 29952]
{'value3': 'SJVW-YP7W-3K6A'}
None
7M7C-JSKK-9WRN-99HN-W6MF-YVCH
1540491042079502723
0


In [5]:
# count all events in event history

with db.begin() as txn:
    cnt = schema.sessions.count(txn)
    print('sessions: {}'.format(cnt))

    cnt = schema.publications.count(txn)
    print('publications: {}'.format(cnt))

    cnt = schema.events.count(txn)
    print('events: {}'.format(cnt))

sessions: 40
publications: 87140
events: 427547


In [24]:
# count all events, grouped by authid

from pprint import pprint
authids = {}

with db.begin() as txn:
    for evt in schema.events.select(txn, limit=10000, return_keys=False):
        rec = schema.sessions[txn, evt.receiver]
        if rec.authid not in authids:
            authids[rec.authid] = 0
        authids[rec.authid] += 1

pprint(authids)

{'3QXK-QNUP-PMC4-M5YA-TR5G-CG5E': 309,
 '4HFC-L4HH-HQMR-6PJY-VPHL-YL5M': 168,
 '6FFF-9F9S-W673-GMQ5-3C99-44A6': 200,
 '7M7C-JSKK-9WRN-99HN-W6MF-YVCH': 1494,
 '7MEX-KJ5X-KKAK-Q5KU-9QLU-GYG9': 17,
 '7YSE-VEN7-7X3R-CFHM-3T55-993P': 290,
 'A3TR-MT64-4NT6-EJJ6-XTRW-J7AT': 168,
 'ECFF-T7NL-5NS9-ERF6-7TMC-CF4E': 328,
 'ECYQ-J5F3-XJHW-4T3H-QHX6-U64N': 1511,
 'FA7X-975X-TTGN-CAKT-APE9-F6FA': 200,
 'HUNU-7FEC-M7WS-4XM9-UNTA-7QWV': 1512,
 'L6WQ-53UX-CEKX-WXU4-HFJU-5HRM': 325,
 'QCKR-LQY6-9QG7-RECU-SCVG-YLY7': 1494,
 'TKYG-H6FP-SWUT-JVLQ-W4WN-7M3G': 200,
 'U4XJ-7VYK-4NMM-QS4M-7PTE-THS3': 290,
 'WMUT-W7YR-H476-649Q-54VE-KAXT': 1494}


In [7]:
import time

def doit(verbose=False):
    res = {}
    total = 0
    sum = 0

    with db.begin() as txn:
        i = 0
        for evt in schema.publications.select(txn, limit=1000000-1, return_keys=False):
            if evt.topic.startswith('com.example.geoservice.'):
                e = evt.args[0]
                x, y, category = e['x'], e['y'], e['category']
                value1, value2, value3 = e['value1'], e['value2'], e['value3']
                d = (x, y)
                if d not in res:
                    res[d] = (0, 0)
                res[d] = (res[d][0] + 1, res[d][1] + value2)
                total += 1
                
                if False:
                    if i < 10 or res[d][0] > 14:
                        print('({}, {}): {}. category="{}"  topic="{}"'.format(x, y, res[d], category, evt.topic))
                    elif i == 10:
                        print('...')
                i += 1
    if verbose:
        pprint(res)
    return total

started = time.perf_counter()
total = doit()
ended = time.perf_counter()
recs_per_sec = int(float(total) / (ended - started))
print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))

54329 records in 0.38 secs (143500 records per sec)


In [8]:
from pprint import pprint

def doit(publishers, receivers):
    with db.begin() as txn:
        for pub in schema.publications.select(txn, limit=1000000-1, return_keys=False):
            if pub.publisher not in publishers:
                publishers[pub.publisher] = 0
            publishers[pub.publisher] += 1
            
            from_key = (pub.publication, 0, 0)
            to_key = (pub.publication + 1, 0, 0)
            for evt in schema.events.select(txn, from_key=from_key, to_key=to_key, return_keys=False):
                if evt.receiver not in receivers:
                    receivers[evt.receiver] = 0
                receivers[evt.receiver] += 1
            
    total = 0
    for x in publishers.values():
        total += x
    return total

publishers = {}
receivers = {}

started = time.perf_counter()
total = doit(publishers, receivers)
ended = time.perf_counter()
recs_per_sec = int(float(total) / (ended - started))

print('publishers:')
pprint(publishers)

print('receivers:')
pprint(receivers)

print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))


publishers:
{2361200170397101: 4993, 2475181101487056: 41567, 8760936425110809: 7769}
receivers:
{825130965066635: 4982,
 840686762535787: 7252,
 1099415647834027: 4923,
 1353882293476443: 470,
 2277162720874418: 7678,
 2475181101487056: 41027,
 2608742549618717: 41026,
 4580631653116131: 4216,
 4880811124571864: 4216,
 5932342410277093: 41026,
 6192714356302635: 6939,
 7488114861245392: 41497,
 7646455278784097: 41566,
 7983575357203379: 6939,
 8760936425110809: 7759,
 8942188816060976: 4913}
54329 records in 0.4 secs (137330 records per sec)


In [9]:
total = 0

with db.begin() as txn:
    for evt in schema.publications.select(txn, limit=100000, return_keys=False):
        if evt.topic.startswith('com.example.geoservice.'):
            total += 1

print(total)

54329


In [27]:
import re
import timeit

def test2(verbose=False):
    pat = re.compile(r'^com\.example\.geoservice\.([a-z]+)\.([0-9]+).([0-9]+)$')

    def trunc(val):
        return int(val / 25)

    started = time.perf_counter()
    total = 0

    res1 = {}
    res2 = {}
    with db.begin() as txn:
        for pub in schema.publications.select(txn, limit=1000000-1, return_keys=False):
            total += 1
            m = pat.match(pub.topic)
            if m:
                category, x, y = m.groups()

                key = (trunc(int(x)), trunc(int(y)))
                if key not in res1:
                    res1[key] = 0
                res1[key] += 1

                if category not in res2:
                    res2[category] = 0
                res2[category] += 1

    ended = time.perf_counter()

    if verbose:
        pprint(res1)
        pprint(res2)

    recs_per_sec = int(float(total) / (ended - started))
    print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))

    return recs_per_sec

test2(verbose=True)
#timeit.timeit(test2, number=10)

{(0, 0): 3237,
 (0, 1): 3338,
 (0, 2): 3384,
 (0, 3): 3312,
 (0, 4): 129,
 (1, 0): 3305,
 (1, 1): 3415,
 (1, 2): 3342,
 (1, 3): 3346,
 (1, 4): 140,
 (2, 0): 3281,
 (2, 1): 3394,
 (2, 2): 3225,
 (2, 3): 3261,
 (2, 4): 141,
 (3, 0): 3422,
 (3, 1): 3345,
 (3, 2): 3301,
 (3, 3): 3318,
 (3, 4): 136,
 (4, 0): 161,
 (4, 1): 123,
 (4, 2): 129,
 (4, 3): 140,
 (4, 4): 4}
54329 records in 0.09 secs (605889 records per sec)


605889

In [5]:
import timeit
import time
from pprint import pprint

def test3(verbose=False):
    started = time.perf_counter()
    total = 0

    res = {}
    final = {}
    with db.begin() as txn:
        for evt in schema.events.select(txn, limit=1000000-1, return_keys=False):
            total += 1
            if evt.receiver not in res:
                res[evt.receiver] = 0
            res[evt.receiver] += 1

        for receiver in res:
            session = schema.sessions[txn, receiver]
            final[session.authid] = res[receiver]

    ended = time.perf_counter()
    recs_per_sec = int(float(total) / (ended - started))

    if verbose:
        pprint(final)

    print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))

    return recs_per_sec

test3(verbose=True)
timeit.timeit(test3, number=10)

{'3QXK-QNUP-PMC4-M5YA-TR5G-CG5E': 7252,
 '4HFC-L4HH-HQMR-6PJY-VPHL-YL5M': 4216,
 '6FFF-9F9S-W673-GMQ5-3C99-44A6': 4982,
 '7M7C-JSKK-9WRN-99HN-W6MF-YVCH': 41027,
 '7MEX-KJ5X-KKAK-Q5KU-9QLU-GYG9': 470,
 '7YSE-VEN7-7X3R-CFHM-3T55-993P': 6939,
 'A3TR-MT64-4NT6-EJJ6-XTRW-J7AT': 4216,
 'ECFF-T7NL-5NS9-ERF6-7TMC-CF4E': 7759,
 'ECYQ-J5F3-XJHW-4T3H-QHX6-U64N': 41497,
 'FA7X-975X-TTGN-CAKT-APE9-F6FA': 4913,
 'HUNU-7FEC-M7WS-4XM9-UNTA-7QWV': 41566,
 'L6WQ-53UX-CEKX-WXU4-HFJU-5HRM': 7678,
 'QCKR-LQY6-9QG7-RECU-SCVG-YLY7': 41026,
 'TKYG-H6FP-SWUT-JVLQ-W4WN-7M3G': 4923,
 'U4XJ-7VYK-4NMM-QS4M-7PTE-THS3': 6939,
 'WMUT-W7YR-H476-649Q-54VE-KAXT': 41026}
266429 records in 2.7 secs (96999 records per sec)
266429 records in 3.0 secs (88332 records per sec)
266429 records in 2.7 secs (97686 records per sec)
266429 records in 2.8 secs (95656 records per sec)
266429 records in 2.8 secs (95715 records per sec)
266429 records in 2.8 secs (94560 records per sec)
266429 records in 2.8 secs (94794 records per sec)

28.558505331980996

In [None]:
%autoawait asyncio

In [None]:
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async with aiohttp.ClientSession() as session:
    html = await fetch(session, 'https://crossbar.io')
    print(html.find('Crossbar'))

In [None]:
from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions

import asyncio


component = Component(
    transports=[
        {
            "type": "websocket",
            "url": "ws://localhost:8080/ws",
            #"url": "ws://crossbar:8080/ws",
            "endpoint": {
                "type": "tcp",
                "host": "localhost",
                #"host": "crossbar",
                "port": 8080
            }
        },
    ],
    realm="realm1",
)


async def add2(x, y, details):
    print("add2(x={}, y={}, details={})".format(x, y, details))
    return x + y


@component.on_join
async def join(session, details):
    print("joined {}".format(details))
    await session.register(add2, u"foobar.add3", options=RegisterOptions(details_arg='details'))
    print("component ready!")

    # await run([component])

await component.start()

In [None]:
1

In [None]:
import lmdb

#db = lmdb.open('testdb2', readonly=True, subdir=True)
db = lmdb.open('../.testdb', readonly=True, subdir=True, lock=False)
#db = lmdb.open('.', readonly=False, subdir=True)
print(db)

In [None]:
import os
os.listdir('../../crossbar/.testdb')
#os.listdir('../.testdb')