From 6a96bb96b4281dcf6f3ea8c59a3b5b5b25916c73 Mon Sep 17 00:00:00 2001 From: Diogo Dutra Date: Wed, 7 Dec 2016 18:33:25 -0200 Subject: [PATCH] created a new class RedisObject to use on engines cores to exports. Did some renames and create common codes --- myreco/engines/cores/base.py | 41 ++++++++++ .../cores/top_seller/{engine.py => core.py} | 74 +++++++++---------- myreco/engines/cores/utils.py | 16 +--- myreco/engines/models.py | 14 ++-- myreco/placements/models.py | 10 +-- myreco/utils.py | 9 +++ myreco/version.py | 2 +- tests/conftest.py | 2 +- .../engines/test_engines_integration.py | 56 +++++++------- .../test_items_types_integration.py | 4 +- .../placements/test_placements_integration.py | 4 +- .../slots/test_slots_integration.py | 4 +- 12 files changed, 135 insertions(+), 101 deletions(-) rename myreco/engines/cores/top_seller/{engine.py => core.py} (57%) diff --git a/myreco/engines/cores/base.py b/myreco/engines/cores/base.py index a927df9..f41e6d1 100644 --- a/myreco/engines/cores/base.py +++ b/myreco/engines/cores/base.py @@ -23,11 +23,17 @@ from falconswagger.models.base import build_validator, get_module_path from falconswagger.mixins import LoggerMixin +from falconswagger.json_builder import JsonBuilder from myreco.engines.cores.items_indices_map import ItemsIndicesMap +from myreco.engines.cores.utils import build_engine_data_path, build_engine_key_prefix from jsonschema import Draft4Validator from abc import ABCMeta, abstractmethod from bottleneck import argpartition +from glob import glob import msgpack +import os.path +import csv +import gzip class EngineError(Exception): @@ -103,6 +109,24 @@ def _get_best_indices(self, rec_vector, max_recos): def export_objects(self, session): pass + def _build_csv_readers(self, pattern, delimiter='#'): + path = build_engine_data_path(self.engine) + readers = [] + for filename in glob(os.path.join(path, '{}*.gz'.format(pattern))): + csv_file = gzip.open(filename, 'rt') + readers.append(csv.DictReader(csv_file, delimiter=delimiter)) + return readers + + def _get_items_indices_map_dict(self, items_indices_map, session): + items_indices_map = items_indices_map.get_all(session) + + if not items_indices_map: + raise EngineError( + "The Indices Map for '{}' is empty. Please update these items" + .format(self._engine_core.engine['item_type']['name'])) + + return items_indices_map + class AbstractDataImporter(metaclass=ABCMeta): @@ -112,3 +136,20 @@ def __init__(self, engine): @abstractmethod def get_data(cls, items_indices_map, session): pass + + +class RedisObjectBase(LoggerMixin): + + def __init__(self, engine_core): + self._build_logger() + self._engine_core = engine_core + self._redis_key = build_engine_key_prefix(self._engine_core.engine) + + + def _set_item_values(self, item): + for k in item: + schema = self._engine_core.engine['item_type']['schema']['properties'].get(k) + if schema is None: + raise EngineError('Invalid Item {}'.format(item)) + + item[k] = JsonBuilder(item[k], schema) diff --git a/myreco/engines/cores/top_seller/engine.py b/myreco/engines/cores/top_seller/core.py similarity index 57% rename from myreco/engines/cores/top_seller/engine.py rename to myreco/engines/cores/top_seller/core.py index 2fa8ade..56be82c 100644 --- a/myreco/engines/cores/top_seller/engine.py +++ b/myreco/engines/cores/top_seller/core.py @@ -21,8 +21,7 @@ # SOFTWARE. -from myreco.engines.cores.base import EngineCore, EngineError -from myreco.engines.cores.utils import build_csv_readers, build_engine_data_path +from myreco.engines.cores.base import EngineCore, EngineError, RedisObjectBase from falconswagger.models.base import get_model_schema from falconswagger.json_builder import JsonBuilder from concurrent.futures import ThreadPoolExecutor @@ -31,38 +30,49 @@ import zlib -class TopSellerEngine(EngineCore): +class TopSellerEngineCore(EngineCore): __configuration_schema__ = get_model_schema(__file__) def export_objects(self, session, items_indices_map): - data_path = build_engine_data_path(self.engine) - readers = build_csv_readers(data_path, 'top_seller') + readers = self._build_csv_readers('top_seller') + items_indices_map_dict = self._get_items_indices_map_dict(items_indices_map, session) - top_seller_vector = self._build_top_seller_vector(readers, items_indices_map, session) - redis_key = self._build_redis_key() - session.redis_bind.set(redis_key, zlib.compress(top_seller_vector.tobytes())) + top_seller = TopSellerRedisObject(self) + top_seller.update(readers, session, items_indices_map_dict) - result = sorted(enumerate(top_seller_vector), key=(lambda x: (x[1], x[0])), reverse=True) + result = sorted( + enumerate(top_seller.numpy_array), key=(lambda x: (x[1], x[0])), reverse=True) indices_items_map = items_indices_map.get_indices_items_map(session) return [{self._format_output(indices_items_map, r): int(r[1])} for r in result] def _format_output(self, indices_items_map, r): return ' | '.join([str(i) for i in eval(indices_items_map[r[0]])]) - def _build_top_seller_vector(self, readers, items_indices_map, session): - error_message = "No data found for engine '{}'".format(self.engine['name']) + def _build_rec_vector(self, session, **variables): + return TopSellerRedisObject(self).get_numpy_array(session) + rec_vector = session.redis_bind.get(redis_key) + if rec_vector: + return np.fromstring(zlib.decompress(rec_vector), dtype=np.int32) + + +class TopSellerRedisObject(RedisObjectBase): + + def update(self, readers, session, items_indices_map_dict): + self._build_top_seller_vector(readers, session, items_indices_map_dict) + session.redis_bind.set( + self._redis_key, + zlib.compress(self.numpy_array.tobytes()) + ) + + def _build_top_seller_vector(self, readers, session, items_indices_map_dict): + error_message = "No data found for engine '{}'".format(self._engine_core.engine['name']) if not len(readers): raise EngineError(error_message) - executor = ThreadPoolExecutor(len(readers)) - jobs = [] indices_values_map = dict() - for reader in readers: - job = executor.submit(self._set_indices_values_map, indices_values_map, - reader, items_indices_map, session) - jobs.append(job) - [job.result() for job in jobs] + for reader in readers: + self._set_indices_values_map(indices_values_map, items_indices_map_dict, reader) if not indices_values_map: raise EngineError(error_message) @@ -70,33 +80,17 @@ def _build_top_seller_vector(self, readers, items_indices_map, session): vector = np.zeros(max(indices_values_map.keys())+1, dtype=np.int32) indices = np.array(list(indices_values_map.keys()), dtype=np.int32) vector[indices] = np.array(list(indices_values_map.values()), dtype=np.int32) + self.numpy_array = vector - return vector - - def _set_indices_values_map(self, indices_values_map, reader, items_indices_map, session): - items_indices_map = items_indices_map.get_all(session) - - if not items_indices_map: - raise EngineError( - "The Indices Map for '{}' is empty. Please update these items" - .format(self.engine['item_type']['name'])) - + def _set_indices_values_map(self, indices_values_map, items_indices_map_dict, reader): for line in reader: value = line.pop('value') - for k in line: - schema = self.engine['item_type']['schema']['properties'].get(k) - if schema is None: - raise EngineError('Invalid Line {}'.format(line)) - - line[k] = JsonBuilder(line[k], schema) - index = items_indices_map.get(line) + self._set_item_values(line) + index = items_indices_map_dict.get(line) if index is not None: indices_values_map[int(index)] = int(value) - def _build_redis_key(self): - return '{}_{}'.format(self.engine['core']['name'], self.engine['id']) - - def _build_rec_vector(self, session, **variables): - rec_vector = session.redis_bind.get(self._build_redis_key()) + def get_numpy_array(self, session): + rec_vector = session.redis_bind.get(self._redis_key) if rec_vector: return np.fromstring(zlib.decompress(rec_vector), dtype=np.int32) diff --git a/myreco/engines/cores/utils.py b/myreco/engines/cores/utils.py index c2c42cc..316964f 100644 --- a/myreco/engines/cores/utils.py +++ b/myreco/engines/cores/utils.py @@ -20,22 +20,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from glob import glob -from collections import namedtuple -from itertools import starmap + import os.path -import csv -import gzip -def build_csv_readers(path, pattern='', delimiter='#'): - readers = [] - for filename in glob(os.path.join(path, '{}*.gz'.format(pattern))): - csv_file = gzip.open(filename, 'rt') - readers.append(csv.DictReader(csv_file, delimiter=delimiter)) - return readers +def build_engine_key_prefix(engine): + return 'engine_{}_{}'.format(engine['id'], engine['core']['name']) def build_engine_data_path(engine): - engine_path = 'engine_{}_{}'.format(engine['id'], engine['core']['name']) + engine_path = build_engine_key_prefix(engine) return os.path.join(engine['store']['configuration']['data_path'], engine_path) diff --git a/myreco/engines/models.py b/myreco/engines/models.py index eb69e2d..caa6fe6 100644 --- a/myreco/engines/models.py +++ b/myreco/engines/models.py @@ -25,7 +25,7 @@ from falconswagger.exceptions import ModelBaseError from myreco.engines.cores.items_indices_map import ItemsIndicesMap from myreco.items_types.models import build_item_key -from myreco.utils import ModuleClassLoader +from myreco.utils import ModuleClassLoader, get_items_model_from_api from types import MethodType, FunctionType from jsonschema import ValidationError from sqlalchemy.ext.declarative import AbstractConcreteBase, declared_attr @@ -82,8 +82,13 @@ def core_instance(self): def _set_core_instance(self): core_class_ = ModuleClassLoader.load(self.core.configuration['core_module']) + self_dict = self._build_self_dict() + items_model = get_items_model_from_api(type(self).__api__, self_dict) + self._core_instance = core_class_(self_dict, items_model) + + def _build_self_dict(self): todict_schema = {'variables': False} - self._core_instance = core_class_(self.todict(todict_schema)) + return self.todict(todict_schema) def _setattr(self, attr_name, value, session, input_): if attr_name == 'configuration': @@ -135,9 +140,8 @@ def _get_engine(cls, req, job_session): @classmethod def _build_items_indices_map(cls, engine): - item_collection_model = cls.__api__.models[build_item_key(engine.item_type.name)] - item_model = item_collection_model.__models__[engine.store_id] - return ItemsIndicesMap(item_model) + items_model = get_items_model_from_api(cls.__api__, engine._build_self_dict()) + return ItemsIndicesMap(items_model) @classmethod def _build_data_importer(cls, engine): diff --git a/myreco/placements/models.py b/myreco/placements/models.py index 9eecd7d..95c0804 100644 --- a/myreco/placements/models.py +++ b/myreco/placements/models.py @@ -25,8 +25,7 @@ from falconswagger.json_builder import JsonBuilder from falconswagger.exceptions import ModelBaseError from myreco.engines.cores.filters.factory import FiltersFactory -from myreco.items_types.models import build_item_key -from myreco.utils import ModuleClassLoader +from myreco.utils import ModuleClassLoader, get_items_model_from_api from falcon.errors import HTTPNotFound from sqlalchemy.ext.declarative import AbstractConcreteBase, declared_attr import random as random_ @@ -133,7 +132,7 @@ def _get_placement(cls, req, resp): @classmethod def _get_recos_by_slot(cls, slot, input_variables, session, show_details, max_recos=None): engine = slot['engine'] - items_model = cls._get_items_model(engine) + items_model = get_items_model_from_api(cls.__api__, engine) engine_vars, filters = \ cls._get_variables_and_filters(slot, items_model, input_variables) core_config = engine['core']['configuration']['core_module'] @@ -143,11 +142,6 @@ def _get_recos_by_slot(cls, slot, input_variables, session, show_details, max_re return core_instance.get_recommendations( session, filters, max_recos, show_details, **engine_vars) - @classmethod - def _get_items_model(cls, engine): - items_types_model_key = build_item_key(engine['item_type']['name']) - return cls.__api__.models[items_types_model_key].__models__[engine['store_id']] - @classmethod def _get_variables_and_filters(cls, slot, items_model, input_variables): engine_vars = dict() diff --git a/myreco/utils.py b/myreco/utils.py index 4dee097..81bd0d7 100644 --- a/myreco/utils.py +++ b/myreco/utils.py @@ -22,6 +22,7 @@ from falconswagger.exceptions import ModelBaseError +from myreco.items_types.models import build_item_key from importlib import import_module @@ -44,3 +45,11 @@ def load(cls, config): config['path'], config['class_name']) return class_ + + +def get_items_model_from_api(api, engine): + if api: + items_types_model_key = build_item_key(engine['item_type']['name']) + items_collection = api.models.get(items_types_model_key) + if items_collection: + return items_collection.__models__.get(engine['store_id']) diff --git a/myreco/version.py b/myreco/version.py index 110a42b..08af395 100644 --- a/myreco/version.py +++ b/myreco/version.py @@ -21,4 +21,4 @@ # SOFTWARE. -VERSION = '0.14.0' +VERSION = '0.14.1' diff --git a/tests/conftest.py b/tests/conftest.py index 522e80f..14131a4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,7 +26,7 @@ import pytest import logging.config -logging.config.dictConfig({'version': 1, 'root': {'level': 'CRITICAL'}}) +logging.config.dictConfig({'version': 1, 'root': {'level': 'INFO'}}) ROOT_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(ROOT_PATH, '..')) diff --git a/tests/integration/engines/test_engines_integration.py b/tests/integration/engines/test_engines_integration.py index 15c2d01..0d4a6bd 100644 --- a/tests/integration/engines/test_engines_integration.py +++ b/tests/integration/engines/test_engines_integration.py @@ -62,8 +62,8 @@ def app(session): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } } @@ -150,8 +150,8 @@ def test_post(self, client, headers): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } } @@ -205,8 +205,8 @@ def test_get(self, client, headers): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } } @@ -375,8 +375,8 @@ def test_get(self, client, headers): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } } @@ -426,8 +426,8 @@ def data_importer_app(session): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -574,8 +574,8 @@ def objects_exporter_app(session): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -616,7 +616,7 @@ def objects_exporter_client(objects_exporter_app): return Client(objects_exporter_app) -@mock.patch('myreco.engines.cores.top_seller.engine.build_csv_readers') +@mock.patch('myreco.engines.cores.base.EngineCore._build_csv_readers') @mock.patch('falconswagger.models.base.random.getrandbits', new=mock.MagicMock(return_value=131940827655846590526331314439483569710)) class TestEnginesModelsObjectsExporter(object): @@ -692,7 +692,7 @@ def test_exporter_get_with_error( } -@mock.patch('myreco.engines.cores.top_seller.engine.build_csv_readers') +@mock.patch('myreco.engines.cores.base.EngineCore._build_csv_readers') @mock.patch('falconswagger.models.base.random.getrandbits', new=mock.MagicMock(return_value=131940827655846590526331314439483569710)) class TestEnginesModelsObjectsExporterWithImport(object): @@ -837,8 +837,8 @@ def test_post_with_invalid_grant(self, client): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } }] @@ -851,8 +851,8 @@ def test_post(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -884,8 +884,8 @@ def test_get(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -932,8 +932,8 @@ def test_patch_with_invalid_configuration(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -975,8 +975,8 @@ def test_patch(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -1008,8 +1008,8 @@ def test_delete(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', @@ -1044,8 +1044,8 @@ def test_get(self, client, headers): 'name': 'top_seller2', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', diff --git a/tests/integration/items_types/test_items_types_integration.py b/tests/integration/items_types/test_items_types_integration.py index 2ccd195..b737f31 100644 --- a/tests/integration/items_types/test_items_types_integration.py +++ b/tests/integration/items_types/test_items_types_integration.py @@ -780,8 +780,8 @@ def filters_updater_app(redis, session): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' } } } diff --git a/tests/integration/placements/test_placements_integration.py b/tests/integration/placements/test_placements_integration.py index 11ef361..707cbe9 100644 --- a/tests/integration/placements/test_placements_integration.py +++ b/tests/integration/placements/test_placements_integration.py @@ -88,8 +88,8 @@ def app(redis, session, temp_dir): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models', diff --git a/tests/integration/slots/test_slots_integration.py b/tests/integration/slots/test_slots_integration.py index bb5df93..f156b7a 100644 --- a/tests/integration/slots/test_slots_integration.py +++ b/tests/integration/slots/test_slots_integration.py @@ -68,8 +68,8 @@ def app(session): 'name': 'top_seller', 'configuration': { 'core_module': { - 'path': 'myreco.engines.cores.top_seller.engine', - 'class_name': 'TopSellerEngine' + 'path': 'myreco.engines.cores.top_seller.core', + 'class_name': 'TopSellerEngineCore' }, 'data_importer_module': { 'path': 'tests.integration.fixtures_models',