Skip to content

Commit

Permalink
添加__version__.py文件,用来管理版本号,当前版本1.1.2;
Browse files Browse the repository at this point in the history
修改获得版本号函数,可以在其他程序中调用;
将enter_raw_repl函数拆分出一个enter_raw_repl_not_reset函数,可以在一些功能中加快程序速度;
修改setup.py文件,用于指定版本号,并为git库自动加标签。
  • Loading branch information
BigCircleLaw committed Jan 15, 2020
1 parent 6ff069c commit d02e2f7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 47 deletions.
4 changes: 4 additions & 0 deletions ampy/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

VERSION = (1, 1, 2)

__version__ = '.'.join(map(str, VERSION))
7 changes: 4 additions & 3 deletions ampy/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,12 @@ def version(self):
import wonderbits
print(wonderbits.__version__)
"""
self._pyboard.enter_raw_repl()
self._pyboard.enter_raw_repl_not_rest()
out = None
try:
out = self._pyboard.exec_(textwrap.dedent(command)).replace("\n", "").replace("\r", "")
out = self._pyboard.exec_(textwrap.dedent(command)).decode("utf-8")
out = out.replace("\n", "").replace("\r", "")
except PyboardError as ex:
raise ex
self._pyboard.exit_raw_repl()
return out.decode("utf-8")
return out
139 changes: 99 additions & 40 deletions ampy/pyboard.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python

"""
pyboard interface
Expand Down Expand Up @@ -48,34 +47,42 @@
# Python2 doesn't have buffer attr
stdout = sys.stdout


def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
stdout.write(b)
stdout.flush()


class PyboardError(BaseException):
pass


class TelnetToSerial:
def __init__(self, ip, user, password, read_timeout=None):
import telnetlib
self.tn = telnetlib.Telnet(ip, timeout=15)
self.read_timeout = read_timeout
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
if b'Login as:' in self.tn.read_until(
b'Login as:', timeout=read_timeout):
self.tn.write(bytes(user, 'ascii') + b"\r\n")

if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
if b'Password:' in self.tn.read_until(
b'Password:', timeout=read_timeout):
# needed because of internal implementation details of the telnet server
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b"\r\n")

if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
if b'for more information.' in self.tn.read_until(
b'Type "help()" for more information.',
timeout=read_timeout):
# login succesful
from collections import deque
self.fifo = deque()
return

raise PyboardError('Failed to establish a telnet connection with the board')
raise PyboardError(
'Failed to establish a telnet connection with the board')

def __del__(self):
self.close()
Expand Down Expand Up @@ -118,26 +125,38 @@ def inWaiting(self):
else:
return n_waiting


class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
def __init__(self,
device,
baudrate=115200,
user='micro',
password='python',
wait=0,
rawdelay=0):
global _rawdelay
_rawdelay = rawdelay
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
if device and device[0].isdigit() and device[-1].isdigit(
) and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
self.serial = TelnetToSerial(
device, user, password, read_timeout=10)
else:
import serial
delayed = False
for attempt in range(wait + 1):
try:
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
self.serial = serial.Serial(
device, baudrate=baudrate, interCharTimeout=1)
break
except (OSError, IOError) as e: # Py2 and Py3 have different errors
except (OSError,
IOError) as e: # Py2 and Py3 have different errors
# print(e)
if wait == 0:
continue
if attempt == 0:
sys.stdout.write('Waiting {} seconds for pyboard '.format(wait))
sys.stdout.write(
'Waiting {} seconds for pyboard '.format(wait))
delayed = True
time.sleep(1)
sys.stdout.write('.')
Expand All @@ -152,7 +171,8 @@ def __init__(self, device, baudrate=115200, user='micro', password='python', wai
def close(self):
self.serial.close()

def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
def read_until(self, min_num_bytes, ending, timeout=10,
data_consumer=None):
data = self.serial.read(min_num_bytes)
if data_consumer:
data_consumer(data)
Expand All @@ -173,8 +193,7 @@ def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
time.sleep(0.01)
return data

def enter_raw_repl(self):
# Brief delay before sending RAW MODE char if requests
def enter_raw_repl_not_rest(self):
if _rawdelay > 0:
time.sleep(_rawdelay)

Expand All @@ -190,13 +209,16 @@ def enter_raw_repl(self):
self.serial.read(n)
n = self.serial.inWaiting()

self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')

self.serial.write(b'\x04') # ctrl-D: soft reset
def enter_raw_repl(self):
# Brief delay before sending RAW MODE char if requests
self.enter_raw_repl_not_rest()
self.serial.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(1, b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
print(data)
Expand All @@ -206,22 +228,23 @@ def enter_raw_repl(self):
# Modification from original pyboard.py below:
# Add a small delay and send Ctrl-C twice after soft reboot to ensure
# any main program loop in main.py is interrupted.
time.sleep(0.5)
self.serial.write(b'\x03')
time.sleep(0.1) # (slight delay before second interrupt
self.serial.write(b'\x03')
# time.sleep(0.5)
# self.serial.write(b'\x03')
# time.sleep(0.1) # (slight delay before second interrupt
# self.serial.write(b'\x03')
# End modification above.
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')

def exit_raw_repl(self):
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL

def follow(self, timeout, data_consumer=None):
# wait for normal output
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
data = self.read_until(
1, b'\x04', timeout=timeout, data_consumer=data_consumer)
if not data.endswith(b'\x04'):
raise PyboardError('timeout waiting for first EOF reception')
data = data[:-1]
Expand All @@ -248,7 +271,8 @@ def exec_raw_no_follow(self, command):

# write command
for i in range(0, len(command_bytes), 256):
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
self.serial.write(
command_bytes[i:min(i + 256, len(command_bytes))])
time.sleep(0.01)
self.serial.write(b'\x04')

Expand All @@ -258,7 +282,7 @@ def exec_raw_no_follow(self, command):
raise PyboardError('could not exec command')

def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command);
self.exec_raw_no_follow(command)
return self.follow(timeout, data_consumer)

def eval(self, expression):
Expand All @@ -278,39 +302,71 @@ def execfile(self, filename):
return self.exec_(pyfile)

def get_time(self):
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
t = str(
self.eval('pyb.RTC().datetime()'),
encoding='utf8')[1:-1].split(', ')
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])


# in Python2 exec is a keyword so one must use "exec_"
# but for Python3 we want to provide the nicer version "exec"
setattr(Pyboard, "exec", Pyboard.exec_)

def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):

def execfile(filename,
device='/dev/ttyACM0',
baudrate=115200,
user='micro',
password='python'):
pyb = Pyboard(device, baudrate, user, password)
pyb.enter_raw_repl()
output = pyb.execfile(filename)
stdout_write_bytes(output)
pyb.exit_raw_repl()
pyb.close()


def main():
import argparse
cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
cmd_parser.add_argument('-c', '--command', help='program passed in as string')
cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
cmd_parser = argparse.ArgumentParser(
description='Run scripts on the pyboard.')
cmd_parser.add_argument(
'--device',
default='/dev/ttyACM0',
help='the serial device or the IP address of the pyboard')
cmd_parser.add_argument(
'-b',
'--baudrate',
default=115200,
help='the baud rate of the serial device')
cmd_parser.add_argument(
'-u', '--user', default='micro', help='the telnet login username')
cmd_parser.add_argument(
'-p', '--password', default='python', help='the telnet login password')
cmd_parser.add_argument(
'-c', '--command', help='program passed in as string')
cmd_parser.add_argument(
'-w',
'--wait',
default=0,
type=int,
help='seconds to wait for USB connected board to become available')
cmd_parser.add_argument(
'--follow',
action='store_true',
help=
'follow the output after running the scripts [default if no scripts given]'
)
cmd_parser.add_argument('files', nargs='*', help='input files')
args = cmd_parser.parse_args()

def execbuffer(buf):
try:
pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
pyb = Pyboard(args.device, args.baudrate, args.user, args.password,
args.wait)
pyb.enter_raw_repl()
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
ret, ret_err = pyb.exec_raw(
buf, timeout=None, data_consumer=stdout_write_bytes)
pyb.exit_raw_repl()
pyb.close()
except PyboardError as er:
Expand All @@ -332,8 +388,10 @@ def execbuffer(buf):

if args.follow or (args.command is None and len(args.files) == 0):
try:
pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
pyb = Pyboard(args.device, args.baudrate, args.user, args.password,
args.wait)
ret, ret_err = pyb.follow(
timeout=None, data_consumer=stdout_write_bytes)
pyb.close()
except PyboardError as er:
print(er)
Expand All @@ -344,5 +402,6 @@ def execbuffer(buf):
stdout_write_bytes(ret_err)
sys.exit(1)


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()

about = {}

with open(os.path.join(here, 'ampy', '__version__.py')) as f:
exec(f.read(), about)

class UploadCommand(Command):
"""Support setup.py upload."""

Expand Down Expand Up @@ -66,15 +71,15 @@ def run(self):
os.system('twine upload dist/*')

# self.status('Pushing git tags…')
# os.system('git tag v{0}'.format(about['__version__']))
# os.system('git push --tags')
os.system('git tag v{0}'.format(about['__version__']))
os.system('git push --tags')

sys.exit()

setup(
name='wonderbits-ampy',
# version = '1.0.0',
use_scm_version=True,
version = about['__version__'],
# use_scm_version=True,
setup_requires=['setuptools_scm'],

description='ampy (Adafruit MicroPython tool) is a command line tool to interact with a CircuitPython or MicroPython board over a serial connection.',
Expand Down

0 comments on commit d02e2f7

Please sign in to comment.