Skip to content

Commit

Permalink
get rid of asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
Ledoux committed Mar 25, 2021
1 parent e15195a commit 6ec6692
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 69 deletions.
10 changes: 3 additions & 7 deletions sqlalchemy_api_handler/serialization/as_dict.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from functools import partial, singledispatch
from typing import Callable, Iterable, Set, List
from sqlalchemy.orm.collections import InstrumentedList

from sqlalchemy_api_handler.api_handler import ApiHandler
from sqlalchemy_api_handler.serialization.serialize import serialize
from sqlalchemy_api_handler.utils.asynchronous import async_map
from sqlalchemy_api_handler.utils.asynchronous import async_map as default_async_map


def exclusive_includes_from(entity, includes):
Expand Down Expand Up @@ -34,16 +33,13 @@ def as_dict_for_intrumented_list(entities,
includes: Iterable = None,
mode: str = 'columns-and-includes',
use_async: bool=False):
if async_map is None:
async_map = default_async_map
not_deleted_entities = filter(lambda x: not x.is_soft_deleted(), entities)
dictify = partial(as_dict,
async_map=async_map,
includes=includes,
mode=mode)

if use_async and async_map is None:
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(dictify, not_deleted_entities))

map_method = async_map if use_async else map
return list(map_method(dictify, not_deleted_entities))

Expand Down
84 changes: 34 additions & 50 deletions sqlalchemy_api_handler/utils/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# https://medium.com/hackernoon/how-to-run-asynchronous-web-requests-in-parallel-with-python-3-5-without-aiohttp-264dc0f8546
from time import sleep
from itertools import chain
from functools import partial
import asyncio
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -11,41 +12,13 @@ def chunks_from(elements, chunk_by=None):
yield elements[index:index + length]


def create_asynchronous(func,
args_list=None,
executor_class=None,
kwargs_list=None,
max_workers=10,
use_multiprocessing=False):
if args_list and kwargs_list:
if len(args_list) != len(kwargs_list):
raise 'args_list and kwargs_list must have the same length.'
elif args_list and not kwargs_list:
kwargs_list = [{}]*len(args_list)
elif kwargs_list:
args_list = [()]*len(kwargs_list)

if executor_class is None:
executor_class = ThreadPoolExecutor

async def asynchronous_func():
loop = asyncio.get_event_loop()
with executor_class(max_workers=max_workers) as executor:
tasks = [
loop.run_in_executor(executor, partial(func, **kwargs), *args)
for (args, kwargs) in zip(args_list, kwargs_list)
]
return await asyncio.gather(*tasks)
return asynchronous_func


def async_map(func,
args_list=None,
kwargs_list=None,
chunk_by=None,
executor_class=None,
max_workers=None,
sleep_between=None):
def zipped_async_map(func,
args_list=None,
kwargs_list=None,
chunk_by=None,
executor_class=None,
max_workers=10,
sleep_between=None):
if args_list and kwargs_list:
listed_args_list = list(args_list)
listed_kwargs_list = list(kwargs_list)
Expand All @@ -70,21 +43,32 @@ def async_map(func,
else:
return []

if executor_class is None:
executor_class = ThreadPoolExecutor

def asynchronous_func(args_and_kwargs):
return func(*args_and_kwargs[0], **args_and_kwargs[1])


results = []
for index in range(0, len(args_chunks)):
asynchronous_func = create_asynchronous(func,
args_list=args_chunks[index] if args_chunks else None,
kwargs_list=kwargs_chunks[index] if kwargs_chunks else None,
executor_class=executor_class,
max_workers=max_workers)
future = asyncio.ensure_future(asynchronous_func())
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
results += future.result()
if sleep_between:
sleep(sleep_between)
return results
with executor_class(max_workers=max_workers) as executor:
for index in range(0, len(args_chunks)):
args_list = args_chunks[index] if args_chunks else None
kwargs_list = kwargs_chunks[index] if kwargs_chunks else None
if args_list and kwargs_list:
if len(args_list) != len(kwargs_list):
raise 'args_list and kwargs_list must have the same length.'
elif args_list and not kwargs_list:
kwargs_list = [{}]*len(args_list)
elif kwargs_list:
args_list = [()]*len(kwargs_list)
results = chain(results,
executor.map(asynchronous_func,
zip(args_list, kwargs_list)))
if sleep_between:
sleep(sleep_between)
return results


def async_map_with_one_arg(func, args):
return async_map(func, [(arg,) for arg in args])
def async_map(func, *lists):
return zipped_async_map(func, zip(*lists))
20 changes: 10 additions & 10 deletions tests/serialization/as_dict_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from concurrent.futures import ThreadPoolExecutor

from sqlalchemy_api_handler.serialization import as_dict
from sqlalchemy_api_handler.utils import async_map_with_one_arg
from api.models.offer import Offer
from api.models.offer_tag import OfferTag
from api.models.stock import Stock
Expand Down Expand Up @@ -150,12 +149,13 @@ def test_dictify_with_custom_async_map(self, app):

# when
includes = [{ 'key': '|offerTags', 'includes': ['tag'] }]
offer_dict = as_dict(offer,
async_map=async_map_with_one_arg,
includes=includes)

# then
assert len(offer_dict['offerTags']) == offer_tags_count
for index in range(0, offer_tags_count):
assert offer_dict['offerTags'][index]['tag']['label'] == str(index)
assert offer_dict['offerTags'][index]['tag']['sleptFoo'] == 0
with ThreadPoolExecutor(max_workers=5) as executor:
offer_dict = as_dict(offer,
async_map=executor.map,
includes=includes)

# then
assert len(offer_dict['offerTags']) == offer_tags_count
for index in range(0, offer_tags_count):
assert offer_dict['offerTags'][index]['tag']['label'] == str(index)
assert offer_dict['offerTags'][index]['tag']['sleptFoo'] == 0
77 changes: 77 additions & 0 deletions tests/utils/asynchronous_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest

from sqlalchemy_api_handler.utils.asynchronous import async_map, zipped_async_map


class AsynchronousTest:
def test_zipped_async_map_with_args_list(self, app):
# Given
first_call_a = 1
first_call_b = 2
second_call_a = 3
second_call_b = 4
def add(a, b):
return a + b

# When
results = list(zipped_async_map(add, [(first_call_a, first_call_b), (second_call_a, second_call_b)]))

# Then
assert results[0] == first_call_a + first_call_b
assert results[1] == second_call_a + second_call_b

def test_zipped_async_map_with_args_list_and_kwargs_list(self, app):
# Given
first_call_a = 1
first_call_b = 2
second_call_a = 3
second_call_b = 4
def op(a, b, action='add'):
if action == 'add':
return a + b
elif action == 'substract':
return a - b
return None

# When
results = list(zipped_async_map(op,
[(first_call_a, first_call_b), (second_call_a, second_call_b)],
[{ 'action': 'add' }, { 'action': 'substract'}]))

# Then
assert results[0] == first_call_a + first_call_b
assert results[1] == second_call_a - second_call_b


def test_map(self, app):
# Given
first_call_a = 1
first_call_b = 2
second_call_a = 3
second_call_b = 4
def add(a, b):
return a + b

# When
results = list(map(add, (first_call_a, second_call_a), (first_call_b, second_call_b)))

# Then
assert results[0] == first_call_a + first_call_b
assert results[1] == second_call_a + second_call_b


def test_async_map(self, app):
# Given
first_call_a = 1
first_call_b = 2
second_call_a = 3
second_call_b = 4
def add(a, b):
return a + b

# When
results = list(async_map(add, (first_call_a, second_call_a), (first_call_b, second_call_b)))

# Then
assert results[0] == first_call_a + first_call_b
assert results[1] == second_call_a + second_call_b
4 changes: 2 additions & 2 deletions tests/utils/datum_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pytest

from sqlalchemy_api_handler.utils import nesting_datum_from
from sqlalchemy_api_handler.utils.datum import nesting_datum_from


class UtilsTest:
class DatumTest:
def test_nesting_datum(self, app):
# Given
datum = {
Expand Down

0 comments on commit 6ec6692

Please sign in to comment.