Skip to content

Commit

Permalink
Fixed AsyncIOExecutor, was using default loop. Implemented docs/guide…
Browse files Browse the repository at this point in the history
…/test_database.py in async style as docs/guide/test_asyncio.py module. Running tests in docs using Pi with Python 3.5 to use async/await syntax.
  • Loading branch information
vmagamedov committed Dec 2, 2016
1 parent e15e467 commit c44bad1
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 7 deletions.
227 changes: 227 additions & 0 deletions docs/guide/test_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import uuid
import pytest
import asyncio

# setup storage

import aiopg.sa

from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, ForeignKey, select


metadata = MetaData()

character_table = Table(
'character',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('species', String),
)

actor_table = Table(
'actor',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('character_id', ForeignKey('character.id'), nullable=False),
)

# setup test environment

from sqlalchemy.sql.ddl import CreateTable


async def init_db(pg_dsn, *, loop):
db_name = 'test_{}'.format(uuid.uuid4().hex)
async with aiopg.sa.create_engine(pg_dsn, loop=loop) as db_engine:
async with db_engine.acquire() as conn:
await conn.execute('CREATE DATABASE {0}'.format(db_name))
return db_name


async def setup_db(db_dsn, *, loop):
async with aiopg.sa.create_engine(db_dsn, loop=loop) as db_engine:
async with db_engine.acquire() as conn:
await conn.execute(CreateTable(character_table))
await conn.execute(CreateTable(actor_table))

await conn.execute(character_table.insert().values([
dict(id=1, name='James T. Kirk', species='Human'),
dict(id=2, name='Spock', species='Vulcan/Human'),
dict(id=3, name='Leonard McCoy', species='Human'),
]))
await conn.execute(actor_table.insert().values([
dict(id=1, character_id=1, name='William Shatner'),
dict(id=2, character_id=2, name='Leonard Nimoy'),
dict(id=3, character_id=3, name='DeForest Kelley'),
dict(id=4, character_id=1, name='Chris Pine'),
dict(id=5, character_id=2, name='Zachary Quinto'),
dict(id=6, character_id=3, name='Karl Urban'),
]))


async def drop_db(pg_dsn, db_name, *, loop):
async with aiopg.sa.create_engine(pg_dsn, loop=loop) as db_engine:
async with db_engine.acquire() as conn:
await conn.execute('DROP DATABASE {0}'.format(db_name))


@pytest.fixture(scope='session', name='db_dsn')
def db_dsn_fixture(request):
loop = asyncio.get_event_loop()

pg_dsn = 'postgresql://postgres:postgres@postgres:5432/postgres'
db_name = loop.run_until_complete(init_db(pg_dsn, loop=loop))

db_dsn = 'postgresql://postgres:postgres@postgres:5432/{}'.format(db_name)
loop.run_until_complete(setup_db(db_dsn, loop=loop))

def fin():
loop.run_until_complete(drop_db(pg_dsn, db_name, loop=loop))

request.addfinalizer(fin)
return db_dsn

# define graph

from hiku.graph import Graph, Root, Node, Link
from hiku.types import TypeRef, Sequence
from hiku.engine import pass_context
from hiku.sources import aiopg as sa

SA_ENGINE_KEY = 'sa-engine'

character_query = sa.FieldsQuery(SA_ENGINE_KEY, character_table)

actor_query = sa.FieldsQuery(SA_ENGINE_KEY, actor_table)

character_to_actors_query = sa.LinkQuery(Sequence[TypeRef['actor']], SA_ENGINE_KEY,
from_column=actor_table.c.character_id,
to_column=actor_table.c.id)


async def direct_link(ids):
return ids


@pass_context
async def to_characters_query(ctx):
query = select([character_table.c.id])
async with ctx[SA_ENGINE_KEY].acquire() as conn:
rows = await conn.execute(query)
return [row.id for row in rows]


@pass_context
async def to_actors_query(ctx):
query = select([actor_table.c.id])
async with ctx[SA_ENGINE_KEY].acquire() as conn:
rows = await conn.execute(query)
return [row.id for row in rows]


GRAPH = Graph([
Node('character', [
sa.Field('id', character_query),
sa.Field('name', character_query),
sa.Field('species', character_query),
sa.Link('actors', character_to_actors_query, requires='id'),
]),
Node('actor', [
sa.Field('id', actor_query),
sa.Field('name', actor_query),
sa.Field('character_id', actor_query),
Link('character', TypeRef['character'],
direct_link, requires='character_id'),
]),
Root([
Link('characters', Sequence[TypeRef['character']],
to_characters_query, requires=None),
Link('actors', Sequence[TypeRef['actor']],
to_actors_query, requires=None),
]),
])

# test graph

from hiku.engine import Engine
from hiku.result import denormalize
from hiku.readers.simple import read
from hiku.executors.asyncio import AsyncIOExecutor


async def execute(hiku_engine, sa_engine, graph, query_string):
query = read(query_string)
result = await hiku_engine.execute(graph, query, {SA_ENGINE_KEY: sa_engine})
return denormalize(graph, result, query)


@pytest.mark.asyncio(forbid_global_loop=True)
async def test_character_to_actors(db_dsn, event_loop):
hiku_engine = Engine(AsyncIOExecutor(event_loop))
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
result = await execute(hiku_engine, sa_engine, GRAPH,
'[{:characters [:name {:actors [:name]}]}]')
assert result == {
'characters': [
{
'name': 'James T. Kirk',
'actors': [
{'name': 'William Shatner'},
{'name': 'Chris Pine'},
],
},
{
'name': 'Spock',
'actors': [
{'name': 'Leonard Nimoy'},
{'name': 'Zachary Quinto'},
],
},
{
'name': 'Leonard McCoy',
'actors': [
{'name': 'DeForest Kelley'},
{'name': 'Karl Urban'},
],
},
],
}


@pytest.mark.asyncio(forbid_global_loop=True)
async def test_actor_to_character(db_dsn, event_loop):
hiku_engine = Engine(AsyncIOExecutor(event_loop))
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
result = await execute(hiku_engine, sa_engine, GRAPH,
'[{:actors [:name {:character [:name]}]}]')
assert result == {
'actors': [
{
'name': 'William Shatner',
'character': {'name': 'James T. Kirk'},
},
{
'name': 'Leonard Nimoy',
'character': {'name': 'Spock'},
},
{
'name': 'DeForest Kelley',
'character': {'name': 'Leonard McCoy'},
},
{
'name': 'Chris Pine',
'character': {'name': 'James T. Kirk'},
},
{
'name': 'Zachary Quinto',
'character': {'name': 'Spock'},
},
{
'name': 'Karl Urban',
'character': {'name': 'Leonard McCoy'},
},
],
}
1 change: 1 addition & 0 deletions hiku/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def submit(self, fn, *args, **kwargs):
def process(self, queue, workflow):
while queue.__futures__:
done, _ = yield from wait(queue.__futures__,
loop=self._loop,
return_when=FIRST_COMPLETED)
queue.progress(done)
return workflow.result()
10 changes: 6 additions & 4 deletions pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
- raw: apk add --no-cache --virtual .build-deps python3-dev postgresql-dev musl-dev gcc
- pip: name={{item}} executable=pip3 extra_args='--no-cache-dir'
with_items:
- pytest==3.0.2
- sqlalchemy==1.0.15
- pytest==3.0.4
- pytest-asyncio==0.5.0
- graphql-core==1.0.1
- sqlalchemy==1.1.4
- psycopg2==2.6.2
- aiopg==0.10.0
- aiopg==0.12.0
- raw: apk del .build-deps

- !Image
Expand Down Expand Up @@ -56,7 +58,7 @@
requires:
- postgres
params:
- !Argument {name: tests, default: tests3}
- !Argument {name: tests, default: "tests3 docs"}
image: env
eval: py.test -q --tb=native {{tests}}

Expand Down
12 changes: 10 additions & 2 deletions tests3/test_source_aiopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import sqlalchemy
import psycopg2.extensions

from pytest_asyncio.plugin import ForbiddenEventLoopPolicy

from hiku.utils import cached_property
from hiku.engine import Engine
from hiku.sources import aiopg as sa
Expand Down Expand Up @@ -93,5 +95,11 @@ def _check(self, src, value, event_loop):
yield from sa_engine.wait_closed()

def check(self, src, value):
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(self._check(src, value, event_loop))
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
asyncio.set_event_loop_policy(ForbiddenEventLoopPolicy())
try:
loop.run_until_complete(self._check(src, value, loop))
finally:
loop.close()
asyncio.set_event_loop_policy(policy)
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ deps =
astor

[testenv:py34]
commands = py.test -q --tb=native tests docs
commands = py.test -q --tb=native tests

[testenv:flake8]
commands = flake8 hiku tests tests3 setup.py
Expand Down

0 comments on commit c44bad1

Please sign in to comment.