/
host.py
157 lines (131 loc) · 5.47 KB
/
host.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Copyright 2015-2017 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import glob
import os
import signal
import shutil
import subprocess
import logging
from distutils.dir_util import copy_tree
from getpass import getpass
from pipes import quote
from devlib.exception import TargetTransientError, TargetStableError
from devlib.utils.misc import check_output
from devlib.connection import ConnectionBase, PopenBackgroundCommand
PACKAGE_BIN_DIRECTORY = os.path.join(os.path.dirname(__file__), 'bin')
# pylint: disable=redefined-outer-name
def kill_children(pid, signal=signal.SIGKILL):
with open('/proc/{0}/task/{0}/children'.format(pid), 'r') as fd:
for cpid in map(int, fd.read().strip().split()):
kill_children(cpid, signal)
os.kill(cpid, signal)
class LocalConnection(ConnectionBase):
name = 'local'
host = 'localhost'
@property
def connected_as_root(self):
if self._connected_as_root is None:
result = self.execute('id', as_root=False)
self._connected_as_root = 'uid=0(' in result
return self._connected_as_root
@connected_as_root.setter
def connected_as_root(self, state):
self._connected_as_root = state
# pylint: disable=unused-argument
def __init__(self, platform=None, keep_password=True, unrooted=False,
password=None, timeout=None):
super().__init__()
self._connected_as_root = None
self.logger = logging.getLogger('local_connection')
self.keep_password = keep_password
self.unrooted = unrooted
self.password = password
def _copy_path(self, source, dest):
self.logger.debug('copying {} to {}'.format(source, dest))
if os.path.isdir(source):
# Behave similarly as cp, scp, adb push, etc. by creating a new
# folder instead of merging hierarchies
if os.path.exists(dest):
dest = os.path.join(dest, os.path.basename(os.path.normpath(src)))
# Use distutils copy_tree since it behaves the same as
# shutils.copytree except that it won't fail if some folders
# already exist.
#
# Mirror the behavior of all other targets which only copy the
# content without metadata
copy_tree(source, dest, preserve_mode=False, preserve_times=False)
else:
shutil.copy(source, dest)
def _copy_paths(self, sources, dest):
for source in sources:
self._copy_path(source, dest)
def push(self, sources, dest, timeout=None, as_root=False): # pylint: disable=unused-argument
self._copy_paths(sources, dest)
def pull(self, sources, dest, timeout=None, as_root=False): # pylint: disable=unused-argument
self._copy_paths(sources, dest)
# pylint: disable=unused-argument
def execute(self, command, timeout=None, check_exit_code=True,
as_root=False, strip_colors=True, will_succeed=False):
self.logger.debug(command)
if as_root and not self.connected_as_root:
if self.unrooted:
raise TargetStableError('unrooted')
password = self._get_password()
command = 'echo {} | sudo -S -- sh -c '.format(quote(password)) + quote(command)
ignore = None if check_exit_code else 'all'
try:
return check_output(command, shell=True, timeout=timeout, ignore=ignore)[0]
except subprocess.CalledProcessError as e:
message = 'Got exit code {}\nfrom: {}\nOUTPUT: {}'.format(
e.returncode, command, e.output)
if will_succeed:
raise TargetTransientError(message)
else:
raise TargetStableError(message)
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False):
if as_root and not self.connected_as_root:
if self.unrooted:
raise TargetStableError('unrooted')
password = self._get_password()
command = 'echo {} | sudo -S '.format(quote(password)) + command
# Make sure to get a new PGID so PopenBackgroundCommand() can kill
# all sub processes that could be started without troubles.
def preexec_fn():
os.setpgrp()
popen = subprocess.Popen(
command,
stdout=stdout,
stderr=stderr,
shell=True,
preexec_fn=preexec_fn,
)
bg_cmd = PopenBackgroundCommand(popen)
self._current_bg_cmds.add(bg_cmd)
return bg_cmd
def _close(self):
pass
def cancel_running_command(self):
pass
def wait_for_device(self, timeout=30):
return
def reboot_bootloader(self, timeout=30):
raise NotImplementedError()
def _get_password(self):
if self.password:
return self.password
password = getpass('sudo password:')
if self.keep_password:
self.password = password
return password