Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Improves App & Firmware Loading #8

Merged
merged 13 commits into from

2 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
0  __init__.py
No changes.
View
85 p.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+
+import argparse
+import pebble as libpebble
+import sys
+import time
+
+MAX_ATTEMPTS = 5
+
+def cmd_ping(pebble, args):
+ pebble.ping(cookie=0xDEADBEEF)
+
+def cmd_load(pebble, args):
+ pebble.install_app(args.app_bundle)
+
+def cmd_load_fw(pebble, args):
+ pebble.install_firmware(args.fw_bundle)
+ time.sleep(5)
+ print 'resetting to apply firmware update...'
+ pebble.reset()
+
+def cmd_logcat(pebble, args):
+ print 'listening for logs...'
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ return
+
+def cmd_list_apps(pebble, args):
+ for app in pebble.get_appbank_status()['apps']:
+ print '[{}] {}'.format(app['index'], app['name'])
+
+def cmd_rm_app(pebble, args):
+ for app in pebble.get_appbank_status()['apps']:
+ if app['index'] == args.app_index:
+ pebble.remove_app(app["id"], app["index"])
+
+ print 'removed app'
+ return
+
+def main():
+ parser = argparse.ArgumentParser(description='a utility belt for pebble development')
+ parser.add_argument('--pebble_id', type=str, help='the last 4 digits of the target Pebble\'s MAC address')
+
+ subparsers = parser.add_subparsers(help='commands', dest='which')
+
+ ping_parser = subparsers.add_parser('ping', help='send a ping message')
+ ping_parser.set_defaults(func=cmd_ping)
+
+ load_parser = subparsers.add_parser('load', help='load an app onto a connected watch')
+ load_parser.add_argument('app_bundle', metavar='FILE', type=str, help='a compiled app bundle')
+ load_parser.set_defaults(func=cmd_load)
+
+ load_fw_parser = subparsers.add_parser('load_fw', help='load new firmware onto a connected watch')
+ load_fw_parser.add_argument('fw_bundle', metavar='FILE', type=str, help='a compiled app bundle')
+ load_fw_parser.set_defaults(func=cmd_load_fw)
+
+ logcat_parser = subparsers.add_parser('logcat', help='view logs sent from the connected watch')
+ logcat_parser.set_defaults(func=cmd_logcat)
+
+ list_apps_parser = subparsers.add_parser('list', help='list installed apps')
+ list_apps_parser.set_defaults(func=cmd_list_apps)
+
+ rm_app_parser = subparsers.add_parser('rm', help='remove installed apps')
+ rm_app_parser.add_argument('app_index', metavar='IDX', type=int, help='the app index to delete')
+ rm_app_parser.set_defaults(func=cmd_rm_app)
+
+ args = parser.parse_args()
+
+ attempts = 0
+ while True:
+ if attempts > MAX_ATTEMPTS:
+ raise 'Could not connect to Pebble'
+ try:
+ pebble = libpebble.Pebble(args.pebble_id)
+ break
+ except libpebble.PebbleError:
+ time.sleep(5)
+ attempts += 1
+
+ args.func(pebble, args)
+
+if __name__ == '__main__':
+ main()
View
5 pebble/__init__.py
@@ -0,0 +1,5 @@
+__title__ = 'libpebble'
+__version__ = '0.0.1'
+__build__ = 0x01
+
+from .pebble import Pebble, PebbleError, PutBytesClient
View
164 pebble.py → pebble/pebble.py
@@ -12,14 +12,69 @@
import os
import glob
import logging
+import json
log = logging.getLogger()
logging.basicConfig(format='[%(levelname)-8s] %(message)s')
log.setLevel(logging.DEBUG)
-#DEFAULT_PEBBLE_ID = "402F"
DEFAULT_PEBBLE_ID = None #Triggers autodetection on unix-like systems
+DEBUG_PROTOCOL = False
+
+class PebbleBundle(object):
+ MANIFEST_FILENAME = 'manifest.json'
+
+ def __init__(self, bundle_path):
+ bundle_abs_path = os.path.abspath(bundle_path)
+ if not os.path.exists(bundle_abs_path):
+ raise "Bundle does not exist: " + bundle_path
+
+ self.zip = zipfile.ZipFile(bundle_abs_path)
+ self.path = bundle_abs_path
+ self.manifest = None
+
+ def get_manifest(self):
+ if (self.manifest):
+ return self.manifest
+
+ if self.MANIFEST_FILENAME not in self.zip.namelist():
+ raise "Could not find {}; are you sure this is a PebbleBundle?".format(self.MANIFEST_FILENAME)
+
+ self.manifest = json.loads(self.zip.read(self.MANIFEST_FILENAME))
+ return self.manifest
+
+ def close(self):
+ self.zip.close()
+
+ def is_firmware_bundle(self):
+ return 'firmware' in self.get_manifest()
+
+ def is_app_bundle(self):
+ return 'application' in self.get_manifest()
+
+ def has_resources(self):
+ return 'resources' in self.get_manifest()
+
+ def get_firmware_info(self):
+ if not self.is_firmware_bundle():
+ return None
+
+ return self.get_manifest()['firmware']
+
+ def get_application_info(self):
+ if not self.is_app_bundle():
+ return None
+
+ return self.get_manifest()['application']
+
+ def get_resources_info(self):
+ if not self.has_resources():
+ return None
+
+ return self.get_manifest()['resources']
+
+
class EndpointSync():
timeout = 10
@@ -75,16 +130,16 @@ class Pebble(object):
def AutodetectDevice():
if os.name != "posix": #i.e. Windows
raise NotImplementedError("Autodetection is only implemented on UNIX-like systems.")
-
+
pebbles = glob.glob("/dev/tty.Pebble????-SerialPortSe")
-
+
if len(pebbles) == 0:
raise PebbleError(None, "Autodetection could not find any Pebble devices")
elif len(pebbles) > 1:
log.warn("Autodetect found %d Pebbles; using most recent" % len(pebbles))
#NOTE: Not entirely sure if this is the correct approach
pebbles.sort(key=lambda x: os.stat(x).st_mtime, reverse=True)
-
+
id = pebbles[0][15:19]
log.info("Autodetect found a Pebble with ID %s" % id)
return id
@@ -107,10 +162,16 @@ def __init__(self, id):
try:
devicefile = "/dev/tty.Pebble"+id+"-SerialPortSe"
log.debug("Attempting to open %s as Pebble device %s" % (devicefile, id))
- self._ser = serial.Serial(devicefile, 115200, timeout=2)
+ self._ser = serial.Serial(devicefile, 115200, timeout=1)
+
log.debug("Connected, discarding null response")
# we get a null response when we connect, discard it
self._ser.read(5)
+
+ # Eat any cruft that might be sitting in the serial buffer...
+ while self._ser.read():
+ pass
+
log.debug("Initializing reader thread")
self._read_thread = threading.Thread(target=self._reader)
self._read_thread.setDaemon(True)
@@ -132,7 +193,9 @@ def _reader(self):
if resp == None:
continue
- log.debug("Got message for endpoint %s of length %d" % (endpoint, len(resp)))
+ if DEBUG_PROTOCOL:
+ log.debug("Got message for endpoint %s of length %d" % (endpoint, len(resp)))
+ log.debug('<<< ' + resp.encode('hex'))
if endpoint in self._internal_endpoint_handlers:
resp = self._internal_endpoint_handlers[endpoint](endpoint, resp)
@@ -141,7 +204,7 @@ def _reader(self):
self._endpoint_handlers[endpoint](endpoint, resp)
except:
traceback.print_exc()
- raise PebbleError(self._id, "Lost connection to Pebble")
+ raise PebbleError(self.id, "Lost connection to Pebble")
self._alive = False
def _build_message(self, endpoint, data):
@@ -149,9 +212,12 @@ def _build_message(self, endpoint, data):
def _send_message(self, endpoint, data, callback = None):
if endpoint not in self.endpoints:
- raise PebbleError(self._id, "Invalid endpoint specified")
+ raise PebbleError(self.id, "Invalid endpoint specified")
msg = self._build_message(self.endpoints[endpoint], data)
+
+ if DEBUG_PROTOCOL:
+ log.debug('>>> ' + msg.encode('hex'))
self._ser.write(msg)
def _recv_message(self):
@@ -159,14 +225,14 @@ def _recv_message(self):
if len(data) == 0:
return (None, None)
elif len(data) < 4:
- raise PebbleError(self._id, "Malformed response with length "+str(len(data)))
+ raise PebbleError(self.id, "Malformed response with length "+str(len(data)))
size, endpoint = unpack("!HH", data)
resp = self._ser.read(size)
return (endpoint, resp)
def register_endpoint(self, endpoint_name, func):
if endpoint_name not in self.endpoints:
- raise PebbleError(self._id, "Invalid endpoint specified")
+ raise PebbleError(self.id, "Invalid endpoint specified")
endpoint = self.endpoints[endpoint_name]
self._endpoint_handlers[endpoint] = func
@@ -273,17 +339,29 @@ def install_app(self, pbz_path):
This will pick the first free app-bank available.
"""
- with zipfile.ZipFile(pbz_path) as pbz:
- binary = pbz.read("pebble-app.bin")
- resources = pbz.read("app_resources.pbpack")
+ bundle = PebbleBundle(pbz_path)
+ if not bundle.is_app_bundle():
+ raise PebbleError(self.id, "This is not an app bundle")
+
+ binary = bundle.zip.read(
+ bundle.get_application_info()['name'])
+ if bundle.has_resources():
+ resources = bundle.zip.read(
+ bundle.get_resources_info()['name'])
+ else:
+ resources = None
apps = self.get_appbank_status()
+
+ if not apps:
+ raise PebbleError(self.id, "could not obtain app list; try again")
+
first_free = 1
for app in apps["apps"]:
if app["index"] == first_free:
first_free += 1
if first_free == apps["banks"]:
- raise PebbleError(self._id, "All %d app banks are full" % apps["banks"])
+ raise PebbleError(self.id, "All %d app banks are full" % apps["banks"])
log.debug("Attempting to add app to bank %d of %d" % (first_free, apps["banks"]))
client = PutBytesClient(self, first_free, "BINARY", binary)
@@ -292,17 +370,20 @@ def install_app(self, pbz_path):
while not client._done and not client._error:
pass
if client._error:
- raise PebbleError(self._id, "Failed to send application binary %s/pebble-app.bin" % pbz_path)
+ raise PebbleError(self.id, "Failed to send application binary %s/pebble-app.bin" % pbz_path)
- client = PutBytesClient(self, first_free, "RESOURCES", resources)
- self.register_endpoint("PUTBYTES", client.handle_message)
- client.init()
- while not client._done and not client._error:
- pass
- if client._error:
- raise PebbleError(self._id, "Failed to send application resources %s/app_resources.pbpack" % pbz_path)
+ if resources:
+ client = PutBytesClient(self, first_free, "RESOURCES", resources)
+ self.register_endpoint("PUTBYTES", client.handle_message)
+ client.init()
+ while not client._done and not client._error:
+ pass
+ if client._error:
+ raise PebbleError(self.id, "Failed to send application resources %s/app_resources.pbpack" % pbz_path)
+ time.sleep(2)
self._add_app(first_free)
+ time.sleep(2)
def install_firmware(self, pbz_path, recovery=False):
@@ -316,6 +397,8 @@ def install_firmware(self, pbz_path, recovery=False):
resources = pbz.read("system_resources.pbpack")
self.system_message("FIRMWARE_START")
+ time.sleep(2)
+
if resources:
client = PutBytesClient(self, 0, "SYS_RESOURCES", resources)
self.register_endpoint("PUTBYTES", client.handle_message)
@@ -323,7 +406,7 @@ def install_firmware(self, pbz_path, recovery=False):
while not client._done and not client._error:
pass
if client._error:
- raise PebbleError(self._id, "Failed to send firmware resources %s/system_resources.pbpack" % pbz_path)
+ raise PebbleError(self.id, "Failed to send firmware resources %s/system_resources.pbpack" % pbz_path)
client = PutBytesClient(self, 0, "RECOVERY" if recovery else "FIRMWARE", binary)
@@ -332,7 +415,7 @@ def install_firmware(self, pbz_path, recovery=False):
while not client._done and not client._error:
pass
if client._error:
- raise PebbleError(self._id, "Failed to send firmware binary %s/tintin_fw.bin" % pbz_path)
+ raise PebbleError(self.id, "Failed to send firmware binary %s/tintin_fw.bin" % pbz_path)
self.system_message("FIRMWARE_COMPLETE")
@@ -356,7 +439,7 @@ def system_message(self, command):
"BLUETOOTH_END_DISCOVERABLE": 7
}
if command not in commands:
- raise PebbleError(self._id, "Invalid command \"%s\"" % command)
+ raise PebbleError(self.id, "Invalid command \"%s\"" % command)
data = pack("!bb", 0, commands[command])
log.debug("Sending command %s (code %d)" % (command, commands[command]))
self._send_message("SYSTEM_MESSAGE", data)
@@ -397,8 +480,10 @@ def _get_time_response(self, endpoint, data):
return timestamp
def _system_message_response(self, endpoint, data):
- log.info("Got system message %s" % repr(unpack('!bb', data)))
-
+ if len(data) == 2:
+ log.info("Got system message %s" % repr(unpack('!bb', data)))
+ else:
+ log.info("Got 'unknown' system message...")
def _log_response(self, endpoint, data):
if (len(data) < 8):
log.warn("Unable to decode log message (length %d is less than 8)" % len(data))
@@ -573,28 +658,3 @@ def handle_message(self, endpoint, resp):
self.handle_commit(resp)
elif self._state == self.states["COMPLETE"]:
self.handle_complete(resp)
-
-if __name__ == '__main__':
- if DEFAULT_PEBBLE_ID is not None:
- log.debug("Default Pebble ID is %s" % DEFAULT_PEBBLE_ID)
- else:
- log.debug("No default Pebble ID, using autodetection if available")
-
- if len(sys.argv) > 1:
- pebble_id = sys.argv[1]
- else:
- pebble_id = DEFAULT_PEBBLE_ID
- pebble = Pebble(pebble_id)
-
- versions = pebble.get_versions()
- curtime = pebble.get_time()
- apps = pebble.get_appbank_status()
-
- print "Pebble "+pebble.id
- print "Firmware "+versions["normal_fw"]["version"]
- print "Recovery "+versions["recovery_fw"]["version"]
- print "Timestamp: "+str(curtime)
-
- print "Installed apps:"
- for app in apps["apps"]:
- print " - "+app["name"]
View
0  stm32_crc.py → pebble/stm32_crc.py
File renamed without changes
Something went wrong with that request. Please try again.