In [None]:
%matplotlib ipympl
SCOPETYPE = "OPENADC"
PLATFORM = "CWHUSKY"
# SS_VER = "SS_VER_2_1"

%run "../Setup_Generic.ipynb"


import chipwhisperer as cw
from chipwhisperer.capture.scopes.OpenADC import OpenADC
from chipwhisperer.capture.targets.SimpleSerial2 import SimpleSerial2
from chipwhisperer.capture.api.programmers import XMEGAProgrammer
import matplotlib.pyplot as plt
import time
from pprint import pprint
import numpy as np
from typing import Optional, List, Set, Any, Tuple
print(type(scope))
print(type(target))
print(type(prog))
scope: OpenADC = scope
target: SimpleSerial2 = target
prog: XMEGAProgrammer = prog

print("Scope:\n", scope)

In [None]:
scope.vglitch_setup("both")
scope.glitch.output = "enable_only"
# scope.glitch.enabled = True

In [None]:
refresh_rate = 100e6

scope.adc.timeout = 5
scope.adc.samples = 118_000
# scope.clock.clkgen_src = "system" 
scope.clock.adc_mul = 1
scope.clock.clkgen_freq = refresh_rate
print("'Real freq': scope.clock.adc_freq:", scope.clock.adc_freq)
if scope.clock.adc_freq is not None:
    print(scope.clock.adc_freq / 1e6, "MHz")

print(scope.glitch.phase_shift_steps)

In [None]:
print(scope.clock.adc_mul)

In [None]:
from chipwhisperer.common.results.glitch import GlitchController

gc = GlitchController(groups=["success", "reset", "normal"], parameters=["width", "offset", "ext_offset", "repeat"])
gc.clear()
gc.display_stats()


In [None]:
import datetime
import sys

LOG_FILE = "glitch_log.txt"

def log_print(*args, **kwargs):
    timestamp = datetime.datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
    message = " ".join(str(a) for a in args)
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(f"{timestamp} {message}\n")
    print(f"{timestamp} {message}", **kwargs)

In [None]:
import paramiko
import threading
import time
import socket
import math

traces = []

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

TARGET_IP = "172.20.10.2"
USERNAME = "test"
PASSWORD = "test"

NORMAL_INCREMENT = 1
RESET_DECREMENT = 20

START_REPEAT = 8192
NUM_GLITCHES = 2

log_print("*** Starting a new trace ***")


def update_repeat(original_repeat: int, change: int, num_glitches: int) -> Tuple[int, List[int], List[int]]:
    # Returns the new repeat value, new scope.glitch.repeat list, and the new scope.glitch.ext_offset list
    if change > 0:
        new_repeat: int = min(original_repeat + NORMAL_INCREMENT, 8192)
    elif change < 0:
        new_repeat: int = max(original_repeat + NORMAL_INCREMENT, 1)
    else:
        assert(original_repeat >= 0 and original_repeat <= 8192)
        new_repeat: int = original_repeat

    return (new_repeat, [new_repeat] * num_glitches, [0] + [new_repeat] * (num_glitches - 1))


def connect_ssh():
    global client
    reset_duration = 0
    while True:
        try:
            client.connect(
                hostname=TARGET_IP,
                port=22,
                username=USERNAME,
                password=PASSWORD,
                timeout=5
            )
            log_print("[+] Connected to target")
            return
        except (
            paramiko.AuthenticationException,
            paramiko.SSHException,
            socket.error
        ) as e:
            log_print(f"[!] Target not reachable, waiting for reboot... (total time: {reset_duration}s)")
            print("Error type:", type(e))
            print("Error text:", e)
            time.sleep(5)
            reset_duration += 5
            continue

def reconnect_target():
    global client
    try:
        client.close()
    except Exception:
        pass
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    connect_ssh()

def is_ssh_alive():
    try:
        stdin, stdout, stderr = client.exec_command("echo alive", timeout=5)
        result = stdout.read().decode().strip()
        return result == "alive"
    except Exception:
        return False


def run_remote_command():
    time.sleep(0.2)
    try:
        stdin, stdout, stderr = client.exec_command(
            'touch /tmp/heartbeat; sudo systemctl start watchdog; ./trigger-test/rbp4/main',
            timeout = 5.0
        )

        output = stdout.read().decode('utf-8')
        err = stderr.read().decode('utf-8')

        print("Got output:", output)

        if output.startswith("10000 10000 100000000 0"):
            gc.add("normal")
        else:
            gc.add("success")
            log_print("HIT GLITCH:", output)

    except (paramiko.SSHException, socket.error):
        log_print("[!] Lost SSH connection during execution.")
        gc.add("reset")


connect_ssh()

gc.set_range("offset", 0, scope.glitch.phase_shift_steps)

gc.set_global_step([1])

scope.glitch.num_glitches = NUM_GLITCHES

rep, scope.glitch.repeat, scope.glitch.ext_offset = update_repeat(START_REPEAT, 0, scope.glitch.num_glitches)

for glitch_settings in gc.glitch_values():
    print(glitch_settings)

    # Increment the repeat value
    rep, scope.glitch.repeat, scope.glitch.ext_offset = update_repeat(rep, NORMAL_INCREMENT, scope.glitch.num_glitches)

    scope.glitch.offset = int(glitch_settings[1])
    log_print(f"Offset: {scope.glitch.offset}, Width: {scope.glitch.width}, Ext_offset: {scope.glitch.ext_offset}, Repeat: {scope.glitch.repeat}, Total repeat: {scope.glitch.repeat if isinstance(scope.glitch.repeat, int) else sum(scope.glitch.repeat)}")
    print(f"scope.adc.trig_count: {scope.adc.trig_count}")

    # If the target is offline, pause glitching until it's back
    while True:
        try:
            if is_ssh_alive():
                break
            else:
                log_print("[!] SSH inactive, waiting...")
                rep, scope.glitch.repeat, scope.glitch.ext_offset = update_repeat(rep, RESET_DECREMENT, scope.glitch.num_glitches)
                reconnect_target()
        except Exception as e:
            log_print("Found unhandled error case:", type(e), e)
            reconnect_target()

    # Normal glitch process
    scope.arm()
    thread = threading.Thread(target=run_remote_command)
    thread.start()

    ret = scope.capture()
    thread.join()

    traces.append(scope.get_last_trace())

    if ret:
        log_print('Timeout - no trigger')
        gc.add("reset")


In [None]:
plt.figure()
plt.plot(traces[0])
plt.show()