Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Write logs to microSD in HSM mode #196

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 176 additions & 153 deletions shared/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput
from exceptions import HSMDenied
from version import has_psram, has_fatram, MAX_TXN_LEN
from logging import AuditLogger

# Where in SPI flash/PSRAM the two PSBT files are (in and out)
TXN_INPUT_OFFSET = 0
Expand Down Expand Up @@ -379,192 +380,214 @@ async def interact(self):

# step 1: parse PSBT from sflash into in-memory objects.

try:
with SFFile(TXN_INPUT_OFFSET, length=self.psbt_len, message='Reading...') as fd:
# NOTE: psbtObject captures the file descriptor and uses it later
self.psbt = psbtObject.read_psbt(fd)
except BaseException as exc:
if isinstance(exc, MemoryError):
msg = "Transaction is too complex"
exc = None
else:
msg = "PSBT parse failed"
if hsm_active:
policy_hash = hsm_active.hash()
hsm_rules = hsm_active.save()['rules']
try:
hsm_rules[0]['never_log']
except KeyError:
never_log = False
else:
policy_hash = b'\x00'*32
never_log = False
with AuditLogger('logs', self.psbt_sha, never_log, policy_hash) as log:
log.new_psbt()
log.info("Loading the psbt file")
try:
with SFFile(TXN_INPUT_OFFSET, length=self.psbt_len, message='Reading...') as fd:
# NOTE: psbtObject captures the file descriptor and uses it later
self.psbt = psbtObject.read_psbt(fd)
except BaseException as exc:
if isinstance(exc, MemoryError):
msg = exc.__class__.__name__ + " Transaction is too complex"
exc = None
else:
msg = exc.__class__.__name__ + " PSBT parse failed"

return await self.failure(msg, exc)
log.error(msg)
return await self.failure(msg, exc)

dis.fullscreen("Validating...")
dis.fullscreen("Validating...")

# Do some analysis/ validation
try:
await self.psbt.validate() # might do UX: accept multisig import
self.psbt.consider_inputs()

dis.fullscreen("Validating...", percent=0.33)
self.psbt.consider_keys()

dis.progress_bar(0.66)
self.psbt.consider_outputs()

dis.progress_bar(0.85)
except FraudulentChangeOutput as exc:
print('FraudulentChangeOutput: ' + exc.args[0])
return await self.failure(exc.args[0], title='Change Fraud')
except FatalPSBTIssue as exc:
print('FatalPSBTIssue: ' + exc.args[0])
return await self.failure(exc.args[0])
except BaseException as exc:
del self.psbt
gc.collect()
log.info("PSBT parsing successful")
# Do some analysis/ validation
try:
await self.psbt.validate() # might do UX: accept multisig import
self.psbt.consider_inputs()

dis.fullscreen("Validating...", percent=0.33)
self.psbt.consider_keys()

dis.progress_bar(0.66)
self.psbt.consider_outputs()

dis.progress_bar(0.85)
except FraudulentChangeOutput as exc:
print('FraudulentChangeOutput: ' + exc.args[0])
log.error(exc.__class__.__name__ + exc.args[0])
return await self.failure(exc.args[0], title='Change Fraud')
except FatalPSBTIssue as exc:
print('FatalPSBTIssue: ' + exc.args[0])
log.error(exc.__class__.__name__ + exc.args[0])
return await self.failure(exc.args[0])
except BaseException as exc:
log.error(exc.__class__.__name__ + exc.args[0])
del self.psbt
gc.collect()

if isinstance(exc, MemoryError):
msg = "Transaction is too complex"
exc = None
else:
msg = "Invalid PSBT"

return await self.failure(msg, exc)

# step 2: figure out what we are approving, so we can get sign-off
# - outputs, amounts
# - fee
#
# notes:
# - try to handle lots of outputs
# - cannot calc fee as sat/byte, only as percent
# - somethings are 'warnings':
# - fee too big
# - inputs we can't sign (no key)
#
try:
msg = uio.StringIO()
if isinstance(exc, MemoryError):
msg = "Transaction is too complex"
log.error(exc.__class__.__name__ + exc.args[0])
exc = None
else:
msg = "Invalid PSBT"
log.error(exc.__class__.__name__ + exc.args[0])

return await self.failure(msg, exc)

log.info("Formal validation completed")
log.info("Approving content")
# step 2: figure out what we are approving, so we can get sign-off
# - outputs, amounts
# - fee
#
# notes:
# - try to handle lots of outputs
# - cannot calc fee as sat/byte, only as percent
# - somethings are 'warnings':
# - fee too big
# - inputs we can't sign (no key)
#
try:
msg = uio.StringIO()

# mention warning at top
wl= len(self.psbt.warnings)
if wl == 1:
msg.write('(1 warning below)\n\n')
elif wl >= 2:
msg.write('(%d warnings below)\n\n' % wl)
# mention warning at top
wl= len(self.psbt.warnings)
if wl == 1:
msg.write('(1 warning below)\n\n')
elif wl >= 2:
msg.write('(%d warnings below)\n\n' % wl)

self.output_summary_text(msg)
gc.collect()
# self.output_summary_text(msg)
# gc.collect()

fee = self.psbt.calculate_fee()
if fee is not None:
msg.write("\nNetwork fee:\n%s %s\n" % self.chain.render_value(fee))
fee = self.psbt.calculate_fee()
if fee is not None:
msg.write("\nNetwork fee:\n%s %s\n" % self.chain.render_value(fee))

# NEW: show where all the change outputs are going
self.output_change_text(msg)
gc.collect()
# NEW: show where all the change outputs are going
# self.output_change_text(msg)
gc.collect()

if self.psbt.warnings:
msg.write('\n---WARNING---\n\n')
if self.psbt.warnings:
msg.write('\n---WARNING---\n\n')

for label, m in self.psbt.warnings:
msg.write('- %s: %s\n\n' % (label, m))
for label, m in self.psbt.warnings:
msg.write('- %s: %s\n\n' % (label, m))

if self.do_visualize:
# stop here and just return the text of approval message itself
self.result = await self.save_visualization(msg, (self.stxn_flags & STXN_SIGNED))
del self.psbt
self.done()
if self.do_visualize:
# stop here and just return the text of approval message itself
self.result = await self.save_visualization(msg, (self.stxn_flags & STXN_SIGNED))
del self.psbt
self.done()

return
return

if not hsm_active:
msg.write("\nPress OK to approve and sign transaction. X to abort.")
ch = await ux_show_story(msg, title="OK TO SEND?")
else:
ch = await hsm_active.approve_transaction(self.psbt, self.psbt_sha, msg.getvalue())
dis.progress_bar(1) # finish the Validating...
if not hsm_active:
msg.write("\nPress OK to approve and sign transaction. X to abort.")
ch = await ux_show_story(msg, title="OK TO SEND?")
else:
ch = await hsm_active.approve_transaction(self.psbt, self.psbt_sha, msg.getvalue(), log)
dis.progress_bar(1) # finish the Validating...

except MemoryError:
# recovery? maybe.
try:
del self.psbt
del msg
except: pass # might be NameError since we don't know how far we got
gc.collect()
except MemoryError:
# recovery? maybe.
try:
del self.psbt
del msg
except: pass # might be NameError since we don't know how far we got
gc.collect()

msg = "Transaction is too complex"
return await self.failure(msg)
msg = "Transaction is too complex"
return await self.failure(msg)

if ch != 'y':
# they don't want to!
self.refused = True
if ch != 'y':
# they don't want to!
self.refused = True

await ux_dramatic_pause("Refused.", 1)
await ux_dramatic_pause("Refused.", 1)

del self.psbt
del self.psbt

self.done()
return
self.done()
return

# do the actual signing.
try:
dis.fullscreen('Wait...')
gc.collect() # visible delay caused by this but also sign_it() below
self.psbt.sign_it()
except FraudulentChangeOutput as exc:
return await self.failure(exc.args[0], title='Change Fraud')
except MemoryError:
msg = "Transaction is too complex"
return await self.failure(msg)
except BaseException as exc:
return await self.failure("Signing failed late", exc)
# do the actual signing.
try:
dis.fullscreen('Wait...')
gc.collect() # visible delay caused by this but also sign_it() below
self.psbt.sign_it()
except FraudulentChangeOutput as exc:
return await self.failure(exc.args[0], title='Change Fraud')
except MemoryError:
msg = "Transaction is too complex"
return await self.failure(msg)
except BaseException as exc:
return await self.failure("Signing failed late", exc)

if self.approved_cb:
# for micro sd case
await self.approved_cb(self.psbt)
self.done()
return
if self.approved_cb:
# for micro sd case
await self.approved_cb(self.psbt)
self.done()
return

txid = None
try:
# re-serialize the PSBT back out
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as fd:
await fd.erase()
txid = None
try:
# re-serialize the PSBT back out
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as fd:
await fd.erase()

if self.do_finalize:
txid = self.psbt.finalize(fd)
else:
self.psbt.serialize(fd)
if self.do_finalize:
txid = self.psbt.finalize(fd)
else:
self.psbt.serialize(fd)

fd.close()
self.result = (fd.tell(), fd.checksum.digest())
fd.close()
self.result = (fd.tell(), fd.checksum.digest())

self.done(redraw=(not txid))
self.done(redraw=(not txid))

except BaseException as exc:
return await self.failure("PSBT output failed", exc)
except BaseException as exc:
return await self.failure("PSBT output failed", exc)

from glob import NFC
from glob import NFC

if self.do_finalize and txid and not hsm_active:
while 1:
# Show txid when we can; advisory
# - maybe even as QR, hex-encoded in alnum mode
tmsg = txid + '\n\n'
if self.do_finalize and txid and not hsm_active:
while 1:
# Show txid when we can; advisory
# - maybe even as QR, hex-encoded in alnum mode
tmsg = txid + '\n\n'

if has_fatram:
tmsg += 'Press 1 for QR Code of TXID. '
if NFC:
tmsg += 'Press 3 to share signed txn over NFC.'
if has_fatram:
tmsg += 'Press 1 for QR Code of TXID. '
if NFC:
tmsg += 'Press 3 to share signed txn over NFC.'

ch = await ux_show_story(tmsg, "Final TXID", escape='13')
ch = await ux_show_story(tmsg, "Final TXID", escape='13')

if ch=='1' and has_fatram:
await show_qr_code(txid, True)
continue
if ch=='1' and has_fatram:
await show_qr_code(txid, True)
continue

if ch == '3' and NFC:
await NFC.share_signed_txn(txid, TXN_OUTPUT_OFFSET,
self.result[0], self.result[1])
continue
break
if ch == '3' and NFC:
await NFC.share_signed_txn(txid, TXN_OUTPUT_OFFSET,
self.result[0], self.result[1])
continue
break

# TODO ofter to share / or auto-share over NFC if that seems appropraite
#if NFC:
#NFC.share_signed_psbt(TXN_OUTPUT_OFFSET, self.result[0], self.result[1])
# TODO ofter to share / or auto-share over NFC if that seems appropraite
#if NFC:
#NFC.share_signed_psbt(TXN_OUTPUT_OFFSET, self.result[0], self.result[1])

def save_visualization(self, msg, sign_text=False):
# write text into spi flash, maybe signing it as we go
Expand Down
Loading