diff --git a/ledgerblue/Dongle.py b/ledgerblue/Dongle.py new file mode 100644 index 0000000..e2ec9d3 --- /dev/null +++ b/ledgerblue/Dongle.py @@ -0,0 +1,55 @@ +""" +******************************************************************************* +* Ledger Blue +* (c) 2016 Ledger +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +******************************************************************************** +""" +from abc import ABCMeta, abstractmethod +from binascii import hexlify +import sys + +TIMEOUT=20000 + +def hexstr(bstr): + if (sys.version_info.major == 3): + return hexlify(bstr).decode() + if (sys.version_info.major == 2): + return hexlify(bstr) + return "I', targetid)) + targetid = bytearray(struct.pack('>I', targetId)) # identify apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid @@ -70,6 +70,7 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey): # walk the device certificates to retrieve the public key to use for authentication index = 0 last_pub_key = PublicKey(binascii.unhexlify(issuerKey), raw=True) + devicePublicKey = None while True: if index == 0: certificate = bytearray(dongle.exchange(bytearray.fromhex('E052000000'))) @@ -86,6 +87,7 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey): certificateSignature = last_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray)) # first cert contains a header field which holds the certificate's public key role if index == 0: + devicePublicKey = certificatePublicKey certificateSignedData = bytearray([0x02]) + certificateHeader + certificatePublicKey # Could check if the device certificate is signed by the issuer public key # ephemeral key certificate @@ -99,7 +101,13 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey): # Commit device ECDH channel dongle.exchange(bytearray.fromhex('E053000000')) secret = last_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize())) - return secret[0:16] + if targetId&0xF == 0x2: + return secret[0:16] + elif targetId&0xF == 0x3: + ret = {} + ret['ecdh_secret'] = secret + ret['devicePublicKey'] = devicePublicKey + return ret if __name__ == '__main__': from .ecWrapper import PrivateKey, PublicKey @@ -123,20 +131,38 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid, issuerKey): args.rootPrivateKey = privateKey.serialize() genuine = False + ui = False + customCA = False dongle = getDongle(args.apdu) + version = None secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId, args.issuerKey) if secret != None: - loader = HexLoader(dongle, 0xe0, True, secret) - data = b'\xFF' - data = loader.encryptAES(data) try: - loader.exchange(loader.cla, 0x00, 0x00, 0x00, data) - except CommException as e: - genuine = (e.sw == 0x6D00) - + loader = HexLoader(dongle, 0xe0, True, secret) + version = loader.getVersion() + genuine = True + apps = loader.listApp() + while len(apps) != 0: + for app in apps: + if (app['flags'] & 0x08): + ui = True + if (app['flags'] & 0x400): + customCA = True + apps = loader.listApp(False) + except: + genuine = False if genuine: + if ui: + print ("WARNING : Product is genuine but has a UI application loaded") + if customCA: + print ("WARNING : Product is genuine but has a Custom CA loaded") + if not ui and not customCA: print ("Product is genuine") + print ("SE Version " + version['osVersion']) + print ("MCU Version " + version['mcuVersion']) + if 'mcuHash' in version: + print ("MCU Hash " + binascii.hexlify(version['mcuHash']).decode('ascii')) else: - print ("Product is NOT genuine") + print ("Product is NOT genuine") diff --git a/ledgerblue/comm.py b/ledgerblue/comm.py index 70cb189..46167f0 100644 --- a/ledgerblue/comm.py +++ b/ledgerblue/comm.py @@ -20,49 +20,38 @@ from abc import ABCMeta, abstractmethod from .commException import CommException from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU +from .Dongle import * from binascii import hexlify -import hid import time +import os import sys +from .commU2F import getDongle as getDongleU2F +from .commHTTP import getDongle as getDongleHTTP +import hid -TIMEOUT=20000 - -try: - from smartcard.Exceptions import NoCardException - from smartcard.System import readers - from smartcard.util import toHexString, toBytes - SCARD = True -except ImportError: - SCARD = False - +APDUGEN=None +if "APDUGEN" in os.environ and len(os.environ["APDUGEN"]) != 0: + APDUGEN=os.environ["APDUGEN"] +# Force use of U2F if required +U2FKEY=None +if "U2FKEY" in os.environ and len(os.environ["U2FKEY"]) != 0: + U2FKEY=os.environ["U2FKEY"] +# Force use of MCUPROXY if required +MCUPROXY=None +if "MCUPROXY" in os.environ and len(os.environ["MCUPROXY"]) != 0: + MCUPROXY=os.environ["MCUPROXY"] -def hexstr(bstr): - if (sys.version_info.major == 3): - return hexlify(bstr).decode() - if (sys.version_info.major == 2): - return hexlify(bstr) - return " %s" % hexstr(apdu)) + print("HID => %s" % hexstr(apdu)) if self.ledger: apdu = wrapCommandAPDU(0x0101, apdu, 64) padSize = len(apdu) % 64 @@ -86,7 +79,8 @@ def exchange(self, apdu, timeout=TIMEOUT): while(offset != len(tmp)): data = tmp[offset:offset + 64] data = bytearray([0]) + data - self.device.write(data) + if self.device.write(data) < 0: + raise BaseException("Error while writing") offset += 64 dataLength = 0 dataStart = 2 @@ -125,7 +119,7 @@ def exchange(self, apdu, timeout=TIMEOUT): sw = (result[swOffset] << 8) + result[swOffset + 1] response = result[dataStart : dataLength + dataStart] if self.debug: - print("<= %s%.2x" % (hexstr(response), sw)) + print("HID <= %s%.2x" % (hexstr(response), sw)) if sw != 0x9000: raise CommException("Invalid status %04x" % sw, sw, response) return response @@ -141,6 +135,9 @@ def waitFirstResponse(self, timeout): time.sleep(0.0001) return bytearray(data) + def apduMaxDataSize(self): + return 255 + def close(self): if self.opened: try: @@ -159,11 +156,11 @@ def __init__(self, device, debug=False): def exchange(self, apdu, timeout=TIMEOUT): if self.debug: - print("=> %s" % hexstr(apdu)) + print("SC => %s" % hexstr(apdu)) response, sw1, sw2 = self.device.transmit(toBytes(hexlify(apdu))) sw = (sw1 << 8) | sw2 if self.debug: - print("<= %s%.2x" % (hexstr(response).replace(" ", ""), sw)) + print("SC <= %s%.2x" % (hexstr(response).replace(" ", ""), sw)) if sw != 0x9000: raise CommException("Invalid status %04x" % sw, sw, bytearray(response)) return bytearray(response) @@ -177,18 +174,25 @@ def close(self): self.opened = False def getDongle(debug=False, selectCommand=None): + if APDUGEN: + return HIDDongleHIDAPI(None, True, debug) + + if not U2FKEY is None: + return getDongleU2F(scrambleKey=U2FKEY, debug=debug) + if MCUPROXY is not None: + return getDongleHTTP(remote_host=MCUPROXY, debug=debug) dev = None hidDevicePath = None ledger = True for hidDevice in hid.enumerate(0, 0): - if hidDevice['vendor_id'] == 0x2c97: + if hidDevice['vendor_id'] == 0x2c97 and ('interface_number' not in hidDevice or hidDevice['interface_number'] == 0): hidDevicePath = hidDevice['path'] if hidDevicePath is not None: dev = hid.device() dev.open_path(hidDevicePath) dev.set_nonblocking(True) return HIDDongleHIDAPI(dev, ledger, debug) - if SCARD: + if PCSC: connection = None for reader in readers(): try: diff --git a/ledgerblue/commHTTP.py b/ledgerblue/commHTTP.py new file mode 100644 index 0000000..de55dff --- /dev/null +++ b/ledgerblue/commHTTP.py @@ -0,0 +1,112 @@ +""" +******************************************************************************* +* Ledger Blue +* (c) 2016 Ledger +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +******************************************************************************** +""" + +from abc import ABCMeta, abstractmethod +from .commException import CommException +from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU +from binascii import hexlify +import time +import os +import sys +import requests +import json + +def hexstr(bstr): + if (sys.version_info.major == 3): + return hexlify(bstr).decode() + if (sys.version_info.major == 2): + return hexlify(bstr) + return " %s" % hexstr(apdu)) + + try: + ret = requests.post(self.remote_host + "/send_apdu", params={"data": hexstr(apdu)}) + + while True: + ret = requests.post(self.remote_host + "/fetch_apdu") + if ret.text != "no response apdu yet": + print("<= %s" % ret.text) + break + else: + time.sleep(0.1) + + + return bytearray(str(ret.text).decode("hex")) + except Exception as e: + print(e) + + + + def exchange_seph_event(self, event): + if self.debug >= 3: + print("=> %s" % hexstr(event)) + + try: + ret = requests.post(self.remote_host + "/send_seph_event", params={"data": event.encode("hex")}) + return ret.text + except Exception as e: + print(e) + + + def poll_status(self): + if self.debug >= 5: + print("=> Waiting for a status") + + try: + while True: + ret = requests.post(self.remote_host + "/fetch_status") + if ret.text != "no status yet": + break + else: + time.sleep(0.05) + + return bytearray(str(ret.text).decode("hex")) + except Exception as e: + print(e) + + + def reset(self): + if self.debug: + print("=> Reset") + + try: + ret = requests.post(self.remote_host + "/reset") + except Exception as e: + print(e) + + + + + + + +def getDongle(remote_host="localhost", debug=False): + + return HTTPProxy(remote_host, debug) \ No newline at end of file diff --git a/ledgerblue/commU2F.py b/ledgerblue/commU2F.py new file mode 100644 index 0000000..746fdf5 --- /dev/null +++ b/ledgerblue/commU2F.py @@ -0,0 +1,333 @@ +# Copyright (c) 2013 Yubico AB +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +""" +******************************************************************************* +* Ledger Blue +* (c) 2016 Ledger +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +******************************************************************************** +""" + +import os +import traceback +from abc import ABCMeta, abstractmethod +from .ledgerWrapper import wrapCommandAPDU, unwrapResponseAPDU +from binascii import hexlify +from .Dongle import * + +import binascii +import time +import sys +import hid +from u2flib_host.device import U2FDevice +from u2flib_host.yubicommon.compat import byte2int, int2byte +from u2flib_host.constants import INS_ENROLL, INS_SIGN +from u2flib_host import u2f, exc +from u2flib_host.utils import websafe_decode, websafe_encode +from hashlib import sha256 + +from .commException import CommException + +TIMEOUT=30000 + +DEVICES = [ + (0x1050, 0x0200), # Gnubby + (0x1050, 0x0113), # YubiKey NEO U2F + (0x1050, 0x0114), # YubiKey NEO OTP+U2F + (0x1050, 0x0115), # YubiKey NEO U2F+CCID + (0x1050, 0x0116), # YubiKey NEO OTP+U2F+CCID + (0x1050, 0x0120), # Security Key by Yubico + (0x1050, 0x0410), # YubiKey Plus + (0x1050, 0x0402), # YubiKey 4 U2F + (0x1050, 0x0403), # YubiKey 4 OTP+U2F + (0x1050, 0x0406), # YubiKey 4 U2F+CCID + (0x1050, 0x0407), # YubiKey 4 OTP+U2F+CCID + (0x2581, 0xf1d0), # Plug-Up U2F Security Key + (0x2581, 0xf1d1), # Ledger Production U2F Dongle + (0x2c97, 0x0000), # Ledger Blue + (0x2c97, 0x0001), # Ledger Nano S + (0x2c97, 0x0002), # Ledger Aramis + (0x2c97, 0x0003), # Ledger HW2 + (0x2c97, 0x0004), # Ledger Blend + (0x2c97, 0xf1d0), # Plug-Up U2F Security Key +] +HID_RPT_SIZE = 64 + +TYPE_INIT = 0x80 +U2F_VENDOR_FIRST = 0x40 + +CMD_INIT = 0x06 +CMD_WINK = 0x08 +CMD_APDU = 0x03 +U2FHID_YUBIKEY_DEVICE_CONFIG = U2F_VENDOR_FIRST + +STAT_ERR = 0xbf + +def _read_timeout(dev, size, timeout=TIMEOUT): + if (timeout > 0): + timeout += time.time() + while timeout == 0 or time.time() < timeout: + resp = dev.read(size) + if resp: + return resp + time.sleep(0.01) + return [] + +class U2FHIDError(Exception): + def __init__(self, code): + super(Exception, self).__init__("U2FHIDError: 0x%02x" % code) + self.code = code + + +class HIDDevice(U2FDevice): + + """ + U2FDevice implementation using the HID transport. + """ + + def __init__(self, path): + self.path = path + self.cid = b"\xff\xff\xff\xff" + + def open(self): + self.handle = hid.device() + self.handle.open_path(self.path) + self.handle.set_nonblocking(True) + self.init() + + def close(self): + if hasattr(self, 'handle'): + self.handle.close() + del self.handle + + def init(self): + nonce = os.urandom(8) + resp = self.call(CMD_INIT, nonce) + while resp[:8] != nonce: + print("Wrong nonce, read again...") + resp = self._read_resp(self.cid, CMD_INIT) + self.cid = resp[8:12] + + def set_mode(self, mode): + data = mode + b"\x0f\x00\x00" + self.call(U2FHID_YUBIKEY_DEVICE_CONFIG, data) + + def _do_send_apdu(self, apdu_data): + return self.call(CMD_APDU, apdu_data) + + def wink(self): + self.call(CMD_WINK) + + def _send_req(self, cid, cmd, data): + size = len(data) + bc_l = int2byte(size & 0xff) + bc_h = int2byte(size >> 8 & 0xff) + payload = cid + int2byte(TYPE_INIT | cmd) + bc_h + bc_l + \ + data[:HID_RPT_SIZE - 7] + payload += b'\0' * (HID_RPT_SIZE - len(payload)) + if self.handle.write([0] + [byte2int(c) for c in payload]) < 0: + raise exc.DeviceError("Cannot write to device!") + data = data[HID_RPT_SIZE - 7:] + seq = 0 + while len(data) > 0: + payload = cid + int2byte(0x7f & seq) + data[:HID_RPT_SIZE - 5] + payload += b'\0' * (HID_RPT_SIZE - len(payload)) + if self.handle.write([0] + [byte2int(c) for c in payload]) < 0: + raise exc.DeviceError("Cannot write to device!") + data = data[HID_RPT_SIZE - 5:] + seq += 1 + + def _read_resp(self, cid, cmd): + resp = b'.' + header = cid + int2byte(TYPE_INIT | cmd) + while resp and resp[:5] != header: + # allow for timeout + resp_vals = _read_timeout(self.handle, HID_RPT_SIZE) + resp = b''.join(int2byte(v) for v in resp_vals) + if resp[:5] == cid + int2byte(STAT_ERR): + raise U2FHIDError(byte2int(resp[7])) + + if not resp: + raise exc.DeviceError("Invalid response from device!") + + data_len = (byte2int(resp[5]) << 8) + byte2int(resp[6]) + data = resp[7:min(7 + data_len, HID_RPT_SIZE)] + data_len -= len(data) + + seq = 0 + while data_len > 0: + resp_vals = _read_timeout(self.handle, HID_RPT_SIZE) + resp = b''.join(int2byte(v) for v in resp_vals) + if resp[:4] != cid: + raise exc.DeviceError("Wrong CID from device!") + if byte2int(resp[4:5]) != seq & 0x7f: + raise exc.DeviceError("Wrong SEQ from device!") + seq += 1 + new_data = resp[5:min(5 + data_len, HID_RPT_SIZE)] + data_len -= len(new_data) + data += new_data + return data + + def call(self, cmd, data=b''): + if isinstance(data, int): + data = int2byte(data) + + self._send_req(self.cid, cmd, data) + return self._read_resp(self.cid, cmd) + +class U2FTunnelDongle(Dongle, DongleWait): + + def __init__(self, device, scrambleKey="", ledger=False, debug=False): + self.device = device + self.scrambleKey = scrambleKey + self.ledger = ledger + self.debug = debug + self.waitImpl = self + self.opened = True + self.device.open() + + def exchange(self, apdu, timeout=TIMEOUT): + if self.debug: + print("U2F => %s" % hexstr(apdu)) + + if (len(apdu)>=256): + raise CommException("Too long APDU to transport") + + # wrap apdu + i=0 + keyHandle = "" + while i < len(apdu): + val = apdu[i:i+1] + if len(self.scrambleKey) > 0: + val = chr(ord(val) ^ ord(self.scrambleKey[i % len(self.scrambleKey)])) + keyHandle += val + i+=1 + + client_param = sha256("u2f_tunnel".encode('utf8')).digest() + app_param = sha256("u2f_tunnel".encode('utf8')).digest() + + request = client_param + app_param + int2byte(len(keyHandle)) + keyHandle + + #p1 = 0x07 if check_only else 0x03 + p1 = 0x03 + p2 = 0 + response = self.device.send_apdu(INS_SIGN, p1, p2, request) + + if self.debug: + print("U2F <= %s%.2x" % (hexstr(response), 0x9000)) + + # check replied status words of the command (within the APDU tunnel) + if hexstr(response[-2:]) != "9000": + raise CommException("Invalid status words received: " + hexstr(response[-2:])); + + # api expect a byte array, remove the appended status words + return bytearray(response[:-2]) + + def apduMaxDataSize(self): + return 256-5 + + def close(self): + self.device.close() + + def waitFirstResponse(self, timeout): + raise CommException("Invalid use") + +def getDongles(dev_class=None, scrambleKey="", debug=False): + dev_class = dev_class or HIDDevice + devices = [] + for d in hid.enumerate(0, 0): + usage_page = d['usage_page'] + if usage_page == 0xf1d0 and d['usage'] == 1: + devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)) + # Usage page doesn't work on Linux + # well known devices + elif (d['vendor_id'], d['product_id']) in DEVICES: + device = HIDDevice(d['path']) + try: + device.open() + device.close() + devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)) + except (exc.DeviceError, IOError, OSError): + pass + # unknown devices + else: + device = HIDDevice(d['path']) + try: + device.open() + # try a ping command to ensure a FIDO device, else timeout (BEST here, modulate the timeout, 2 seconds is way too big) + device.ping() + device.close() + devices.append(U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug)) + except (exc.DeviceError, IOError, OSError): + pass + return devices + +def getDongle(path=None, dev_class=None, scrambleKey="", debug=False): + # if path is none, then use the first device + dev_class = dev_class or HIDDevice + devices = [] + for d in hid.enumerate(0, 0): + if path is None or d['path'] == path: + usage_page = d['usage_page'] + if usage_page == 0xf1d0 and d['usage'] == 1: + return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug) + # Usage page doesn't work on Linux + # well known devices + elif (d['vendor_id'], d['product_id']) in DEVICES and ('interface_number' not in d or d['interface_number'] == 1): + #print d + device = HIDDevice(d['path']) + try: + device.open() + device.close() + return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug) + except (exc.DeviceError, IOError, OSError): + traceback.print_exc() + pass + # unknown devices + # else: + # device = HIDDevice(d['path']) + # try: + # device.open() + # # try a ping command to ensure a FIDO device, else timeout (BEST here, modulate the timeout, 2 seconds is way too big) + # device.ping() + # device.close() + # return U2FTunnelDongle(dev_class(d['path']),scrambleKey, debug=debug) + # except (exc.DeviceError, IOError, OSError): + # traceback.print_exc() + # pass + raise CommException("No dongle found") diff --git a/ledgerblue/deleteApp.py b/ledgerblue/deleteApp.py index 5ce9ffd..aea49ae 100644 --- a/ledgerblue/deleteApp.py +++ b/ledgerblue/deleteApp.py @@ -23,6 +23,7 @@ def get_argparser(): parser = argparse.ArgumentParser(description="Delete the app with the specified name.") parser.add_argument("--targetId", help="The device's target ID (default is Ledger Blue)", type=auto_int) parser.add_argument("--appName", help="The name of the application to delete") + parser.add_argument("--appHash", help="Set the application hash") parser.add_argument("--rootPrivateKey", help="A private key used to establish a Secure Channel (hex encoded)") parser.add_argument("--apdu", help="Display APDU log", action='store_true') parser.add_argument("--deployLegacy", help="Use legacy deployment API", action='store_true') @@ -41,13 +42,24 @@ def auto_int(x): args = get_argparser().parse_args() - if args.appName == None: - raise Exception("Missing appName") + if args.appName == None and args.appHash == None: + raise Exception("Missing appName or appHash") + if args.appName != None and args.appHash != None: + raise Exception("Set either appName or appHash") + + if args.appName != None: + if (sys.version_info.major == 3): + args.appName = bytes(args.appName,'ascii') + if (sys.version_info.major == 2): + args.appName = bytes(args.appName) + + if args.appHash != None: + if (sys.version_info.major == 3): + args.appHash = bytes(args.appHash,'ascii') + if (sys.version_info.major == 2): + args.appHash = bytes(args.appHash) + args.appHash = bytearray.fromhex(args.appHash) - if (sys.version_info.major == 3): - args.appName = bytes(args.appName,'ascii') - if (sys.version_info.major == 2): - args.appName = bytes(args.appName) if args.targetId == None: args.targetId = 0x31000002 @@ -64,4 +76,8 @@ def auto_int(x): else: secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) loader = HexLoader(dongle, 0xe0, True, secret) - loader.deleteApp(args.appName) + + if args.appName != None: + loader.deleteApp(args.appName) + if args.appHash != None: + loader.deleteAppByHash(args.appHash) \ No newline at end of file diff --git a/ledgerblue/deployed.py b/ledgerblue/deployed.py index 584e660..a85c7cc 100644 --- a/ledgerblue/deployed.py +++ b/ledgerblue/deployed.py @@ -30,6 +30,9 @@ def getDeployedSecretV1(dongle, masterPrivate, targetid): testMasterPublic = bytearray(testMaster.pubkey.serialize(compressed=False)) targetid = bytearray(struct.pack('>I', targetid)) + if targetId&0xF != 0x1: + raise BaseException("Target ID does not support SCP V1") + # identify apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid dongle.exchange(apdu) @@ -75,10 +78,13 @@ def getDeployedSecretV1(dongle, masterPrivate, targetid): secret = last_pub_key.ecdh(bytes(ephemeralPrivate.serialize().decode('hex'))) return secret[0:16] -def getDeployedSecretV2(dongle, masterPrivate, targetid): +def getDeployedSecretV2(dongle, masterPrivate, targetId, signerCertChain=None, ecdh_secret_format=None): testMaster = PrivateKey(bytes(masterPrivate)) testMasterPublic = bytearray(testMaster.pubkey.serialize(compressed=False)) - targetid = bytearray(struct.pack('>I', targetid)) + targetid = bytearray(struct.pack('>I', targetId)) + + if targetId&0xF == 0x1: + raise BaseException("Target ID does not support SCP V2") # identify apdu = bytearray([0xe0, 0x04, 0x00, 0x00]) + bytearray([len(targetid)]) + targetid @@ -95,13 +101,18 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid): #if cardKey != testMasterPublic: # raise Exception("Invalid batch public key") - print("Using test master key %s " % binascii.hexlify(testMasterPublic)) - dataToSign = bytes(bytearray([0x01]) + testMasterPublic) - signature = testMaster.ecdsa_sign(bytes(dataToSign)) - signature = testMaster.ecdsa_serialize(signature) - certificate = bytearray([len(testMasterPublic)]) + testMasterPublic + bytearray([len(signature)]) + signature - apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(certificate)]) + certificate - dongle.exchange(apdu) + if (signerCertChain): + for cert in signerCertChain: + apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(cert)]) + cert + dongle.exchange(apdu) + else: + print("Using test master key %s " % binascii.hexlify(testMasterPublic)) + dataToSign = bytes(bytearray([0x01]) + testMasterPublic) + signature = testMaster.ecdsa_sign(bytes(dataToSign)) + signature = testMaster.ecdsa_serialize(signature) + certificate = bytearray([len(testMasterPublic)]) + testMasterPublic + bytearray([len(signature)]) + signature + apdu = bytearray([0xE0, 0x51, 0x00, 0x00]) + bytearray([len(certificate)]) + certificate + dongle.exchange(apdu) # provide the ephemeral certificate ephemeralPrivate = PrivateKey() @@ -116,7 +127,8 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid): # walk the device certificates to retrieve the public key to use for authentication index = 0 - last_pub_key = PublicKey(bytes(testMasterPublic), raw=True) + last_dev_pub_key = PublicKey(bytes(testMasterPublic), raw=True) + devicePublicKey = None while True: if index == 0: certificate = bytearray(dongle.exchange(bytearray.fromhex('E052000000'))) @@ -132,24 +144,33 @@ def getDeployedSecretV2(dongle, masterPrivate, targetid): certificatePublicKey = certificate[offset : offset + certificate[offset-1]] offset += certificate[offset-1] + 1 certificateSignatureArray = certificate[offset : offset + certificate[offset-1]] - certificateSignature = last_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray)) + certificateSignature = last_dev_pub_key.ecdsa_deserialize(bytes(certificateSignatureArray)) # first cert contains a header field which holds the certificate's public key role if index == 0: + devicePublicKey = certificatePublicKey certificateSignedData = bytearray([0x02]) + certificateHeader + certificatePublicKey # Could check if the device certificate is signed by the issuer public key # ephemeral key certificate else: certificateSignedData = bytearray([0x12]) + deviceNonce + nonce + certificatePublicKey - if not last_pub_key.ecdsa_verify(bytes(certificateSignedData), certificateSignature): + if not last_dev_pub_key.ecdsa_verify(bytes(certificateSignedData), certificateSignature): if index == 0: # Not an error if loading from user key print("Broken certificate chain - loading from user key") else: raise Exception("Broken certificate chain") - last_pub_key = PublicKey(bytes(certificatePublicKey), raw=True) + last_dev_pub_key = PublicKey(bytes(certificatePublicKey), raw=True) index = index + 1 # Commit device ECDH channel dongle.exchange(bytearray.fromhex('E053000000')) - secret = last_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize())) - return secret[0:16] + secret = last_dev_pub_key.ecdh(binascii.unhexlify(ephemeralPrivate.serialize())) + + #forced to specific version + if ecdh_secret_format==1 or targetId&0xF == 0x2: + return secret[0:16] + elif targetId&0xF == 0x3: + ret = {} + ret['ecdh_secret'] = secret + ret['devicePublicKey'] = devicePublicKey + return ret diff --git a/ledgerblue/endorsementSetupLedger.py b/ledgerblue/endorsementSetupLedger.py index a567415..861eb97 100644 --- a/ledgerblue/endorsementSetupLedger.py +++ b/ledgerblue/endorsementSetupLedger.py @@ -51,7 +51,11 @@ def serverQuery(request, url): import sys import os import struct - import urllib2, urlparse + if sys.version_info.major == 3: + import urllib.request as urllib2 + import urllib.parse as urlparse + else: + import urllib2, urlparse from .BlueHSMServer_pb2 import Request, Response, Parameter from .comm import getDongle @@ -108,13 +112,17 @@ def serverQuery(request, url): parameter.local = False parameter.alias = "persoKey" parameter.name = args.perso - request.parameters = str(deviceNonce) + request.parameters = bytes(deviceNonce) response = serverQuery(request, args.url) offset = 0 - remotePublicKeySignatureLength = ord(response.response[offset + 1]) + 2 + if sys.version_info.major == 2: + responseLength = ord(response.response[offset + 1]) + else: + responseLength = response.response[offset + 1] + remotePublicKeySignatureLength = responseLength + 2 remotePublicKeySignature = response.response[offset : offset + remotePublicKeySignatureLength] certificate = bytearray([len(remotePublicKey)]) + remotePublicKey + bytearray([len(remotePublicKeySignature)]) + remotePublicKeySignature @@ -136,7 +144,7 @@ def serverQuery(request, url): request = Request() request.reference = "signEndorsement" request.id = response.id - request.parameters = str(certificate) + request.parameters = bytes(certificate) serverQuery(request, args.url) index += 1 @@ -158,7 +166,7 @@ def serverQuery(request, url): parameter.local = False parameter.alias = "endorsementKey" parameter.name = args.endorsement - request.parameters = str(endorsementData) + request.parameters = bytes(endorsementData) request.id = response.id response = serverQuery(request, args.url) certificate = bytearray(response.response) diff --git a/ledgerblue/hexLoader.py b/ledgerblue/hexLoader.py index 7fc504b..f023598 100644 --- a/ledgerblue/hexLoader.py +++ b/ledgerblue/hexLoader.py @@ -18,22 +18,127 @@ """ from Crypto.Cipher import AES +import sys import struct import hashlib import binascii +from .ecWrapper import PrivateKey, PublicKey +from builtins import int +from ecpy.curves import Curve +import os +#from builtins import str LOAD_SEGMENT_CHUNK_HEADER_LENGTH = 3 MIN_PADDING_LENGTH = 1 +SCP_MAC_LENGTH = 0xE + +BOLOS_TAG_APPNAME = 0x01 +BOLOS_TAG_APPVERSION = 0x02 +BOLOS_TAG_ICON = 0x03 +BOLOS_TAG_DERIVEPATH = 0x04 +BOLOS_TAG_DATASIZE = 0x05 +BOLOS_TAG_DEPENDENCY = 0x06 + +def encodelv(v): + l = len(v) + s = "" + if l < 128: + s += struct.pack(">B", l) + elif l < 256: + s += struct.pack(">B", 0x81) + s += struct.pack(">B", l) + elif l < 65536: + s += struct.pack(">B", 0x82) + s += struct.pack(">H", l) + else: + raise Exception("Unimplemented LV encoding") + s += v + return s + +def encodetlv(t, v): + l = len(v) + s = struct.pack(">B", t) + if l < 128: + s += struct.pack(">B", l) + elif l < 256: + s += struct.pack(">B", 0x81) + s += struct.pack(">B", l) + elif l < 65536: + s += struct.pack(">B", 0x82) + s += struct.pack(">H", l) + else: + raise Exception("Unimplemented TLV encoding") + s += v + return s + +def str2bool(v): + if v is not None: + return v.lower() in ("yes", "true", "t", "1") + return False +SCP_DEBUG = str2bool(os.getenv("SCP_DEBUG")) class HexLoader: - def __init__(self, card, cla=0xF0, secure=False, key=None, relative=True): + + def scp_derive_key(self, ecdh_secret, keyindex): + retry = 0 + # di = sha256(i || retrycounter || ecdh secret) + while True: + sha256 = hashlib.new('sha256') + sha256.update(struct.pack(">IB", keyindex, retry)) + sha256.update(ecdh_secret) + + # compare di with order + CURVE_SECP256K1 = Curve.get_curve('secp256k1') + if int.from_bytes(sha256.digest(), 'big') < CURVE_SECP256K1.order: + break + #regenerate a new di satisfying order upper bound + retry+=1 + + # Pi = di*G + privkey = PrivateKey(bytes(sha256.digest())) + pubkey = bytearray(privkey.pubkey.serialize(compressed=False)) + # ki = sha256(Pi) + sha256 = hashlib.new('sha256') + sha256.update(pubkey) + #print ("Key " + str (keyindex) + ": " + sha256.hexdigest()) + return sha256.digest() + + def __init__(self, card, cla=0xF0, secure=False, mutauth_result=None, relative=True, cleardata_block_len=None): self.card = card self.cla = cla self.secure = secure - self.key = key - self.iv = b"\x00" * 16 + self.createappParams = None + + #legacy unsecure SCP (pre nanos-1.4, pre blue-2.1) + self.max_mtu = 0xFE + if not self.card is None: + self.max_mtu = min(self.max_mtu, self.card.apduMaxDataSize()) + self.scpVersion = 2 + self.key = mutauth_result + self.iv = b'\x00' * 16 self.relative = relative + #store the aligned block len to be transported if requested + self.cleardata_block_len=cleardata_block_len + if not (self.cleardata_block_len is None): + if not self.card is None: + self.cleardata_block_len = min(self.cleardata_block_len, self.card.apduMaxDataSize()) + + # try: + if type(mutauth_result) is dict and 'ecdh_secret' in mutauth_result: + self.scp_enc_key = self.scp_derive_key(mutauth_result['ecdh_secret'], 0)[0:16] + self.scp_enc_iv = b"\x00" * 16 + self.scp_mac_key = self.scp_derive_key(mutauth_result['ecdh_secret'], 1)[0:16] + self.scp_mac_iv = b"\x00" * 16 + self.scpVersion = 3 + self.max_mtu = 0xFE + if not self.card is None: + self.max_mtu = min(self.max_mtu, self.card.apduMaxDataSize()&0xF0) + + # except: + # pass + + def crc16(self, data): @@ -79,53 +184,122 @@ def crc16(self, data): return crc def exchange(self, cla, ins, p1, p2, data): + #wrap + data = self.scpWrap(data) apdu = bytearray([cla, ins, p1, p2, len(data)]) + bytearray(data) if self.card == None: print("%s" % binascii.hexlify(apdu)) else: - return self.card.exchange(apdu) + # unwrap after exchanged + return self.scpUnwrap(bytes(self.card.exchange(apdu))) - def encryptAES(self, data): - if not self.secure: + def scpWrap(self, data): + if not self.secure or data is None or len(data) == 0: return data - paddedData = data + b'\x80' - while (len(paddedData) % 16) != 0: - paddedData += b'\x00' - cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - encryptedData = cipher.encrypt(paddedData) - self.iv = encryptedData[len(encryptedData) - 16:] + + if self.scpVersion == 3: + if SCP_DEBUG: + print(binascii.hexlify(data)) + # ENC + paddedData = data + b'\x80' + while (len(paddedData) % 16) != 0: + paddedData += b'\x00' + if SCP_DEBUG: + print(binascii.hexlify(paddedData)) + cipher = AES.new(self.scp_enc_key, AES.MODE_CBC, self.scp_enc_iv) + encryptedData = cipher.encrypt(paddedData) + self.scp_enc_iv = encryptedData[-16:] + if SCP_DEBUG: + print(binascii.hexlify(encryptedData)) + # MAC + cipher = AES.new(self.scp_mac_key, AES.MODE_CBC, self.scp_mac_iv) + macData = cipher.encrypt(encryptedData) + self.scp_mac_iv = macData[-16:] + + # only append part of the mac + encryptedData += self.scp_mac_iv[-SCP_MAC_LENGTH:] + if SCP_DEBUG: + print(binascii.hexlify(encryptedData)) + else: + paddedData = data + b'\x80' + while (len(paddedData) % 16) != 0: + paddedData += b'\x00' + cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + if SCP_DEBUG: + print("wrap_old: "+binascii.hexlify(paddedData)) + encryptedData = cipher.encrypt(paddedData) + self.iv = encryptedData[-16:] + + #print (">>") return encryptedData - def decryptAES(self, data): - if not self.secure or len(data) == 0: + def scpUnwrap(self, data): + if not self.secure or data is None or len(data) == 0 or len(data) == 2: return data - cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - decryptedData = cipher.decrypt(data) - l = len(decryptedData) - 1 - while (decryptedData[l] != chr(0x80)): - l-=1 - decryptedData = decryptedData[0:l] - self.iv = data[len(data) - 16:] + + if sys.version_info.major == 3: + padding_char = 0x80 + else: + padding_char = chr(0x80) + + if self.scpVersion == 3: + if SCP_DEBUG: + print(binascii.hexlify(data)) + # MAC + cipher = AES.new(self.scp_mac_key, AES.MODE_CBC, self.scp_mac_iv) + macData = cipher.encrypt(data[0:-SCP_MAC_LENGTH]) + self.scp_mac_iv = macData[-16:] + if self.scp_mac_iv[-SCP_MAC_LENGTH:] != data[-SCP_MAC_LENGTH:] : + raise BaseException("Invalid SCP MAC") + # consume mac + data = data[0:-SCP_MAC_LENGTH] + + if SCP_DEBUG: + print(binascii.hexlify(data)) + # ENC + cipher = AES.new(self.scp_enc_key, AES.MODE_CBC, self.scp_enc_iv) + self.scp_enc_iv = data[-16:] + data = cipher.decrypt(data) + l = len(data) - 1 + while (data[l] != padding_char): + l-=1 + if l == -1: + raise BaseException("Invalid SCP ENC padding") + data = data[0:l] + decryptedData = data + + if SCP_DEBUG: + print(binascii.hexlify(data)) + else: + cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + decryptedData = cipher.decrypt(data) + if SCP_DEBUG: + print("unwrap_old: "+binascii.hexlify(decryptedData)) + l = len(decryptedData) - 1 + while (decryptedData[l] != padding_char): + l-=1 + if l == -1: + raise BaseException("Invalid SCP ENC padding") + decryptedData = decryptedData[0:l] + self.iv = data[-16:] + + #print ("<<") return decryptedData def selectSegment(self, baseAddress): data = b'\x05' + struct.pack('>I', baseAddress) - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) def loadSegmentChunk(self, offset, chunk): data = b'\x06' + struct.pack('>H', offset) + chunk - data = self.encryptAES(data) - self.exchange(self.cla, 0x00, 0x00, 0x00, data) + self.exchange(self.cla, 0x00, 0x00, 0x00, data) def flushSegment(self): data = b'\x07' - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) def crcSegment(self, offsetSegment, lengthSegment, crcExpected): data = b'\x08' + struct.pack('>H', offsetSegment) + struct.pack('>I', lengthSegment) + struct.pack('>H', crcExpected) - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) def validateTargetId(self, targetId): @@ -138,10 +312,15 @@ def boot(self, bootadr, signature=None): data = b'\x09' + struct.pack('>I', bootadr) if (signature != None): data += chr(len(signature)) + signature - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) - def createApp(self, appflags, applength, appname, icon=None, path=None, iconOffset=None, iconSize=None, appversion=None): + def commit(self, signature=None): + data = b'\x09' + if (signature != None): + data += chr(len(signature)) + signature + self.exchange(self.cla, 0x00, 0x00, 0x00, data) + + def createAppNoInstallParams(self, appflags, applength, appname, icon=None, path=None, iconOffset=None, iconSize=None, appversion=None): data = b'\x0B' + struct.pack('>I', applength) + struct.pack('>I', appflags) + struct.pack('>B', len(appname)) + appname if iconOffset is None: if not (icon is None): @@ -160,42 +339,102 @@ def createApp(self, appflags, applength, appname, icon=None, path=None, iconOffs if not appversion is None: data += struct.pack('>B', len(appversion)) + appversion - data = self.encryptAES(data) + # in previous version, appparams are not part of the application hash yet + self.createappParams = None #data[1:] self.exchange(self.cla, 0x00, 0x00, 0x00, data) + def createApp(self, code_length, data_length=0, install_params_length=0, flags=0, bootOffset=1): + #keep the create app parameters to be included in the load app hash + self.createappParams = struct.pack('>IIIII', code_length, data_length, install_params_length, flags, bootOffset) + data = b'\x0B' + self.createappParams + self.exchange(self.cla, 0x00, 0x00, 0x00, data) + def deleteApp(self, appname): data = b'\x0C' + struct.pack('>B',len(appname)) + appname - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) + def deleteAppByHash(self, appfullhash): + if len(appfullhash) != 32: + raise BaseException("Invalid hash format, sha256 expected") + data = b'\x15' + appfullhash + self.exchange(self.cla, 0x00, 0x00, 0x00, data) + + def getVersion(self): + data = b'\x10' + response = self.exchange(self.cla, 0x00, 0x00, 0x00, data) + if sys.version_info.major == 2: + response = bytearray(response) + result = {} + offset = 0 + result['targetId'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3] + offset += 4 + result['osVersion'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8') + offset += 1 + response[offset] + offset += 1 + result['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3] + offset += 4 + result['mcuVersion'] = response[offset + 1 : offset + 1 + response[offset] - 1].decode('utf-8') + offset += 1 + response[offset] + if (offset < len(response)): + result['mcuHash'] = response[offset : offset + 32] + return result + def listApp(self, restart=True): if restart: data = b'\x0E' else: data = b'\x0F' - data = self.encryptAES(data) - response = str(self.exchange(self.cla, 0x00, 0x00, 0x00, data)) - response = bytearray(self.decryptAES(response)) + response = self.exchange(self.cla, 0x00, 0x00, 0x00, data) + if sys.version_info.major == 2: + response = bytearray(response) + #print binascii.hexlify(response[0]) result = [] offset = 0 - while offset != len(response): - item = {} - offset += 1 - item['name'] = response[offset + 1 : offset + 1 + response[offset]] - offset += 1 + response[offset] - item['flags'] = response[offset] << 24 | response[offset + 1] << 16 | response[offset + 2] << 8 | response[offset + 3] - offset += 4 - item['hash'] = response[offset : offset + 32] - offset += 32 - result.append(item) + if len(response) > 0: + if response[0] != 0x01: + # support old format + while offset != len(response): + item = {} + offset += 1 + item['name'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8') + offset += 1 + response[offset] + item['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3] + offset += 4 + item['hash'] = response[offset : offset + 32] + offset += 32 + result.append(item) + else: + offset += 1 + while offset != len(response): + item = {} + #skip the current entry's size + offset += 1 + item['flags'] = (response[offset] << 24) | (response[offset + 1] << 16) | (response[offset + 2] << 8) | response[offset + 3] + offset += 4 + item['hash_code_data'] = response[offset : offset + 32] + offset += 32 + item['hash'] = response[offset : offset + 32] + offset += 32 + item['name'] = response[offset + 1 : offset + 1 + response[offset]].decode('utf-8') + offset += 1 + response[offset] + result.append(item) return result + - def load(self, erase_u8, max_length_per_apdu, hexFile): + def load(self, erase_u8, max_length_per_apdu, hexFile, reverse=False, doCRC=True): + if (max_length_per_apdu > self.max_mtu): + max_length_per_apdu = self.max_mtu initialAddress = 0 if self.relative: initialAddress = hexFile.minAddr() sha256 = hashlib.new('sha256') - for area in hexFile.getAreas(): + # stat by hashing the create app params to ensure complete app signature + if self.createappParams: + sha256.update(self.createappParams) + areas = hexFile.getAreas() + if reverse: + areas = reversed(hexFile.getAreas()) + for area in areas: startAddress = area.getStart() - initialAddress data = area.getData() self.selectSegment(startAddress) @@ -206,37 +445,50 @@ def load(self, erase_u8, max_length_per_apdu, hexFile): crc = self.crc16(bytearray(data)) offset = 0 length = len(data) + if reverse: + offset = length while (length > 0): - if length > max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH: - chunkLen = max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH + if length > max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH - SCP_MAC_LENGTH: + chunkLen = max_length_per_apdu - LOAD_SEGMENT_CHUNK_HEADER_LENGTH - MIN_PADDING_LENGTH - SCP_MAC_LENGTH + if (chunkLen%16) != 0: + chunkLen -= (chunkLen%16) else: chunkLen = length - chunk = data[offset : offset + chunkLen] - sha256.update(chunk) - self.loadSegmentChunk(offset, bytes(chunk)) - offset += chunkLen + + if self.cleardata_block_len and chunkLen%self.cleardata_block_len: + if (chunkLen < self.cleardata_block_len): + raise Exception("Cannot transport not block aligned data with fixed block len") + chunkLen -= chunkLen%self.cleardata_block_len; + # padd with 00's when not complete block and performing NENC + if reverse: + chunk = data[offset-chunkLen : offset] + self.loadSegmentChunk(offset-chunkLen, bytes(chunk)) + else: + chunk = data[offset : offset + chunkLen] + sha256.update(chunk) + self.loadSegmentChunk(offset, bytes(chunk)) + if reverse: + offset -= chunkLen + else: + offset += chunkLen length -= chunkLen self.flushSegment() - self.crcSegment(0, len(data), crc) + if doCRC: + self.crcSegment(0, len(data), crc) return sha256.hexdigest() - def run(self, hexFile, bootaddr, signature=None): - initialAddress = 0 - if self.relative: - initialAddress = hexFile.minAddr() - self.boot(bootaddr - initialAddress, signature) + def run(self, bootoffset=1, signature=None): + self.boot(bootoffset, signature) def resetCustomCA(self): data = b'\x13' - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) def setupCustomCA(self, name, public): data = b'\x12' + struct.pack('>B',len(name)) + name + struct.pack('>B',len(public)) + public - data = self.encryptAES(data) self.exchange(self.cla, 0x00, 0x00, 0x00, data) def runApp(self, name): - data = b'\x14' + struct.pack('>B',len(name)) + name - data = self.encryptAES(data) - self.exchange(self.cla, 0x00, 0x00, 0x00, data) + data = name + self.exchange(self.cla, 0xD8, 0x00, 0x00, data) + diff --git a/ledgerblue/hexParser.py b/ledgerblue/hexParser.py index 3ac7dfe..ea8b8eb 100644 --- a/ledgerblue/hexParser.py +++ b/ledgerblue/hexParser.py @@ -66,7 +66,7 @@ def __init__(self, fileName): recordType = data[3] if recordType == 0x00: if startZone == None: - raise Exception("Data record but no zone defined at line " + lineNumber) + raise Exception("Data record but no zone defined at line " + str(lineNumber)) if startFirst == None: startFirst = address current = startFirst @@ -123,10 +123,13 @@ def minAddr(self): import binascii class IntelHexPrinter: - def addArea(self, startaddress, data): - #order by start address + def addArea(self, startaddress, data, insertFirst=False): #self.areas.append(IntelHexArea(startaddress, data)) - self.areas = insertAreaSorted(self.areas, IntelHexArea(startaddress, data)) + if (insertFirst): + self.areas = [IntelHexArea(startaddress, data)] + self.areas + else: + #order by start address + self.areas = insertAreaSorted(self.areas, IntelHexArea(startaddress, data)) def __init__(self, parser=None, eol="\r\n"): self.areas = [] diff --git a/ledgerblue/loadApp.py b/ledgerblue/loadApp.py index 28b3f19..0b8a874 100644 --- a/ledgerblue/loadApp.py +++ b/ledgerblue/loadApp.py @@ -18,6 +18,7 @@ """ DEFAULT_ALIGNMENT = 1024 +PAGE_ALIGNMENT = 64 import argparse @@ -32,6 +33,7 @@ def get_argparser(): repeated""", action='append') parser.add_argument("--appName", help="The name to give the application after loading it") parser.add_argument("--signature", help="A signature of the application (hex encoded)") + parser.add_argument("--signApp", help="Sign application with provided rootPrivateKey", action='store_true') parser.add_argument("--appFlags", help="The application flags", type=auto_int) parser.add_argument("--bootAddr", help="The application's boot address", type=auto_int) parser.add_argument("--rootPrivateKey", help="""The Signer private key used to establish a Secure Channel (otherwise @@ -41,7 +43,13 @@ def get_argparser(): parser.add_argument("--apilevel", help="Use given API level when interacting with the device", type=auto_int) parser.add_argument("--delete", help="Delete the app with the same name before loading the provided one", action='store_true') parser.add_argument("--params", help="Store icon and install parameters in a parameter section before the code", action='store_true') + parser.add_argument("--tlv", help="Use install parameters for all variable length parameters", action='store_true') + parser.add_argument("--dataSize", help="The code section's size in the provided hex file (to separate data from code, if not provided the whole allocated NVRAM section for the application will remain readonly.", type=auto_int) parser.add_argument("--appVersion", help="The application version (as a string)") + parser.add_argument("--offline", help="Request to only output application load APDUs", action="store_true") + parser.add_argument("--installparamsSize", help="The loaded install parameters section size (when parameters are already included within the .hex file.", type=auto_int) + parser.add_argument("--tlvraw", help="Add a custom install param with the hextag:hexvalue encoding", action='append') + parser.add_argument("--dep", help="Add a dependency over an appname[:appversion]", action='append') return parser def auto_int(x): @@ -129,50 +137,121 @@ def parse_bip32_path(path, apilevel): else: path = parse_bip32_path(args.path[0], args.apilevel) - icon = None if not args.icon is None: - icon = bytearray.fromhex(args.icon) - + args.icon = bytearray.fromhex(args.icon) + signature = None if not args.signature is None: signature = bytearray.fromhex(args.signature) - + #prepend app's data with the icon content (could also add other various install parameters) printer = IntelHexPrinter(parser) - #todo build a TLV zone to keep install params - #todo dney nvm_write in that section ? - paramsSectionContent = [] - if icon: - paramsSectionContent = icon - - # prepend the param section (arbitrary) - if (args.params): - #take care of aligning the parameters sections to avoid possible invalid dereference of aligned words in the program nvram. - #also use the default MPU alignment - param_start = printer.minAddr()-len(paramsSectionContent)-(DEFAULT_ALIGNMENT-(len(paramsSectionContent)%DEFAULT_ALIGNMENT)) - printer.addArea(param_start, paramsSectionContent) - # account for added regions (install parameters, icon ...) - appLength = printer.maxAddr() - printer.minAddr() + # Use of Nested Encryption Key within the SCP protocol is mandartory for upgrades + cleardata_block_len=None + if args.appFlags & 2: + # Not true for scp < 3 + # if signature is None: + # raise BaseException('Upgrades must be signed') + # ensure data can be decoded with code decryption key without troubles. + cleardata_block_len = 16 - dongle = getDongle(args.apdu) + if not args.offline: + dongle = getDongle(args.apdu) - if args.deployLegacy: - secret = getDeployedSecretV1(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) - else: - secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) - loader = HexLoader(dongle, 0xe0, True, secret) + if args.deployLegacy: + secret = getDeployedSecretV1(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) + else: + secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) + + loader = HexLoader(dongle, 0xe0, not(args.offline), secret, cleardata_block_len=cleardata_block_len) + #tlv mode does not support explicit by name removal, would require a list app before to identify the hash to be removed if (not (args.appFlags & 2)) and args.delete: loader.deleteApp(args.appName) - #heuristic to guess how to pass the icon - if (args.params): - loader.createApp(args.appFlags, appLength, args.appName, None, path, 0, len(paramsSectionContent), args.appVersion) + if (args.tlv): + #if code length is not provided, then consider the whole provided hex file is the code and no data section is split + code_length = printer.maxAddr() - printer.minAddr() + if not args.dataSize is None: + code_length -= args.dataSize + else: + args.dataSize = 0 + + installparams = "" + + # express dependency + if (args.dep): + for dep in args.dep: + appname = dep + appversion = None + # split if version is specified + if (dep.find(":") != -1): + (appname,appversion) = dep.split(":") + depvalue = encodelv(appname) + if(appversion): + depvalue += encodelv(appversion) + installparams += encodetlv(BOLOS_TAG_DEPENDENCY, depvalue) + + #add raw install parameters as requested + if (args.tlvraw): + for tlvraw in args.tlvraw: + (hextag,hexvalue) = tlvraw.split(":") + installparams += encodetlv(int(hextag, 16), binascii.unhexlify(hexvalue)) + + if (not (args.appFlags & 2)) and ( args.installparamsSize is None or args.installparamsSize == 0 ): + #build install parameters + #mandatory app name + installparams += encodetlv(BOLOS_TAG_APPNAME, args.appName) + if not args.appVersion is None: + installparams += encodetlv(BOLOS_TAG_APPVERSION, args.appVersion) + if not args.icon is None: + installparams += encodetlv(BOLOS_TAG_ICON, args.icon) + if len(path) > 0: + installparams += encodetlv(BOLOS_TAG_DERIVEPATH, path) + + # append install parameters to the loaded file + param_start = printer.maxAddr()+(PAGE_ALIGNMENT-(args.dataSize%PAGE_ALIGNMENT))%PAGE_ALIGNMENT + # only append install param section when not an upgrade as it has already been computed in the encrypted and signed chunk + printer.addArea(param_start, installparams) + paramsSize = len(installparams) + else: + paramsSize = args.installparamsSize + # split code and install params in the code + code_length -= args.installparamsSize + # create app + #ensure the boot address is an offset + if args.bootAddr > printer.minAddr(): + args.bootAddr -= printer.minAddr() + loader.createApp(code_length, args.dataSize, paramsSize, args.appFlags, args.bootAddr|1) + elif (args.params): + paramsSectionContent = [] + if not args.icon is None: + paramsSectionContent = args.icon + #take care of aligning the parameters sections to avoid possible invalid dereference of aligned words in the program nvram. + #also use the default MPU alignment + param_start = printer.minAddr()-len(paramsSectionContent)-(DEFAULT_ALIGNMENT-(len(paramsSectionContent)%DEFAULT_ALIGNMENT)) + printer.addArea(param_start, paramsSectionContent) + # account for added regions (install parameters, icon ...) + appLength = printer.maxAddr() - printer.minAddr() + loader.createAppNoInstallParams(args.appFlags, appLength, args.appName, None, path, 0, len(paramsSectionContent), args.appVersion) else: - loader.createApp(args.appFlags, appLength, args.appName, icon, path, None, None, args.appVersion) + # account for added regions (install parameters, icon ...) + appLength = printer.maxAddr() - printer.minAddr() + loader.createAppNoInstallParams(args.appFlags, appLength, args.appName, args.icon, path, None, None, args.appVersion) + hash = loader.load(0x0, 0xF0, printer) - print("Application hash : " + hash) - loader.run(printer, args.bootAddr, signature) + + print("Application full hash : " + hash) + + if (signature == None and args.signApp): + masterPrivate = PrivateKey(bytes(bytearray.fromhex(args.rootPrivateKey))) + signature = masterPrivate.ecdsa_serialize(masterPrivate.ecdsa_sign(bytes(binascii.unhexlify(hash)), raw=True)) + print("Application signature: " + binascii.hexlify(signature)) + + if (args.tlv): + loader.commit(signature) + else: + loader.run(args.bootAddr-printer.minAddr(), signature) diff --git a/ledgerblue/loadMCU.py b/ledgerblue/loadMCU.py index 94ff8e2..3782d1e 100644 --- a/ledgerblue/loadMCU.py +++ b/ledgerblue/loadMCU.py @@ -29,6 +29,8 @@ def get_argparser(): parser.add_argument("--fileName", help="The name of the firmware file to load") parser.add_argument("--bootAddr", help="The firmware's boot address", type=auto_int) parser.add_argument("--apdu", help="Display APDU log", action='store_true') + parser.add_argument("--reverse", help="Load HEX file in reverse from the highest address to the lowest", action='store_true') + parser.add_argument("--nocrc", help="Load HEX file without checking CRC of loaded sections", action='store_true') return parser if __name__ == '__main__': @@ -53,5 +55,6 @@ def get_argparser(): loader = HexLoader(dongle, 0xe0, False, None, False) loader.validateTargetId(args.targetId) - hash = loader.load(0xFF, 0xF0, parser) - loader.run(parser.getAreas(), args.bootAddr) + hash = loader.load(0xFF, 0xF0, parser, reverse=args.reverse, doCRC=(not args.nocrc)) + loader.run(args.bootAddr) + \ No newline at end of file diff --git a/ledgerblue/runApp.py b/ledgerblue/runApp.py index 452cacc..5657fc2 100644 --- a/ledgerblue/runApp.py +++ b/ledgerblue/runApp.py @@ -55,7 +55,6 @@ def auto_int(x): dongle = getDongle(args.apdu) - secret = getDeployedSecretV2(dongle, bytearray.fromhex(args.rootPrivateKey), args.targetId) - loader = HexLoader(dongle, 0xe0, True, secret) + loader = HexLoader(dongle, 0xe0) loader.runApp(args.appName) diff --git a/ledgerblue/runScript.py b/ledgerblue/runScript.py index 9bc757b..8f4f7a0 100644 --- a/ledgerblue/runScript.py +++ b/ledgerblue/runScript.py @@ -63,39 +63,23 @@ def auto_int(x): class SCP: def __init__(self, dongle, targetId, rootPrivateKey): - self.key = getDeployedSecretV2(dongle, rootPrivateKey, targetId) - self.iv = b'\x00' * 16 + secret = getDeployedSecretV2(dongle, rootPrivateKey, targetId) + self.loader = HexLoader(dongle, 0xe0, True, secret) def encryptAES(self, data): - paddedData = data + b'\x80' - while (len(paddedData) % 16) != 0: - paddedData += b'\x00' - cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - encryptedData = cipher.encrypt(paddedData) - self.iv = encryptedData[len(encryptedData) - 16:] - return encryptedData + return self.loader.scpWrap(data); def decryptAES(self, data): - if len(data) == 0: - return data - cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - decryptedData = cipher.decrypt(data) - l = len(decryptedData) - 1 - while (decryptedData[l] != chr(0x80)): - l -= 1 - decryptedData = decryptedData[0:l] - self.iv = data[len(data) - 16:] - return decryptedData + return self.loader.scpUnwrap(data); dongle = getDongle(args.apdu) if args.scp: if args.rootPrivateKey is None: privateKey = PrivateKey() - publicKey = binascii.hexlify( - privateKey.pubkey.serialize(compressed=False)) + publicKey = binascii.hexlify(privateKey.pubkey.serialize(compressed=False)) print("Generated random root public key : %s" % publicKey) args.rootPrivateKey = privateKey.serialize() - scp = SCP(dongle, args.targetId, bytearray.fromhex(args.rootPrivateKey)) + scp = SCP(dongle, args.targetId, bytearray.fromhex(args.rootPrivateKey)) for data in file: data = data.rstrip('\r\n').decode('hex') @@ -105,13 +89,13 @@ def decryptAES(self, data): data = bytearray(data) if data[4] > 0 and len(data) > 5: apduData = data[5: 5 + data[4]] - apduData = scp.encryptAES(str(apduData)) + apduData = scp.encryptAES(bytes(apduData)) result = dongle.exchange( - data[0:4] + bytearray([len(apduData)]) + bytearray(apduData)) + data[0:4] + bytearray([len(apduData)]) + bytearray(apduData)) else: result = dongle.exchange(data[0:5]) result = scp.decryptAES(str(result)) if args.apdu: - print("<= Clear " + hexstr(result)) + print("<= Clear " + result.encode('hex')) else: dongle.exchange(bytearray(data)) diff --git a/ledgerblue/updateFirmware.py b/ledgerblue/updateFirmware.py index 27d91f2..1fae8e3 100644 --- a/ledgerblue/updateFirmware.py +++ b/ledgerblue/updateFirmware.py @@ -46,9 +46,14 @@ def serverQuery(request, url): return response if __name__ == '__main__': + import sys import os import struct - import urllib2, urlparse + if sys.version_info.major == 3: + import urllib.request as urllib2 + import urllib.parse as urlparse + else: + import urllib2, urlparse from .BlueHSMServer_pb2 import Request, Response, Parameter from .comm import getDongle import sys @@ -81,6 +86,11 @@ def serverQuery(request, url): parameter.local = False parameter.alias = "persoKey" parameter.name = args.perso + if args.targetId&0xF == 0x3: + parameter = request.remote_parameters.add() + parameter.local = False + parameter.alias = "scpv2" + parameter.name = "dummy" request.largeStack = True response = serverQuery(request, args.url) @@ -106,14 +116,23 @@ def serverQuery(request, url): parameter.local = False parameter.alias = "persoKey" parameter.name = args.perso - request.parameters = str(deviceNonce) + if args.targetId&0xF == 0x3: + parameter = request.remote_parameters.add() + parameter.local = False + parameter.alias = "scpv2" + parameter.name = "dummy" + request.parameters = bytes(deviceNonce) request.largeStack = True response = serverQuery(request, args.url) offset = 0 - remotePublicKeySignatureLength = ord(response.response[offset + 1]) + 2 + if sys.version_info.major == 2: + responseLength = ord(response.response[offset + 1]) + else: + responseLength = response.response[offset + 1] + remotePublicKeySignatureLength = responseLength + 2 remotePublicKeySignature = response.response[offset : offset + remotePublicKeySignatureLength] certificate = bytearray([len(remotePublicKey)]) + remotePublicKey + bytearray([len(remotePublicKeySignature)]) + remotePublicKeySignature @@ -135,7 +154,7 @@ def serverQuery(request, url): request = Request() request.reference = "distributeFirmware11" request.id = response.id - request.parameters = str(certificate) + request.parameters = bytes(certificate) request.largeStack = True serverQuery(request, args.url) index += 1 @@ -152,6 +171,11 @@ def serverQuery(request, url): parameter.local = False parameter.alias = "firmwareKey" parameter.name = args.firmwareKey + if args.targetId&0xF == 0x3: + parameter = request.remote_parameters.add() + parameter.local = False + parameter.alias = "scpv2" + parameter.name = "dummy" request.id = response.id request.largeStack = True diff --git a/setup.py b/setup.py index 5d71254..2fb3c9d 100644 --- a/setup.py +++ b/setup.py @@ -8,14 +8,14 @@ here = dirname(__file__) setup( name='ledgerblue', - version='0.1.16', + version='0.1.17', author='Ledger', author_email='hello@ledger.fr', description='Python library to communicate with Ledger Blue/Nano S', long_description=open(join(here, 'README.md')).read(), url='https://github.com/LedgerHQ/blue-loader-python', packages=find_packages(), - install_requires=['hidapi>=0.7.99', 'protobuf>=2.6.1', 'pycrypto>=2.6.1', 'future', 'ecpy>=0.8.1', 'pillow>=3.4.0'], + install_requires=['hidapi>=0.7.99', 'protobuf>=2.6.1', 'pycrypto>=2.6.1', 'future', 'ecpy>=0.8.1', 'pillow>=3.4.0', 'python-u2flib-host>=3.0.2'], extras_require = { 'smartcard': [ 'python-pyscard>=1.6.12-4build1' ] },