In [11]:

import soundfile as sf
import csv
import time
import threading
import sounddevice as sd
import numpy as np
from pynput import keyboard
from scipy.io.wavfile import write as wav_write

## CURRENT ISSUE : ESC is being recorded at the very end but the audio cuts, so its spectrogram is incomplete. Edit the code so that it is skipped

### The following code is used to record the audio and the keystrokes

*It is not used in the final project, but it is kept here for reference
*The audio recording will go on for 10 seconds. The keystroke recording will be done simultaneously but needs to be stopped manually by pressing ESC (escape)

Head's up : Dont forget to grant accessibility access the editor with which you're running the code below, in particular input monitoring for keystroke recording.


In [None]:
import numpy as np
import csv
import time
import sounddevice as sd
import threading
import scipy.io.wavfile as wav
from pynput import keyboard

# Parameters for sound recording
sample_rate = 44100  # Hz
duration = 60  # seconds
audio_data = np.zeros((duration * sample_rate, 1), dtype=np.int16)

# File paths
audio_file = 'keyboard_sound.wav'
log_file = 'key_log.csv'

# Initialize the keystroke log file
with open(log_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Key', 'Action', 'Timestamp'])

# Global variable to track the start time of audio recording
start_time_audio = None
stop_recording = False  # Flag to stop both audio and keyboard listeners

# Recording function for audio
def record_audio():
    global audio_data, start_time_audio
    print("Recording audio...")

    # Set the start time of the recording
    start_time_audio = time.time()  # Capture the timestamp when recording starts

    # Record the audio and store it in audio_data
    try : 
        audio_data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=2, dtype='int16')
        sd.wait()
        wav.write(audio_file, sample_rate, audio_data)
        print(f"Audio saved to {audio_file}")
    except:
        print("Your computer does not support stereo recording. Defaulting to mono.")
        audio_data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='int16')
        sd.wait()
        wav.write(audio_file, sample_rate, audio_data)
        print(f"Audio saved to {audio_file}")

    print("Audio recording finished")

    # Save the recorded audio to a WAV file

# Keystroke listener function
def on_press(key):
    if start_time_audio is None:
        return  # Don't log if the audio hasn't started yet

    timestamp = time.time() - start_time_audio  # Calculate relative timestamp
    try:
        key_str = key.char  # Normal keys
    except AttributeError:
        key_str = str(key)  # Special keys like shift, ctrl, etc.

    # Log the key press with relative timestamp
    with open(log_file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([key_str, 'Pressed', round(timestamp, 6)])  # Round for cleaner timestamp

    print(f"Key {key_str} Pressed at {timestamp:.6f} seconds")

def on_release(key):
    if start_time_audio is None:
        return  # Don't log if the audio hasn't started yet

    timestamp = time.time() - start_time_audio  # Calculate relative timestamp
    try:
        key_str = key.char
    except AttributeError:
        key_str = str(key)

    # Log the key release with relative timestamp
    with open(log_file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([key_str, 'Released', round(timestamp, 6)])  # Round for cleaner timestamp

    print(f"Key {key_str} Released at {timestamp:.6f} seconds")

    # Stop listener if 'Esc' key is pressed
    if key == keyboard.Key.esc:
        global stop_recording
        stop_recording = True  # Set flag to stop both recordings
        return False

# Start recording audio in a separate thread
audio_thread = threading.Thread(target=record_audio)
audio_thread.start()

# Start the keyboard listener in the main thread to avoid blocking
def start_keyboard_listener():
    with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
        listener.join()

# Run the keyboard listener in a separate thread
keyboard_thread = threading.Thread(target=start_keyboard_listener)
keyboard_thread.start()

# Wait for both threads to finish, while checking for stop condition
while not stop_recording:
    time.sleep(0.1)  # Prevent high CPU usage by sleeping briefly

# Once 'Esc' is pressed, both threads will finish
audio_thread.join()
keyboard_thread.join()
print("Recording process finished.")

Version 2 : continuous audio recording with buffer

In [5]:
import numpy as np
import csv
import time
import sounddevice as sd
import threading
import scipy.io.wavfile as wav
from pynput import keyboard

# Parameters for sound recording
sample_rate = 44100  # Hz
channels = 2  # Try stereo first
audio_buffer = []  # Buffer to store audio data

# File paths
audio_file = 'keyboard_sound_dell.wav'
log_file = 'key_log_dell.csv'

# Initialize the keystroke log file
with open(log_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Key', 'Action', 'Timestamp'])

# Global variables
start_time_audio = None
stop_recording = False

def audio_callback(indata, frames, time, status):
    """Callback function for audio streaming"""
    if status:
        print(f"Audio callback status: {status}")
    audio_buffer.append(indata.copy())

def record_audio():
    global start_time_audio, stop_recording
    print("Recording audio...")
    
    # Set the start time of the recording
    start_time_audio = time.time()
    
    try:
        # Try stereo recording first
        with sd.InputStream(samplerate=sample_rate, channels=channels, callback=audio_callback):
            while not stop_recording:
                time.sleep(0.01)
    except Exception as e:
        print("Your computer does not support stereo recording. Defaulting to mono.")
        # Try mono recording
        with sd.InputStream(samplerate=sample_rate, channels=1, callback=audio_callback):
            while not stop_recording:
                time.sleep(0.01)
    
    # When stopped, save the recorded audio
    if audio_buffer:
        audio_data = np.concatenate(audio_buffer, axis=0)
        wav.write(audio_file, sample_rate, audio_data)
        print(f"Audio saved to {audio_file}")
    
    print("Audio recording finished")

# Keystroke listener function
def on_press(key,debug=False):
    """"
    helper function to listen for keystrokes and record them on a csv file
    :key : keyboard key object
    """
    if start_time_audio is None:
        return  # Don't log if the audio hasn't started yet

    timestamp = time.time() - start_time_audio  # Calculate relative timestamp
    try:
        key_str = key.char  # Normal keys
    except AttributeError:
        key_str = str(key)  # Special keys like shift, ctrl, etc.

    # Log the key press with relative timestamp
    with open(log_file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([key_str, 'Pressed', round(timestamp, 6)])  # Round for cleaner timestamp

    if debug:
        print(f"Key {key_str} Pressed at {timestamp:.6f} seconds")

def on_release(key,debug=False):
    """"
    helper function to listen for keystrokes and record them on a csv file
    :key : keyboard key object
    """
    if start_time_audio is None:
        return  # Don't log if the audio hasn't started yet

    timestamp = time.time() - start_time_audio  # Calculate relative timestamp
    try:
        key_str = key.char
    except AttributeError:
        key_str = str(key)

    # Log the key release with relative timestamp
    with open(log_file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([key_str, 'Released', round(timestamp, 6)])  # Round for cleaner timestamp

    if debug:
        print(f"Key {key_str} Released at {timestamp:.6f} seconds")

    # Stop listener if 'Esc' key is pressed
    if key == keyboard.Key.esc:
        global stop_recording
        stop_recording = True  # Set flag to stop both recordings
        return False

# Start recording audio in a separate thread
audio_thread = threading.Thread(target=record_audio)
audio_thread.start()

# Start the keyboard listener in the main thread to avoid blocking
def start_keyboard_listener():
    with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
        listener.join()

# Run the keyboard listener in a separate thread
keyboard_thread = threading.Thread(target=start_keyboard_listener)
keyboard_thread.start()

# Wait for both threads to finish, while checking for stop condition
while not stop_recording:
    time.sleep(0.01)  # Prevent high CPU usage by sleeping briefly

# Once 'Esc' is pressed, both threads will finish
audio_thread.join()
keyboard_thread.join()
print("Recording process finished.")

This process is not trusted! Input event monitoring will not be possible until it is added to accessibility clients.


Recording audio...
Your computer does not support stereo recording. Defaulting to mono.
Audio saved to keyboard_sound_dell.wav
Audio recording finished
Recording process finished.


In [3]:
print(sd.query_devices())

> 0 MacBook Pro Microphone, Core Audio (1 in, 0 out)
< 1 MacBook Pro Speakers, Core Audio (0 in, 2 out)
  2 Livebox-3E55 Microphone, Core Audio (1 in, 0 out)
  3 Microsoft Teams Audio, Core Audio (1 in, 1 out)


In [8]:
# Load the audio file
audio_path = "keyboard_sound_dell.wav"
audio_data, sample_rate = sf.read(audio_path)

# Read keystroke timestamps from CSV
keystroke_times = []
with open("key_log_dell.csv", "r") as audio_data_file:
    reader = csv.reader(audio_data_file)
    stack = {}

    for row in reader:
        if row[0] == "Key":
            continue
        key = row[0]
        action = row[1]
        timestamp = float(row[2])

        if action == "Pressed":
            if key in stack:
                raise Exception(f"Key {key} was pressed again before being released.")
            stack[key] = [key, timestamp]

        elif action == "Released":
            if key not in stack:
                raise Exception(f"Key {key} was released without being pressed.")
            stack[key].append(timestamp)
            keystroke_times.append(stack[key])
            del stack[key]

print(keystroke_times)


[['q', 2.634055, 2.777125], ['w', 3.096101, 3.2252], ['e', 3.496931, 3.62381], ['r', 3.914863, 4.039945], ['t', 4.327441, 4.487732], ['y', 5.563664, 5.673083], ['u', 5.945352, 6.056883], ['i', 6.328603, 6.440968], ['o', 6.779723, 6.888081], ['p', 7.224409, 7.354256], ['a', 8.00909, 8.152872], ['d', 8.392357, 8.503936], ['s', 8.777921, 8.887747], ['f', 9.097146, 9.208613], ['r', 9.450198, 9.545049], ['t', 9.752896, 9.886871], ['f', 10.103841, 10.199934], ['g', 10.440331, 10.569712], ['r', 10.860121, 10.952574], ['h', 11.256158, 11.385096], ['t', 11.655683, 11.783825], ['y', 12.073187, 12.16891], ['u', 12.40826, 12.537012], ['i', 12.824201, 12.936777], ['o', 13.209489, 13.35215], ['h', 13.639865, 13.768635], ['g', 13.992661, 14.088684], ['j', 14.3301, 14.473145], ['t', 14.695626, 14.808795], ['b', 15.048379, 15.192514], ['v', 15.479499, 15.591864], ['c', 15.838812, 15.949384], ['x', 16.201571, 16.316002], ['z', 16.58466, 16.682129], ['s', 17.063991, 17.17603], ['a', 17.496237, 17.592574]

## The code that generates individual spectrograms for each keystroke

In [9]:
import torch
print(f"CUDA available: {torch.cuda.is_available()}")

CUDA available: False


In [10]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.io.wavfile as wav
import scipy.signal as signal
import csv
import os

# Parameters
AUDIO_FILE = "keyboard_sound_dell.wav"  # Replace with your actual audio file
KEYSTROKE_CSV = "key_log_dell.csv"
OUTPUT_DIR = "keystroke_spectrograms/keystroke_spectrograms_dell"
NUMPY_OUTPUT_DIR = "keystroke_spectrograms/numpy_arrays_dell"  # New directory for NumPy arrays

## Hyperparameter to fine-tune
BUFFER_TIME = 0.2  # Extra time (seconds) before & after each keystroke

# Ensure output directories exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(NUMPY_OUTPUT_DIR, exist_ok=True)

# Load audio data
sample_rate, audio_data = wav.read(AUDIO_FILE)

# Read keystroke data from CSV
keystroke_times = []
with open(KEYSTROKE_CSV, "r") as file:
    reader = csv.reader(file)
    next(reader)  # Skip header
    stack = {}

    for row in reader:
        key, action, timestamp = row[0], row[1], float(row[2])

        if action == "Pressed":
            stack[key] = timestamp  # Store press time
        elif action == "Released" and key in stack:
            keystroke_times.append((key, stack.pop(key), timestamp))  # Store key, press, and release

# Function to create and save the spectrogram and numpy arrays
def create_spectrogram_and_numpy(audio_segment, key, idx):
    # Generate the spectrogram using scipy
    f, t, Sxx = signal.spectrogram(audio_segment, sample_rate)

    # Normalize spectrogram for neural network (optional)
    Sxx_log = 10 * np.log10(Sxx + 1e-10)  # Adding small constant to avoid log(0)

    # Plot the spectrogram
    plt.figure(figsize=(10, 4))
    plt.pcolormesh(t, f, Sxx_log, shading='auto', cmap='inferno')
    plt.colorbar(label='Power (dB)')
    plt.title(f"Keystroke '{key}' Spectrogram")
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")

    # Save the spectrogram to file as PNG
    spectrogram_path = os.path.join(OUTPUT_DIR, f"keystroke_{idx + 1}_{key}.png")
    plt.savefig(spectrogram_path)
    plt.close()
    print(f"Saved spectrogram for '{key}' at {spectrogram_path}")

    # Save the spectrogram as a NumPy array
    numpy_array_path = os.path.join(NUMPY_OUTPUT_DIR, f"keystroke_{idx + 1}_{key}.npy")
    np.save(numpy_array_path, Sxx_log)  # Save the log-scaled spectrogram as a numpy array
    print(f"Saved NumPy array for '{key}' at {numpy_array_path}")

# Process keystrokes
for idx, (key, press_time, release_time) in enumerate(keystroke_times):
    # Define start and end times (add buffer)
    start_time = max(0, press_time - BUFFER_TIME)
    end_time = min(len(audio_data) / sample_rate, release_time + BUFFER_TIME)

    # Convert times to sample indices
    start_sample = int(start_time * sample_rate)
    end_sample = int(end_time * sample_rate)

    # Extract audio segment
    keystroke_audio = audio_data[start_sample:end_sample]

    if len(keystroke_audio) == 0:
        print(f"Warning: Empty audio segment for keystroke {idx + 1}")
        continue

    # Create and save the spectrogram and numpy array for this key
    create_spectrogram_and_numpy(keystroke_audio, key, idx)

print("Processing complete. Spectrograms and NumPy arrays saved.")


Saved spectrogram for 'q' at keystroke_spectrograms/keystroke_spectrograms_dell/keystroke_1_q.png
Saved NumPy array for 'q' at keystroke_spectrograms/numpy_arrays_dell/keystroke_1_q.npy
Saved spectrogram for 'w' at keystroke_spectrograms/keystroke_spectrograms_dell/keystroke_2_w.png
Saved NumPy array for 'w' at keystroke_spectrograms/numpy_arrays_dell/keystroke_2_w.npy
Saved spectrogram for 'e' at keystroke_spectrograms/keystroke_spectrograms_dell/keystroke_3_e.png
Saved NumPy array for 'e' at keystroke_spectrograms/numpy_arrays_dell/keystroke_3_e.npy
Saved spectrogram for 'r' at keystroke_spectrograms/keystroke_spectrograms_dell/keystroke_4_r.png
Saved NumPy array for 'r' at keystroke_spectrograms/numpy_arrays_dell/keystroke_4_r.npy
Saved spectrogram for 't' at keystroke_spectrograms/keystroke_spectrograms_dell/keystroke_5_t.png
Saved NumPy array for 't' at keystroke_spectrograms/numpy_arrays_dell/keystroke_5_t.npy
Saved spectrogram for 'y' at keystroke_spectrograms/keystroke_spectrog