Skip to content
This repository
Fetching contributors…

Cannot retrieve contributors at this time

file 145 lines (125 sloc) 4.322 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
# -*- coding: utf-8 -*-
# <Lettuce - Behaviour Driven Development for python>
# Copyright (C) <2010-2012> Gabriel Falcão <gabriel@nacaolivre.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import threading
import traceback

from lettuce.exceptions import StepLoadingError

world = threading.local()
world._set = False


def _function_matches(one, other):
    return (os.path.abspath(one.func_code.co_filename) == os.path.abspath(other.func_code.co_filename) and
            one.func_code.co_firstlineno == other.func_code.co_firstlineno)


class CallbackDict(dict):
    def append_to(self, where, when, function):
        if not any(_function_matches(o, function) for o in self[where][when]):
            self[where][when].append(function)

    def clear(self):
        for name, action_dict in self.items():
            for callback_list in action_dict.values():
                callback_list[:] = []

class StepDict(dict):
    def load(self, step, func):
        self._assert_is_step(step, func)
        self[step] = func
        return func

    def load_func(self, func):
        regex = self._extract_sentence(func)
        return self.load(regex, func)

    def load_steps(self, obj):
        exclude = getattr(obj, "exclude", [])
        for attr in dir(obj):
            if self._attr_is_step(attr, obj) and attr not in exclude:
                step_method = getattr(obj, attr)
                self.load_func(step_method)
        return obj

    def _extract_sentence(self, func):
        func = getattr(func, '__func__', func)
        sentence = getattr(func, '__doc__', None)
        if sentence is None:
            sentence = func.func_name.replace('_', ' ')
            sentence = sentence[0].upper() + sentence[1:]
        return sentence

    def _assert_is_step(self, step, func):
        try:
            re.compile(step)
        except re.error, e:
            raise StepLoadingError("Error when trying to compile:\n"
                                   " regex: %r\n"
                                   " for function: %s\n"
                                   " error: %s" % (step, func, e))

    def _attr_is_step(self, attr, obj):
        return attr[0] != '_' and self._is_func_or_method(getattr(obj, attr))

    def _is_func_or_method(self, func):
        func_dir = dir(func)
        return callable(func) and ("func_name" in func_dir or "__func__" in func_dir)


STEP_REGISTRY = StepDict()
CALLBACK_REGISTRY = CallbackDict(
    {
        'all': {
            'before': [],
            'after': [],
        },
        'step': {
            'before_each': [],
            'after_each': [],
            'before_output': [],
            'after_output': [],
        },
        'scenario': {
            'before_each': [],
            'after_each': [],
            'outline': [],
        },
        'background': {
            'before_each': [],
            'after_each': [],
        },
        'feature': {
            'before_each': [],
            'after_each': [],
        },
        'app': {
            'before_each': [],
            'after_each': [],
        },
        'harvest': {
            'before': [],
            'after': [],
        },
        'handle_request': {
            'before': [],
            'after': [],
        },
        'runserver': {
            'before': [],
            'after': [],
        },
    },
)


def call_hook(situation, kind, *args, **kw):
    for callback in CALLBACK_REGISTRY[kind][situation]:
        try:
            callback(*args, **kw)
        except Exception, e:
            print "=" * 1000
            traceback.print_exc(e)
            print
            raise


def clear():
    STEP_REGISTRY.clear()
    CALLBACK_REGISTRY.clear()
Something went wrong with that request. Please try again.