In [None]:
# hide

from pprint import pprint

%load_ext nb_black

<IPython.core.display.Javascript object>

In [None]:
# default_exp core

<IPython.core.display.Javascript object>

# module Core

> Core benchmark logic

In [None]:
# export

import os
import math
import json
import hashlib

import pandas as pd

from pathlib import Path
from typing import Optional, Callable, Any

from pydantic import BaseModel

<IPython.core.display.Javascript object>

# Benchmark Server

In [None]:
# export


class BenchmarkServer(BaseModel):
    name: str = "base_server"

    def start(self):
        # do nothing
        pass

    def stop(self):
        # do nothing
        pass

<IPython.core.display.Javascript object>

## Usage

In [None]:
name = "Nginx Docker"
server = BenchmarkServer(name=name)

<IPython.core.display.Javascript object>

## Tests

In [None]:
assert server.name == name
assert server.start() is None
assert server.stop() is None

<IPython.core.display.Javascript object>

# Checksum Mixin

In [None]:
# export


class CheckSumMixin:
    def calculate_checksum(self, content):
        return hashlib.md5(content).hexdigest()

<IPython.core.display.Javascript object>

## Usage

In [None]:
class FooBar(CheckSumMixin):
    def some_method_returning_checksum(self, content):
        return self.calculate_checksum(content)


foobar = FooBar()

<IPython.core.display.Javascript object>

## Tests

In [None]:
test_content = b"foobar"
assert (
    foobar.some_method_returning_checksum(test_content)
    == hashlib.md5(test_content).hexdigest()
)

<IPython.core.display.Javascript object>

# Benchmark Client

In [None]:
# export


class BenchmarkClient(CheckSumMixin, BaseModel):
    name: str = "base_client"

    def verify_checksums(self, benchmark_files, responses):
        checksum_lookup = {}
        for response in responses:
            url = str(response.url)
            checksum_lookup[url] = self.calculate_checksum(response.content)

        for bf in benchmark_files:
            looked_up_checksum = checksum_lookup.get(bf.url, None)
            if bf.checksum != looked_up_checksum:
                print(bf.checksum, bf.url)
            assert bf.checksum == checksum_lookup.get(bf.url, None)

<IPython.core.display.Javascript object>

## Usage

In [None]:
name = "Httpx"
client = BenchmarkClient(name=name)

<IPython.core.display.Javascript object>

## Tests

In [None]:
assert client.name == name

<IPython.core.display.Javascript object>

# Benchmark Files

In [None]:
# export


class FilesystemCreator(CheckSumMixin, BaseModel):
    def checksum_for_path(self, path):
        with path.open("rb") as f:
            checksum = self.calculate_checksum(f.read())
        return checksum

    def __call__(self, path, size):
        path.parent.mkdir(parents=True, exist_ok=True)
        if not path.exists():
            with path.open("wb") as f:
                f.write(os.urandom(size))
        return self.checksum_for_path(path)


class BenchmarkFile(BaseModel):
    number: int
    base_path: str
    size: int
    data_root: str = "data"
    hostname: str = "localhost"
    port: int = 8000
    checksum: Optional[str] = None
    creator: Callable = FilesystemCreator()

    @property
    def filesystem_path(self):
        return Path(self.data_root) / self.base_path / str(self.number)

    def get_or_create(self):
        self.checksum = self.creator(self.filesystem_path, self.size)

    @property
    def path(self):
        return f"{self.data_root}/{self.base_path}/{self.number}"

    @property
    def host(self):
        return f"http://{self.hostname}:{self.port}"

    @property
    def url(self):
        return f"{self.host}/{self.path}"

<IPython.core.display.Javascript object>

## Usage

In [None]:
base_path = "3000000_2_12500000"
benchmark_file = BenchmarkFile(number=0, size=10 ** 6 * 3, base_path=base_path)
benchmark_file.url

'http://localhost:8000/data/3000000_2_12500000/0'

<IPython.core.display.Javascript object>

## Tests

In [None]:
assert "localhost" in benchmark_file.url
assert base_path in benchmark_file.url
assert "base_path" in benchmark_file.json()

<IPython.core.display.Javascript object>

In [None]:
file_size = 10 ** 6 * 3


class TestCreator:
    def __call__(self, path, size):
        assert size == file_size
        return "test_md5sum"


benchmark_file = BenchmarkFile(
    number=0, size=file_size, base_path=base_path, creator=TestCreator()
)
benchmark_file.get_or_create()

<IPython.core.display.Javascript object>

# Benchmark Rows

In [None]:
# export


class BenchmarkRow(BaseModel):
    file_size: int  # size of a single file
    duration: int = 30  # in seconds
    bandwidth: int = int(10 ** 9 / 8)  # in bytes per second
    files: list[BenchmarkFile] = []
    file_creator: Callable = FilesystemCreator()
    elapsed: Optional[float] = None
    data_root: str = "data"

    def __str__(self):
        return f"size: {self.file_size} duration: {self.duration} bandwidth: {self.bandwidth}"

    @property
    def base_path(self):
        return f"{self.file_size}_{self.duration}_{self.bandwidth}"

    @property
    def complete_size(self):
        return self.duration * self.bandwidth

    @property
    def number_of_files(self):
        return math.ceil(self.complete_size / self.file_size)

    @property
    def number_of_connections(self):
        return math.ceil(self.bandwidth / self.file_size)

    def get_bytes_per_second(self, elapsed):
        return self.complete_size / elapsed

    @property
    def bytes_per_second(self):
        return self.complete_size / self.elapsed

    def create_files(self):
        if len(self.files) > 0:
            return
        for num in range(self.number_of_files):
            benchmark_file = BenchmarkFile(
                number=num,
                base_path=self.base_path,
                size=self.file_size,
                creator=self.file_creator,
                data_root=self.data_root,
            )
            benchmark_file.get_or_create()
            self.files.append(benchmark_file)

<IPython.core.display.Javascript object>

In [None]:
# hide


class DummyCreator(BaseModel):
    def __call__(self, path, size):
        return "dummy"

<IPython.core.display.Javascript object>

## Usage

In [None]:
# dont_test

byte = 8
hundred_mbit = 10 ** 8
bandwidth = hundred_mbit / byte
duration = 2  # seconds
file_size = 10 ** 6 * 3  # 100MB

benchmark_row = BenchmarkRow(
    file_size=file_size,
    duration=duration,
    bandwidth=bandwidth,
    file_creator=DummyCreator(),
)
benchmark_row.create_files()
print(len(benchmark_row.files))

9


<IPython.core.display.Javascript object>

## Tests

In [None]:
byte = 8
gigabit = 10 ** 9

test_params = {
    "file_size": 10 ** 6 * 10,  # 10MB
    "duration": 30,
    "bandwidth": gigabit / byte,
    "file_creator": DummyCreator(),
}

test_benchmark_row = BenchmarkRow(**test_params)

<IPython.core.display.Javascript object>

In [None]:
assert test_benchmark_row.bandwidth == 125000000
assert test_benchmark_row.number_of_files == 375
assert test_benchmark_row.get_bytes_per_second(30.0) == test_benchmark_row.bandwidth
assert test_benchmark_row.number_of_connections == 13
assert "file_size" in test_benchmark_row.json()

<IPython.core.display.Javascript object>

In [None]:
test_benchmark_row.create_files()
assert len(test_benchmark_row.files) == test_benchmark_row.number_of_files

# assert we don't generate files twice
test_benchmark_row.create_files()
assert len(test_benchmark_row.files) == test_benchmark_row.number_of_files

<IPython.core.display.Javascript object>

# Benchmark Result

In [None]:
# export


def convert_size(size_bytes):
    if size_bytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return s, size_name[i]


class BenchmarkResult(BaseModel):
    server: str
    client: str
    file_size: int
    elapsed: Optional[float] = None
    complete_size: int

    def __hash__(self):
        return hash(self.json(exclude={"elapsed"}))

    def __eq__(self, other):
        self_dict, other_dict = self.dict(exclude={"elapsed"}), other.dict(
            exclude={"elapsed"}
        )
        return self_dict == other_dict

    @classmethod
    def build_empty_result(cls, row, server, client):
        return cls(
            server=server.name,
            client=client.name,
            file_size=row.file_size,
            complete_size=row.complete_size,
        )

    def make_readable(self, size_in_bytes):
        size, unit = convert_size(size_in_bytes)
        return f"{size}{unit}"

    @property
    def readable_file_size(self):
        return self.make_readable(self.file_size)

    @property
    def bytes_per_second(self):
        return self.complete_size / self.elapsed

    @property
    def readable_bytes_per_second(self):
        return self.make_readable(self.bytes_per_second)

    def dict_with_properties(self):
        return {
            **super().dict(),
            "file_size_h": self.readable_file_size,
            "bytes_per_second": self.bytes_per_second,
            "bytes_per_second_h": self.readable_bytes_per_second,
        }


# This does not work yet
#     def dict(self):
#         _dict = super().dict()
#         return {
#             **super().dict(),
#             "file_size_h": self.readable_file_size,
#             "bytes_per_second": self.bytes_per_second,
#             "bytes_per_second_h": self.readable_bytes_per_second,
#         }

#     def json(self):
#         return json.dumps(self.dict())

<IPython.core.display.Javascript object>

## Usage

In [None]:
file_size = 10 ** 6
complete_size = 100 * file_size
result = BenchmarkResult(
    server="nginx",
    client="httpx",
    file_size=file_size,
    elapsed=3.0,
    complete_size=complete_size,
)
print(result.dict())
print(result.dict_with_properties())

{'server': 'nginx', 'client': 'httpx', 'file_size': 1000000, 'elapsed': 3.0, 'complete_size': 100000000}
{'server': 'nginx', 'client': 'httpx', 'file_size': 1000000, 'elapsed': 3.0, 'complete_size': 100000000, 'file_size_h': '976.56KB', 'bytes_per_second': 33333333.333333332, 'bytes_per_second_h': '31.79MB'}


<IPython.core.display.Javascript object>

## Tests

In [None]:
assert result.readable_bytes_per_second == "31.79MB"
assert "file_size" in result.json()

<IPython.core.display.Javascript object>

In [None]:
class TestClient(BenchmarkClient):
    measured: bool = False

    def measure(self, benchmark_row):
        self.measured = True
        print("measure_benchmark_row: ", benchmark_row)
        return 2.0


class TestServer(BenchmarkServer):
    started: bool = False
    stopped: bool = False

    def start(self):
        self.started = True

    def stop(self):
        self.stopped = True


row_params = {
    "file_size": 10 ** 6 * 10,
    "duration": 30,
    "bandwidth": 10 ** 9 / 8,
    "file_creator": DummyCreator(),
}
row = BenchmarkRow(**row_params)

test_result = BenchmarkResult.build_empty_result(
    row, TestServer(name="server"), TestClient(name="client")
)

<IPython.core.display.Javascript object>

In [None]:
assert test_result.server == "server"
assert test_result.client == "client"

<IPython.core.display.Javascript object>

# Persistence / Repository

In [None]:
# export


class BaseRepository(BaseModel):
    session: Any = None

    def get_result(self, benchmark, result):
        pass

    def add_result(self, benchmark, result):
        pass

<IPython.core.display.Javascript object>

# Core Benchmark Model

In [None]:
# export
import cpuinfo
import platform
import subprocess

from functools import cache


def get_macos_machine_id():
    kwargs = {"capture_output": True, "text": True}
    output = subprocess.run(
        [
            "/usr/sbin/system_profiler",
            "SPHardwareDataType",
        ],
        **kwargs,
    )
    machine_id = None
    for line in output.stdout.split("\n"):
        if "Serial Number" in line:
            machine_id = line.split()[-1]
    return machine_id


@cache
def get_machine_id():
    os = platform.platform().lower().split("-")[0]
    os_lookup = {"macos": get_macos_machine_id}
    return os_lookup[os]()


class Benchmark(BaseModel):
    duration: int = 30  # in seconds
    bandwidth: int = int(10 ** 9 / 8)  # in bytes per second
    file_sizes: list[int] = [10 ** 7, 10 ** 6, 10 ** 5]
    rows: list[BenchmarkRow] = []
    file_creator: Callable = FilesystemCreator()
    uname: Optional[Any] = platform.uname()
    cpuinfo: Optional[dict] = cpuinfo.get_cpu_info()
    servers: list[BenchmarkServer] = []
    clients: list[BenchmarkClient] = []
    results: list[BenchmarkResult] = []
    repository: Optional[BaseRepository] = None
    machine_id: str = get_machine_id()

    @property
    def uname_json(self):
        return json.dumps(self.uname)

    def __hash__(self):
        return hash(self.machine_id)

    def __eq__(self, other):
        self.machine_id == other.machine_id

    def create_row_from_file_size(self, file_size):
        do_not_copy = {
            "rows",
            "file_sizes",
            "servers",
            "clients",
            "results",
            "repository",
        }
        kwargs = {k: v for k, v in dict(self).items() if k not in do_not_copy}
        br = BenchmarkRow(file_size=file_size, **kwargs)
        br.create_files()
        return br

    def create_rows(self):
        if len(self.rows) > 0:
            # benchmark rows were already created
            return

        # create a row for each file_size
        for file_size in self.file_sizes:
            self.rows.append(self.create_row_from_file_size(file_size))

    def build_empty_result(self, row, server, client):
        return BenchmarkResult(
            server=server.name,
            client=client.name,
            file_size=row.file_size,
            elapsed=elapsed,
            complete_size=row.complete_size,
            platform=self.uname.machine,
        )

    def test_server_with_client(self, server, client):
        for row in self.rows:
            result = BenchmarkResult.build_empty_result(row, server, client)
            if (
                self.repository is not None
                and (
                    already_measured := self.repository.get_result(self, result)
                ).elapsed
                is not None
            ):
                print("already measured: ", already_measured)
                result = already_measured
            else:
                if not server.started:
                    server.start()
                result.elapsed = client.measure(row)
                if self.repository is not None:
                    self.repository.add_result(self, result)
                print("measured: ", result)
            self.results.append(result)

    def run(self):
        for server in self.servers:
            # start with servers, because they are more expensive to create
            print(f"server: {server}")
            for client in self.clients:
                self.test_server_with_client(server, client)
            if server.started:
                server.stop()

    def json(self):
        # return super().json(exclude={"rows", "repository"})
        fields = {
            "duration",
            "bandwidth",
            "cpuinfo",
        }
        return super().json(include=fields)

    @property
    def results_frame(self):
        return pd.DataFrame([r.dict_with_properties() for r in self.results])

<IPython.core.display.Javascript object>

In [None]:
# hide

from collections import defaultdict


class TestRepository(BaseRepository):
    results: dict = defaultdict(dict)

    def get_result(self, benchmark, result):
        return self.results.get(benchmark, {}).get(result, result)

    def add_result(self, benchmark, result):
        self.results[benchmark][result] = result

<IPython.core.display.Javascript object>

## Usage

In [None]:
# dont_test

byte = 8
hundred_mbit = 10 ** 8
bandwidth = hundred_mbit / byte
duration = 2  # seconds
file_size = 10 ** 6 * 3  # 100MB
file_sizes = [10 ** 7, 10 ** 6, 10 ** 5]

benchmark = Benchmark(
    duration=duration,
    bandwidth=bandwidth,
    file_creator=DummyCreator(),
    file_sizes=file_sizes,
    servers=[TestServer(name="Nginx")],
    clients=[TestClient(name="Httpx")],
    repository=TestRepository(),
)
benchmark.create_rows()
print(len(benchmark.rows))

benchmark.run()
# pprint(benchmark.results)
benchmark.results_frame

3
server: name='Nginx' started=False stopped=False
measure_benchmark_row:  size: 10000000 duration: 2 bandwidth: 12500000
measured:  server='Nginx' client='Httpx' file_size=10000000 elapsed=2.0 complete_size=25000000
measure_benchmark_row:  size: 1000000 duration: 2 bandwidth: 12500000
measured:  server='Nginx' client='Httpx' file_size=1000000 elapsed=2.0 complete_size=25000000
measure_benchmark_row:  size: 100000 duration: 2 bandwidth: 12500000
measured:  server='Nginx' client='Httpx' file_size=100000 elapsed=2.0 complete_size=25000000


Unnamed: 0,server,client,file_size,elapsed,complete_size,file_size_h,bytes_per_second,bytes_per_second_h
0,Nginx,Httpx,10000000,2.0,25000000,9.54MB,12500000.0,11.92MB
1,Nginx,Httpx,1000000,2.0,25000000,976.56KB,12500000.0,11.92MB
2,Nginx,Httpx,100000,2.0,25000000,97.66KB,12500000.0,11.92MB


<IPython.core.display.Javascript object>

## Tests

In [None]:
%%time
from collections import namedtuple

TestPlatform = namedtuple("TestPlatform", ["machine"])
    
byte = 8
gigabit = 10 ** 9
file_sizes = [10 ** 7, 10 ** 6, 10 ** 5]

test_params = {
    "duration": 3,
    "bandwidth": gigabit / byte / 10,  # divided by ten for test duration
    "file_creator": DummyCreator(),
    "file_sizes": file_sizes,
    "cpuinfo": {"python_version": 4.0},
    "uname": TestPlatform("M3"),
    "repository": TestRepository(),
}

test_benchmark = Benchmark(**test_params)

test_benchmark.create_rows()
assert len(test_benchmark.rows) == len(file_sizes)

CPU times: user 17.8 ms, sys: 734 µs, total: 18.5 ms
Wall time: 18.9 ms


<IPython.core.display.Javascript object>

In [None]:
%%time

class TestClient(BenchmarkClient):
    measured: bool = False

    def measure(self, benchmark_row):
        self.measured = True
        print("measure_benchmark_row: ", benchmark_row)
        return 2.0


class TestServer(BenchmarkServer):
    started: bool = False
    stopped: bool = False

    def start(self):
        self.started = True

    def stop(self):
        self.stopped = True


test_params = {
    **test_params,
    "clients": [TestClient(name="foo")],
    "servers": [TestServer(name="bar")],
}
test_benchmark = Benchmark(**test_params)
test_benchmark.create_rows()
test_benchmark.run()

server: name='bar' started=False stopped=False
measure_benchmark_row:  size: 10000000 duration: 3 bandwidth: 12500000
measured:  server='bar' client='foo' file_size=10000000 elapsed=2.0 complete_size=37500000
measure_benchmark_row:  size: 1000000 duration: 3 bandwidth: 12500000
measured:  server='bar' client='foo' file_size=1000000 elapsed=2.0 complete_size=37500000
measure_benchmark_row:  size: 100000 duration: 3 bandwidth: 12500000
measured:  server='bar' client='foo' file_size=100000 elapsed=2.0 complete_size=37500000
CPU times: user 16.4 ms, sys: 1.62 ms, total: 18 ms
Wall time: 16.8 ms


<IPython.core.display.Javascript object>

In [None]:
assert len(test_benchmark.results) == len(test_benchmark.rows)

<IPython.core.display.Javascript object>

In [None]:
assert test_benchmark.servers[0].started
assert test_benchmark.servers[0].stopped

<IPython.core.display.Javascript object>

In [None]:
assert test_benchmark.uname.machine == "M3"
assert "python_version" in test_benchmark.cpuinfo

<IPython.core.display.Javascript object>

In [None]:
assert "duration" in test_benchmark.json()

<IPython.core.display.Javascript object>

In [None]:
assert len(test_benchmark.repository.results[test_benchmark]) == len(
    test_benchmark.results
)

<IPython.core.display.Javascript object>

In [None]:
# assert already measured results are not measured again
test_benchmark.clients = [TestClient(name="foo")]
test_benchmark.run()
assert test_benchmark.clients[0].measured == False

server: name='bar' started=True stopped=True
already measured:  server='bar' client='foo' file_size=10000000 elapsed=2.0 complete_size=37500000
already measured:  server='bar' client='foo' file_size=1000000 elapsed=2.0 complete_size=37500000
already measured:  server='bar' client='foo' file_size=100000 elapsed=2.0 complete_size=37500000


<IPython.core.display.Javascript object>

# Export

In [None]:
# dont_test

from nbdev.export import notebook2script

notebook2script()

Converted 00_core.ipynb.
Converted 01_django_views.ipynb.
Converted 01_fastapi_views.ipynb.
Converted 02_docker_servers.ipynb.
Converted 02_local_servers.ipynb.
Converted 03_benchmark_clients.ipynb.
Converted 04_persistence.ipynb.
Converted 05_run_benchmark.ipynb.
Converted index.ipynb.


<IPython.core.display.Javascript object>