# experiments.ipynb

This notebook contains experiments using various methods and libraries to see what gets us the best running time and memory usage.

In [1]:
%load_ext autoreload
%autoreload 2

In [15]:
import sys
sys.path.append("../..")

import pipit as pp
import pandas as pd
import dask
import numpy as np
import sqlite3
import polars
from operator import itemgetter
import duckdb
import json
import otf2
from otf2.events import *

Let's read in the sw4lite-32 OTF2 dataset:

In [3]:
trace = pp.Trace.from_otf2("/home/rakrish/trace-data/sw4lite-32/")

# DataFrame (stored column-wise)
df = trace.events
df

Unnamed: 0,Timestamp (ns),Event Type,Name,Thread,Process,Attributes
0,4.772924e-01,Instant,ProgramBegin,0,14,{'program_name': '/g/g92/bhowmik1/sw4lite/sw4l...
1,1.484379e+02,Instant,ProgramBegin,0,15,{'program_name': '/g/g92/bhowmik1/sw4lite/sw4l...
2,1.740208e+03,Instant,ProgramBegin,0,12,{'program_name': '/g/g92/bhowmik1/sw4lite/sw4l...
3,2.002576e+04,Instant,ProgramBegin,0,13,{'program_name': '/g/g92/bhowmik1/sw4lite/sw4l...
4,2.014269e+04,Enter,MPI_Init,0,14,{'region': 'Region 36'}
...,...,...,...,...,...,...
9533,4.699147e+09,Leave,MPI_Sendrecv,0,25,
9534,4.704243e+09,Leave,TRACER_Loop,0,25,
9535,4.704245e+09,Instant,MeasurementOnOff,0,25,{'measurement_mode': 'MeasurementMode.OFF'}
9536,4.704403e+09,Leave,TRACER_Loop,0,1,


In [4]:
# List of dictionaries (stored row-wise)
events = df.to_dict(orient="records")

## Experiment 1: Summing the "Timestamp (ns)" column

In [5]:
%%timeit
df["Timestamp (ns)"].sum()

47.3 µs ± 1.28 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [6]:
%%timeit
sum(df["Timestamp (ns)"].tolist())

155 µs ± 1.69 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


In [85]:
%%timeit
sum([event["Timestamp (ns)"] for _, event in df.iterrows()])

427 ms ± 7.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [74]:
%%timeit
total = 0
for idx, event in df.iterrows():
    total += event["Timestamp (ns)"]

439 ms ± 14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [78]:
%%timeit
sum(evt["Timestamp (ns)"] for evt in events)

473 µs ± 11.3 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [79]:
%%timeit
sum(map(itemgetter("Timestamp (ns)"), events))

434 µs ± 22.3 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


What results do we see here?

If we sum a column using Pandas (column-oriented), we get ~50 us.

If we sum a column using Python list of dicts (row-oriented), we get ~434 us (9.88x slower).


However, if we do this operation without vectorization (using row-by-row iteration), using Pandas we get ~427 ms.

Using Python list of dicts, this is still ~434 us, which is (983x faster).

In this case, it is **worth moving to a row-oriented format.**

## What if we used NumPy arrays?

In [8]:
ts = df["Timestamp (ns)"].to_numpy()

In [9]:
%%timeit
ts.sum()

3.54 µs ± 137 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [13]:
%%timeit
total = 0
for t in ts:
    total += t

666 µs ± 20.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [14]:
%%timeit
total = 0
for i in range(len(ts)):
    total += ts[i]

1.05 ms ± 22.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


## What if we used DuckDB?

DuckDB is also column-oriented, so it's a bad idea to add one row at a time. However, DuckDB lets us do so since it uses SQL.

From their doc:

> INSERT statements are the standard way of loading data into a relational database. When using INSERT statements, the values are supplied row-by-row. While simple, there is significant overhead involved in parsing and processing individual INSERT statements. This makes lots of individual row-by-row insertions very inefficient for bulk insertion.

> As a rule-of-thumb, avoid using lots of individual row-by-row INSERT statements when inserting more than a few rows (i.e., avoid using INSERT > statements as part of a loop). When bulk inserting data, try to maximize the amount of data that is inserted per statement.

So, a more efficient way will be doing so in batches, instead of individual event insertions. Thus, we'll need to keep some sort of buffer in Python, and then flush this buffer by inserting into a DuckDB table..



In [61]:
conn = duckdb.connect(database=':memory:')

conn.execute('''
    CREATE TABLE IF NOT EXISTS my_table (
        "Timestamp (ns)" double,
        "Event Type" varchar,
        "Name" varchar,
        "Thread" int32,
        "Process" varchar,
        "Attributes" varchar,
    )
''')
for event in events:
    timestamp = event["Timestamp (ns)"]
    event_type = event["Event Type"]
    name = event["Name"]
    thread = event["Thread"]
    process = event["Process"]
    attributes = json.dumps(event["Attributes"])

    # Use f-string to dynamically insert values with correct syntax
    conn.execute(f"INSERT INTO my_table VALUES ({timestamp}, '{event_type}', '{name}', {thread}, {process}, '{attributes}')")

In [57]:
%%timeit
conn.execute('SELECT SUM("Timestamp (ns)") FROM my_table').fetchall()

288 µs ± 8.63 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


## What if we use Polars?

Since polars is column-oriented, it's not recommend to add one row at a time. Instead, we can build a Python list of dicts and then convert that to a Polars DataFrame.

In practice, this will duplicate memory, so we should build a small Polars DF at a time, freeing up that memory from the Python list. 

So effectively, this is the same as the DuckDB approach -- build a buffer in Python, push it on to the DuckDB/Polars table, and flush the buffer.

In [58]:
podf = polars.DataFrame(events)
podf

Timestamp (ns),Event Type,Name,Thread,Process,Attributes
f64,str,str,i64,i64,struct[4]
0.477292,"""Instant""","""ProgramBegin""",0,14,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"
148.437925,"""Instant""","""ProgramBegin""",0,15,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"
1740.207954,"""Instant""","""ProgramBegin""",0,12,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"
20025.75566,"""Instant""","""ProgramBegin""",0,13,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"
20142.692289,"""Enter""","""MPI_Init""",0,14,"{null,null,""Region 36"",null}"
20377.520131,"""Enter""","""MPI_Init""",0,12,"{null,null,""Region 36"",null}"
20534.072026,"""Enter""","""MPI_Init""",0,15,"{null,null,""Region 36"",null}"
35336.817362,"""Enter""","""MPI_Init""",0,13,"{null,null,""Region 36"",null}"
80158.388554,"""Instant""","""ProgramBegin""",0,10,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"
88742.014404,"""Instant""","""ProgramBegin""",0,9,"{""/g/g92/bhowmik1/sw4lite/sw4lite/optimize_c_quartz/sw4lite"",[""tests/trace-collection-input/gaussian-32.in""],null,null}"


In [91]:
%%timeit
podf["Timestamp (ns)"].sum()

3.38 µs ± 53.7 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [65]:
%%timeit
total = 0
for row in podf.iter_rows():
    total += row[0]

7.75 ms ± 176 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [71]:
%%timeit
total = 0
for row in podf.select(['Timestamp (ns)']).iter_rows():
    total += row[0]

1.09 ms ± 8.49 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


Notice how this 1.09 ms time is pretty close to iterating through the `ts` numpy array.

In [86]:
%%timeit
sum(map(itemgetter("Timestamp (ns)"), events))

448 µs ± 50.8 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


What results do we see here?

If we sum a column using Polars (column-oriented), we get ~4 us.

If we sum a column using Python list of dicts (row-oriented), we get ~448 us (112x slower).

However, if we do this operation without vectorization (using row-by-row iteration), using Polars we get either ~7.75ms or ~1.09 ms, depending on how many columns we're using.

Using Python list of dicts, this is still ~448 us, which is (17.3.x faster) or (2.43x faster).

In this case, it is **not worth moving to row-wise format**.

## Results table

Comparing row orientation vs. column orientation for basic row-by-row iteration:

| Method | Orientation | Time |
| --- | --- | --- |
| List of dicts | Row | 0.448 ms |
| Pandas vectorized | Column | 0.05 ms |
| Pandas row-by-row, all | Column | 427 ms |
| Polars vectorized | Column | 0.004 ms |
| Polars row-by-row, all | Column | 7.75 ms |
| Polars row-by-row, select | Column | 1.09 ms |

(note: we don't include Pandas row-by-row, select because this uses up more memory. We would like to avoid making copies of columns.)

If we *really* simplify this, we can include just the best possible times for both row orientation and column orientation:

| Method | Orientation | Time | Speedup/slowdown |
| --- | --- | --- | --- |
| List of dicts | Row | 0.448 ms | 1x |
| Polars vectorized | Column | 0.004 ms | 112x |
| Polars row-by-row, all | Column | 7.75 ms | 17.3x |
| Polars row-by-row, select | Column | 1.09 ms | 2.43x |

In other words, by using Polars, we get a 112x speedup for column-wise operations, but only a 2.43x to 17.3x slowdown for row-wise operations. Definitely worth it, especially since most operations are column-wise (this is why OLAP databases are *all* column-oriented).

But, I still think that doing stateful calculations while reading is the best way of doing this. Why? **Because events are naturally oriented in row-wise order on the disk.**

| Trace format | Orientation |
| --- | --- |
| CSV | Row-wise |
| OTF2 | Row-wise |
| HPCToolkit | Row-wise |
| Nsight systems | Row-wise |
| Projections | Row-wise |
| CTF | Row-wise |

Based on this, the most natural way to do this seems like the following:

1. Read the trace from the disk, event by event (i.e. row-wise). As we read each event, maintain the current state (i.e. call stack, message queue). Write these states into attributes in the dataframe.

2. Allow the users to do interactive analysis using dataframe functions. |

# Modified reading procedure

Let's modify the OTF2 reader so that we can immediately post-process an event after it has been read from the disk.

In [31]:
def read_trace(trace_name="../../pipit/tests/data/ping-pong-otf2/traces.otf2"):
    events = []
    
    with otf2.reader.open(trace_name) as trace:
        # print("Read {} string definitions".format(len(trace.definitions.strings)))

        # for string in trace.definitions.strings:
        #     print("String definition with value '{}' in trace.".format(string))

        for location, event in trace.events:
            # print(type(event))
            # print(event)
            if isinstance(event, Enter):
                print(f"Enter {event.region.name} {location}")
                # print("Encountered enter event into '{event.region.name}' on location {location} at {event.time}".format(location, event))
            # elif isinstance(event, Leave):
            #     print("Encountered leave event for '{event.region.name}' on location {location} at {event.time}".format(location, event))
            # else:
            #     print("Encountered event on location {} at {}".format(location, event.time))

read_trace()

Enter int main(int, char**) Location [1]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [1] 'MPI Rank 1'
Enter MPI_Init Location [1]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [1] 'MPI Rank 1'
Enter int main(int, char**) Location [0]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [0] 'MPI Rank 0'
Enter MPI_Init Location [0]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [0] 'MPI Rank 0'
Enter MPI_Comm_size Location [0]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [0] 'MPI Rank 0'
Enter MPI_Comm_size Location [1]: 'name': 'Master thread', 'type': LocationType.CPU_THREAD, 'number_of_events': 60, 'group': LocationGroup [1] 'MPI Rank 1'
Enter MPI_Comm_rank Location [0]: 'name': 'Master thread',

<class 'otf2.events.ProgramBegin'>
{'time': 7397466976977800, 'attributes': {Attribute [2] 'ProcessId': 26602}, 'program_name': '/g/g92/bhatele1/umd/traces/score-p/ping-pong.otf2', 'program_arguments': []}
<class 'otf2.events.Enter'>
{'time': 7397466977040830, 'attributes': None, 'region': Region [3] 'int main(int, char**)'}
<class 'otf2.events.Enter'>
{'time': 7397466977062212, 'attributes': None, 'region': Region [148] 'MPI_Init'}
<class 'otf2.events.ProgramBegin'>
{'time': 7397466977622557, 'attributes': {Attribute [2] 'ProcessId': 26601}, 'program_name': '/g/g92/bhatele1/umd/traces/score-p/ping-pong.otf2', 'program_arguments': []}
<class 'otf2.events.Enter'>
{'time': 7397466977683839, 'attributes': None, 'region': Region [3] 'int main(int, char**)'}
<class 'otf2.events.Enter'>
{'time': 7397466977702853, 'attributes': None, 'region': Region [148] 'MPI_Init'}
<class 'otf2.events.Leave'>
{'time': 7397467382698364, 'attributes': None, 'region': Region [148] 'MPI_Init'}
<class 'otf2.eve