-
Notifications
You must be signed in to change notification settings - Fork 44
/
service_implementation.py
279 lines (227 loc) · 10.8 KB
/
service_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import re
import ipaddress
from unicon.plugins.generic.service_implementation import (
Execute as GenericExecute, ContextMgrBaseService)
from unicon.bases.linux.services import BaseService
from unicon.core.errors import SubCommandFailure, StateMachineError
from unicon.eal.dialogs import Dialog, Statement
from unicon.plugins.linux.patterns import LinuxPatterns
from unicon.plugins.linux.utils import LinuxUtils
from unicon.utils import AttributeDict
from unicon.logs import UniconFileHandler
from unicon import log
from .statements import linux_execution_statements
utils = LinuxUtils()
class Execute(GenericExecute):
""" Execute Service implementation
Service to executes exec_commands on the device and return the
console output. reply option can be passed for the interactive exec
command.
Arguments:
command: exec command
reply: Additional Dialog patterns for interactive exec commands.
timeout : Timeout value in sec, Default Value is 60 sec.
check_retcode: (boolean) Enable/disable return code checking.
valid_retcodes: (list) List of valid return codes.
Returns:
True on Success, raise SubCommandFailure on failure
Example:
.. code-block:: python
output = dev.execute("show command")
"""
def __init__(self, connection, context, **kwargs):
# Connection object will have all the received details
super().__init__(connection, context, **kwargs)
self.utils = utils
self.dialog = Dialog(linux_execution_statements)
def post_service(self, *args, **kwargs):
if kwargs.get('check_retcode', self.connection.settings.CHECK_RETURN_CODE):
valid_retcodes = kwargs.get('valid_retcodes', self.connection.settings.VALID_RETURN_CODES)
assert isinstance(valid_retcodes, list), 'valid_retcodes should be a list'
con = self.connection
sm = con.state_machine
timeout = kwargs.get('timeout') or self.timeout
dialog = self.service_dialog()
con.sendline('echo $?')
try:
match = con.expect(r'\r\n\d+', timeout=timeout)
if match:
ret_code = match.match_output.replace('echo $?', '').lstrip()
ret_code = re.match(r'^(\d+)', ret_code)
if ret_code:
ret_code = ret_code.group(1)
if int(ret_code) not in valid_retcodes:
raise SubCommandFailure('Invalid return code: %s' % ret_code)
dialog.process(
con.spawn,
timeout=timeout,
prompt_recovery=self.prompt_recovery,
context=con.context
)
except Exception as err:
raise SubCommandFailure("Command execution failed", err) from err
class Ping(BaseService):
""" Service to issue ping response request to another network from device.
Returns:
ping command response, raises SubCommandFailure if 0% packet loss is not seen
Example:
.. code-block:: python
ping("10.1.1.1")
ping("10.2.1.1", count=10)
Syntax:
.. code-block:: python
ping(destination, options="LRUbfnqrvA", arg=value)
"""
_ping_option_long_to_short = {
'count': 'c',
'interval': 'i',
'deadline': 'w',
'timeout': 'w',
'pattern': 'p',
'size': 's',
'ttl': 't',
'interface': 'I',
'sndbuf': 'S',
'timestamp': 'T',
'tos': 'Q'
}
# Ping Options
ping_boolean_options = {
# 'a': 'Audible ping'
'A': 'Adaptive ping. Interpacket interval adapts to round-trip time',
'b': 'Allow pinging a broadcast address.',
# 'd': 'Set the SO_DEBUG option on the socket being used.'
# ' Essentially, this socket option is not used by Linux kernel.',
'f': 'Flood ping.',
'L': 'Suppress loopback of multicast packets.'
' This flag only applies if the ping destination is a multicast address.',
'n': 'Numeric output only. No attempt will be made to lookup'
' symbolic names for host addresses.',
'q': 'Quiet output. Nothing is displayed except the summary lines at startup time and when finished.',
'r': 'Bypass the normal routing tables and send directly to a host on an attached interface.',
'R': 'Record route.',
'S': 'Set socket sndbuf. If not specified, it is selected to buffer not more than one packet.',
'U': 'Print full user-to-user latency',
'v': 'Verbose output.',
# 'V': 'show version',
}
ping_arg_options = {
'c': 'Number of packets to send',
'i': 'Wait interval seconds between sending each packet.',
'I': 'Set source address to specified interface address. '
'Argument may be numeric IP address or name of device.',
'p': 'You may specify up to 16 ''pad'' bytes to '
'fill out the packet you send.',
's': 'Specifies the number of data bytes to be sent.',
'Q': 'Set Quality of Service -related bits in ICMP datagrams. '
'tos can be either decimal or hex number. ',
't': 'Set the IP Time to Live.',
'T': 'Set special IP timestamp options. timestamp option may be either\n'
' tsonly (only timestamps), tsandaddr (timestamps and addresses) or\n'
' tsprespec host1 [host2 [host3 [host4]]] (timestamp prespecified hops).',
'w': 'Specify a timeout, in seconds',
}
def __init__(self, connection, context, **kwargs):
super().__init__(connection, context, **kwargs)
self.start_state = 'shell'
self.end_state = 'shell'
self.timeout = 60
# Ping error Patterns
self.default_error_pattern = ['[123456789]+0*% packet loss']
self.__dict__.update(kwargs)
ping_option_short_to_long = {v:k for (k,v) in self._ping_option_long_to_short.items()}
self.__doc__ = self.__doc__ + "Boolean options\n{}\n\n Argument options\n{}".format(
"\n".join(["{:>10}: {}".format(k,self.ping_boolean_options[k]) \
for k in sorted(self.ping_boolean_options.keys(), key=lambda k: k.lower())]),
"\n".join(["{:>10}: {}".format(ping_option_short_to_long[k],self.ping_arg_options[k]) \
for k in sorted(self.ping_arg_options.keys(), key=lambda k: k.lower())]),
)
def call_service(self, addr, command="ping", **kwargs):
if not addr:
raise SubCommandFailure("Address is not specified")
if 'error_pattern' in kwargs:
self.error_pattern = kwargs['error_pattern']
if self.error_pattern is None:
self.error_pattern=[]
if not isinstance(self.error_pattern, list):
raise ValueError('error pattern must be a list')
kwargs.pop('error_pattern')
else:
self.error_pattern = self.default_error_pattern
con = self.connection
# Default value setting
timeout = self.timeout
ping_options = AttributeDict({})
ping_options['c'] = '5' # default to 5 packets
# Read input values passed
# Convert to string in case users passes non-string types
for key in kwargs.copy():
if key in self._ping_option_long_to_short:
new_key = self._ping_option_long_to_short[key]
kwargs[new_key] = kwargs[key]
kwargs.pop(key)
key = new_key
if key == 'options':
for o in str(kwargs['options']):
if o in self.ping_boolean_options:
ping_options[o] = "" # Boolean options
else:
log.warning('Uknown ping option - %s, ignoring' % o)
elif key in self.ping_arg_options:
ping_options[key] = str(kwargs[key])
else:
log.warning('Uknown ping option - %s, ignoring' % key)
if not 'options' in kwargs:
ping_options['A'] = "" # Default to adaptive ping
ipaddr = ipaddress.ip_address(addr)
if isinstance(ipaddr, ipaddress.IPv6Address):
ping_str = 'ping6'
elif isinstance(ipaddr, ipaddress.IPv4Address):
ping_str = 'ping'
else:
# Stringify the command in case it is an object.
ping_str = str(command)
for ping_option in sorted(ping_options):
ping_str += ' -%s%s' % (ping_option, ping_options[ping_option])
ping_str += ' %s' % addr
p = LinuxPatterns()
dialog = Dialog()
dialog.append(Statement(pattern=p.prompt))
spawn = con.spawn
spawn.sendline(ping_str)
try:
self.result = dialog.process(spawn, timeout=timeout)
except Exception as err:
raise SubCommandFailure("Ping failed", err)
# Remove command and hostname from output.
if self.result:
output = utils.truncate_trailing_prompt(
con.state_machine.get_state(con.state_machine.current_state),
self.result.match_output,
self.connection.hostname)
output = re.sub(re.escape(ping_str), "", output, 1)
self.result = output.strip()
# Error checking is not part of the linux base infra, adding it here for now
# TODO: update linux infra
self.match_flag = False
self.match_list = []
for pat in self.error_pattern:
m = re.search(pat, self.result)
if m:
self.match_list.append(m.group())
self.match_flag = True
if self.match_flag:
raise SubCommandFailure(self.result, self.match_list)
class Sudo(Execute):
def __init__(self, connection, context, **kwargs):
super().__init__(connection, context, **kwargs)
def call_service(self, command='bash', **kwargs):
super().call_service('sudo {}'.format(command), **kwargs)
class TrexConsole(ContextMgrBaseService):
def __init__(self, connection, context, **kwargs):
super().__init__(connection, context, **kwargs)
self.context_state = 'trex_console'
self.service_name = 'trex_console'
self.start_state = "shell"
self.end_state = "shell"
self.__dict__.update(kwargs)