forked from google/python-adb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
common_cli.py
151 lines (134 loc) · 4.68 KB
/
common_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import print_function
import argparse
import io
import inspect
import logging
import re
import sys
import types
from adb import usb_exceptions
class _PortPathAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(
namespace, self.dest, [int(i) for i in values.replace("/", ",").split(",")]
)
class PositionalArg(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
namespace.positional.append(values)
def GetDeviceArguments():
group = argparse.ArgumentParser("Device", add_help=False)
group.add_argument(
"--timeout_ms",
default=10000,
type=int,
metavar="10000",
help="Timeout in milliseconds.",
)
group.add_argument(
"--port_path",
action=_PortPathAction,
help="USB port path integers (eg 1,2 or 2,1,1)",
)
group.add_argument(
"-s", "--serial", help="Device serial to look for (host:port or USB serial)"
)
return group
def GetCommonArguments():
group = argparse.ArgumentParser("Common", add_help=False)
group.add_argument("--verbose", action="store_true", help="Enable logging")
return group
def _DocToArgs(doc):
offset = None
param = None
out = {}
for l in doc.splitlines():
if l.strip() == "Parameters":
offset = len(l.rstrip()) - len(l.strip())
elif offset:
if l.strip() == "-" * len("Parameters"):
continue
if l.strip() in ["Returns", "Yields", "Raises"]:
break
if len(l.rstrip()) - len(l.strip()) == offset:
param = l.strip().split()[0]
out[param] = ""
elif l.strip():
if out[param]:
out[param] += " " + l.strip()
else:
out[param] = l.strip()
return out
def MakeSubparser(subparsers, parents, method, arguments=None):
name = ("-".join(re.split(r"([A-Z][a-z]+)", method.__name__)[1:-1:2])).lower()
help = method.__doc__.splitlines()[0]
subparser = subparsers.add_parser(
name=name, description=help, help=help.rstrip("."), parents=parents
)
subparser.set_defaults(method=method, positional=[])
argspec = inspect.getfullargspec(method)
offset = len(argspec.args) - len(argspec.defaults or []) - 1
positional = []
for i in range(1, len(argspec.args)):
if i > offset and argspec.defaults[i - offset - 1] is None:
break
positional.append(argspec.args[i])
defaults = [None] * offset + list(argspec.defaults or [])
args_help = _DocToArgs(method.__doc__)
for name, default in zip(positional, defaults):
if not isinstance(default, (None.__class__, str)):
continue
subparser.add_argument(
name,
help=(arguments or {}).get(name, args_help.get(name)),
default=default,
nargs="?" if default is not None else None,
action=PositionalArg,
)
if argspec.varargs:
subparser.add_argument(
argspec.varargs,
nargs=argparse.REMAINDER,
help=(arguments or {}).get(argspec.varargs, args_help.get(argspec.varargs)),
)
return subparser
def _RunMethod(dev, args, extra):
logging.info("%s(%s)", args.method.__name__, ", ".join(args.positional))
result = args.method(dev, *args.positional, **extra)
if result is not None:
if isinstance(result, io.StringIO):
sys.stdout.write(result.getvalue())
elif isinstance(result, (list, types.GeneratorType)):
r = ""
for r in result:
r = str(r)
sys.stdout.write(r)
if not r.endswith("\n"):
sys.stdout.write("\n")
else:
result = str(result)
sys.stdout.write(result)
if not result.endswith("\n"):
sys.stdout.write("\n")
return 0
def StartCli(args, adb_commands, extra=None, **device_kwargs):
try:
dev = adb_commands()
dev.ConnectDevice(
port_path=args.port_path,
serial=args.serial,
default_timeout_ms=args.timeout_ms,
**device_kwargs
)
except usb_exceptions.DeviceNotFoundError as e:
print("No device found: {}".format(e), file=sys.stderr)
return 1
except usb_exceptions.CommonUsbError as e:
print("Could not connect to device: {}".format(e), file=sys.stderr)
return 1
try:
return _RunMethod(dev, args, extra or {})
except Exception as e:
sys.stdout.write(str(e))
return 1
finally:
dev.Close()