# Using Dask for Trace Analysis

Now that you know how Dask works, let's see how we can use it to analyze traces.

Currently, Pipit lets us parse traces from various file formats, like OTF2 and HPCToolkit, into an in-memory Pandas DataFrame. Thus, we'll need enough space in memory to hold all of the data in the DataFrame (at the very least). As it turns out, parsing the data into a DataFrame and calling Pipit's analysis functions uses even more extra auxiliary space.

So, the question is, how can we work with datasets that need more space than what is available in memory?

Recall from the first notebook that Dask performs its computations *lazily*, which means it puts off doing *any* type of work, until the result of that work is actually needed. This is different from the typical Pandas/NumPy workflow, in which we do all of the computations immediately.

For instance, when we call `read_csv` with Pandas, it will load the entire dataset in memory into a Pandas DataFrame right away.

In [6]:
import pandas as pd

df = pd.read_csv("./foo-bar.csv")
df

Unnamed: 0,Timestamp (s),Event Type,Name,Process
0,0,Enter,main(),0
1,1,Enter,foo(),0
2,3,Enter,MPI_Send,0
3,5,Leave,MPI_Send,0
4,8,Enter,baz(),0
5,18,Leave,baz(),0
6,25,Leave,foo(),0
7,100,Leave,main(),0
8,0,Enter,main(),1
9,1,Enter,bar(),1


However, Dask's lazy mindset also applies to reading data from disk into a DataFrame. When we call `read_csv` with Dask, it doesn't *actually* read the dataset -- it just sets up the task to read it, which it will put off doing until it actually needs to be done.

In [10]:
import dask.dataframe as dd

ddf = dd.read_csv("./foo-bar.csv")
ddf

Unnamed: 0_level_0,Timestamp (s),Event Type,Name,Process
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,object,object,int64
,...,...,...,...


Let's again see how Pandas and Dask operations differ:

In [25]:
df[" Process"].unique()

array([0, 1])

In [24]:
ddf[" Process"].unique()

Dask Series Structure:
npartitions=1
    int64
      ...
Name:  Process, dtype: int64
Dask Name: unique-agg, 4 graph layers

In [27]:
ddf[" Process"].unique().compute()

0    0
1    1
Name:  Process, dtype: int64

Now that we have a feel for how Dask operations work, let's do some actual analysis.

Say we would like to analyze the AMG-16 OTF-2 trace. Using Pipit, my laptop currently struggles (100% CPU usage, ~40% memory usage) to read and keep in memory the entire DataFrame for AMG-16.

For Dask demonstration purposes, I have used Pipit's `OTF2Reader` to read the AMG-16 trace into a Pandas DataFrame, and then saved this Pandas DataFrame to disk in the Parquet format using the `DataFrame.to_parquet` function.

In [30]:
# DON'T EXECUTE THIS CODE! Exported parquet file is already available!
# import sys
# sys.path.append('/home/rakrish/pipit/')
# import pipit as pp
# amg16 = pp.Trace.from_otf2('/home/rakrish/Score-P-traces/amg/16/')
# amg16.events.to_parquet('amg_16.parquet')

So, assuming that we have done some preprocessing to get the trace in the desired Parquet format, let's see how Dask lets us perform operations on this trace easily (even when it doesn't fit in memory):

In [36]:
ddf = dd.read_parquet('amg_16.parquet')
ddf

Unnamed: 0_level_0,Timestamp (ns),Event Type,Name,Thread,Process,Attributes
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float64,category[unknown],category[unknown],category[unknown],category[unknown],object
,...,...,...,...,...,...


We see that the Dask DataFrame object is initialized in *less than a second* -- wow! This is because Dask doesn't load up the entire dataset in memory -- it puts off this task as much as possible until we need actually need the result of an operation.

Before we can start doing analysis operations on the Dask DataFrame, we need to index and partition it.

In [59]:
ddf['Process'] = ddf['Process'].cat.as_ordered()

In [60]:
ddf = ddf.set_index('Process')
ddf = ddf.repartition(npartitions=16)

In [63]:
ddf.partitions[0].head()

TypeError: cannot do slice indexing on CategoricalIndex with these indexers [1] of type int

In [55]:
len(ddf['Process'].unique().compute())

16

In [None]:
ddf.


Now that we have our Dask DataFrame setup, let's do some operations. First, let's peek at the top of the Dask DataFrame to see some example values of the columns:

In [47]:
ddf.set_index('Timestamp (ns)')
ddf.repartition(partition_size='5MB')

Unnamed: 0_level_0,Timestamp (ns),Event Type,Name,Thread,Process,Attributes
npartitions=213,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float64,category[unknown],category[unknown],category[unknown],category[unknown],object
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [49]:
ddf.head()

Unnamed: 0,Timestamp (ns),Event Type,Name,Thread,Process,Attributes
0,0.0,Instant,ProgramBegin,0,12,{'program_name': '/g/g92/bhowmik1/AMG/test/amg...
1,46.774703,Instant,ProgramBegin,0,13,{'program_name': '/g/g92/bhowmik1/AMG/test/amg...
2,71.593933,Instant,ProgramBegin,0,15,{'program_name': '/g/g92/bhowmik1/AMG/test/amg...
3,100.231506,Instant,ProgramBegin,0,14,{'program_name': '/g/g92/bhowmik1/AMG/test/amg...
4,24849.776813,Enter,MPI_Init,0,15,{'region': 'Region 36'}


In [40]:
ddf["Name"].unique().compute()

0            ProgramBegin
1                MPI_Init
2        MeasurementOnOff
3             TRACER_Loop
4     TRACER_WallTime_amg
5      MpiCollectiveBegin
6           MPI_Allreduce
7        MpiCollectiveEnd
8             MPI_Waitall
9               MPI_Irecv
10        MpiIrecvRequest
11              MPI_Isend
12               MpiIsend
13             MPI_Iprobe
14               MPI_Recv
15                MpiRecv
16               MPI_Send
17                MpiSend
18               MpiIrecv
19               MPI_Wait
20       MpiIsendComplete
21               MPI_Scan
22              MPI_Bcast
23          MPI_Recv_init
24          MPI_Send_init
25           MPI_Startall
26          MPI_Allgather
27         MPI_Allgatherv
28            MPI_Barrier
Name: Name, dtype: category
Categories (29, object): ['MPI_Allgather', 'MPI_Allgatherv', 'MPI_Allreduce', 'MPI_Barrier', ..., 'MpiSend', 'ProgramBegin', 'TRACER_Loop', 'TRACER_WallTime_amg']