Permalink
Browse files

Use XSD to lint tools and repositories.

 - Implement validation abstraction that will use either lxml (Python lib) or xmllint (command line app) dependending on what is available - with test cases.
 - Add a ``--xsd`` flag to the lint command that lints against the experimental XSD from https://github.com/JeanFred/Galaxy-XSD.
 - Implement ``repository_dependencies.xsd`` to describe Tool Shed ``repository_dependencies.xml`` files (fairly complete).
 - Implement ``tool_dependencies.xsd`` to describe Tool Shed ``tool_dependencies.xml`` files.
   - Validates attributes and elements down to the ``action`` elements and then largely gives up (sticking ``any`` and ``anyAttribute`` tags on that element).
   - Registers everything in tools-devteam as valid, and detects one invalid XML file in tools-iuc.
 - Implement new ``shed_lint`` command that:
   - Validates tool_dependencies.xml against schema
   - Validates repository_dependencies.xml against schema
   - Bare minimum to lint .shed.yml files.
   - Optionally also lints tools in repsitories with the ``--tools`` argument.
   - Can recursively lint many repositories at ont time ``-r``.
 - Refactoring of existing stuff to support this and make room for XSD validation of tool XML files and generalizing applying actions over many repositories and many tools.
  • Loading branch information...
jmchilton committed Feb 20, 2015
1 parent 0820f48 commit 912df026256150bbb3005c1b8462075d26de1062
@@ -1,55 +1,24 @@
import sys
import traceback

import click

from planemo.cli import pass_context
from planemo.io import info
from planemo.io import error
from planemo import options

from galaxy.tools.loader_directory import load_tool_elements_from_path
from galaxy.tools.lint import lint_xml

SKIP_XML_MESSAGE = "Skipping XML file - does not appear to be a tool %s."
LINTING_TOOL_MESSAGE = "Linting tool %s"
from planemo.tool_lint import build_lint_args
from planemo.tool_lint import lint_tools_on_path


@click.command('lint')
@options.optional_tools_arg()
@click.option(
'--report_level',
type=click.Choice(['all', 'warn', 'error']),
default="all"
)
@click.option(
'--fail_level',
type=click.Choice(['warn', 'error']),
default="warn"
)
@options.report_level_option()
@options.fail_level_option()
@options.lint_xsd_option()
@pass_context
def cli(ctx, path, report_level="all", fail_level="warn"):
def cli(ctx, path, **kwds):
"""Check specified tool(s) for common errors and adherence to best
practices.
"""
exit = 0
lint_args = dict(level=report_level, fail_level=fail_level)
tools = load_tool_elements_from_path(path, load_exception_handler)
valid_tools = 0
for (tool_path, tool_xml) in tools:
if tool_xml.getroot().tag != "tool":
if ctx.verbose:
info(SKIP_XML_MESSAGE % tool_path)
continue
info("Linting tool %s" % tool_path)
if not lint_xml(tool_xml, **lint_args):
exit = 1
else:
valid_tools += 1
if exit == 0 and valid_tools == 0:
exit = 2
lint_args = build_lint_args(**kwds)
exit = lint_tools_on_path(ctx, path, lint_args)
sys.exit(exit)


def load_exception_handler(path, exc_info):
error("Error loading tool with path %s" % path)
traceback.print_exception(*exc_info, limit=1, file=sys.stderr)
@@ -0,0 +1,34 @@
import click
import sys

from planemo.cli import pass_context
from planemo import options
from planemo import shed
from planemo import shed_lint


@click.command('shed_lint')
@options.optional_project_arg(exists=True)
@options.report_level_option()
@options.fail_level_option()
@options.click.option(
'--tools',
is_flag=True,
default=False,
help=("Lint tools discovered in the process of linting repositories.")
)
@options.lint_xsd_option()
@options.recursive_shed_option()
@pass_context
def cli(ctx, path, recursive=False, **kwds):
"""Check a Tool Shed repository for common problems.
"""
def lint(path):
return shed_lint.lint_repository(ctx, path, **kwds)

if recursive:
exit_code = shed.for_each_repository(lint, path)
else:
exit_code = lint(path)

sys.exit(exit_code)
@@ -10,8 +10,6 @@
from planemo.io import shell
import json

import fnmatch
import os

tar_path = click.Path(
exists=True,
@@ -54,15 +52,15 @@
is_flag=True,
default=False
)
@click.option(
'-r', '--recursive',
is_flag=True,
help="Recursively search for repositories to publish to a tool shed",
)
@options.recursive_shed_option()
@pass_context
def cli(ctx, path, **kwds):
"""Handle possible recursion through paths for uploading files to a toolshed
"""

def upload(path):
return __handle_upload(ctx, **kwds)

if kwds['recursive']:
if kwds['name'] is not None:
error("--name is incompatible with --recursive")
@@ -71,17 +69,9 @@ def cli(ctx, path, **kwds):
error("--tar is incompatible with --recursive")
return -1

ret_codes = []
for base_path, dirnames, filenames in os.walk(path):
for filename in fnmatch.filter(filenames, '.shed.yml'):
ret_codes.append(
__handle_upload(ctx, base_path, **kwds)
)
# "Good" returns are Nones, everything else is a -1 and should be
# passed upwards.
return None if all(x is None for x in ret_codes) else -1
return shed.for_each_repository(upload, path)
else:
return __handle_upload(ctx, path, **kwds)
return upload(path)


def __handle_upload(ctx, path, **kwds):
@@ -121,6 +111,7 @@ def __handle_upload(ctx, path, **kwds):
error(e2.read())
return -1
info("Repository %s updated successfully." % path)
return 0


def __find_repository(ctx, tsi, path, **kwds):
@@ -1,5 +1,6 @@
""" Utilities for reasoning about Galaxy test results.
"""
from __future__ import absolute_import
import json
import xml.etree.ElementTree as ET

@@ -0,0 +1,15 @@
import os

from planemo.xml import validation


def lint_xsd(lint_ctx, schema_path, path):
name = os.path.basename(path)
validator = validation.get_validator(require=True)
validation_result = validator.validate(schema_path, path)
if not validation_result.passed:
msg = "Invalid %s found. Errors [%s]"
msg = msg % (name, validation_result.output)
lint_ctx.error(msg)
else:
lint_ctx.info("%s found and appears to be valid XML" % name)
No changes.
@@ -0,0 +1,30 @@
""" Tool linting module that lints Galaxy tool against experimental XSD.
"""
import copy
import os
import tempfile

from planemo.xml import XSDS_PATH
import planemo.lint

TOOL_XSD = os.path.join(XSDS_PATH, "tool", "galaxy.xsd")


def lint_tool_xsd(root, lint_ctx):
""" Write a temp file out and lint it.
"""
with tempfile.NamedTemporaryFile() as tf:
_clean_root(root).write(tf.name)
planemo.lint.lint_xsd(lint_ctx, TOOL_XSD, tf.name)


def _clean_root(root):
""" XSD assumes macros have been expanded, so remove them.
"""
clean_root = copy.deepcopy(root)
to_remove = []
for macros_el in clean_root.findall("macros"):
to_remove.append(macros_el)
for macros_el in to_remove:
clean_root.getroot().remove(macros_el)
return clean_root
@@ -249,3 +249,38 @@ def shed_password_option():
help="Password for Tool Shed auth (required unless shed_key is "
"specified)."
)


def lint_xsd_option():
return click.option(
'--xsd',
is_flag=True,
default=False,
help=("Include experimental tool XSD validation in linting "
"process (requires xmllint on PATH or lxml installed).")
)


def report_level_option():
return click.option(
'--report_level',
type=click.Choice(['all', 'warn', 'error']),
default="all",
)


def fail_level_option():
return click.option(
'--fail_level',
type=click.Choice(['warn', 'error']),
default="warn"
)


def recursive_shed_option():
return click.option(
'-r',
'--recursive',
is_flag=True,
help="Recursively perform command for nested repository directories.",
)
@@ -1,8 +1,10 @@
import fnmatch
import glob
import os
from tempfile import mkstemp
import tarfile
from tempfile import mkstemp

import yaml
import glob

try:
from bioblend import toolshed
@@ -192,6 +194,24 @@ def build_tarball(tool_path):
return temp_path


def walk_repositories(path):
""" Recurse through directories and find effective repositories. """
for base_path, dirnames, filenames in os.walk(path):
for filename in fnmatch.filter(filenames, '.shed.yml'):
yield base_path


def for_each_repository(function, path):
ret_codes = []
for base_path in walk_repositories(path):
ret_codes.append(
function(base_path)
)
# "Good" returns are Nones, everything else is a -1 and should be
# passed upwards.
return 0 if all((not x) for x in ret_codes) else -1


def username(tsi):
user = _user(tsi)
return user["username"]
@@ -0,0 +1,87 @@
import os
import yaml
from galaxy.tools.lint import LintContext
from planemo.lint import lint_xsd
from planemo.tool_lint import (
build_lint_args,
yield_tool_xmls,
)
from planemo.xml import XSDS_PATH


from planemo.io import info
from planemo.io import error

from galaxy.tools.lint import lint_xml_with

TOOL_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "tool_dependencies.xsd")
REPO_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "repository_dependencies.xsd")


def lint_repository(ctx, path, **kwds):
info("Linting repository %s" % path)
lint_args = build_lint_args(**kwds)
lint_ctx = LintContext(lint_args["level"])
lint_ctx.lint(
"tool_dependencies",
lint_tool_dependencies,
path,
)
lint_ctx.lint(
"repository_dependencies",
lint_repository_dependencies,
path,
)
lint_ctx.lint(
"shed_yaml",
lint_shed_yaml,
path,
)
if kwds["tools"]:
for (tool_path, tool_xml) in yield_tool_xmls(ctx, path):
info("+Linting tool %s" % tool_path)
lint_xml_with(
lint_ctx,
tool_xml,
extra_modules=lint_args["extra_modules"]
)
failed = lint_ctx.failed(lint_args["fail_level"])
if failed:
error("Failed linting")
return 1 if failed else 0


def lint_tool_dependencies(path, lint_ctx):
tool_dependencies = os.path.join(path, "tool_dependencies.xml")
if not os.path.exists(tool_dependencies):
lint_ctx.info("No tool_dependencies.xml, skipping.")
return
lint_xsd(lint_ctx, TOOL_DEPENDENCIES_XSD, tool_dependencies)


def lint_repository_dependencies(path, lint_ctx):
repo_dependencies = os.path.join(path, "repository_dependencies.xml")
if not os.path.exists(repo_dependencies):
lint_ctx.info("No repository_dependencies.xml, skipping.")
return
lint_xsd(lint_ctx, REPO_DEPENDENCIES_XSD, repo_dependencies)


def lint_shed_yaml(path, lint_ctx):
shed_yaml = os.path.join(path, ".shed.yml")
if not os.path.exists(shed_yaml):
lint_ctx.info("No .shed.yml file found, skipping.")
return
try:
shed_contents = yaml.load(open(shed_yaml, "r"))
except Exception as e:
lint_ctx.warn("Failed to parse .shed.yml file [%s]" % str(e))

warned = False
for required_key in ["owner", "name"]:
if required_key not in shed_contents:
lint_ctx.warn(".shed.yml did not contain key [%s]" % required_key)
warned = True

if not warned:
lint_ctx.info(".shed.yml found and appears to be valid YAML.")
Oops, something went wrong.

0 comments on commit 912df02

Please sign in to comment.