Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More pep8 fixes

  • Loading branch information...
commit 7e4bace65b46e54a75b97b07ffed985dc51af421 1 parent e165617
@bkabrda authored
View
34 devassistant/lang.py
@@ -88,14 +88,16 @@ def dependencies_section(section, kwargs, runner=None):
# we don't allow general commands, only "call"/"use" command here
if dep_type in ['call', 'use']:
deps.extend(Command(dep_type, dep_list, kwargs).run())
- elif dep_type in package_managers.managers.keys(): # handle known types of deps the same, just by appending to "deps" list
+ # handle known types of deps the same, just by appending to "deps" list
+ elif dep_type in package_managers.managers.keys():
fmtd = list(map(lambda dep: format_str(dep, kwargs), dep_list))
deps.append({dep_type: fmtd})
elif dep_type.startswith('if'):
possible_else = None
- if len(section) > i + 1: # do we have "else" clause?
+ if len(section) > i + 1: # do we have "else" clause?
possible_else = list(section[i + 1].items())[0]
- _, skip_else, to_run = get_section_from_condition((dep_type, dep_list), possible_else, kwargs)
+ _, skip_else, to_run = get_section_from_condition((dep_type, dep_list),
+ possible_else, kwargs)
if to_run:
deps.extend(dependencies_section(to_run, kwargs, runner=runner))
elif dep_type == 'else':
@@ -110,10 +112,13 @@ def dependencies_section(section, kwargs, runner=None):
return deps
+
def run_section(section, kwargs=None, runner=None):
- if kwargs == None: kwargs = {}
+ if kwargs is None:
+ kwargs = {}
return eval_exec_section(section, kwargs, runner)
+
def eval_exec_section(section, kwargs, runner=None):
skip_else = False
retval = [False, '']
@@ -127,9 +132,10 @@ def eval_exec_section(section, kwargs, runner=None):
for comm_type, comm in command_dict.items():
if comm_type.startswith('if'):
possible_else = None
- if len(section) > i + 1: # do we have "else" clause?
+ if len(section) > i + 1: # do we have "else" clause?
possible_else = list(section[i + 1].items())[0]
- _, skip_else, to_run = get_section_from_condition((comm_type, comm), possible_else, kwargs)
+ _, skip_else, to_run = get_section_from_condition((comm_type, comm),
+ possible_else, kwargs)
# run with original kwargs, so that they might be changed for code after this
if to_run:
retval = run_section(to_run, kwargs, runner=runner)
@@ -140,7 +146,8 @@ def eval_exec_section(section, kwargs, runner=None):
skip_else = False
elif comm_type.startswith('for'):
# syntax: "for $i in $x: <section> or "for $i in cl_command: <section>"
- control_vars, eval_expression = get_for_control_var_and_eval_expr(comm_type, kwargs)
+ control_vars, eval_expression = get_for_control_var_and_eval_expr(comm_type,
+ kwargs)
for i in eval_expression:
if len(control_vars) == 2:
kwargs[control_vars[0]] = i[0]
@@ -165,6 +172,7 @@ def eval_exec_section(section, kwargs, runner=None):
return retval
+
def eval_literal_section(section, kwargs, runner=None):
retval = [False, '']
@@ -192,10 +200,12 @@ def eval_literal_section(section, kwargs, runner=None):
return retval
+
def assign_last_result(kwargs, log_res, res):
kwargs[settings.LAST_LR_VAR] = log_res
kwargs[settings.LAST_R_VAR] = res
+
def parse_for(control_line):
"""Returns name of loop control variable(s) and expression to iterate on.
@@ -219,6 +229,7 @@ def parse_for(control_line):
return (control_vars, expr)
+
def get_for_control_var_and_eval_expr(comm_type, kwargs):
"""Returns tuple that consists of control variable name and iterable that is result
of evaluated expression of given for loop.
@@ -242,6 +253,7 @@ def get_for_control_var_and_eval_expr(comm_type, kwargs):
iterval = eval_expression.split()
return control_vars, iterval
+
def get_section_from_condition(if_section, else_section, kwargs):
"""Returns section that should be used from given if/else sections by evaluating given
condition.
@@ -266,6 +278,7 @@ def get_section_from_condition(if_section, else_section, kwargs):
else:
return (1, skip, else_section[1]) if skip else (1, skip, None)
+
def assign_variable(variable, log_res, res, kwargs):
"""Assigns given result (resp. logical result and result) to a variable
(resp. to two variables). log_res and res are already computed result
@@ -303,17 +316,20 @@ def assign_variable(variable, log_res, res, kwargs):
kwargs[var2] = res
return log_res, res
+
def is_var(string):
return string.startswith('$')
+
def get_var_name(dolar_variable):
name = dolar_variable.strip()
name = name.strip('"\'')
if not name.startswith('$'):
raise exceptions.YamlSyntaxError('Not a proper variable name: ' + dolar_variable)
- name = name[1:] # strip the dollar
+ name = name[1:] # strip the dollar
return name.strip('{}')
+
### Expression evaluation
class Interpreter(object):
"""
@@ -593,6 +609,7 @@ def nud(self):
# we want to do homedir expansion in quotes (which bash doesn't)
_homedir_matcher = re.compile('\\\\*~')
+
def _homedir_expand(cls, matchobj):
# therefore we must hack around this here
if len(matchobj.group(0)) % 2 == 0:
@@ -602,6 +619,7 @@ def _homedir_expand(cls, matchobj):
# odd length => even number of backslashes => expand an
return matchobj.group(0)[:-1] + os.path.expanduser('~')
+
def format_str(s, kwargs):
files_dir = kwargs.get('__files_dir__', [''])[-1]
files = kwargs.get('__files__', [{}])[-1]
View
4 devassistant/loaded_yaml.py
@@ -3,11 +3,13 @@
from devassistant import exceptions
from devassistant import settings
+
class LoadedYaml(object):
@property
def load_path(self):
for d in settings.DATA_DIRECTORIES:
- if d == os.path.commonprefix([self.path, d]): break
+ if d == os.path.commonprefix([self.path, d]):
+ break
return d
View
5 devassistant/logger.py
@@ -7,6 +7,7 @@
logger_gui = logging.getLogger('devassistant-gui')
logger_gui.setLevel(logging.DEBUG)
+
class DevassistantClHandler(logging.StreamHandler):
def emit(self, record):
event_type = getattr(record, 'event_type', '')
@@ -15,8 +16,10 @@ def emit(self, record):
else:
super(DevassistantClHandler, self).emit(record)
+
class DevassistantClFormatter(logging.Formatter):
def format(self, record):
event_type = getattr(record, 'event_type', '')
- fmt_str = settings.LOG_FORMATS_MAP.get(event_type, None) or settings.LOG_FORMATS_MAP['log_cmd']
+ fmt_str = settings.LOG_FORMATS_MAP.get(event_type, None) or \
+ settings.LOG_FORMATS_MAP['log_cmd']
return fmt_str.format(**vars(record))
View
44 devassistant/package_managers.py
@@ -40,11 +40,13 @@
# 'pip': [PIPPackageManager]}
managers = {}
+
def register_manager(manager):
managers.setdefault(manager.shortcut, [])
managers[manager.shortcut].append(manager)
return manager
+
class PackageManager(object):
"""Abstract class for API definition of package managers."""
@@ -129,7 +131,8 @@ def rpm_q(cls, rpm_name):
@classmethod
def is_rpm_installed(cls, rpm_name):
- logger.info('Checking for presence of {0}...'.format(rpm_name), extra={'event_type': 'dep_check'})
+ logger.info('Checking for presence of {0}...'.format(rpm_name),
+ extra={'event_type': 'dep_check'})
found_rpm = cls.rpm_q(rpm_name)
if found_rpm:
@@ -230,7 +233,8 @@ def install(cls, *args):
@classmethod
def is_pacmanpkg_installed(cls, pkg_name):
- logger.info('Checking for presence of {0}...'.format(pkg_name), extra={'event_type': 'dep_check'})
+ logger.info('Checking for presence of {0}...'.format(pkg_name),
+ extra={'event_type': 'dep_check'})
try:
found_pkg = ClHelper.run_command('{pacman} -Q "{pkg}"'.\
@@ -329,6 +333,7 @@ def get_distro_dependencies(self, smgr_sc):
def __str__(self):
return "pip package manager"
+
@register_manager
class NPMPackageManager(PackageManager):
""" Package manager for managing python dependencies from NPM """
@@ -384,6 +389,7 @@ def get_distro_dependencies(self, smgr_sc):
def __str__(self):
return "npm package manager"
+
@register_manager
class GemPackageManager(PackageManager):
""" Package manager for managing ruby dependencies from rubygems.org """
@@ -431,11 +437,12 @@ def resolve(cls, *dep):
@classmethod
def get_distro_dependencies(self, smgr_sc):
- return ['rubygems','ruby-devel']
+ return ['rubygems', 'ruby-devel']
def __str__(self):
return "gem package manager"
+
class GentooPackageManager:
"""Mix-in class for Gentoo package managers. The only thing it capable to do
is to detect current package manager used in a particular Gentoo based system.
@@ -489,7 +496,7 @@ def throw_package_list(cls, to_install):
_list = ', '.join(to_install)
raise exceptions.DependencyException(
'You must install the following packages before run this command: {0}'.format(_list)
- )
+ )
@register_manager
@@ -526,7 +533,8 @@ def is_pkg_installed(cls, pkg):
r = vartree.dbapi.match(pkg)
logger.debug('Checking is installed: {0} -> {1}'.format(pkg, repr(r)))
except portage.exception.InvalidAtom:
- raise exceptions.DependencyException('Invalid dependency specification: {0}'.format(pkg))
+ raise exceptions.DependencyException('Invalid dependency specification: {0}'.\
+ format(pkg))
# TODO Compare package version!
return bool(r)
@@ -561,13 +569,14 @@ def resolve(cls, *deps):
@register_manager
class PaludisPackageManager(PackageManager, GentooPackageManager):
- """ Another package manager class for Gentoo (yep, for [paludis](http://paludis.exherbo.org/) ;-)
+ """Another package manager class for Gentoo (yep, for
+ [paludis](http://paludis.exherbo.org/) ;-)
- NOTE Nowadays Paludis has Python2 only API, but Python3 is coming soon (I hope)
- (upstream bug is here http://paludis.exherbo.org/trac/ticket/1297).
+ NOTE Nowadays Paludis has Python2 only API, but Python3 is coming soon (I hope)
+ (upstream bug is here http://paludis.exherbo.org/trac/ticket/1297).
- NOTE Ebuild for paludis w/ Python3 support available here:
- https://github.com/zaufi/zaufi-overlay/tree/master/sys-apps/paludis
+ NOTE Ebuild for paludis w/ Python3 support available here:
+ https://github.com/zaufi/zaufi-overlay/tree/master/sys-apps/paludis
"""
shortcut = 'ebuild'
@@ -588,7 +597,8 @@ def is_pkg_installed(cls, dep):
env = paludis.EnvironmentFactory.instance.create('')
installed = env.fetch_repository('installed')
try:
- pkg = paludis.parse_user_package_dep_spec(dep, env, paludis.UserPackageDepSpecOptions())
+ pkg = paludis.parse_user_package_dep_spec(dep, env,
+ paludis.UserPackageDepSpecOptions())
# TODO Compare package version!
r = []
for i in installed.package_ids(str(pkg.package), []):
@@ -611,11 +621,12 @@ def resolve(cls, *deps):
logger.info('[paludis] Resolving dependencies ...')
env = paludis.EnvironmentFactory.instance.create('')
- fltr = paludis.Filter.And(paludis.Filter.SupportsInstallAction(), paludis.Filter.NotMasked())
+ fltr = paludis.Filter.And(paludis.Filter.SupportsInstallAction(),
+ paludis.Filter.NotMasked())
to_install = set()
for dep in deps:
ds = paludis.parse_user_package_dep_spec(dep, env, paludis.UserPackageDepSpecOptions())
- gen = paludis.Generator.Matches(ds,paludis.MatchPackageOptions())
+ gen = paludis.Generator.Matches(ds, paludis.MatchPackageOptions())
fg = paludis.FilteredGenerator(gen, fltr)
s = paludis.Selection.BestVersionOnly(fg)
_to_install = set()
@@ -682,7 +693,7 @@ def _process_dependency(self, dep_t, dep_l):
raise exceptions.NoPackageManagerException(err)
# try to get list of distros where the dependency type is system type
distros = settings.SYSTEM_DEPTYPES_SHORTCUTS.get(dep_t, None)
- if not distros: # non-distro dependency type
+ if not distros: # non-distro dependency type
sysdep_t = self.get_system_deptype_shortcut()
# for now, just take the first manager that can install dep_t and install this manager
self._process_dependency(sysdep_t,
@@ -694,7 +705,7 @@ def _process_dependency(self, dep_t, dep_l):
if distro in local_distro:
found = True
break
- if not found: # distro dependency type, but for another distro
+ if not found: # distro dependency type, but for another distro
return
self.dependencies.setdefault(dep_t, [])
self.dependencies[dep_t].extend(dep_l)
@@ -731,7 +742,7 @@ def _install_dependencies(self):
# TODO: we should do this more systematically (send signal to cl/gui?)
logger.info('Installing dependencies, sit back and relax ...',
extra={'event_type': 'dep_installation_start'})
- if current_run.UI == 'cli': # TODO: maybe let every manager to decide when to start
+ if current_run.UI == 'cli': # TODO: maybe let every manager to decide when to start
event = threading.Event()
t = EndlessProgressThread(event)
t.start()
@@ -777,6 +788,7 @@ def get_system_deptype_shortcut(self):
# just try rpm if unkown (not very nice?)
return 'rpm'
+
class EndlessProgressThread(threading.Thread):
def __init__(self, finish_event):
super(EndlessProgressThread, self).__init__()
View
10 devassistant/path_runner.py
@@ -5,6 +5,7 @@
from devassistant import exceptions
from devassistant import yaml_assistant
+
class PathRunner(object):
def __init__(self, path, override_sys_excepthook=True):
self.path = path
@@ -38,7 +39,7 @@ def run(self, **parsed_args):
"""
error = None
# run 'pre_run', 'logging', 'dependencies' and 'run'
- try: # serve as a central place for error logging
+ try: # serve as a central place for error logging
self._logging(parsed_args)
if not 'deps_only' in parsed_args:
self._run_path_run('pre', parsed_args)
@@ -49,12 +50,12 @@ def run(self, **parsed_args):
if not getattr(e, 'already_logged', False):
# this is here primarily because of log_ command, that logs the message itself
logger.error(six.text_type(e))
- if isinstance(e, exceptions.YamlError): # if there's a yaml error, just shut down
+ if isinstance(e, exceptions.YamlError): # if there's a yaml error, just shut down
raise e
error = e
# in any case, run post_run
- try: # serve as a central place for error logging
+ try: # serve as a central place for error logging
self._run_path_run('post', parsed_args)
except exceptions.ExecutionException as e:
if not getattr(e, 'already_logged', False):
@@ -62,7 +63,8 @@ def run(self, **parsed_args):
logger.error(six.text_type(e))
error = e
- if error: raise error
+ if error:
+ raise error
def stop(self):
for a in self.path:
View
20 devassistant/remote_auth.py
@@ -10,6 +10,7 @@
from devassistant.command_helpers import ClHelper, DialogHelper
from devassistant.logger import logger
+
class GitHubAuth(object):
"""Only use the github_authenticated decorator from the class.
The other methods should be consider private; they expect certain order of calling,
@@ -29,7 +30,7 @@ def _github_token(cls, login):
cls._token = ClHelper.run_command("git config github.token.{login}".format(
login=login))
except exceptions.ClException:
- pass # token is not available yet
+ pass # token is not available yet
return cls._token
@@ -48,15 +49,16 @@ def _get_github_user(cls, login):
cls._token = None
# login with username/password
password = DialogHelper.ask_for_password(
- prompt='Github Password for {username}:'.format(username=login))
+ prompt='Github Password for {username}:'.format(username=login))
gh = cls._gh_module.Github(login_or_token=login, password=password)
cls._user = gh.get_user()
try:
cls._user.login
- cls._github_create_auth() # create auth for future use
+ cls._github_create_auth() # create auth for future use
except cls._gh_module.GithubException as e:
msg = 'Wrong username or password\nGitHub exception: {0}'.format(e)
- # reset cls._user to None, so that we don't use it if calling this multiple times
+ # reset cls._user to None, so that we don't use it
+ # if calling this multiple times
cls._user = None
raise exceptions.CommandException(msg)
return cls._user
@@ -69,7 +71,8 @@ def _github_create_auth(cls):
"""
if not cls._token:
try:
- auth = cls._user.create_authorization(scopes=['repo', 'user'], note="DeveloperAssistant")
+ auth = cls._user.create_authorization(scopes=['repo', 'user'],
+ note="DeveloperAssistant")
ClHelper.run_command("git config --global github.token.{login} {token}".format(
login=cls._user.login,
token=auth.token))
@@ -83,7 +86,7 @@ def _github_create_ssh_key(cls):
try:
login = cls._user.login
pkey_path = '{home}/.ssh/{keyname}'.format(home=os.path.expanduser('~'),
- keyname=settings.GITHUB_SSH_KEYNAME.format(login=login))
+ keyname=settings.GITHUB_SSH_KEYNAME.format(login=login))
# TODO: handle situation where {pkey_path} exists, but it's not registered on GH
# generate ssh key
ClHelper.run_command('ssh-keygen -t rsa -f {pkey_path}\
@@ -100,7 +103,7 @@ def _github_create_ssh_key(cls):
def _create_ssh_config_entry(cls):
# TODO: some duplication with _ssh_key_needs_config_entry, maybe refactor a bit
ssh_config = os.path.expanduser('~/.ssh/config')
- fh = os.fdopen(os.open(ssh_config, os.O_WRONLY|os.O_CREAT|os.O_APPEND, 0o600), 'a')
+ fh = os.fdopen(os.open(ssh_config, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600), 'a')
fh.write(settings.GITHUB_SSH_CONFIG.format(
login=cls._user.login,
keyname=settings.GITHUB_SSH_KEYNAME.format(login=cls._user.login)))
@@ -138,7 +141,6 @@ def _ssh_key_needs_config_entry(cls):
return needs_to_add_config_entry
return False
-
@classmethod
def github_authenticated(cls, func):
"""Does user authentication, creates SSH keys if needed and injects "_user" attribute
@@ -147,7 +149,7 @@ def github_authenticated(cls, func):
"""
def inner(func_cls, *args, **kwargs):
if not cls._gh_module:
- logger.warning('PyGithub not installed, skipping github authentication procedures.')
+ logger.warning('PyGithub not installed, skipping Github auth procedures.')
elif not func_cls._user:
# authenticate user, possibly also creating authentication for future use
func_cls._user = cls._get_github_user(kwargs['login'])
View
2  devassistant/settings.py
@@ -23,7 +23,7 @@
os.path.expanduser('~/.devassistant')]
if 'DEVASSISTANT_PATH' in os.environ:
DATA_DIRECTORIES = os.environ['DEVASSISTANT_PATH'].split(':') + DATA_DIRECTORIES
-ASSISTANT_ROLES=['crt', 'mod', 'prep', 'task']
+ASSISTANT_ROLES = ['crt', 'mod', 'prep', 'task']
DEFAULT_ASSISTANT_ROLE = 'crt'
# system dependency types and package managers for various distros
View
1  devassistant/sigint_handler.py
@@ -4,6 +4,7 @@
from devassistant.logger import logger
from devassistant import package_managers
+
def override():
def signal_handler(signal, frame):
if package_managers.DependencyInstaller.install_lock:
View
1  devassistant/snippet.py
@@ -2,6 +2,7 @@
from devassistant import loaded_yaml
+
class Snippet(loaded_yaml.LoadedYaml):
def __init__(self, name, parsed_yaml, path):
self.name = name
View
7 devassistant/utils.py
@@ -7,19 +7,22 @@
except ImportError:
from yaml import Dumper
-try: # ugly hack for using imp instead of importlib on Python <= 2.6
+try: # ugly hack for using imp instead of importlib on Python <= 2.6
import importlib
except ImportError:
import imp as importlib
+
def import_module(name):
fp, pathname, description = importlib.find_module(name.replace('.', '/'))
return importlib.load_module(name, fp, pathname, description)
importlib.import_module = import_module
del import_module
+
def import_module(module):
return importlib.import_module(module)
+
def get_distro_name():
distro = platform.linux_distribution()[0].lower()
if not distro and os.path.exists('/etc/os-release'):
@@ -29,6 +32,7 @@ def get_distro_name():
distro = l.split('=')[-1].strip()
return distro
+
def get_assistant_attrs_from_dict(d, source):
# In pre-0.9.0, we required assistant to be a mapping of {name: assistant_attributes}
# now we allow that, but we also allow omitting the assistant name and putting
@@ -43,6 +47,7 @@ def get_assistant_attrs_from_dict(d, source):
else:
return None
+
def cl_string_for_da_eval(section, context=None):
if context is None:
context = {}
View
30 devassistant/yaml_assistant.py
@@ -15,6 +15,7 @@
from devassistant import yaml_loader
from devassistant import yaml_snippet_loader
+
def needs_fully_loaded(method):
"""Wraps all publicly callable methods of YamlAssistant. If the assistant was loaded
from cache, this decorator will fully load it first time a publicly callable method
@@ -30,6 +31,7 @@ def inner(self, *args, **kwargs):
return inner
+
class YamlAssistant(assistant_base.AssistantBase, loaded_yaml.LoadedYaml):
def __init__(self, name, parsed_yaml, path, superassistant, fully_loaded=True,
role=settings.DEFAULT_ASSISTANT_ROLE):
@@ -103,12 +105,14 @@ def _construct_args(self, struct):
# it with current arg_params, if any
try:
problem = None
- snippet = yaml_snippet_loader.YamlSnippetLoader.get_snippet_by_name(use_snippet)
+ snippet = yaml_snippet_loader.YamlSnippetLoader.get_snippet_by_name(
+ use_snippet)
arg_params = dict(snippet.args.pop(arg_name), **arg_params)
except exceptions.SnippetNotFoundException as e:
- problem = 'Couldn\'t expand argument {arg} in assistant {a}: ' + six.text_type(e)
- except KeyError as e: # snippet doesn't have the requested argument
- problem = 'Couldn\'t find argument {arg} in snippet {snip} wanted by assistant {a}.'
+ problem = 'Couldn\'t expand argument {arg} in assistant {a}: ' + \
+ six.text_type(e)
+ except KeyError as e: # snippet doesn't have the requested argument
+ problem = 'Couldn\'t find arg {arg} in snippet {snip} wanted by assistant {a}.'
if problem:
logger.warning(problem.format(snip=use_snippet,
@@ -153,7 +157,8 @@ def logging(self, kwargs):
os.makedirs(os.path.dirname(expanded_lfile))
# add handler and formatter
handler = logging.FileHandler(expanded_lfile, 'a+')
- formatter = logging.Formatter('%(asctime)-15s [%(event_type)] %(levelname)s - %(message)s')
+ formatter = logging.Formatter(
+ '%(asctime)-15s [%(event_type)] %(levelname)s - %(message)s')
handler.setFormatter(formatter)
handler.setLevel(getattr(logging, level.upper()))
# register handler with the global logger
@@ -172,7 +177,8 @@ def dependencies(self, kwargs=None):
"""
# we can't use {} as a default for kwargs, as that initializes the dict only once in Python
# and uses the same dict in all subsequent calls of this method
- if not kwargs: kwargs = {}
+ if not kwargs:
+ kwargs = {}
self.proper_kwargs('dependencies', kwargs)
sections = [getattr(self, '_dependencies', [])]
@@ -180,7 +186,8 @@ def dependencies(self, kwargs=None):
# if subassistant_path is "foo bar baz", then search for dependency sections
# _dependencies_foo, _dependencies_foo_bar, _dependencies_foo_bar_baz
for i in range(1, len(kwargs.get('subassistant_path', [])) + 1):
- possible_dep_section = '_dependencies_{0}'.format('_'.join(kwargs['subassistant_path'][:i]))
+ possible_dep_section = '_dependencies_{0}'.\
+ format('_'.join(kwargs['subassistant_path'][:i]))
if possible_dep_section in dir(self):
sections.append(getattr(self, possible_dep_section))
# install these dependencies in any case
@@ -199,15 +206,16 @@ def dependencies(self, kwargs=None):
def run(self, stage='', kwargs=None):
# we can't use {} as a default for kwargs, as that initializes the dict only once in Python
# and uses the same dict in all subsequent calls of this method
- if not kwargs: kwargs = {}
+ if not kwargs:
+ kwargs = {}
self.proper_kwargs('run', kwargs)
to_run = '_run'
- if stage: # if we have stage, always use that
+ if stage: # if we have stage, always use that
to_run = '_' + stage + '_run'
elif self.role == 'mod':
- # try to get a section to run from the most specialized one to the least specialized one
- # e.g. first run_python_django, then run_python and then just run
+ # try to get a section to run from the most specialized one to the least
+ # specialized one, e.g. first run_python_django, then run_python and then just run
sa_path = kwargs.get('subassistant_path', [])
for i in range(len(sa_path), -1, -1):
possible_run = '_'.join(['_run'] + sa_path[:i])
View
14 devassistant/yaml_assistant_loader.py
@@ -10,6 +10,7 @@
from devassistant import yaml_assistant
from devassistant import yaml_checker
+
class YamlAssistantLoader(object):
assistants_dirs = list(map(lambda x: os.path.join(x, 'assistants'), settings.DATA_DIRECTORIES))
# mapping of assistant roles to lists of top-level assistant instances
@@ -110,11 +111,11 @@ def get_assistants_from_file_hierarchy(cls, file_hierarchy, superassistant,
references to instances of their subassistants (and their subassistants, ...)
"""
result = []
- warn_msg ='Failed to load assistant {source}, skipping subassistants.'
+ warn_msg = 'Failed to load assistant {source}, skipping subassistants.'
for name, attrs in file_hierarchy.items():
loaded_yaml = yaml_loader.YamlLoader.load_yaml_by_path(attrs['source'])
- if not loaded_yaml: # there was an error parsing yaml
+ if not loaded_yaml: # there was an error parsing yaml
logger.warning(warn_msg.format(source=attrs['source']))
continue
try:
@@ -149,9 +150,11 @@ def get_assistants_file_hierarchy(cls, dirs):
Returns:
hierarchy structure that looks like this:
{'assistant1':
- {'source': '/path/to/assistant1.yaml', 'subhierarchy': {<hierarchy of subassistants>}},
+ {'source': '/path/to/assistant1.yaml',
+ 'subhierarchy': {<hierarchy of subassistants>}},
'assistant2':
- {'source': '/path/to/assistant2.yaml', 'subhierarchy': {<another hieararchy of subassistants}}
+ {'source': '/path/to/assistant2.yaml',
+ 'subhierarchy': {<another hierarchy of subassistants}}
}
"""
result = {}
@@ -161,7 +164,8 @@ def get_assistants_file_hierarchy(cls, dirs):
if assistant_name not in result:
subas_dirs = [os.path.join(dr, assistant_name) for dr in dirs]
result[assistant_name] = {'source': os.path.join(d, f),
- 'subhierarchy': cls.get_assistants_file_hierarchy(subas_dirs)}
+ 'subhierarchy':
+ cls.get_assistants_file_hierarchy(subas_dirs)}
return result
View
5 devassistant/yaml_checker.py
@@ -2,6 +2,7 @@
from devassistant import exceptions
+
class YamlChecker(object):
_yaml_typenames = {dict: 'mapping',
list: 'list',
@@ -33,7 +34,7 @@ def check(self):
if not isinstance(self.parsed_yaml, dict):
msg = 'In {0}:\n'.format(self.sourcefile)
msg += 'Assistants and snippets must be Yaml mappings, not "{0}"!'.\
- format(self.parsed_yaml)
+ format(self.parsed_yaml)
raise exceptions.YamlTypeError(msg)
self._check_fullname(self.sourcefile)
self._check_description(self.sourcefile)
@@ -44,7 +45,7 @@ def check(self):
def _check_fullname(self, source):
path = [source]
self._assert_str(self.parsed_yaml.get('fullname', ''), path)
-
+
def _check_description(self, source):
path = [source]
self._assert_str(self.parsed_yaml.get('description', ''), path)
View
14 devassistant/yaml_loader.py
@@ -9,6 +9,7 @@
from devassistant.logger import logger
+
class YamlLoader(object):
@classmethod
def load_all_yamls(cls, directories):
@@ -26,7 +27,8 @@ def load_all_yamls(cls, directories):
if d.startswith('/home') and not os.path.exists(d):
os.makedirs(d)
for dirname, subdirs, files in os.walk(d):
- yaml_files.extend(map(lambda x: os.path.join(dirname, x), filter(lambda x: x.endswith('.yaml'), files)))
+ yaml_files.extend(map(lambda x: os.path.join(dirname, x),
+ filter(lambda x: x.endswith('.yaml'), files)))
for f in yaml_files:
loaded_yamls[f] = cls.load_yaml_by_path(f)
@@ -49,7 +51,7 @@ def load_yaml_by_relpath(cls, directories, rel_path):
possible_path = os.path.join(d, rel_path)
if os.path.exists(possible_path):
loaded = cls.load_yaml_by_path(possible_path)
- if loaded != None:
+ if loaded is not None:
return (possible_path, cls.load_yaml_by_path(possible_path))
return None
@@ -61,8 +63,8 @@ def load_yaml_by_path(cls, path):
return yaml.load(open(path, 'r'), Loader=Loader)
except yaml.scanner.ScannerError as e:
logger.warning('Yaml error in {path} (line {ln}, column {col}): {err}'.\
- format(path=path,
- ln=e.problem_mark.line,
- col=e.problem_mark.column,
- err=e.problem))
+ format(path=path,
+ ln=e.problem_mark.line,
+ col=e.problem_mark.column,
+ err=e.problem))
return None
View
6 devassistant/yaml_snippet_loader.py
@@ -6,6 +6,7 @@
from devassistant import snippet
from devassistant import yaml_checker
+
class YamlSnippetLoader(object):
snippets_dirs = list(map(lambda x: os.path.join(x, 'snippets'), settings.DATA_DIRECTORIES))
_snippets = {}
@@ -13,14 +14,15 @@ class YamlSnippetLoader(object):
@classmethod
def _find_snippet(cls, name):
for snip in cls._snippets.values():
- if snip.name == name: return snip
+ if snip.name == name:
+ return snip
return None
@classmethod
def get_snippet_by_name(cls, name):
found = cls._find_snippet(name)
- if found != None:
+ if found is not None:
return found
loaded = yaml_loader.YamlLoader.load_yaml_by_relpath(cls.snippets_dirs, name + '.yaml')
if loaded:
View
2  test/test_yaml_assistant_loader.py
@@ -83,7 +83,7 @@ def test_assistant_from_yaml_doesnt_fail_on_missing_arg(self):
'assistants_with_snippet_problems')]
y = self.load_yaml_from_fixture('no_arg_in_snippet')
self.yl.assistant_from_yaml('no_arg_in_snippet.yaml', y, CreatorAssistant())
- assert ('WARNING', 'Couldn\'t find argument bar in snippet snippet1 wanted by assistant no_arg_in_snippet.') in self.tlh.msgs
+ assert ('WARNING', 'Couldn\'t find arg bar in snippet snippet1 wanted by assistant no_arg_in_snippet.') in self.tlh.msgs
def test_no_cache(self):
current_run.USE_CACHE = False
Please sign in to comment.
Something went wrong with that request. Please try again.