-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
646 additions
and
204 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 19 additions & 40 deletions
59
cloudshell/cli/command_template/command_template_executor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,39 @@ | ||
from collections import OrderedDict | ||
|
||
from cloudshell.cli.command_template.command_template import CommandTemplate | ||
from cloudshell.cli.service.action_map import ActionMap | ||
from cloudshell.cli.service.error_map import ErrorMap | ||
|
||
|
||
class CommandTemplateExecutor(object): | ||
""" | ||
Execute command template using cli service | ||
""" | ||
"""Execute command template using cli service""" | ||
|
||
def __init__(self, cli_service, command_template, action_map=None, error_map=None, **optional_kwargs): | ||
""" | ||
:param cli_service: | ||
:type cli_service: CliService | ||
:param command_template: | ||
:type command_template: CommandTemplate | ||
:param error_map: expected error map with subclass of CommandExecutionException or str | ||
:type error_map: dict[str, cloudshell.cli.session.session_exceptions.CommandExecutionException|str] | ||
:param cloudshell.cli.service.cli_service.CliService cli_service: | ||
:param cloudshell.cli.command_template.command_template.CommandTemplate command_template: | ||
:param cloudshell.cli.service.action_map.ActionMap action_map: | ||
:param cloudshell.cli.service.error_map.ErrorMap error_map: | ||
:return: | ||
""" | ||
self._cli_service = cli_service | ||
self._command_template = command_template | ||
self._action_map = action_map or OrderedDict() | ||
self._error_map = error_map or OrderedDict() | ||
self._optional_kwargs = optional_kwargs | ||
|
||
@property | ||
def action_map(self): | ||
""" | ||
Return updated action | ||
""" | ||
return dict(**self._action_map, **self._command_template.action_map) | ||
self._action_map = action_map or ActionMap() | ||
self._action_map.extend(command_template.action_map) | ||
|
||
@property | ||
def error_map(self): | ||
return dict(**self._error_map, **self._command_template.error_map) | ||
self._error_map = error_map or ErrorMap() | ||
self._error_map.extend(command_template.error_map) | ||
|
||
@property | ||
def optional_kwargs(self): | ||
return self._optional_kwargs | ||
self._optional_kwargs = optional_kwargs | ||
|
||
def execute_command(self, **command_kwargs): | ||
""" | ||
Execute command | ||
:param command_kwargs: | ||
:param dict command_kwargs: | ||
:return: Command output | ||
:rtype: str | ||
""" | ||
command = self._command_template.prepare_command(**command_kwargs) | ||
return self._cli_service.send_command(command, action_map=self.action_map, error_map=self.error_map, | ||
**self.optional_kwargs) | ||
|
||
def update_action_map(self, action_map): | ||
self._action_map.update(action_map) | ||
|
||
def update_error_map(self, error_map): | ||
self._error_map.update(error_map) | ||
|
||
def update_optional_kwargs(self, **optional_kwargs): | ||
self.optional_kwargs.update(optional_kwargs) | ||
return self._cli_service.send_command(command, | ||
action_map=self._action_map, | ||
error_map=self._error_map, | ||
**self._optional_kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
from collections import OrderedDict | ||
import re | ||
|
||
from cloudshell.cli.session.session_exceptions import SessionLoopDetectorException | ||
|
||
|
||
class Action: | ||
def __init__(self, pattern, callback, execute_once=False): | ||
""" | ||
:param str pattern: | ||
:param function callback: | ||
:param bool execute_once: | ||
""" | ||
self.pattern = pattern | ||
self.compiled_pattern = re.compile(pattern=pattern, flags=re.DOTALL) | ||
self.callback = callback | ||
self.execute_once = execute_once | ||
|
||
def __call__(self, session, logger): | ||
""" | ||
:param cloudshell.cli.session.expect_session.ExpectSession session: | ||
:param logging.Logger logger: | ||
:return: | ||
""" | ||
return self.callback(session, logger) | ||
|
||
def __repr__(self): | ||
""" | ||
:rtype: str | ||
""" | ||
return f"{super().__repr__()} pattern: {self.pattern}, execute once: {self.execute_once}" | ||
|
||
def match(self, output): | ||
""" | ||
:param str output: | ||
:rtype: bool | ||
""" | ||
return bool(self.compiled_pattern.search(output)) | ||
|
||
|
||
class ActionMap: | ||
def __init__(self, actions=None): | ||
""" | ||
:param list[Action] actions: | ||
""" | ||
if actions is None: | ||
actions = [] | ||
|
||
self.matched_patterns = set() | ||
self._actions_dict = OrderedDict([(action.pattern, action) for action in actions]) | ||
|
||
@property | ||
def actions(self): | ||
""" | ||
:rtype: list[Action] | ||
""" | ||
return list(self._actions_dict.values()) | ||
|
||
@property | ||
def active_actions(self): | ||
""" | ||
:rtype: list[Action] | ||
""" | ||
return [action for action in self.actions if (not action.execute_once or | ||
action.pattern not in self.matched_patterns)] | ||
|
||
def add(self, action): | ||
""" | ||
:param Action action: | ||
:return: | ||
""" | ||
self._actions_dict[action.pattern] = action | ||
|
||
def extend(self, action_map, override=False, extend_matched_patterns=True): | ||
""" | ||
:param ActionMap action_map: | ||
:param bool override: | ||
:param bool extend_matched_patterns: | ||
:return: | ||
""" | ||
for action in action_map.actions: | ||
if not override and action.pattern in self._actions_dict: | ||
continue | ||
self.add(action) | ||
|
||
if extend_matched_patterns: | ||
self.matched_patterns |= action_map.matched_patterns | ||
|
||
def process(self, session, logger, output, check_action_loop_detector, action_loop_detector): | ||
""" | ||
:param cloudshell.cli.session.expect_session.ExpectSession session: | ||
:param logging.Logger logger: | ||
:param str output: | ||
:param bool check_action_loop_detector: | ||
:param ActionLoopDetector action_loop_detector: | ||
:rtype: bool | ||
""" | ||
for action in self.active_actions: | ||
if action.match(output): | ||
logger.debug(f"Matched Action with pattern: {action.pattern}") | ||
|
||
if check_action_loop_detector: | ||
logger.debug(f"Checking loops for Action with pattern : {action.pattern}") | ||
|
||
if action_loop_detector.loops_detected(action.pattern): | ||
logger.error(f"Loops detected for action patter: {action.pattern}") | ||
raise SessionLoopDetectorException("Expected actions loops detected") | ||
|
||
action(session, logger) | ||
self.matched_patterns.add(action.pattern) | ||
return True | ||
|
||
return False | ||
|
||
def __add__(self, other): | ||
""" | ||
:param other: | ||
:rtype: ActionMap | ||
""" | ||
action_map_class = type(self) | ||
if isinstance(other, action_map_class): | ||
action_map = action_map_class(actions=self.actions) | ||
action_map.extend(other, extend_matched_patterns=False) | ||
return action_map | ||
|
||
raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'") | ||
|
||
def __repr__(self): | ||
""" | ||
:rtype: str | ||
""" | ||
return f"{super().__repr__()} matched patterns: {self.matched_patterns}, actions: {self.actions}" | ||
|
||
|
||
class ActionLoopDetector: | ||
"""Help to detect loops for action combinations""" | ||
|
||
def __init__(self, max_loops, max_combination_length): | ||
""" | ||
:param max_loops: | ||
:param max_combination_length: | ||
:return: | ||
""" | ||
self._max_action_loops = max_loops | ||
self._max_combination_length = max_combination_length | ||
self._action_history = [] | ||
|
||
def loops_detected(self, action_pattern): | ||
"""Add action key to the history and detect loops | ||
:param str action_pattern: | ||
:return: | ||
""" | ||
self._action_history.append(action_pattern) | ||
for combination_length in range(1, self._max_combination_length + 1): | ||
if self._is_combination_compatible(combination_length): | ||
if self._is_loop_exists(combination_length): | ||
return True | ||
return False | ||
|
||
def _is_combination_compatible(self, combination_length): | ||
"""Check if combinations may exist | ||
:param combination_length: | ||
:return: | ||
""" | ||
return len(self._action_history) / combination_length >= self._max_action_loops | ||
|
||
def _is_loop_exists(self, combination_length): | ||
"""Detect loops for combination length | ||
:param combination_length: | ||
:return: | ||
""" | ||
reversed_history = self._action_history[::-1] | ||
combinations = [reversed_history[x:x + combination_length] for x in | ||
range(0, len(reversed_history), combination_length)][:self._max_action_loops] | ||
for x, y in [combinations[x:x + 2] for x in range(0, len(combinations) - 1)]: | ||
if x != y: | ||
return False | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.