# Setup

In [6]:
# Extract and build the benchmark
!rm -f ipc/ipc-static
!rm -f ipc/ipc-dynamic
!make -C ipc
!sysctl kern.ipc.maxsockbuf=33554432

cc -DWITH_PMC -Wall -o ipc-static -DPROGNAME=\"ipc-static\" ipc.c -static  -lpmc -lpthread
cc -DWITH_PMC -Wall -o ipc-dynamic -DPROGNAME=\"ipc-dynamic\" ipc.c -dynamic  -lpmc -lpthread
kern.ipc.maxsockbuf: 33554432 -> 33554432


In [5]:
# D Language scripts
D_ = """"""

In [3]:
import itertools

def strs_to_tup(strs):
    if len(strs) == 1:
        return (strs[0].strip(),None)
    else:
        return (strs[0].strip(),strs[1].strip())

def flatten(lst):
    return list(itertools.chain.from_iterable(lst))

def parse(cmd_out):
    tups = map(strs_to_tup, [string.split(":") for string in cmd_out])
    return dict(tups)

import time
from __future__ import print_function
from decimal import Decimal
import pandas as pd
import numpy as np
%matplotlib inline
# import matplotlib.pyplot as plt
# plt.style.use('seaborn-whitegrid')

def label_points(x, y, ax):
    a = pd.concat({'x': x, 'y': y}, axis=1)
    for i, point in a.iterrows():
        ax.text(point['x'], point['y'], "{:.2E}".format(Decimal(point['x'])))

def graph(xs, ys, num_trials = 0, save = False, save_name = "plot", title = "", xlabel = ""):
    # np.array expects: [[x1, y1], [x2, y2], ..., [xn, yn]]
    '''df = pd.DataFrame(np.array(np.column_stack((xs, ys))), columns=['x', 'y'])
    plt = df.plot(x='x', y='y',logx=True, grid=True, figsize=(20,10))
    label_points(df.x, df.y, plt)'''
    xvalues = np.reshape(ys, (len(xs), num_trials))[:,:]
    df = pd.DataFrame(xvalues, index=xs)
    error_bars = df.quantile([.25, .75], axis=1)
    error_bars.loc[[0.25]] = df.median(1) - error_bars.loc[[0.25]]
    error_bars.loc[[0.75]] = error_bars.loc[[0.75]] - df.median(1)
    error_bars_values = [error_bars.values]
    plt.figure();
    df.median(1).plot(figsize=(9,9), yerr=error_bars_values, label=title)
    plt.ylabel('I/O bandwidth (KiBytes/sec)')
    plt.xlabel(xlabel)
    plt.xscale('log')
    if save:
        '''fig = plt.get_figure()
        fig.savefig("{}.pdf".format(save_name))'''
        plt.savefig("{}.png".format(save_name))

def benchmark(flags, repeat, exe, filename, buf_sizes, io_sizes, create = False, reset_file = False, dtrace_info = None, bench_name = "" ):
    runs = range(1, repeat + 1)
    outputs = []
    print("\t ==> Running {}".format(exe))
    print("\t\t ==> Reading from {}".format(filename))
    is_dtrace = (dtrace_info != None)
    dtrace_thread = None

    for buffer_size in buf_sizes:
        for io_size in io_sizes:
            cmd = "{} {} -b {} -t {} {}".format(exe, flags, buffer_size, io_size, filename)
            #if "-v" not in flags:
            #    cmd = "time -p " + cmd
            if "-d" in flags:
                discard = !{cmd}

            if create:
                created = !io/io-static -t {str(io_size)} -c lab1/iofile

            if is_dtrace:
                # Create a seperate thread to run the DTrace instrumentation
                dtrace_thread = DTraceConsumerThread(dtrace_info['script'],
                                                        chew_func=None,
                                                        chewrec_func=None,
                                                        walk_func=dtrace_info['agg_fn'],
                                                        sleep=1)
                # Start the DTrace instrumentation
                dtrace_thread.start()

            results = []
            for i in runs:
                out = !{cmd}
                parsed = parse(out)
                results.append(parsed)

            if is_dtrace and dtrace_thread != None:
                # The benchmark has completed - stop the DTrace instrumentation
                dtrace_thread.stop()
                dtrace_thread.join()
                dtrace_thread.consumer.__del__() # Memory leak workaround
                print("\t\t\tdtrace run: {0:2} bytes {1:2} bytes".format(buffer_size, io_size))
            else:
                times = [float(item['time']) for item in results]
                speeds = [float(item['speed'].split(' ')[0]) for item in results] # Need to split by ' ' because output speed has units attached to it (see io.c)
                avg_time = sum(times) / len(runs)
                avg_speed = sum(speeds) / len(runs)
                buf_sz = int(results[0]['buffersize'])
                tot_sz = int(results[0]['totalsize'])
                print("\t\t\t{0:2} bytes {1:2} bytes ({2:.2f} KBytes/sec): {3:.6f}s".format(buf_sz, tot_sz, avg_speed, avg_time))
                outputs.append({'buffersize':buffer_size, 'time':avg_time, 'speed':avg_speed, 'iosize':tot_sz, 'speeds': speeds, 'times': times})
    if reset_file:
        print("recreating default benchmark file")
        make_io_file()
    return outputs

# Reads

In [None]:
BUF_START = 512
BUFFER_SIZES = [BUF_START * 2 ** exp for exp in range(0, 16)]
IO_SIZES = [BUFFER_SIZES[-1]] # Keep constant at default 16MB (last value of BUFFER_SIZES)
RUNS = 12

results_iofile = benchmark("-v -r", RUNS, "io/io-static", "/dev/zero", BUFFER_SIZES, IO_SIZES, create = False)
saveas = "static_const_io_flags_vr_runs_12_oldfile_zero"
graph(BUFFER_SIZES, flatten([item['speeds'] for item in results_iofile]), RUNS, save = True, save_name = saveas)

In [None]:
BUF_START = 512
BUFFER_SIZES = [BUF_START * 2 ** exp for exp in range(0, 16)]
IO_SIZES = [BUFFER_SIZES[-1]] # Keep constant at default 16MB (last value of BUFFER_SIZES)
RUNS = 12
values = []

# Callbacks invoked to process the aggregation
def simple_agg(a, b, c, d):
    print("From DTrace: {} | {} | {} | val: {}".format(a,b,c,d))
    values.append(d)

def quantized_out(a, b, c, d):
    print("From DTrace: {} | {} | {} | val: {}".format(a,b,c,d))
    values.append(d)

for io_sz in IO_SIZES:
    for buf_sz in BUFFER_SIZES:
        benchmark("-r -B -q", 1, "io/io-static", "lab1/iofile", [buf_sz], [io_sz], dtrace_info = {'agg_fn':simple_agg, 'script':D_WHOLE_time})

# Static vs. Dynamic Linking

In [5]:
static = !./linker_benchmark.sh io/io-static "-q -r -B" iofile static_link
dynamic = !./linker_benchmark.sh io/io-dynamic "-q -r -B" iofile dyn_link

In [12]:
import numbers
pairs = [x.split(" ") for x in static[6:]]
buffs = [int(x[0]) for x in pairs]
times = [float(x[1]) for x in pairs]

d_pairs = [x.split(" ") for x in dynamic[6:]]
d_buffs = [int(x[0]) for x in d_pairs]
d_times = [float(x[1]) for x in d_pairs]

In [None]:
df = pd.DataFrame(np.array(np.column_stack((buffs, times))), columns=['x', 'y'])
d_df = pd.DataFrame(np.array(np.column_stack((d_buffs, d_times))), columns=['x', 'y'])
ax = df.plot(x='x', y='y',logx=True, grid=True, figsize=(20,10))
plt = d_df.plot(x='x', y='y',logx=True, grid=True, figsize=(20,10), ax=ax)
ax.set_xlabel('Filesizes in bytes (log scale)')
ax.set_ylabel('Average time (seconds)')
ax.legend(["Static", "Dynamic"])
fig = ax.get_figure()
fig.savefig("dyn_vs_stat.png")

# Notes

Investigation of IPC syscalls such as pipe(2) and socket(2). read(2) and write(2) can be performed on sockets and pipes and allow for partial reads and writes. This implies that it is possible for only a subset of a requested buffer size to be read or written. Possible causes are size limits of in-kernel IPC buffers or use of non-blocking I/O. Input and output bytes both need to be considered during benchmarking performance of IPC syscalls.

Links:
* https://www.freebsd.org/cgi/man.cgi?query=socket&sektion=2
* https://www.freebsd.org/cgi/man.cgi?pipe(2)
* https://wiki.freebsd.org/NetworkPerformanceTuning
* https://wiki.freebsd.org/BenchmarkAdvice

Benchmark:
* Set up IPC endpoints (using pipe(2) or socket(2))
* Transfer bytes from a user-space buffer using read(2)/write(2)
  * ! there is no guarantee for all bytes to be transferred however
* Modes of rx/tx operations are:
  * within single thread
  * between pair of threads in same process
  * between threads in two different processes
* Time is measured via clock_gettime(2)
* Both statically and dynamically linked version available

* -i: IPC object type (pipe, local, tcp)
* -t: total size of I/O file | default: 16777216
* -b: user-space IPC buffer size (bytes) | default: 131072
* 1thread, 2thread, 2proc

Testsuite:
* 1st thread/proc is receiver
* 2nd thread/proc is sender
* Sender writes timestamp to shared memory before sending bytes
  * sender_argument.sa_starttime
* Sender:
  * call write(2) with buffersize or remaining bytes to write (whichever is lower)
  * repeat writes until total I/O size is reached
* Receiver:
  * call read(2) until total I/O size has been reached
* Multiple Processes/Threads:
  * Create page (mmap(2)) for sender_argument struct
  * Set inheritance property (minherit(2)) such that the address space of the page can be shared between parent and child
  * Receive or send depending on pid
  * Return time difference
* Single thread:
  * Set non-blocking I/O flags on input file descriptors (O_NONBLOCKING from pipe(2))
  * Perform read and write in same iteration until total I/O size is reached
    * In the first iteration of the benchmark only a read can be performed unless we block because the first read failed
  * One of read or write is definitely performed at each iteration (block until rx or tx is ready if necessary (using select(2))
  * Return time of I/O loop
* Main:
  * Allocate IPC object: tcp socket, pipe, local socket
  * Prepare IPC buffer sizes if necessary
  * fsync(2) and sleep if necessary
  * Start benchmark modes
  * Print output if necessary