# PyTorch Micro Controller Benchmark

Benchmark PyTorch Models and generate Compatibility list for micro controllers.

In [1]:
# Install packages

# %pip install torch torchaudio omegaconf soundfile numpy prettytable transformers pocketsphinx

In [2]:
# Imports

import os
import gc
import psutil
import wave
import platform
import torch
import numpy as np
from glob import glob
from timeit import default_timer
from typing import TypedDict, Callable
from prettytable import PrettyTable
from torch.profiler import profile, record_function, ProfilerActivity
from transformers import AutoProcessor, HubertForCTC
from pocketsphinx import Decoder

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Types

class Model(TypedDict):
    name: str
    num_inferred_samples: int
    infer: Callable[[None], list[str]]
    is_pytorch: bool

class MicroController(TypedDict):
    name: str
    architecture: str
    memory_mb: float
    cpu_speed_ghz: float

class ModelResults:
    name: str
    cpu_time_total_ms: float | None
    mean_inference_time_ms: float
    mean_memory_usage_mb: float
    m_flops: float | None
    samples_per_cpu_second: float | None
    samples_per_inference_second: float

class ModelMicroControllerResults:
    model_name: str
    estimated_cpu_time_total_ms: float | None
    estimated_inference_time_ms: float
    estimated_samples_per_cpu_second: float | None
    estimated_samples_per_inference_second: float
    memory_usage_percentage: float
    compatible: bool

class BenchmarkOptions:
    cpu_speed_ghz: float
    target_sampling_rate_khz: float
    logfile_path: str

Log = Callable[[str], None]

CreateModel = Callable[[None], Model]

## Benchmark Definition

In [4]:
def byte_to_mb(byte: int) -> int:
    return byte / (1024 ** 2)

def create_logger(logifle_path: str) -> Log:
    def __log(msg: str):
        with open(logifle_path, 'a') as f:
            f.write(f'{msg}\n' )
        print(msg)
    return __log

def log_hash_comment(content: str, log: Log):
    content_str = f'# {content} #'
    num_hashes = len(content_str)
    hashes = '#' * num_hashes
    log(
        f'{hashes}\n'
        f'{content_str}\n'
        f'{hashes}\n'
        '\n'
    )

In [5]:
def benchmark_pytorch_model(model: Model, log: Log, row_limit = 5, iterations = 5) -> ModelResults:

    #####################
    # Instantiate Model #
    #####################

    gc.collect()

    model_name = model['name']
    infer = model['infer']
    num_samples = model['num_inferred_samples']

    log_hash_comment(model_name, log)

    ########################
    # Run PyTorch Profiler #
    ########################

    with profile(activities=[ProfilerActivity.CPU], profile_memory=True, with_flops=True, record_shapes=True, with_stack=True) as prof:
        with record_function("model_inference"):
            _out = model['infer']()

    key_averages = prof.key_averages()
    total_average = key_averages.total_average()

    key_averages.table()

    cpu_time_ms = total_average.cpu_time_total * 0.001
    self_cpu_time_ms = total_average.self_cpu_time_total * 0.001
    cpu_memory_usage_mb = byte_to_mb(total_average.cpu_memory_usage)
    self_cpu_memory_usage_mb = byte_to_mb(total_average.self_cpu_memory_usage)
    m_flops = total_average.flops * 0.000001

    log(f'--- PyTorch Profile: {model_name} ---\n')


    log(
        f'CPU time top {row_limit}\n'
        f'{key_averages.table(sort_by="cpu_time_total", row_limit=row_limit)}'
    )

    log(
        f'CPU memory usage top {row_limit}\n'
        f'{key_averages.table(sort_by="cpu_memory_usage", row_limit=row_limit)}'
    )

    log(
        f'MFLOPs top {row_limit}\n'
        f'{key_averages.table(sort_by="flops", row_limit=row_limit)}'
    )

    log(
        f'Total averages\n'
        f'CPU time total [ms]: {cpu_time_ms}\n' 
        f'Self CPU time total [ms]: {self_cpu_time_ms}\n'
        f'CPU memory usage [Mb]: {cpu_memory_usage_mb}\n'
        f'Self CPU memory usage [Mb]: {self_cpu_memory_usage_mb}\n'
        f'MFLOPs: {m_flops}\n'
    )

    ###############################
    # Run psutil memory_full_info #
    ###############################

    gc.collect()

    process = psutil.Process(os.getpid())
    psutil_mem_rss_b: list[int] = []
    psutil_mem_uss_b: list[int] = []
    for _ in range(iterations):
        memory_info = process.memory_full_info()
        psutil_mem_rss_b.append(memory_info.rss)
        psutil_mem_uss_b.append(memory_info.uss)
        _out = infer()

    psutil_mem_rss_b = np.array(psutil_mem_rss_b)
    psutil_mem_uss_b = np.array(psutil_mem_uss_b)
    mean_psutil_mem_rss_b = np.mean(psutil_mem_rss_b)
    mean_psutil_mem_uss_b = np.mean(psutil_mem_uss_b)
    std_psutil_mem_rss = np.std(psutil_mem_rss_b)
    std_psutil_mem_uss = np.std(psutil_mem_uss_b)

    log(f'--- Psutil memory_full_info: {model_name} ---\n')

    log(
        f'Over {iterations} iterations\n'
        f'Mean RSS [Mb]: {byte_to_mb(mean_psutil_mem_rss_b)}\n'
        f'Std RSS: {byte_to_mb(std_psutil_mem_rss)}\n'
        f'Mean USS [Mb]: {byte_to_mb(mean_psutil_mem_uss_b)}\n'
        f'Std USS: {byte_to_mb(std_psutil_mem_uss)}\n'
    )

    ############################
    # Run timeit default_timer #
    ############################

    gc.collect()

    inference_times_ms: list[int] = []
    for _ in range(iterations):
        start = default_timer()
        _out = infer()
        end = default_timer()
        inference_times_ms.append((end - start) * 1000)
    
    inference_times_ms = np.array(inference_times_ms)
    mean_inference_time_ms = np.mean(inference_times_ms)
    std_inference_time = np.std(inference_times_ms)


    log(f'--- Timeit default_timer: {model_name} ---\n')

    log(
        f'Over {iterations} iterations\n'
        f'Mean inference time [ms]: {mean_inference_time_ms}\n'
        f'Std inference time: {std_inference_time}\n'
    )

    #############################
    # Calculate overall results #
    #############################

    mean_memory_usage_mb = byte_to_mb(np.mean(np.array([mean_psutil_mem_rss_b, mean_psutil_mem_uss_b])))  # cpu_memory_usage_mb
    samples_per_cpu_second = num_samples / (self_cpu_time_ms * 0.001)
    samples_per_inference_second = num_samples / (mean_inference_time_ms * 0.001)

    results: ModelResults = {
        'name': model_name,
        'cpu_time_total_ms': self_cpu_time_ms,
        'mean_inference_time_ms': mean_inference_time_ms,
        'mean_memory_usage_mb': mean_memory_usage_mb,
        'm_flops': m_flops,
        'samples_per_cpu_second': samples_per_cpu_second,
        'samples_per_inference_second': samples_per_inference_second,
    }

    log(f'--- Overall results: {model_name} ---\n')

    log(
        f'Mean memory usage [Mb]: {mean_memory_usage_mb}\n'
        f'Samples per CPU second: {samples_per_cpu_second}\n'
        f'Samples per inference second: {samples_per_inference_second}\n'
        '\n'
    )

    return results


In [6]:
def benchmark_unknown_model(model: Model, log: Log, iterations = 5) -> ModelResults:

    #####################
    # Instantiate Model #
    #####################

    gc.collect()

    model_name = model['name']
    infer = model['infer']
    num_samples = model['num_inferred_samples']

    log_hash_comment(model_name, log)

    ###############################
    # Run psutil memory_full_info #
    ###############################

    gc.collect()

    process = psutil.Process(os.getpid())
    psutil_mem_rss_b: list[int] = []
    psutil_mem_uss_b: list[int] = []
    for _ in range(iterations):
        memory_info = process.memory_full_info()
        psutil_mem_rss_b.append(memory_info.rss)
        psutil_mem_uss_b.append(memory_info.uss)
        _out = infer()

    psutil_mem_rss_b = np.array(psutil_mem_rss_b)
    psutil_mem_uss_b = np.array(psutil_mem_uss_b)
    mean_psutil_mem_rss_b = np.mean(psutil_mem_rss_b)
    mean_psutil_mem_uss_b = np.mean(psutil_mem_uss_b)
    std_psutil_mem_rss = np.std(psutil_mem_rss_b)
    std_psutil_mem_uss = np.std(psutil_mem_uss_b)

    log(f'--- Psutil memory_full_info: {model_name} ---\n')

    log(
        f'Over {iterations} iterations\n'
        f'Mean RSS [Mb]: {byte_to_mb(mean_psutil_mem_rss_b)}\n'
        f'Std RSS: {byte_to_mb(std_psutil_mem_rss)}\n'
        f'Mean USS [Mb]: {byte_to_mb(mean_psutil_mem_uss_b)}\n'
        f'Std USS: {byte_to_mb(std_psutil_mem_uss)}\n'
    )

    ############################
    # Run timeit default_timer #
    ############################

    gc.collect()

    inference_times_ms: list[int] = []
    for _ in range(iterations):
        start = default_timer()
        _out = infer()
        end = default_timer()
        inference_times_ms.append((end - start) * 1000)
    
    inference_times_ms = np.array(inference_times_ms)
    mean_inference_time_ms = np.mean(inference_times_ms)
    std_inference_time = np.std(inference_times_ms)


    log(f'--- Timeit default_timer: {model_name} ---\n')

    log(
        f'Over {iterations} iterations\n'
        f'Mean inference time [ms]: {mean_inference_time_ms}\n'
        f'Std inference time: {std_inference_time}\n'
    )

    #############################
    # Calculate overall results #
    #############################

    mean_memory_usage_mb = byte_to_mb(np.mean(np.array([mean_psutil_mem_rss_b, mean_psutil_mem_uss_b])))
    samples_per_inference_second = num_samples / (mean_inference_time_ms * 0.001)

    results: ModelResults = {
        'name': model_name,
        'cpu_time_total_ms': None,
        'mean_inference_time_ms': mean_inference_time_ms,
        'mean_memory_usage_mb': mean_memory_usage_mb,
        'm_flops': None,
        'samples_per_cpu_second': None,
        'samples_per_inference_second': samples_per_inference_second,
    }

    log(f'--- Overall results: {model_name} ---\n')

    log(
        f'Mean memory usage [Mb]: {mean_memory_usage_mb}\n'
        f'Samples per inference second: {samples_per_inference_second}\n'
        '\n'
    )

    return results


In [7]:
def benchmark_model(create_model: CreateModel, log: Log) -> ModelResults:
    # Instantiate model
    model = create_model()
    is_pythorch_model = model['is_pytorch']
    # Benchmark based on model type
    if is_pythorch_model:
        results = benchmark_pytorch_model(model, log)
    else:
        results = benchmark_unknown_model(model, log)
    # Delete model from memory before running next benchmark
    del model
    return results

In [8]:
def benchmark_model_micro_controller(model_results: ModelResults, micro_controller: MicroController, options: BenchmarkOptions) -> ModelMicroControllerResults:
    controller_cpu_speed_ghz = micro_controller['cpu_speed_ghz']
    benchmark_cpu_speed_ghz = options['cpu_speed_ghz']
    cpu_time_factor = benchmark_cpu_speed_ghz / controller_cpu_speed_ghz

    if model_results['cpu_time_total_ms'] is not None:
        estimated_cpu_time_total_ms = model_results['cpu_time_total_ms'] * cpu_time_factor
    else:
        estimated_cpu_time_total_ms = None
    estimated_inference_time_ms = model_results['mean_inference_time_ms'] * cpu_time_factor

    if model_results['samples_per_cpu_second'] is not None:
        estimated_samples_per_cpu_second = model_results['samples_per_cpu_second'] / cpu_time_factor
    else:
        estimated_samples_per_cpu_second = None
    estimated_samples_per_inference_second = model_results['samples_per_inference_second'] / cpu_time_factor

    memory_usage_percentage = (model_results['mean_memory_usage_mb'] / micro_controller['memory_mb']) * 100

    target_sampling_rate_hz = options['target_sampling_rate_khz'] * 1000

    compatible = (
        # estimated_samples_per_cpu_second >= target_sampling_rate_hz 
        estimated_samples_per_inference_second >= target_sampling_rate_hz
        and memory_usage_percentage <= 100
    )

    results: ModelMicroControllerResults = {
        'model_name': model_results['name'],
        'estimated_cpu_time_total_ms': estimated_cpu_time_total_ms,
        'estimated_inference_time_ms': estimated_inference_time_ms,
        'estimated_samples_per_cpu_second': estimated_samples_per_cpu_second,
        'estimated_samples_per_inference_second': estimated_samples_per_inference_second,
        'memory_usage_percentage': memory_usage_percentage,
        'compatible': compatible,
    }

    return results


In [9]:
def benchmark(create_models: list[CreateModel], micro_controllers: list[MicroController], options: BenchmarkOptions):
    ##################
    # Create logfile #
    ##################

    logfile_path = options['logfile_path']
    f = open(logfile_path, 'w')
    f.close()

    log = create_logger(logfile_path)

    ###################
    # Log system info #
    ###################

    machine = platform.machine()
    system = platform.system()
    version = platform.version()
    processor = platform.processor()
    ram = psutil.virtual_memory().total / (1024.0 **3)

    log(
        f'--- Running benchmark on ---\n'
        '\n'
        f'Arch: {machine}\n'
        f'Platform: {system} {version}\n'
        f'CPU: {processor}, {options["cpu_speed_ghz"]} GHz\n'
        f'RAM: {ram} GB\n'
    )


    ###############################
    # Benchmark individual models #
    ###############################

    log(
        f'--- Benchmarking {len(create_models)} models ---\n'
        '\n'
    )
    models_results = [benchmark_model(create_model, log) for create_model in create_models]

    ##########################################
    # Benchmark models for micro controllers #
    ##########################################

    log(
        f'\n--- Benchmarking {len(create_models)} models for {len(micro_controllers)} micro controllers ---\n'
        '\n'
    )

    for micro_controller in micro_controllers:
        log_hash_comment(micro_controller['name'], log)
        log(
            'Info\n'
            f'Architecture: {micro_controller["architecture"]}\n'
            f'Memory [Mb]: {micro_controller["memory_mb"]}\n'
            f'CPU speed [GHz]: {micro_controller["cpu_speed_ghz"]}\n'
        )
        table = PrettyTable(
            ['Model', 'Estimated CPU time [ms]', 'Estimated inference time [ms]', 'Estimated samples per CPU second', 'Estimated samples per inference second', 'Memory usage %', 'COMPATIBLE']
        )
        results = [benchmark_model_micro_controller(model_result, micro_controller, options).values() for model_result in models_results]
        table.add_rows(results)
        log(table.get_string())
        log('\n')

### Interpret PyTorch Profiler results

References:
- [Recipe](https://h-huang.github.io/tutorials/recipes/recipes/profiler_recipe.html)

#### CPU time

CPU time vs self CPU time: operators can call other operators -> self cpu time excludes time spent in children operator calls, while total cpu time includes it

#### Memory usage

- Shows amount of memory used by the model’s tensors:
- That was allocated (or released) during the execution of the model’s operators

Self memory: corresponds to the memory allocated (released) by the operator, excluding the children calls to the other operators

## Speech Recognition Models

### PyTorch Silero

In [10]:
####################################################
# Use Silero utils to download model and test file #
####################################################

# Always use CPU (simulate run on micro controller)
device = torch.device('cpu')  

# Download model, decoder and utils
model, decoder, utils = torch.hub.load(repo_or_dir='snakers4/silero-models',
                                       model='silero_stt',
                                       language='en', # also available 'de', 'es'
                                       device=device
)
(read_batch, split_into_batches, _ , prepare_model_input) = utils  # see function signature for details

# Download a single test file, any format compatible with TorchAudio (soundfile backend)
torch.hub.download_url_to_file('https://opus-codec.org/static/examples/samples/speech_orig.wav',
                            dst ='speech_orig.wav', progress=True)
test_files = glob('speech_orig.wav')

######################################
# Get number of samples in test data #
######################################

audio_file = wave.open(test_files[0], 'r')
sampling_rate = audio_file.getframerate()
num_samples = audio_file.getnframes()

print(
    f'\n--- Input statistics ---\n'
    f'Sampling rate: {sampling_rate} Hz\n'
    f'Number of samples: {num_samples}\n'
)

Using cache found in C:\Users\Moritz/.cache\torch\hub\snakers4_silero-models_master
100%|██████████| 0.99M/0.99M [00:00<00:00, 1.19MB/s]


--- Input statistics ---
Sampling rate: 48000 Hz
Number of samples: 518400






In [11]:
#######################
# Create SILERO model #
#######################

def create_silero_model():
    # Prepare input data
    batches = split_into_batches(test_files, batch_size=10)

    #################################
    # Create model for benchmarking #
    #################################

    def infer_silero():
        batch_tensor = read_batch(batches[0])
        input = prepare_model_input(batch_tensor, device=device)
        output = model(input)
        return [decoder(example.cpu()) for example in output]

    silero_model: Model = {
        'name': 'Silero',
        'num_inferred_samples': num_samples,
        'infer': infer_silero,
        'is_pytorch': True,
    }

    return silero_model

#######################################
# Run model inference and log results #
#######################################

silero_model = create_silero_model()
transcription = silero_model['infer']()
print(transcription)

del silero_model

["the boch canoe slit on the smooth planks blew the sheet to the dark blue background it's easy to tell a depth of a well four hours of steady work faced us"]


In [12]:
%pip freeze

antlr4-python3-runtime==4.9.3
asttokens @ file:///home/conda/feedstock_root/build_artifacts/asttokens_1670263926556/work
backcall @ file:///home/conda/feedstock_root/build_artifacts/backcall_1592338393461/work
backports.functools-lru-cache @ file:///home/conda/feedstock_root/build_artifacts/backports.functools_lru_cache_1618230623929/work
certifi==2023.5.7
cffi==1.15.1
charset-normalizer==3.1.0
colorama @ file:///home/conda/feedstock_root/build_artifacts/colorama_1666700638685/work
contourpy==1.0.7
cycler==0.11.0
debugpy @ file:///C:/ci_310/debugpy_1642079916595/work
decorator @ file:///home/conda/feedstock_root/build_artifacts/decorator_1641555617451/work
executing @ file:///home/conda/feedstock_root/build_artifacts/executing_1667317341051/work
filelock==3.12.0
flops-profiler==0.1.2
fonttools==4.39.4
fsspec==2023.5.0
huggingface-hub==0.15.1
idna==3.4
importlib-metadata @ file:///home/conda/feedstock_root/build_artifacts/importlib-metadata_1682176699712/work
ipykernel @ file:///D:/bld/

### PocketSphinx

Reference: [PyPI](https://pypi.org/project/pocketsphinx/), [Example](https://github.com/cmusphinx/pocketsphinx/blob/master/examples/simple.py)

In [13]:
#############################
# Create PocketSphinx model #
#############################

def create_pocket_sphinx_model():

    # Configure decoder
    decoder = Decoder(samprate=sampling_rate)

    #################################
    # Create model for benchmarking #
    #################################

    def infer_pocket_sphinx():
        with wave.open('speech_orig.wav', 'rb') as audio:
            decoder.start_utt()
            decoder.process_raw(audio.getfp().read(), full_utt=True)
            decoder.end_utt()
            return decoder.hyp().hypstr
       

    pocket_sphinx_model: Model = {
        'name': 'PocketSphinx',
        'num_inferred_samples': num_samples,
        'infer': infer_pocket_sphinx,
        'is_pytorch': False,
    }

    return pocket_sphinx_model

#######################################
# Run model inference and log results #
#######################################

pocket_sphinx_model = create_pocket_sphinx_model()
transcription = pocket_sphinx_model['infer']()
print(transcription)

del pocket_sphinx_model

the pitch kinnear slipped on the snooze planks linda say to the doc the loop act grounds it's easy to tell him that the well for allies and steady work face death


### HuBERT

Reference: [Hugging Face](https://huggingface.co/docs/transformers/model_doc/hubert)

In [14]:
#######################
# Create HuBERT model #
#######################

def create_hubert_model():
    # Download model
    model = HubertForCTC.from_pretrained("facebook/hubert-large-ls960-ft")
    processor = AutoProcessor.from_pretrained("facebook/hubert-large-ls960-ft")

    # Prepare inputs
    batches = split_into_batches(test_files, batch_size=10)

    #################################
    # Create model for benchmarking #
    #################################

    def infer_hubert():
        inputs = processor(read_batch(batches[0])[0], sampling_rate=16_000, return_tensors="pt")
        with torch.no_grad():
            logits = model(**inputs).logits
            predicted_ids = torch.argmax(logits, dim=-1)
        return processor.batch_decode(predicted_ids)

    hubert_model: Model = {
        'name': 'HuBERT',
        'num_inferred_samples': num_samples,
        'infer': infer_hubert,
        'is_pytorch': True,
    }

    return hubert_model

#######################################
# Run model inference and log results #
#######################################

pocket_sphinx_model = create_hubert_model()
transcription = pocket_sphinx_model['infer']()
print(transcription)

del pocket_sphinx_model

["THE BIRCH CANOE SLID ON THE SMOOTH PLANKS GLUE THE SHEET TO THE DARK BLUE BACKGROUND IT'S EASY TO TELL THE DEPTH OF A WELL FOUR HOURS OF STEADY WORK FACED US"]


## Micro Controllers

### ESP32

In [15]:
############################################
# Create micro controller for benchmarking #
############################################

esp32: MicroController = {
    'name': 'ESP32',
    'architecture': '32-bit RISC-V',
    'cpu_speed_ghz': 0.24,
    'memory_mb': 0.23,
}


### Raspberry Pi Zero

In [16]:
############################################
# Create micro controller for benchmarking #
############################################

pi_zero: MicroController = {
    'name': 'Raspberry Pi Zero',
    'architecture': '32-bit ARM',
    'cpu_speed_ghz': 1,
    'memory_mb': 512,
}


### BeagleBone Black

In [17]:
############################################
# Create micro controller for benchmarking #
############################################

beagle_bone_black: MicroController = {
    'name': 'BeagleBone Black',
    'architecture': 'ARM Cortex-A8',
    'cpu_speed_ghz': 1,
    'memory_mb': 512,
}

### Raspberry Pi 3B

In [18]:
############################################
# Create micro controller for benchmarking #
############################################

pi_3_b: MicroController = {
    'name': 'Raspberry Pi 3 B',
    'architecture': '64-bit ARM',
    'cpu_speed_ghz': 1.2,
    'memory_mb': 1000,
}


### Raspberry Pi 4 Model B 4Go

In [19]:
############################################
# Create micro controller for benchmarking #
############################################

pi_4_b: MicroController = {
    'name': 'Raspberry Pi 4 Model B 4Go',
    'architecture': '64-bit ARM',
    'cpu_speed_ghz': 1.5,
    'memory_mb': 4000,
}


## Run Benchmark

In [20]:
#########################################
# Register models and micro controllers #
#########################################

models: list[Model] = [create_pocket_sphinx_model, create_silero_model, create_hubert_model]

micro_controllers: list[MicroController] = [esp32, pi_zero, beagle_bone_black, pi_3_b, pi_4_b]

#########################
# Set benchmark options #
#########################

benchmark_options: BenchmarkOptions = {
    'cpu_speed_ghz': 3.7,
    'target_sampling_rate_khz': 16,
    'logfile_path': 'benchmark.txt'
}

#################
# Run benchmark #
#################

benchmark(models, micro_controllers, benchmark_options)

--- Running benchmark on ---

Arch: AMD64
Platform: Windows 10.0.22621
CPU: AMD64 Family 23 Model 8 Stepping 2, AuthenticAMD, 3.7 GHz
RAM: 15.951824188232422 GB

--- Benchmarking 3 models ---


################
# PocketSphinx #
################


--- Psutil memory_full_info: PocketSphinx ---

Over 5 iterations
Mean RSS [Mb]: 531.29765625
Std RSS: 5.94583573995148
Mean USS [Mb]: 478.52578125
Std USS: 5.01000561376931

--- Timeit default_timer: PocketSphinx ---

Over 5 iterations
Mean inference time [ms]: 2146.356899966486
Std inference time: 93.1672998273074

--- Overall results: PocketSphinx ---

Mean memory usage [Mb]: 504.91171875000003
Samples per inference second: 241525.5356684131


##########
# Silero #
##########


--- PyTorch Profile: Silero ---

CPU time top 5
------------------------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  ------------  
                          Name    Self CPU %      Self CPU   