Skip to content

Commit

Permalink
created a new class RedisObject to use on engines cores to exports. D…
Browse files Browse the repository at this point in the history
…id some renames and create common codes
  • Loading branch information
dutradda committed Dec 7, 2016
1 parent 5949e4b commit 6a96bb9
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 101 deletions.
41 changes: 41 additions & 0 deletions myreco/engines/cores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@

from falconswagger.models.base import build_validator, get_module_path
from falconswagger.mixins import LoggerMixin
from falconswagger.json_builder import JsonBuilder
from myreco.engines.cores.items_indices_map import ItemsIndicesMap
from myreco.engines.cores.utils import build_engine_data_path, build_engine_key_prefix
from jsonschema import Draft4Validator
from abc import ABCMeta, abstractmethod
from bottleneck import argpartition
from glob import glob
import msgpack
import os.path
import csv
import gzip


class EngineError(Exception):
Expand Down Expand Up @@ -103,6 +109,24 @@ def _get_best_indices(self, rec_vector, max_recos):
def export_objects(self, session):
pass

def _build_csv_readers(self, pattern, delimiter='#'):
path = build_engine_data_path(self.engine)
readers = []
for filename in glob(os.path.join(path, '{}*.gz'.format(pattern))):
csv_file = gzip.open(filename, 'rt')
readers.append(csv.DictReader(csv_file, delimiter=delimiter))
return readers

def _get_items_indices_map_dict(self, items_indices_map, session):
items_indices_map = items_indices_map.get_all(session)

if not items_indices_map:
raise EngineError(
"The Indices Map for '{}' is empty. Please update these items"
.format(self._engine_core.engine['item_type']['name']))

return items_indices_map


class AbstractDataImporter(metaclass=ABCMeta):

Expand All @@ -112,3 +136,20 @@ def __init__(self, engine):
@abstractmethod
def get_data(cls, items_indices_map, session):
pass


class RedisObjectBase(LoggerMixin):

def __init__(self, engine_core):
self._build_logger()
self._engine_core = engine_core
self._redis_key = build_engine_key_prefix(self._engine_core.engine)


def _set_item_values(self, item):
for k in item:
schema = self._engine_core.engine['item_type']['schema']['properties'].get(k)
if schema is None:
raise EngineError('Invalid Item {}'.format(item))

item[k] = JsonBuilder(item[k], schema)
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
# SOFTWARE.


from myreco.engines.cores.base import EngineCore, EngineError
from myreco.engines.cores.utils import build_csv_readers, build_engine_data_path
from myreco.engines.cores.base import EngineCore, EngineError, RedisObjectBase
from falconswagger.models.base import get_model_schema
from falconswagger.json_builder import JsonBuilder
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -31,72 +30,67 @@
import zlib


class TopSellerEngine(EngineCore):
class TopSellerEngineCore(EngineCore):
__configuration_schema__ = get_model_schema(__file__)

def export_objects(self, session, items_indices_map):
data_path = build_engine_data_path(self.engine)
readers = build_csv_readers(data_path, 'top_seller')
readers = self._build_csv_readers('top_seller')
items_indices_map_dict = self._get_items_indices_map_dict(items_indices_map, session)

top_seller_vector = self._build_top_seller_vector(readers, items_indices_map, session)
redis_key = self._build_redis_key()
session.redis_bind.set(redis_key, zlib.compress(top_seller_vector.tobytes()))
top_seller = TopSellerRedisObject(self)
top_seller.update(readers, session, items_indices_map_dict)

result = sorted(enumerate(top_seller_vector), key=(lambda x: (x[1], x[0])), reverse=True)
result = sorted(
enumerate(top_seller.numpy_array), key=(lambda x: (x[1], x[0])), reverse=True)
indices_items_map = items_indices_map.get_indices_items_map(session)
return [{self._format_output(indices_items_map, r): int(r[1])} for r in result]

def _format_output(self, indices_items_map, r):
return ' | '.join([str(i) for i in eval(indices_items_map[r[0]])])

def _build_top_seller_vector(self, readers, items_indices_map, session):
error_message = "No data found for engine '{}'".format(self.engine['name'])
def _build_rec_vector(self, session, **variables):
return TopSellerRedisObject(self).get_numpy_array(session)
rec_vector = session.redis_bind.get(redis_key)
if rec_vector:
return np.fromstring(zlib.decompress(rec_vector), dtype=np.int32)


class TopSellerRedisObject(RedisObjectBase):

def update(self, readers, session, items_indices_map_dict):
self._build_top_seller_vector(readers, session, items_indices_map_dict)
session.redis_bind.set(
self._redis_key,
zlib.compress(self.numpy_array.tobytes())
)

def _build_top_seller_vector(self, readers, session, items_indices_map_dict):
error_message = "No data found for engine '{}'".format(self._engine_core.engine['name'])
if not len(readers):
raise EngineError(error_message)

executor = ThreadPoolExecutor(len(readers))
jobs = []
indices_values_map = dict()
for reader in readers:
job = executor.submit(self._set_indices_values_map, indices_values_map,
reader, items_indices_map, session)
jobs.append(job)

[job.result() for job in jobs]
for reader in readers:
self._set_indices_values_map(indices_values_map, items_indices_map_dict, reader)

if not indices_values_map:
raise EngineError(error_message)

vector = np.zeros(max(indices_values_map.keys())+1, dtype=np.int32)
indices = np.array(list(indices_values_map.keys()), dtype=np.int32)
vector[indices] = np.array(list(indices_values_map.values()), dtype=np.int32)
self.numpy_array = vector

return vector

def _set_indices_values_map(self, indices_values_map, reader, items_indices_map, session):
items_indices_map = items_indices_map.get_all(session)

if not items_indices_map:
raise EngineError(
"The Indices Map for '{}' is empty. Please update these items"
.format(self.engine['item_type']['name']))

def _set_indices_values_map(self, indices_values_map, items_indices_map_dict, reader):
for line in reader:
value = line.pop('value')
for k in line:
schema = self.engine['item_type']['schema']['properties'].get(k)
if schema is None:
raise EngineError('Invalid Line {}'.format(line))

line[k] = JsonBuilder(line[k], schema)
index = items_indices_map.get(line)
self._set_item_values(line)
index = items_indices_map_dict.get(line)
if index is not None:
indices_values_map[int(index)] = int(value)

def _build_redis_key(self):
return '{}_{}'.format(self.engine['core']['name'], self.engine['id'])

def _build_rec_vector(self, session, **variables):
rec_vector = session.redis_bind.get(self._build_redis_key())
def get_numpy_array(self, session):
rec_vector = session.redis_bind.get(self._redis_key)
if rec_vector:
return np.fromstring(zlib.decompress(rec_vector), dtype=np.int32)
16 changes: 4 additions & 12 deletions myreco/engines/cores/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from glob import glob
from collections import namedtuple
from itertools import starmap

import os.path
import csv
import gzip


def build_csv_readers(path, pattern='', delimiter='#'):
readers = []
for filename in glob(os.path.join(path, '{}*.gz'.format(pattern))):
csv_file = gzip.open(filename, 'rt')
readers.append(csv.DictReader(csv_file, delimiter=delimiter))
return readers
def build_engine_key_prefix(engine):
return 'engine_{}_{}'.format(engine['id'], engine['core']['name'])


def build_engine_data_path(engine):
engine_path = 'engine_{}_{}'.format(engine['id'], engine['core']['name'])
engine_path = build_engine_key_prefix(engine)
return os.path.join(engine['store']['configuration']['data_path'], engine_path)
14 changes: 9 additions & 5 deletions myreco/engines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from falconswagger.exceptions import ModelBaseError
from myreco.engines.cores.items_indices_map import ItemsIndicesMap
from myreco.items_types.models import build_item_key
from myreco.utils import ModuleClassLoader
from myreco.utils import ModuleClassLoader, get_items_model_from_api
from types import MethodType, FunctionType
from jsonschema import ValidationError
from sqlalchemy.ext.declarative import AbstractConcreteBase, declared_attr
Expand Down Expand Up @@ -82,8 +82,13 @@ def core_instance(self):

def _set_core_instance(self):
core_class_ = ModuleClassLoader.load(self.core.configuration['core_module'])
self_dict = self._build_self_dict()
items_model = get_items_model_from_api(type(self).__api__, self_dict)
self._core_instance = core_class_(self_dict, items_model)

def _build_self_dict(self):
todict_schema = {'variables': False}
self._core_instance = core_class_(self.todict(todict_schema))
return self.todict(todict_schema)

def _setattr(self, attr_name, value, session, input_):
if attr_name == 'configuration':
Expand Down Expand Up @@ -135,9 +140,8 @@ def _get_engine(cls, req, job_session):

@classmethod
def _build_items_indices_map(cls, engine):
item_collection_model = cls.__api__.models[build_item_key(engine.item_type.name)]
item_model = item_collection_model.__models__[engine.store_id]
return ItemsIndicesMap(item_model)
items_model = get_items_model_from_api(cls.__api__, engine._build_self_dict())
return ItemsIndicesMap(items_model)

@classmethod
def _build_data_importer(cls, engine):
Expand Down
10 changes: 2 additions & 8 deletions myreco/placements/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
from falconswagger.json_builder import JsonBuilder
from falconswagger.exceptions import ModelBaseError
from myreco.engines.cores.filters.factory import FiltersFactory
from myreco.items_types.models import build_item_key
from myreco.utils import ModuleClassLoader
from myreco.utils import ModuleClassLoader, get_items_model_from_api
from falcon.errors import HTTPNotFound
from sqlalchemy.ext.declarative import AbstractConcreteBase, declared_attr
import random as random_
Expand Down Expand Up @@ -133,7 +132,7 @@ def _get_placement(cls, req, resp):
@classmethod
def _get_recos_by_slot(cls, slot, input_variables, session, show_details, max_recos=None):
engine = slot['engine']
items_model = cls._get_items_model(engine)
items_model = get_items_model_from_api(cls.__api__, engine)
engine_vars, filters = \
cls._get_variables_and_filters(slot, items_model, input_variables)
core_config = engine['core']['configuration']['core_module']
Expand All @@ -143,11 +142,6 @@ def _get_recos_by_slot(cls, slot, input_variables, session, show_details, max_re
return core_instance.get_recommendations(
session, filters, max_recos, show_details, **engine_vars)

@classmethod
def _get_items_model(cls, engine):
items_types_model_key = build_item_key(engine['item_type']['name'])
return cls.__api__.models[items_types_model_key].__models__[engine['store_id']]

@classmethod
def _get_variables_and_filters(cls, slot, items_model, input_variables):
engine_vars = dict()
Expand Down
9 changes: 9 additions & 0 deletions myreco/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@


from falconswagger.exceptions import ModelBaseError
from myreco.items_types.models import build_item_key
from importlib import import_module


Expand All @@ -44,3 +45,11 @@ def load(cls, config):
config['path'], config['class_name'])

return class_


def get_items_model_from_api(api, engine):
if api:
items_types_model_key = build_item_key(engine['item_type']['name'])
items_collection = api.models.get(items_types_model_key)
if items_collection:
return items_collection.__models__.get(engine['store_id'])
2 changes: 1 addition & 1 deletion myreco/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
# SOFTWARE.


VERSION = '0.14.0'
VERSION = '0.14.1'
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import pytest
import logging.config

logging.config.dictConfig({'version': 1, 'root': {'level': 'CRITICAL'}})
logging.config.dictConfig({'version': 1, 'root': {'level': 'INFO'}})

ROOT_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(ROOT_PATH, '..'))
Expand Down
Loading

0 comments on commit 6a96bb9

Please sign in to comment.