Skip to content

Commit

Permalink
Add missing docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
florisla committed Apr 17, 2019
1 parent 5583c36 commit b15b54d
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 4 deletions.
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ max-line-length=98
disable=
fixme, # TO DOs are not errors
bad-continuation, # be compatible to black
missing-docstring,

[REPORT]
score=no
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,4 @@ Not currently supported
Future work
-----------
* Use proper logging instead of print statements
* Add docstrings to all public methods
* Use Travis or Azure pipelines for CI
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ignore =
# be compatible to black
C812, # Missing trailing comma
E203, # Whitespace before ':'
D102, # Missing docstring in public method
per-file-ignores =
# Missing docstring in public function
tests/*:D103,
Expand Down
54 changes: 54 additions & 0 deletions stm32loader/bootloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,37 +149,48 @@ def __init__(self, connection, verbosity=5):
self.verbosity = verbosity

def write(self, *data):
"""Write the given data to the MCU."""
for data_bytes in data:
if isinstance(data_bytes, int):
data_bytes = struct.pack("B", data_bytes)
self.connection.write(data_bytes)

def write_and_ack(self, message, *data):
"""Write data to the MCU and wait until it replies with ACK."""
# this is a separate method from write() because a keyword
# argument after *args is not possible in Python 2
self.write(*data)
return self._wait_for_ack(message)

def debug(self, level, message):
"""Print the given message if its level is low enough."""
if self.verbosity >= level:
print(message, file=sys.stderr)

def reset_from_system_memory(self):
"""Reset the MCU with boot0 enabled to enter the bootloader."""
self._enable_boot0(True)
self._reset()
return self.write_and_ack("Synchro", self.Command.SYNCHRONIZE)

def reset_from_flash(self):
"""Reset the MCU with boot0 disabled."""
self._enable_boot0(False)
self._reset()

def command(self, command, description):
"""
Send the given command to the MCU.
Raise CommandException if there's no ACK replied.
"""
self.debug(10, "*** Command: %s" % description)
ack_received = self.write_and_ack("Command", command, command ^ 0xFF)
if not ack_received:
raise CommandException("%s (%s) failed: no ack" % (description, command))

def get(self):
"""Return the bootloader version and remember supported commands."""
self.command(self.Command.GET, "Get")
length = bytearray(self.connection.read())[0]
version = bytearray(self.connection.read())[0]
Expand All @@ -192,6 +203,11 @@ def get(self):
return version

def get_version(self):
"""
Return the bootloader version.
Read protection status readout is not yet implemented.
"""
self.command(self.Command.GET_VERSION, "Get version")
version = bytearray(self.connection.read())[0]
self.connection.read(2)
Expand All @@ -200,6 +216,7 @@ def get_version(self):
return version

def get_id(self):
"""Send the 'Get ID' command and return the device (model) ID."""
self.command(self.Command.GET_ID, "Get ID")
length = bytearray(self.connection.read())[0]
id_data = bytearray(self.connection.read(length + 1))
Expand All @@ -208,23 +225,31 @@ def get_id(self):
return _device_id

def get_flash_size(self, device_family):
"""Return the MCU's flash size in bytes."""
flash_size_address = self.FLASH_SIZE_ADDRESS[device_family]
flash_size_bytes = self.read_memory(flash_size_address, 2)
flash_size = flash_size_bytes[0] + flash_size_bytes[1] * 256
return flash_size

def get_uid(self, device_id):
"""Send the 'Get UID' command and return the device UID."""
uid_address = self.UID_ADDRESS[device_id]
uid = self.read_memory(uid_address, 12)
return uid

@staticmethod
def format_uid(uid):
"""Return a readable string from the given UID."""
swapped_data = [[uid[b] for b in part] for part in Stm32Bootloader.UID_SWAP]
uid_string = "-".join("".join(format(b, "02X") for b in part) for part in swapped_data)
return uid_string

def read_memory(self, address, length):
"""
Return the memory contents of flash at the given address.
Supports maximum 256 bytes.
"""
assert length <= 256
self.command(self.Command.READ_MEMORY, "Read memory")
self.write_and_ack("0x11 address failed", self._encode_address(address))
Expand All @@ -234,11 +259,17 @@ def read_memory(self, address, length):
return bytearray(self.connection.read(length))

def go(self, address):
"""Send the 'Go' command to start execution of firmware."""
# pylint: disable=invalid-name
self.command(self.Command.GO, "Go")
self.write_and_ack("0x21 go failed", self._encode_address(address))

def write_memory(self, address, data):
"""
Write the given data to flash at the given address.
Supports maximum 256 bytes.
"""
nr_of_bytes = len(data)
if nr_of_bytes == 0:
return
Expand Down Expand Up @@ -279,6 +310,7 @@ def erase_memory(self, sectors=None):
self.debug(10, " Erase memory done")

def extended_erase_memory(self):
"""Send the extended erase command to erase the full flash content."""
self.command(self.Command.EXTENDED_ERASE, "Extended erase memory")
# Global mass erase and checksum byte
self.write(b'\xff\xff\x00')
Expand All @@ -290,23 +322,31 @@ def extended_erase_memory(self):
self.debug(10, " Extended Erase memory done")

def write_protect(self, pages):
"""Enable write protection on the given flash pages."""
self.command(self.Command.WRITE_PROTECT, "Write protect")
nr_of_pages = (len(pages) - 1) & 0xFF
checksum = reduce(operator.xor, pages, nr_of_pages)
self.write_and_ack("0x63 write protect failed", nr_of_pages, pages, checksum)
self.debug(10, " Write protect done")

def write_unprotect(self):
"""Disable write protection of the flash memory."""
self.command(self.Command.WRITE_UNPROTECT, "Write unprotect")
self._wait_for_ack("0x73 write unprotect failed")
self.debug(10, " Write Unprotect done")

def readout_protect(self):
"""Enable readout protection of the flash memory."""
self.command(self.Command.READOUT_PROTECT, "Readout protect")
self._wait_for_ack("0x82 readout protect failed")
self.debug(10, " Read protect done")

def readout_unprotect(self):
"""
Disable readout protection of the flash memory.
Beware, this will erase the flash content.
"""
self.command(self.Command.READOUT_UNPROTECT, "Readout unprotect")
self._wait_for_ack("0x92 readout unprotect failed")
self.debug(20, " Mass erase -- this may take a while")
Expand All @@ -316,6 +356,11 @@ def readout_unprotect(self):
self.reset_from_system_memory()

def read_memory_data(self, address, length):
"""
Return flash content from the given address and byte count.
Length may be more than 256 bytes.
"""
data = bytearray()
while length > 256:
self.debug(
Expand All @@ -332,6 +377,11 @@ def read_memory_data(self, address, length):
return data

def write_memory_data(self, address, data):
"""
Write the given data to flash.
Data length may be more than 256 bytes.
"""
length = len(data)
offset = 0
while length > 256:
Expand All @@ -349,6 +399,7 @@ def write_memory_data(self, address, data):
self.write_memory(address, data[offset : offset + length])

def _reset(self):
"""Enable or disable the reset IO line (if possible)."""
if not self._toggle_reset:
return
self.connection.enable_reset(True)
Expand All @@ -357,12 +408,14 @@ def _reset(self):
time.sleep(0.5)

def _enable_boot0(self, enable=True):
"""Enable or disable the boot0 IO line (if possible)."""
if not self._toggle_boot0:
return

self.connection.enable_boot0(enable)

def _wait_for_ack(self, info=""):
"""Read a byte and raise CommandException if it's not ACK."""
try:
ack = bytearray(self.connection.read())[0]
except TypeError:
Expand All @@ -377,6 +430,7 @@ def _wait_for_ack(self, info=""):

@staticmethod
def _encode_address(address):
"""Return the given address as big-endian bytes with a checksum."""
address_bytes = bytearray(struct.pack(">I", address))
checksum_byte = struct.pack("B", reduce(operator.xor, address_bytes))
return address_bytes + checksum_byte
5 changes: 5 additions & 0 deletions stm32loader/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, serial_port, baud_rate=115_200, parity="E"):
self.serial_connection = None

def connect(self):
"""Connect to the RS-232 serial port."""
self.serial_connection = serial.Serial(
port=self.serial_port,
baudrate=self.baud_rate,
Expand All @@ -65,12 +66,15 @@ def connect(self):
)

def write(self, *args, **kwargs):
"""Write the given data to the serial connection."""
return self.serial_connection.write(*args, **kwargs)

def read(self, *args, **kwargs):
"""Read the given amount of bytes from the serial connection."""
return self.serial_connection.read(*args, **kwargs)

def enable_reset(self, enable=True):
"""Enable or disable the reset IO line."""
# reset on the STM32 is active low (0 Volt puts the MCU in reset)
# but the RS-232 DTR signal is active low by itself, so it
# inverts this (writing a logical 1 outputs a low voltage, i.e.
Expand All @@ -85,6 +89,7 @@ def enable_reset(self, enable=True):
self.serial_connection.setDTR(level)

def enable_boot0(self, enable=True):
"""Enable or disable the boot0 IO line."""
level = int(enable)

# by default, this is active low
Expand Down
8 changes: 7 additions & 1 deletion stm32loader/stm32loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ def __init__(self):
self.verbosity = DEFAULT_VERBOSITY

def debug(self, level, message):
"""Log a message to stderror if its level is low enough."""
if self.verbosity >= level:
print(message, file=sys.stderr)

def parse_arguments(self, arguments):
"""Parse the list of command-line arguments."""
# pylint: disable=too-many-branches, eval-used

try:
# parse command-line arguments using getopt
options, arguments = getopt.getopt(arguments, "hqVeuwvrsRBP:p:b:a:l:g:f:")
Expand Down Expand Up @@ -126,6 +127,7 @@ def parse_arguments(self, arguments):
assert False, "unhandled option %s" % option

def connect(self):
"""Connect to the RS-232 serial port."""
serial_connection = SerialConnection(
self.configuration["port"], self.configuration["baud"], self.configuration["parity"]
)
Expand Down Expand Up @@ -162,6 +164,7 @@ def connect(self):
sys.exit(1)

def perform_commands(self):
"""Run all operations as defined by the configuration."""
# pylint: disable=too-many-branches
binary_data = None
if self.configuration["write"] or self.configuration["verify"]:
Expand Down Expand Up @@ -213,10 +216,12 @@ def perform_commands(self):
self.bootloader.go(self.configuration["go_address"])

def reset(self):
"""Reset the microcontroller."""
self.bootloader.reset_from_flash()

@staticmethod
def print_usage():
"""Print help text explaining the command-line arguments."""
help_text = """Usage: %s [-hqVeuwvrsRB] [-l length] [-p port] [-b baud] [-P parity]
[-a address] [-g address] [-f family] [file.bin]
-e Erase (note: this is required on previously written memory)
Expand Down Expand Up @@ -247,6 +252,7 @@ def print_usage():
print(help_text)

def read_device_details(self):
"""Show MCU details (bootloader version, chip ID, UID, flash size)."""
boot_version = self.bootloader.get()
self.debug(0, "Bootloader version %X" % boot_version)
device_id = self.bootloader.get_id()
Expand Down

0 comments on commit b15b54d

Please sign in to comment.