Skip to content

Commit

Permalink
fix(pyd): support pushing binary files to pydevice in upydevice backend.
Browse files Browse the repository at this point in the history
Signed-off-by: Braden Mars <bradenmars@bradenmars.me>
  • Loading branch information
BradenM committed Apr 17, 2023
1 parent 5a21ecd commit e58c1f2
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions micropy/pyd/backend_upydevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from upydevice.phantom import UOS as UPY_UOS

from .abc import DevicePath, HostPath, MessageConsumer, MetaPyDeviceBackend, PyDeviceConsumer
from .consumers import NoOpConsumer

AnyUPyDevice: TypeAlias = Union[upydevice.SerialDevice, upydevice.WebSocketDevice]

Expand Down Expand Up @@ -143,9 +144,11 @@ def copy_dir(self, source_path: DevicePath, target_path: HostPath, **kwargs):
file_dest.parent.mkdir(parents=True, exist_ok=True)
self.pull_file(file_path, HostPath(str(file_dest)), **kwargs)

def push_file(self, source_path: HostPath, target_path: DevicePath, **kwargs) -> None:
def push_file(
self, source_path: HostPath, target_path: DevicePath, binary: bool = False, **kwargs
) -> None:
src_path = Path(str(source_path))
src_contents = src_path.read_text()
src_contents = src_path.read_bytes() if binary else src_path.read_text()
self.write_file(src_contents, target_path, **kwargs)

def pull_file(self, source_path: DevicePath, target_path: HostPath, **kwargs) -> None:
Expand All @@ -165,22 +168,38 @@ def _iter_hex_chunks(self, content: str):

@retry
def write_file(
self, contents: str, target_path: DevicePath, *, consumer: PyDeviceConsumer | None = None
self,
contents: str | bytes,
target_path: DevicePath,
*,
consumer: PyDeviceConsumer = NoOpConsumer,
) -> None:
is_bytes = isinstance(contents, bytes)
target_path = self.resolve_path(target_path)
self._pydevice.cmd("import gc")
self._pydevice.cmd("import ubinascii")
self._pydevice.cmd(f"f = open('{str(target_path)}', 'wb')")
content_size = len(binascii.hexlify(contents.encode())) // 2
if consumer:
consumer.on_start(name=f"Writing {str(target_path)}", size=content_size)
for chunk in self._iter_hex_chunks(contents):
cmd = f"contents = ubinascii.unhexlify('{chunk.decode()}'); f.write(contents)"

content_iter = (
iterutils.chunked_iter(contents, self.BUFFER_SIZE)
if is_bytes
else self._iter_hex_chunks(contents)
)

content_size = len(contents)
consumer.on_start(name=f"Writing {str(target_path)}", size=content_size)

for chunk in content_iter:
cmd = (
f"contents = {chunk}; f.write(contents)"
if is_bytes
else f"contents = ubinascii.unhexlify('{chunk.decode()}'); f.write(contents)"
)
self._pydevice.cmd(cmd, silent=True)
if consumer:
consumer.on_update(size=len(chunk) // 2)
if consumer:
consumer.on_end()
consumer.on_update(size=len(chunk))
consumer.on_end()
self._pydevice.cmd("f.close()")
self._pydevice.cmd("import gc; gc.collect()")

@retry
def read_file(
Expand Down Expand Up @@ -212,7 +231,7 @@ def read_file(
value = buffer.getvalue().decode()
return value

def eval(self, command: str, *, consumer: MessageConsumer = None):
def eval(self, command: str, *, consumer: MessageConsumer | None = None) -> str | None:
if consumer:
return self._pydevice.cmd(
command, follow=True, pipe=lambda m, *args, **kws: consumer.on_message(m)
Expand All @@ -224,13 +243,11 @@ def eval_script(
contents: AnyStr,
target_path: DevicePath | None = None,
*,
consumer: PyDeviceConsumer = None,
consumer: PyDeviceConsumer | None = None,
):
_target_path = (
self.resolve_path(target_path) if target_path else f"{self._rand_device_path()}.py"
)
if isinstance(contents, bytes):
contents = contents.decode() # type: ignore
self.write_file(str(contents), DevicePath(_target_path), consumer=consumer)
self.write_file(contents, DevicePath(_target_path), consumer=consumer)
self.eval(f"import {Path(_target_path).stem}", consumer=consumer)
self.uos.remove(str(_target_path))

0 comments on commit e58c1f2

Please sign in to comment.