# Control Keyboard with your Muscle

If you use this script for your own research, please consider citing the associated study referenced alongside this script.

## Libraries

In [1]:
import sys
import serial
import time
import threading
import numpy as np
import scipy  
from scipy.signal import butter, filtfilt, iirnotch
import pyautogui
import tkinter as tk
import matplotlib 
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from collections import deque
import gc

In [2]:
print("Library Versions:")
print(f"Python: {sys.version.split()[0]}")
print(f"pyserial: {serial.__version__}")
print(f"numpy: {np.__version__}")
print(f"scipy: {scipy.__version__}")
print(f"pyautogui: {pyautogui.__version__}")
print(f"matplotlib: {matplotlib.__version__}")

Library Versions:
Python: 3.12.7
pyserial: 3.5
numpy: 1.26.4
scipy: 1.15.3
pyautogui: 0.9.54
matplotlib: 3.10.3


## EMG to Keypress

### Configuration

- Customize your EMG threshold for keypress triggering
- Adapt the keyboard output: choose any key to map (e.g., game shortcuts)
- Verify that the serial port configuration specify the correct COM port

In [None]:
PORT = "COM3"           # Serial port to which the acquisition device (e.g., Arduino/EMG board) is connected
TIMEOUT = 0.01          # Serial read timeout in seconds
THRESHOLD = 1000        # RMS threshold above which an event (i.e., keypress) is generated
WINDOW_SIZE = 500       # Number of samples in each analysis window (for filtering and RMS computation)
FS = 2000               # Sampling frequency of the EMG data (Hz)
BAUD_RATE = 230400      # Baud rate for serial communication
REFRESH_RATE = 0.01     # UI refresh interval in seconds (controls responsiveness of plot and label updates)
PLOT_LENGTH = 100       # Number of points shown in the RMS envelope plot (rolling window length for visualization)
KEY_TO_PRESS = 'a'      # The keypress generated when the RMS treshold has been reached.
low_bandpass = 10       # Lower cutoff frequency of the band-pass filter
high_bandpass = 250     # Higher cutoff frequency of the band-pass filter
notch_value = 50        # Center frequency value of the notch filter

### Start the reading and triggering

In [None]:
# Serial decoding
def decode_frame(data):
    """
    Decode EMG samples from the custom serial protocol.
    - Each sample is encoded in 2 bytes, with the MSB used as a marker.
    - Reconstructs 14-bit samples from the byte stream.
    """
    samples = []
    i = 0
    while i < len(data) - 1:
        if data[i] & 0x80:  # Check if MSB is set → start of new sample
            sample = ((data[i] & 0x7F) << 7) | (data[i + 1] & 0x7F)
            samples.append(sample)
            i += 1
        i += 1
    return samples


def remove_offset(signal):
    """Remove DC offset by subtracting the mean."""
    return signal - np.mean(signal)

def bandpass_filter(signal, lowcut, highcut, fs, order=3):
    """
    Apply Butterworth band-pass filter.
    Keeps EMG-relevant frequencies (10–250 Hz).
    """
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

def notch_filter(signal, freq, fs, quality=30):
    """
    Apply IIR notch filter to remove power line interference (defined in the function call as 50 Hz).
    Quality factor determines bandwidth of the notch.
    """
    nyquist = 0.5 * fs
    w0 = freq / nyquist
    b, a = iirnotch(w0, quality)
    return filtfilt(b, a, signal)

def calculate_rms(signal, window_size):
    """
    Compute root mean square (RMS) envelope of the signal.
    Sliding window defined by `window_size`.
    """
    squared = np.square(signal)
    window = np.ones(window_size) / window_size
    rms = np.sqrt(np.convolve(squared, window, mode='same'))
    return rms

# State
keypress_flag = False  # Prevents repeated keypresses while threshold is held ; if u want, turn it True
rms_history = deque(maxlen=PLOT_LENGTH) # Stores recent RMS values for plotting
buffer = deque(maxlen=FS) # Keep 1 second of EMG data (sampling rate FS)

# UI setup
window = tk.Tk()
window.title("EMG-Keypress Trigger Monitor")
window.geometry("600x400")
label_status = tk.Label(window, font=("Helvetica", 24))
label_status.pack()

# Plot setup
fig, ax = plt.subplots(figsize=(5, 2))
canvas = FigureCanvasTkAgg(fig, master=window)
canvas_widget = canvas.get_tk_widget()
canvas_widget.pack()
line, = ax.plot([], [], lw=2)
ax.set_ylim(0, 2000)
ax.set_xlim(0, PLOT_LENGTH)
ax.set_title("RMS over time")
ax.set_xlabel("Frame")
ax.set_ylabel("RMS")

def update_ui(triggered):
    """Update label status depending on whether a trigger event occurred."""
    label_status.config(text="Keypress triggered" if triggered else "", fg="green" if triggered else "black")

def update_plot():
    """Update the real-time RMS plot."""
    if rms_history:
        ydata = list(rms_history)
        line.set_data(range(len(ydata)), ydata)
        ax.set_xlim(0, len(ydata))
        ax.set_ylim(0, max(2000, max(ydata) + 100))
        canvas.draw()
    window.after(int(REFRESH_RATE * 1000), update_plot)

def read_and_trigger():
    """
    Read EMG data from serial port, filter it, compute RMS,
    and trigger a keypress when the threshold is exceeded.
    """
    global keypress_flag
    ser = serial.Serial(PORT, BAUD_RATE, timeout=TIMEOUT)
    ser.flushInput()
    ser.write(b"start:;\n")
    print("Serial started")

    try:
        while True:
            if ser.in_waiting:
                raw = ser.read(ser.in_waiting)
                samples = decode_frame(list(raw))
                buffer.extend(samples)

                if len(buffer) >= WINDOW_SIZE:
                    recent_data = np.array(list(buffer)[-WINDOW_SIZE:], dtype=np.float64)
                    filtered = remove_offset(recent_data)
                    filtered = bandpass_filter(filtered, low_bandpass, high_bandpass, FS)
                    filtered = notch_filter(filtered, notch_value, FS)

                    rms = calculate_rms(filtered, WINDOW_SIZE)
                    current_rms = rms[-1] if len(rms) else 0
                    rms_history.append(current_rms)

                    if current_rms > THRESHOLD and not keypress_flag:
                        keypress_flag = True
                        pyautogui.press(KEY_TO_PRESS) #HERE
                        update_ui(True)
                    elif current_rms <= THRESHOLD and keypress_flag:
                        keypress_flag = False
                        update_ui(False)

                    del recent_data, filtered, rms
                    gc.collect()
            time.sleep(REFRESH_RATE)

    except KeyboardInterrupt:
        ser.write(b"h:;\n")
        ser.close()
        print("Serial stopped")

# Start acquisition and launch UI

thread = threading.Thread(target=read_and_trigger, daemon=True)
thread.start()
update_plot()
window.mainloop()