# PART 2: Dive Into RobotFramework

## Agenda

* RobotFramework的开发和维护
* RobotFramework的整体工作流
* 从pybot命令开始
* RobotFramework的主要模块
* 一些常见的问题
* 练习

## RobotFramework的开发和维护

我们首先来探寻一下RobotFramework的开发模式. 
* open source，所有的代码、文档、issue都在[github](https://github.com/robotframework/robotframework)上
* master分支总是可用的
* UT & AT
* 使用**pull request**来进行code review
* 使用milestone来进行项目进度管理,如https://github.com/robotframework/robotframework/milestones

## RobotFramework的测试

### Unit Testing

```sh
python utest/run_utests.py
```

### Acceptance Testing

```sh
python atest/run_atests.py python robot
```

## RobotFramework的整体工作流

![RobotFramework Workflow](img/robot_workflow.svg "RobotFramework Workflow")

## 从pybot命令行开始。。。

### /usr/local/bin/pybot

```python
#!/usr/bin/python

import sys
from robot import run_cli

run_cli(sys.argv[1:])
```

```python
# robot.run.run_cli
def run_cli(arguments):
    """Command line execution entry point for running tests.

    :param arguments: Command line arguments as a list of strings.

    For programmatic usage the :func:`run` function is typically better. It has
    a better API for that usage and does not call :func:`sys.exit` like this
    function.

    Example::

        from robot import run_cli

        run_cli(['--include', 'tag', 'path/to/tests.html'])
    """
    RobotFramework().execute_cli(arguments)
```

```python
# robot.run.RobotFramework
class RobotFramework(Application):

    def __init__(self):
        Application.__init__(self, USAGE, arg_limits=(1,),
                             env_options='ROBOT_OPTIONS', logger=LOGGER)

    def main(self, datasources, **options):
        settings = RobotSettings(options)
        LOGGER.register_console_logger(**settings.console_logger_config)
        LOGGER.info('Settings:\n%s' % unicode(settings))
        suite = TestSuiteBuilder(settings['SuiteNames'],
                                 settings['WarnOnSkipped'],
                                 settings['RunEmptySuite']).build(*datasources)
        suite.configure(**settings.suite_config)
        with pyloggingconf.robot_handler_enabled(settings.log_level):
            result = suite.run(settings)
            LOGGER.info("Tests execution ended. Statistics:\n%s"
                        % result.suite.stat_message)
            if settings.log or settings.report or settings.xunit:
                writer = ResultWriter(settings.output if settings.log
                                      else result)
                writer.write_results(settings.get_rebot_settings())
        return result.return_code

    def validate(self, options, arguments):
        return self._filter_options_without_value(options), arguments

    def _filter_options_without_value(self, options):
        return dict((name, value) for name, value in options.items() if value)
```

```python
# robot.utils.application.Application
class Application(object):

    def __init__(self, usage, name=None, version=None, arg_limits=None,
                 env_options=None, logger=None, **auto_options):
        self._ap = ArgumentParser(usage, name, version, arg_limits,
                                  self.validate, env_options, **auto_options)
        self._logger = logger or DefaultLogger()

    def main(self, arguments, **options):
        raise NotImplementedError

    def execute_cli(self, cli_arguments):
        with self._logging():
            options, arguments = self._parse_arguments(cli_arguments)
            rc = self._execute(arguments, options)
        self._exit(rc)

    def execute(self, *arguments, **options):
        with self._logging():
            return self._execute(list(arguments), options)

    def _execute(self, arguments, options):
        try:
            rc = self.main(arguments, **options)
        except DataError, err:
            return self._report_error(unicode(err), help=True)
        except (KeyboardInterrupt, SystemExit):
            return self._report_error('Execution stopped by user.',
                                      rc=STOPPED_BY_USER)
        except:
            error, details = get_error_details()
            return self._report_error('Unexpected error: %s' % error,
                                      details, rc=FRAMEWORK_ERROR)
        else:
            return rc or 0
```

## RobotFramework的主要模块

In [1]:
import robot
filter(lambda x: not x.startswith('_') and x not in ('sys', 'version', 'get_version', 'pythonpathsetter', 'run_cli', 'rebot', 'rebot_cli'), dir(robot))

['conf',
 'errors',
 'htmldata',
 'model',
 'output',
 'parsing',
 'reporting',
 'result',
 'run',
 'running',
 'utils',
 'variables',
 'writer']

我们考虑其中主要的模块:
* robot.run
* robot.model
* robot.parsing
* robot.running
* robot.output
* robot.result
* robot.reporting


### robot.run

In [15]:
from robot.run import RobotFramework # main class
from robot.run import run # 执行入口
from robot.run import run_cli # 命令行执行入口

RobotFramework的主要代码
```python
class RobotFramework(Application):

    def __init__(self):
        Application.__init__(self, USAGE, arg_limits=(1,),
                             env_options='ROBOT_OPTIONS', logger=LOGGER)

    def main(self, datasources, **options):
        settings = RobotSettings(options)
        LOGGER.register_console_logger(**settings.console_logger_config)
        LOGGER.info('Settings:\n%s' % unicode(settings))
        suite = TestSuiteBuilder(settings['SuiteNames'],
                                 settings['WarnOnSkipped'],
                                 settings['RunEmptySuite']).build(*datasources)
        suite.configure(**settings.suite_config)
        with pyloggingconf.robot_handler_enabled(settings.log_level):
            result = suite.run(settings)
            LOGGER.info("Tests execution ended. Statistics:\n%s"
                        % result.suite.stat_message)
            if settings.log or settings.report or settings.xunit:
                writer = ResultWriter(settings.output if settings.log
                                      else result)
                writer.write_results(settings.get_rebot_settings())
        return result.return_code
```

run和run_cli的代码
```python
def run_cli(arguments):
    RobotFramework().execute_cli(arguments)


def run(*datasources, **options):
    return RobotFramework().execute(*datasources, **options)
```

### robot.model

类似通常**MVC**框架中的model, 是RobotFramework中的数据抽象层.
robot.model中的代码大致包含以下几类:

* 基本类型
* RobotFramework  suite/case/keyword等的抽象
* 构建上述抽象类的工厂
* **visitor**

In [23]:
# 基本类型
from robot.model.modelobject import ModelObject
from robot.model.itemlist import ItemList

In [28]:
# RobotFramework suite/case/keyword等的抽象
from robot.model.testsuite import TestSuite, TestSuites
from robot.model.testcase import TestCase, TestCases
from robot.model.imports import Import, Imports
from robot.model.keyword import Keyword, Keywords
from robot.model.message import Message, Messages
from robot.model.metadata import Metadata
from robot.model.tags import Tags
from robot.model.suitestatistics import SuiteStatistics
from robot.model.tagstatistics import TagStatistics
from robot.model.totalstatistics import TotalStatistics
from robot.model.stats import SuiteStat, TagStat, TotalStat

In [29]:
# 构建抽象类的工厂
from robot.model.configurer import SuiteConfigurer
from robot.model.statistics import  StatisticsBuilder
from robot.model.suitestatistics import  SuiteStatisticsBuilder
from robot.model.tagstatistics import  TagStatisticsBuilder
from robot.model.totalstatistics import TotalStatisticsBuilder

In [None]:
# visitor
from robot.model.visitor import SuiteVisitor, SkipAllVisitor
from robot.model.tagsetter import  TagSetter
from robot.model.filter import  Filter, EmptySuiteRemover

重点关注**SuiteVisitor**

* 运行，logging以及其他很多功能都基于它来实现
* 实例：[rfexplain.py](http://becrtt01.china.nsn-net.net/ta/rfinsight/blob/master/scripts/rfexplain.py)

### robot.parsing

顾名思义, robot.parsing是RobotFramework的解析模块,负责将suite文件或目录解析成RobotFramework自己的数据结构. 

In [31]:
# case文件或目录的解析模块
from robot.parsing.htmlreader import  HtmlReader
from robot.parsing.restreader import RestReader
from robot.parsing.txtreader import TxtReader
from robot.parsing.tsvreader import TsvReader

最主要的接口 - TestData

In [5]:
# case文件或目录到model的转换 - robot.parsing.model.TestData
from robot.parsing.model import  TestData
from robot.api import TestData # 也可以直接用api调用, 这样更好
data = TestData(source='examples/kw-driven.robot')
print data.name

Kw-Driven


### robot.running

robot.running是RobotFramework运行时的主要模块, RobotFramework的整个运行阶段基本都与它相关

#### TestSuiteBuilder - 创建运行时的TestSuite

In [40]:
from robot.running.builder import TestSuiteBuilder
from robot.api import TestSuiteBuilder # 也可以直接通过api调用,这样更好
suite = TestSuiteBuilder().build('examples/kw-driven.robot')
print suite

Kw-Driven


#### EXECUTION_CONTEXTS - 运行时的所有信息

一个EXECUTION_CONTEXTS类似于一个stack，每个item为一个\_EXECUTION_CONTEXT。
在\_EXECUTION_CONTEXT中最为主要的属性为一个Namespace。

In [2]:
# coding: utf-8
# example - 在当前每个test运行前插入一个分割符的log
from robot.running.context import EXECUTION_CONTEXTS
from robot.running import Keyword

class TestSplitter:
    ROBOT_LISTENER_API_VERSION = 2

    def __init__(self):
        self._splitter = '-' * 80

    def start_test(self, name, attributes):
        ns = EXECUTION_CONTEXTS.current
        Keyword('log', (self._splitter, )).run(ns)


使用命令`pybot --listener listener.TestSplitter kw-driven.robot`执行，可以得到如下的log：

[log.html](examples/log.html)

#### 传递给某个具体的suite的运行数据

In [7]:
from robot.running.namespace import Namespace
import os

# Namespace的主要属性和实现
class Namespace:
    _default_libraries = ('BuiltIn', 'Reserved', 'Easter')
    _deprecated_libraries = {'BuiltIn': 'DeprecatedBuiltIn',
                             'OperatingSystem': 'DeprecatedOperatingSystem'}
    _library_import_by_path_endings = ('.py', '.java', '.class', '/', os.sep)

    def __init__(self, suite, variables, parent_variables, user_keywords,
                 imports):
        LOGGER.info("Initializing namespace for test suite '%s'" % suite.longname)
        self.suite = suite
        self.test = None
        self.uk_handlers = []
        self.variables = _VariableScopes(variables, parent_variables)
        self._imports = imports
        self._kw_store = KeywordStore(user_keywords)
        self._imported_variable_files = ImportCache()

    @property
    def libraries(self):
        return self._kw_store.libraries.values()


#### model - 运行时的模型抽象

In [70]:
from robot.running.model import TestCase, TestSuite
from robot.running.keywords import Keyword, Keywords
from robot.running.status import SuiteStatus, TestStatus
from robot.running.userkeyword import UserLibrary # 还有其它模型抽象
from robot.running.testlibraries import TestLibrary

#### Runner - 实际执行suite的class

* 继承自SuiteVisitor

In [None]:
from robot.running.runner import Runner
# dir(Runner) # 可以看到如start_suite等方法

#### signal监控 - 处理stop gracefully等

In [4]:
from robot.running.signalhandler import STOP_SIGNAL_MONITOR
with STOP_SIGNAL_MONITOR:
    import time
    print 'a'
    time.sleep(2)
    print 'b'
    time.sleep(2)
    print 'c'
    time.sleep(2)

a
b
c


Question:

1. 为什么上面的例子还是把a,b,c都输出来了?
1. 为什么执行Ctrl+C后之后的keyword都不执行了?

#### timeout处理

原理：根据平台和python版本不同，timeout有几种不同的实现方式。
```python
if sys.platform == 'cli':
    from .timeoutthread import Timeout
elif os.name == 'nt':
    from .timeoutwin import Timeout
else:
    try:
        # python 2.6 or newer in *nix or mac
        from .timeoutsignaling import Timeout
    except ImportError:
        # python < 2.6 and jython don't have complete signal module
        from .timeoutthread import Timeout
```
以Linux平台Python 2.7为例来展示一下它的工作方式。

In [6]:
from robot.running.timeouts import Timeout
t = Timeout(1, 'timeout')

def test():
    import time
    print 'start'
    time.sleep(10)
    print 'end'

t.execute(test)

start


TimeoutError: timeout

In [10]:
# 它的内部工作原理还是基于signal
from signal import setitimer, signal, SIGALRM, ITIMER_REAL

def _raise_error(signum, frame):
    raise RuntimeError('timeout')
    
def test():
    import time
    print 'start'
    time.sleep(10)
    print 'end'
    
signal(SIGALRM, _raise_error)
setitimer(ITIMER_REAL, 1)
test()

start


RuntimeError: timeout

### robot.result

**robot.result**是XML output的解析模块, 它的主要功能是将output.xml转换为Result对象

In [None]:
# 从output.xml文件创建Result对象
from robot.result import  ExecutionResult
result = ExecutionResult('output.xml')

In [None]:
#  Result vs. CombinedResult
from robot.api import ExecutionResult
from robot.result.resultbuilder import CombinedResult, Result, Merger
sources = ['log-1.html', 'log-2.html']
options = {}
CombinedResult(ExecutionResult(src, **options) for src in sources)

#### result阶段的model

In [None]:
from robot.result.testsuite import TestSuite
from robot.result.testcase import  TestCase
from robot.result.keyword import Keyword

### robot.reporting

**robot.reporting**主要用于生成log.html和report.html, 除此以外, 它的功能还有:

* 生成新的output.xml
* log.html中js部分的生成器 (用于动态加载优化性能和解决内存问题)

In [73]:
# log.html & report.html
from robot.reporting.resultwriter import ResultWriter, ReportWriter

In [74]:
# output.xml
from robot.reporting.outputwriter import OutputWriter

In [75]:
# js writer
from robot.reporting.jswriter import JsResultWriter, SplitLogWriter, JsonWriter

## 不同阶段的model

在不同的阶段都有对应的model, 另外robot还有一个robot.model模块.

大部分的model都继承自robot.model中的相关类.

但是robot.parsing.model除外. 怀疑是RobotFramework的遗留代码没有重构完导致的.

* robot.model
* robot.parsing.model
* robot.running.model
* robot.result."model"

## Q: 你知道执行pybot后RobotFramework都做了些什么吗?

* 实例化robot.run.RobotFramework
* 命令行参数解析
* 实例化RobotSettings
* 设置LOGGER
* 生成TestSuite
* 执行TestSuite
* 写入log和report
* 判断测试结果
* 结束进程

这里写入log和report为optional步骤

In [82]:
from robot.run import run_cli
run_cli(['examples/kw-driven.robot'])

SystemExit: 0

To exit: use 'exit', 'quit', or Ctrl-D.


## Q: 你知道RobotFramework的Stop Gracefully是怎么实现的吗?

In [4]:
# robot.running.model.TestSuite的run方法
# robot.running.signalhandler
# 通过signal.signal来实现Ctrl+C的监听
from robot.running.signalhandler import STOP_SIGNAL_MONITOR
from robot.output import pyloggingconf

# robot.running.model
def run(self, settings=None, **options):
    if not settings:
        settings = RobotSettings(options)
        LOGGER.register_console_logger(**settings.console_logger_config)
    with pyloggingconf.robot_handler_enabled(settings.log_level):
        with STOP_SIGNAL_MONITOR:
            IMPORTER.reset()
            init_global_variables(settings)
            output = Output(settings)
            runner = Runner(output, settings)
            self.visit(runner)
        output.close(runner.result)
    return runner.result


# robot.running.handlers
def _run_with_signal_monitoring(self, runner, context):
    try:
        STOP_SIGNAL_MONITOR.start_running_keyword(context.in_teardown)
        return runner()
    finally:
        STOP_SIGNAL_MONITOR.stop_running_keyword()


## 练习

1. 阅读相关代码，找出listener的实现原理是什么
1. 阅读相关代码，理清RobotFramework是怎么写入log的
1. 阅读相关代码，理清dry-run是怎么实现的,有哪些局限性
1. 理解RobotFramework的Timeout在Windows平台的实现原理
1. 列出你认为的RobotFramework的设计缺陷或局限性