In [None]:
import subprocess
from pathlib import Path
import numpy as np
import matplotlib.pylab as plt
from matplotlib.ticker import (MultipleLocator, FormatStrFormatter,
                               AutoMinorLocator)
from tqdm.notebook import trange


%matplotlib notebook


PATH_CW = Path('/opt/ChipWhisperer-git/')

import chipwhisperer as cw
try:
    if not scope.connectStatus:
        scope.con()
except NameError:
    scope = cw.scope()
   
try:
    target = cw.target(scope)
except IOError:
    print("INFO: Caught exception on reconnecting to target - attempting to reconnect to scope first.")
    print("INFO: This is a work-around when USB has died without Python knowing. Ignore errors above this line.")
    scope = cw.scope()
    target = cw.target(scope)

print("INFO: Found ChipWhisperer😍")

In [None]:
SCOPETYPE = 'OPENADC'
PLATFORM = 'CWLITEXMEGA'

In [None]:
prog = cw.programmers.XMEGAProgrammer

In [None]:
path_fw = PATH_CW / "hardware/victims/firmware/simpleserial-signature/simpleserial-signature-{}.hex".format(PLATFORM)

In [None]:
import time
time.sleep(0.05)
scope.default_setup()
def reset_target(scope):
    scope.io.pdic = 'low'
    time.sleep(0.1)
    scope.io.pdic = 'high_z' #XMEGA doesn't like pdic driven high
    time.sleep(0.1) #xmega needs more startup time
    target.flush()
    
scope

In [None]:
def firmware_compile(options=""):
    p = subprocess.run(
        "cp -rv simpleserial-signature {path_cw}/hardware/victims/firmware/ && cd {path_cw}/hardware/victims/firmware/simpleserial-signature && make PLATFORM={platform} {options} CRYPTO_TARGET=AVRCRYPTOLIB CRYPTO_OPTIONS=HMACSHA256".format(
            platform=PLATFORM, path_cw=PATH_CW, options=options,
        ),
        capture_output=True,
        shell=True)
    print(p.stdout.decode('utf-8')[-500:])
    print(p.stderr.decode('utf-8'))
    p.check_returncode()

In [None]:
def upload():
    cw.program_target(
        scope,
        prog,
        str(path_fw))

In [None]:
def capture_trace(scope):
    scope.arm()
    reset_target(scope)
    print(scope.capture())
    return scope.get_last_trace()

In [None]:
def normalize_trace(trace):
    """The values coming from the ADC must be inverted"""
    return -trace

In [None]:
def target_communicate(inp, timeout=0.1):
    """input/output, take in mind that is all asynchronous so it's tricky to wait
    for the output"""
    target.write(f'{inp}\n')
    while not target.in_waiting():
        pass
    while n := target.in_waiting():
        print(target.read(n), end='')
    print()

In [None]:
def firmware_build(count):
    contents = [_ for _ in range(count)]
    return "".join(["{:02x}".format(_) for _ in contents]), bytearray(contents)

In [None]:
firmware_compile()
upload()
reset_target(scope)

In [None]:
example = '000102030405060708090a0b0c0d0e0f'
raw_hex_firmware, raw_firmware = firmware_build(64)
print(len(raw_hex_firmware), raw_firmware)

print("version: ", end="")
target_communicate('v', timeout=0)

print("HMAC-SHA256: ", end="")
target_communicate(f's{raw_hex_firmware}', timeout=0)

print("upload: ", end="")
target_communicate(f'u{raw_hex_firmware}', timeout=0)

print("upload wrong: ", end="")
target_communicate(f'u{generate_random_firmware(64)[1]}')

In [None]:
import hashlib
import hmac
hashlib.algorithms_available

def calculate_signature(inp):
    key = bytearray([
        0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77,
        0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF,
        0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 
        0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77,
        0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 
        0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF,
        0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 
        0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF
    ])
    return hmac.digest(key, inp, 'sha256')

In [None]:
def extract_signature(msg):
    if not msg.startswith("s"):
        raise ValueError("we are expecting 's' at the start")
        
    signature = msg[1:]
    return bytes.fromhex(signature)

In [None]:
def target_readsignature(inp):
    """input/output, take in mind that is all asynchronous so it's tricky to wait
    for the output"""
    output = ""

    target.write(f's{inp}\n')

    while not target.in_waiting():
        pass
    while n := target.in_waiting():
        output += target.read(n)

    *ret, _ = output.split("\n")
    
    if ret[1] != 'z00':
        raise ValueError(f"unexpected result -> {output}")
        
    return extract_signature(ret[0])

In [None]:
assert calculate_signature(raw_firmware) == target_readsignature(raw_hex_firmware)

In [None]:
assert calculate_signature(raw_firmware) == target_readsignature(generate_random_firmware(64)[1])

In [None]:
reset_target(scope)

In [None]:
def generate_random_firmware(size):
    import random
    
    contents = [random.randint(0, 255) for _ in range(size)]

    return bytes(contents), ''.join(['{:02x}'.format(_) for _ in contents])

In [None]:
def target_upload(firmware):
    """Upload the firmware. Note that we are not going to read
    since for the wrong checksum we have the board hanging"""
    output = ""

    target.write(f'u{firmware}\n')


check it works

In [None]:
target_upload(generate_random_firmware(64)[1])

In [None]:
target_upload(raw_hex_firmware)

In [None]:
from tqdm.notebook import trange
import numpy as np
import time


traces = []
signatures = []
scope.adc.samples = 10000
scope.adc.offset = 50000
N = 100

for i in trange(N, desc='Capturing traces'):
    reset_target(scope)
    scope.arm()
    
    firmware_raw, firmware_hex = generate_random_firmware(64)
    signature = calculate_signature(firmware_raw)
    target_upload(firmware_hex)
    
    ret = scope.capture()
    if ret:
        print("Target timed out!")
        continue
    
    traces.append(scope.get_last_trace())
    signatures.append(signature)
    
traces = np.array(traces)

In [None]:
import matplotlib.pylab as plt

plt.figure()
plt.plot(traces[0])
plt.plot(traces[30])
plt.show()

In [None]:
import numpy as np

def mean(X):
    return np.mean(X, axis=0)

def std_dev(X, X_bar):
    return np.sqrt(np.sum((X-X_bar)**2, axis=0))

def cov(X, X_bar, Y, Y_bar):
    return np.sum((X-X_bar)*(Y-Y_bar), axis=0)

In [None]:
HW = [0]*256

def calc_hamming_weight(n):
    return bin(n).count("1")

for idx in range(256):
    HW[idx] = calc_hamming_weight(idx)

In [None]:
signatures[0]

In [None]:
len(signatures[0])

In [None]:
HW[0x0c]

In [None]:
def correlation_trace_signature(traces, signatures, ith, trace_avg, trace_stddev):
    """Correlation between trace values and hamming value
    for the bnum-th byte and key "key_guess"."""
    hws = np.array([[HW[signature[bnum]] for signature in signatures]]).transpose()
    hws_avg = mean(hws)
    
    # print(f'{hws=} {hws_avg=}')
    
    assert len(np.isnan(hws)) != 0
    assert len(np.isnan(hws_avg)) != 0
    
    hws_stddev = std_dev(hws, hws_avg)
    assert len(np.isnan(hws_stddev)) != 0
    
    return cov(traces, trace_avg, hws, hws_avg)/trace_stddev/hws_stddev

In [None]:
from tqdm.notebook import tnrange

t_bar = np.sum(traces, axis=0)/len(traces)
o_t = np.sqrt(np.sum((traces - t_bar)**2, axis=0))

correlation_traces = list(range(32))

print(f'{t_bar=} {o_t=}')

for bnum in tnrange(0, 32):
    cpaoutput = correlation_trace_signature(
        traces,
        signatures,
        bnum,
        t_bar,
        o_t,
    )
    correlation_traces[bnum] = cpaoutput


In [None]:
print(correlation_traces)

In [None]:
np.argsort(np.abs(correlation_traces[0]))[::-1]

In [None]:
len(correlation_traces[0])

In [None]:
correlation_traces[0][2577:]

In [None]:
list(filter(lambda x: not np.isnan(x), correlation_traces[0]))

In [None]:
assert len(np.argwhere(np.isnan(o_t))) == 0

In [None]:
# check correlations
for idx in range(32):
    correlation = list(filter(lambda x: not np.isnan(x), correlation_traces[idx]))
    idxs_max = np.argsort(np.abs(correlation))[::-1][0]
    
    print(f'{idx=} {correlation[idxs_max]=}')

In [None]:
N = 32

fig = plt.figure(figsize=(10, N))

for idx in range(N):
    correlation = list(filter(lambda x: not np.isnan(x), correlation_traces[idx]))
    idxs_max = np.argsort(np.abs(correlation))[::-1]

    ax = plt.subplot(N, 1, idx + 1)
    ax.spines['top'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.set_ylabel('%dth input byte' % idx, rotation=0)
    ax.set_yticklabels([])
    ax.set_yticks([])
    ax.set_xticklabels([])

    plt.plot(correlation)

    for idx_max in idxs_max[:1]:
        plt.axvline(x=idx_max, label="%d" % idx_max, color='r', ls=":")
        
    # plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)

fig.tight_layout()
plt.show()