/
loader.py
239 lines (204 loc) · 8.26 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# -*- coding: utf-8 -*-
"""
kaylee.loader
~~~~~~~~~~~~~
Implements projects, controllers and Kaylee instance loader.
:copyright: (c) 2012 by Zaur Nasibov.
:license: MIT, see LICENSE for more details.
"""
import os
import importlib
import inspect
import types
from copy import deepcopy
from collections import defaultdict
from importlib.machinery import SourceFileLoader
import kaylee.contrib
from .core import Kaylee
from .errors import KayleeError, SettingsError
from .util import (LazyObject, is_strong_subclass, MIN_SECRET_KEY_LENGTH,)
from . import storage, controller, project, node, session
import logging
log = logging.getLogger(__name__)
class LazyKaylee(LazyObject):
def _setup(self, obj = None):
if isinstance(obj, Kaylee):
self._wrapped = obj
elif obj is None:
self._wrapped = None # release the object
else:
raise TypeError('obj must be an instance of {} or '
'a Kaylee settings object, not {}'
.format(Kaylee.__name__, type(obj).__name__))
def load(settings):
"""Loads Kaylee.
:param settings: Kaylee settings object.
:type settings: dict, class, module or absolute Python module path.
:returns: Kaylee object.
"""
# Whatever the initial settings type is, load it (optional) and
# convert it to a dictionary.
# Note, that only settings with uppercase names' are loaded.
if isinstance(settings, str):
if os.path.exists(settings):
settings = SourceFileLoader('kl_settings', settings).load_module()
if isinstance(settings, (type, types.ModuleType)):
d = {}
for attr in dir(settings):
if attr == attr.upper():
d[attr] = getattr(settings, attr)
settings = d
if isinstance(settings, dict):
settings = {k : v for k, v in settings.items() if k == k.upper()}
# at this point, if settings is not a dict, then object type is wrong.
if not isinstance(settings, dict):
raise TypeError('settings must be an instance of '
'{}, {}, {} or {} not {}'.format(
dict.__name__,
type.__name__,
types.ModuleType.__name__,
str.__name__,
type(settings).__name__))
try:
SettingsValidator.validate(settings)
loader = Loader(settings)
registry = loader.registry
sdm = loader.session_data_manager
apps = loader.applications
except (KeyError, AttributeError) as e:
raise KayleeError('Settings error or object was not found: "{}"'
.format(e.args[0]))
return Kaylee(registry=registry,
session_data_manager=sdm,
applications=apps,
**settings)
class SettingsValidator:
@staticmethod
def validate(settings):
SettingsValidator.validate_AUTO_GET_ACTION(settings)
SettingsValidator.validate_SECRET_KEY(settings)
@staticmethod
def validate_AUTO_GET_ACTION(settings):
val = settings['AUTO_GET_ACTION']
if not isinstance(val, bool):
raise SettingsError('AUTO_GET_ACTION is not a boolean')
@staticmethod
def validate_SECRET_KEY(settings):
if 'SECRET_KEY' not in settings:
return
val = settings['SECRET_KEY']
if not isinstance(val, str):
raise SettingsError('SECRET_KEY is not a string object')
if len(val) < MIN_SECRET_KEY_LENGTH:
raise SettingsError('SECRET_KEY is too short (at least {} '
'characters are required)'
.format(MIN_SECRET_KEY_LENGTH))
class Loader:
_loadable_base_classes = [
project.Project,
controller.Controller,
node.NodesRegistry,
storage.PermanentStorage,
storage.TemporalStorage,
session.SessionDataManager,
]
def __init__(self, settings):
self._classes = defaultdict(dict)
self._settings = settings
# load classes from contrib (non-refreshable)
self._update_classes(kaylee.contrib)
# load session data managers
self._update_classes(kaylee.session)
# load classes from project modules (refreshable for new modules only)
if 'PROJECTS_DIR' in settings:
projects_dir = settings['PROJECTS_DIR']
for mod in find_modules(projects_dir):
self._update_classes(mod)
@property
def registry(self):
settings = self._settings
clsname = settings['REGISTRY']['name']
regcls = self._classes[node.NodesRegistry][clsname]
return regcls(**settings['REGISTRY']['config'])
@property
def session_data_manager(self):
settings = self._settings
if 'SESSION_DATA_MANAGER' in settings:
clsname = settings['SESSION_DATA_MANAGER']['name']
sdmcls = self._classes[session.SessionDataManager][clsname]
sdm_config = settings['SESSION_DATA_MANAGER'].get('config', {})
else:
# Load default (should be Phony) session data manager in case it
# is not defined in settings.
default_manager = session.KL_LOADER_DEFAULT_SESSION_DATA_MANAGER
sdmcls = default_manager
sdm_config = {}
return sdmcls(**sdm_config)
@property
def applications(self):
settings = self._settings
apps = []
if 'APPLICATIONS' in settings:
for conf in settings['APPLICATIONS']:
ct = self._load_controller(conf)
apps.append(ct)
return apps
def _update_classes(self, module):
"""Updates the _classes field by the classes found in
the module."""
classes_from_module = get_classes_from_module(module)
for base_class in self._loadable_base_classes:
# update _classes[base_class] with
# { class_name : class } pairs, where `class` is a subclass of
# `base_class`
name_class_pairs = {c.__name__ : c for c in classes_from_module
if is_strong_subclass(c, base_class)}
self._classes[base_class].update(name_class_pairs)
def _load_permanent_storage(self, conf):
psconf = conf['controller']['permanent_storage']
clsname = psconf['name']
pscls = self._classes[storage.PermanentStorage][clsname]
return pscls(**psconf.get('config', {}))
def _load_temporal_storage(self, conf):
if not 'temporal_storage' in conf['controller']:
return None
tsconf = conf['controller']['temporal_storage']
clsname = tsconf['name']
tscls = self._classes[storage.TemporalStorage][clsname]
return tscls(**tsconf.get('config', {}))
def _load_project(self, conf):
clsname = conf['project']['name']
pcls = self._classes[project.Project][clsname]
pj_config = conf['project'].get('config', {})
return pcls(**pj_config)
def _load_controller(self, conf):
# initialize objects
clsname = conf['controller']['name']
ccls = self._classes[controller.Controller][clsname]
app_name = conf['name']
pjobj = self._load_project(conf)
psobj = self._load_permanent_storage(conf)
tsobj = self._load_temporal_storage(conf)
cobj = ccls(app_name, pjobj, psobj, tsobj,
**conf['controller'].get('config', {}))
return cobj
def get_classes_from_module(module):
return [attr for attr in module.__dict__.values()
if inspect.isclass(attr)]
def find_packages(path):
for sub_dir in os.listdir(path):
pdir_path = os.path.join(path, sub_dir)
if not os.path.isdir(pdir_path):
continue
if '__init__.py' not in os.listdir(pdir_path):
continue
yield sub_dir
def find_modules(path):
"""A generator which yields python modules found in the given path."""
for package_name in find_packages(path):
try:
pymod = importlib.import_module(package_name)
yield pymod
except ImportError as e:
log.error('Unable to import project package "{}": {}'
.format(package_name, e))