Skip to content

Commit

Permalink
Fixed BZ Workflow
Browse files Browse the repository at this point in the history
close #4263
  • Loading branch information
renzon authored and renzon committed May 10, 2017
1 parent 9820ff9 commit a61e144
Show file tree
Hide file tree
Showing 2 changed files with 479 additions and 315 deletions.
199 changes: 110 additions & 89 deletions robottelo/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
# -*- encoding: utf-8 -*-
"""Implements various decorators"""
import logging
from functools import wraps, partial
from itertools import chain

import bugzilla
import pytest
import re
import requests
import unittest2
from functools import wraps
from robottelo.helpers import get_func_name
from robozilla.bz import BZReader

from robottelo.config import settings
from robottelo.constants import (
BUGZILLA_URL,
BZ_OPEN_STATUSES,
NOT_IMPLEMENTED,
REDMINE_URL
)
from robottelo.constants import BZ_OPEN_STATUSES, NOT_IMPLEMENTED, REDMINE_URL
from robottelo.helpers import get_func_name
from robottelo.host_info import get_host_sat_version
from six.moves.xmlrpc_client import Fault
from xml.parsers.expat import ExpatError, ErrorString


LOGGER = logging.getLogger(__name__)
OBJECT_CACHE = {}
Expand Down Expand Up @@ -118,8 +112,7 @@ def test_something(self):
if not options_set.issubset(settings.all_features):
invalid = options_set.difference(settings.all_features)
raise ValueError(
'Feature(s): "{0}" not found. Available ones are: "{1}".'
.format(
'Feature(s): "{0}" not found. Available ones are: "{1}".'.format(
', '.join(invalid),
', '.join(settings.all_features)
)
Expand Down Expand Up @@ -302,34 +295,22 @@ def _get_bugzilla_bug(bug_id):
bz_credentials = {}
if setting_is_set('bugzilla'):
bz_credentials = settings.bugzilla.get_credentials()
try:
bz_conn = bugzilla.RHBugzilla(url=BUGZILLA_URL, **bz_credentials)
except (TypeError, ValueError): # pragma: no cover
raise BugFetchError(
'Could not connect to {0}'.format(BUGZILLA_URL)
)
# Fetch the bug and place it in the cache.
try:
_bugzilla[bug_id] = bz_conn.getbug(
bug_id,
include_fields=['id', 'status', 'whiteboard', 'flags']
)
if not bz_credentials:
raise BZUnauthenticatedCall(
_bugzilla[bug_id],
'Unauthenticated call made to BZ API, no flags data will '
'be available'
)
except Fault as err:
raise BugFetchError(
'Could not fetch bug. Error: {0}'.format(err.faultString)
)
except ExpatError as err:
raise BugFetchError(
'Could not interpret bug. Error: {0}'
.format(ErrorString(err.code))
)

reader = BZReader(
bz_credentials,
include_fields=[
'id', 'status', 'whiteboard', 'flags', 'resolution',
'target_milestone'
],
follow_clones=True
)
_bugzilla[bug_id] = reader.get_bug_data(bug_id)
if not bz_credentials:
raise BZUnauthenticatedCall(
_bugzilla[bug_id],
'Unauthenticated call made to BZ API, no flags data will '
'be available'
)
return _bugzilla[bug_id]


Expand Down Expand Up @@ -387,45 +368,73 @@ def _get_redmine_bug_status_id(bug_id):
_redmine['issues'][bug_id] = result['issue']['status']['id']
except KeyError as err:
raise BugFetchError(
'Could not get status ID of Redmine bug {0}. Error: {1}'.
format(bug_id, err)
'Could not get status ID of Redmine bug {0}. Error: {1}'
.format(bug_id, err)
)

return _redmine['issues'][bug_id]


def _skip_flags_condition(flags):
def _to_float_version(regular_exp, possible_version):
"""extract version using regular_exp againt possible_version string.
Returns None if extraction is not possible"""
result = regular_exp.search(possible_version)
if result:
return float(result.group('version'))


_to_zstream_version = partial(
_to_float_version, re.compile(r'sat-(?P<version>\d\.\d)\.z'))

_to_downstream_version = partial(
_to_float_version, re.compile(r'sat-(?P<version>\d\.\d)\.\d'))

_to_target_milestone_version = partial(
_to_float_version, re.compile(r'(?P<version>\d\.\d)\.\d'))


def _skip_downstream_condition(bug):
"""Analyse bugzila flags returning False if host version is greater or
equal to min positive flag version, True otherwise.
:param flags: list
:return: bool
"""
version_re = re.compile(r'sat-(?P<version>\d(\.\d){1})')
flags = bug.get('flags', {})
positive_flags = [k for k, v in flags.items() if v == '+']
zstream_versions = list(filter(
lambda version: version is not None,
map(_to_zstream_version, positive_flags)
))
downstream_versions = list(filter(
lambda version: version is not None,
map(_to_downstream_version, positive_flags)
))

if len(downstream_versions) == 0 and len(zstream_versions) == 0:
return True
target_milestone_version = bug['target_milestone']

def to_float_version(flag_name):
result = version_re.search(flag_name)
if result:
return float(result.group('version'))
has_down_and_zstream = downstream_versions and zstream_versions
if has_down_and_zstream and target_milestone_version is 'Unspecified':
LOGGER.warning('Bugzilla with both downstream and zstream flags and '
'unspecified target_milestone: {}'.format(bug))
return True

flag_version = _to_target_milestone_version(target_milestone_version)

if flag_version is None or not has_down_and_zstream:
if downstream_versions:
flag_version = min(downstream_versions)
else:
flag_version = min(zstream_versions)

positive_flag_versions = (
to_float_version(flag['name'])
for flag in flags if flag['status'] == '+'
)
try:
min_positive_flag_version = min(
filter(lambda version: version is not None, positive_flag_versions)
)
except ValueError: # pragma: no cover
# If flag regarding sat is not available
return True
sat_version = float(get_host_sat_version())
except ValueError:
return False
else:
try:
sat_version = float(get_host_sat_version())
except ValueError:
return False
else:
return sat_version < min_positive_flag_version
return sat_version < flag_version


def bz_bug_is_open(bug_id):
Expand All @@ -441,47 +450,59 @@ def bz_bug_is_open(bug_id):
"""
try:
bug = _get_bugzilla_bug(bug_id)
except BugFetchError as err:
LOGGER.warning(err)
return False
except BZUnauthenticatedCall as err:
LOGGER.warning(err)
return _check_skip_conditions(err.bug, False)
return _check_skip_conditions_for_bug_and_clones(err.bug, False)
else:
return _check_skip_conditions(bug)
return _check_skip_conditions_for_bug_and_clones(bug)


def _check_skip_conditions_for_bug_and_clones(bug, consider_flags=True):
"""Check bug skip conditions for the bug and it's clones. If any of them
returns False, test will not be skipped.
Check flowchar: https://www.lucidchart.com/invitations/accept/
3a595f1d-103f-49aa-99bf-ba1acb632b99
def _check_skip_conditions(bug, consider_flags=True):
:param bug: bug to analyse conditions
:return: boolean indicating if it must be skipped or not
"""
if bug is None:
return False
all_bugs = chain([bug], bug.get('other_clones', {}).values())
skip_results = (
_check_skip_condition_for_one_bug(bug_or_clone, consider_flags)
for bug_or_clone in all_bugs
)
return all(skip_results)


def _check_skip_condition_for_one_bug(bug, consider_flags):
"""Check bug skip conditions. It will not take into account bug's flags
if consider_flags parameter is False
:param bug: bug to analyse conditions
:return: boolean indicating if it must be skipped or not
"""
# NEW, ASSIGNED, MODIFIED, POST
if bug.status in BZ_OPEN_STATUSES:
if bug['status'] in BZ_OPEN_STATUSES:
return True
elif settings.upstream:
return False

# do not test bugs with whiteboard 'verified in upstream' in downstream
# until they are in 'CLOSED' state
# verify all conditions are True, stopping evaluation when
# first condition is False
zstream_re = re.compile(r'sat-\d\.\d\.z')

def is_positive_zstream(flag):
return flag['status'] == '+' and zstream_re.search(flag['name'])

def skip_upstream_conditions(flags):
for flag in flags:
yield not is_positive_zstream(flag)
yield bug.status != 'CLOSED'
yield bug.whiteboard
yield 'verified in upstream' in bug.whiteboard.lower()

return (all(skip_upstream_conditions(bug.flags)) or
(consider_flags and _skip_flags_condition(bug.flags)))
"""do not test bugs with whiteboard 'verified in upstream' in
downstream until they are in 'CLOSED' state
Verify all conditions are True, stopping evaluation when
first condition is False
"""
yield bug['status'] != 'CLOSED'
whiteboard = bug['whiteboard']
yield whiteboard
yield 'verified in upstream' in whiteboard.lower()

return (all(skip_upstream_conditions(bug.get('flags', {}))) or
(consider_flags and _skip_downstream_condition(bug)))


def rm_bug_is_open(bug_id):
Expand Down
Loading

0 comments on commit a61e144

Please sign in to comment.