In [2]:
from importlib import reload
import os, os.path
import shutil
import sys

import dask
import dask.dataframe as dd
from dask.distributed import Client, TimeoutError

sys.path.append(os.path.realpath(".."))
from eai_graph_tools.datasets.lanl.raw import DatasetLANLRaw

In [3]:
%load_ext autoreload
%autoreload 2
%load_ext line_profiler

Expecting an ad hoc Dask infrastructure to have been set up.

In [4]:
DASK_ADDR = "localhost:8786"
PATH_LOS_ALAMOS = os.path.expanduser("~/los_alamos")
TABLES_LOS_ALAMOS = ["auth", "dns", "flows", "proc"]
WIDTH_INTERVAL = 5 * 60

In [5]:
try:
    client = Client(DASK_ADDR, set_as_default=True)
except OSError:
    print("Setting up a local Dask cluster from the notebook kernel", file=sys.stderr)
    client = Client(set_as_default=True)

# Benchmarking uncompressed CSV against Parquet files

**Goal 1**: assessing if there is a gain, besides a ~20% smaller size, to storing the Los Alamos files in the Parquet format, as opposed to the original CSV. 

**Goal 2**: assess whether there exists an advantage in using either the `fastparquet` or the `pyarrow` Parquet reading engine.

Remark that the smaller size is comparing Parquet stores to gzip-compressed CSV files. Neither Dask nor Pandas process the latter in chunks: only uncompressed CSVs can be read in chunks and partitioned by Dask. Obviously, Parquet stores, being smaller than compressed CSV, is much more efficient than uncompressed CSV to store data in chunkable form. So the goal is to determine whether Parquet loading incurs a significant overhead compared to uncompressed CSV loading.

**Benchmark task**

1. Load the four main Los Alamos tables as Dask dataframes, indexed by time and, in the case of the *auth* table, labeled using the `redteam` table.
1. Concatenate the four dataframes in a single monolith dataframe.
1. Group the monolith by time intervals (5 minutes).
1. Compute the number of events to each time interval.

## Uncompressed CSV loading

In [6]:
def select_elements(name_elements):
    if len(name_elements) == 0:
        return TABLES_LOS_ALAMOS
    else:
        return list(name_elements)

def los_alamos_from_csv(*names):
    ds = DatasetLANLRaw(PATH_LOS_ALAMOS)
    return [ds.element(name, labeled=True, indexed=True) for name in select_elements(names)]

## Parquet loading

A quirk of Parquet loading is that for categorical fields, it tries to assign a minimal integer dtype. It gets the port fields wrong, thinking it can get by with less than $2^{15}-1$ values. We work around this by setting it straight with the `categories` parameter.

In [7]:
CATEGORIES = {
    "flows": {
        "port_source": 0xffff,
        "port_destination": 0xffff
    }
}

def los_alamos_from_parquet(*names, engine="auto"):
    return [
        dd.read_parquet(
            os.path.join(PATH_LOS_ALAMOS, f"{name}.parquet"),
            engine=engine,
            categories=CATEGORIES.get(name, {})
        )
        for name in select_elements(names)
    ]

## Benchmark task

In [8]:
def benchmark(client, fn_load_los_alamos, name_elements=None, **kwargs):
    client.restart()
    ldf_tables = fn_load_los_alamos(*(name_elements or ()), **kwargs)
    df_monolith = dd.concat(ldf_tables)
    gdf_intervals = df_monolith.groupby(lambda x: int(x / WIDTH_INTERVAL))
    df_counts = gdf_intervals.count().compute()
    return df_counts

## Uncompressed CSV vs. Parquet

### Uncompressed CSV

Uncompressed CSV is singularly impractical for dataframe processing at scale through Dask. A key aspect is that the indexing and labeling of the *auth* table (using the *redteam* table) are expensive, and must be run **every single time** the dataset is loaded for processing. In contrast, these operations are done once and the result is stored in the Parquet stores for this dataset.

Furthermore, Dask raises a runtime issue when loading data from CSV. When concatenating the dataframes, for some reason, Dask raises an exception that regarding the disparity of the column set of each dataframe when concatenating them. This seems to be caused by the labeling of the *auth* table, which appends column *is_attack* to the dataframe. The adding of the column, done through a dataframe `merge`, is performed in parallel to the concatenation. It seems that the live appending of the column clashes with the concatenation. As fully computing the merge in memory prior to the concatenation is not an option, the passage through Parquet storage seems unavoidable. The following shows the error as it is had.

In [8]:
benchmark(client, los_alamos_from_csv, ("auth", "flows"))



AttributeError: 'numpy.ndarray' object has no attribute 'categories'

If we remove table `auth` from the computing, we can get a glimpse on the runtime distribution of the computation.

In [9]:
r = %lprun -f benchmark -r benchmark(client, los_alamos_from_csv, ("dns", "flows", "proc"))
r.print_stats(output_unit=1.0)



Timer unit: 1 s

Total time: 383.188 s
File: <ipython-input-7-6149e9801ddd>
Function: benchmark at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def benchmark(client, fn_load_los_alamos, name_elements=None, **kwargs):
     2         1          0.8      0.8      0.2      client.restart()
     3         1        139.1    139.1     36.3      ldf_tables = fn_load_los_alamos(*(name_elements or ()), **kwargs)
     4         1          0.1      0.1      0.0      df_monolith = dd.concat(ldf_tables)
     5         1          0.0      0.0      0.0      gdf_intervals = df_monolith.groupby(lambda x: int(x / WIDTH_INTERVAL))
     6         1        243.2    243.2     63.5      df_counts = gdf_intervals.count().compute()
     7         1          0.0      0.0      0.0      return df_counts



## Parquet

We use the default engine. Let's compare the line-by-line profiling with that of CSV loading over the 3 smaller tables.

In [11]:
r = %lprun -f benchmark -r benchmark(client, los_alamos_from_parquet, ("dns", "flows", "proc"))
r.print_stats(output_unit=1.0)

Timer unit: 1 s

Total time: 189.514 s
File: <ipython-input-7-6149e9801ddd>
Function: benchmark at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def benchmark(client, fn_load_los_alamos, name_elements=None, **kwargs):
     2         1          0.8      0.8      0.4      client.restart()
     3         1          0.1      0.1      0.1      ldf_tables = fn_load_los_alamos(*(name_elements or ()), **kwargs)
     4         1          0.0      0.0      0.0      df_monolith = dd.concat(ldf_tables)
     5         1          0.0      0.0      0.0      gdf_intervals = df_monolith.groupby(lambda x: int(x / WIDTH_INTERVAL))
     6         1        188.5    188.5     99.5      df_counts = gdf_intervals.count().compute()
     7         1          0.0      0.0      0.0      return df_counts



As an added advantage, the memory envelope in which the last computation ran, compared to processing the compressed CSV, was 25-40% smaller. On my machine, whereas the CSV processing grows in memory up to about 9GB, the Parquet-bound computations use less than 6GB of RAM.

As a curiosity, let's see how much runtime processing the *auth* table adds.

In [12]:
r = %lprun -f benchmark -r benchmark(client, los_alamos_from_parquet, ("auth", "dns", "flows", "proc"))
r.print_stats(output_unit=1.0)

Timer unit: 1 s

Total time: 1042.86 s
File: <ipython-input-7-6149e9801ddd>
Function: benchmark at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def benchmark(client, fn_load_los_alamos, name_elements=None, **kwargs):
     2         1          0.7      0.7      0.1      client.restart()
     3         1          0.7      0.7      0.1      ldf_tables = fn_load_los_alamos(*(name_elements or ()), **kwargs)
     4         1          0.1      0.1      0.0      df_monolith = dd.concat(ldf_tables)
     5         1          0.0      0.0      0.0      gdf_intervals = df_monolith.groupby(lambda x: int(x / WIDTH_INTERVAL))
     6         1       1041.3   1041.3     99.9      df_counts = gdf_intervals.count().compute()
     7         1          0.0      0.0      0.0      return df_counts



## Parquet engine comparison

In [10]:
%timeit -r 3 -n 1 benchmark(client, los_alamos_from_parquet, engine="fastparquet")

12min 32s ± 1min 43s per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [9]:
%timeit -r 3 -n 1 benchmark(client, los_alamos_from_parquet, engine="pyarrow")

10min 26s ± 1.7 s per loop (mean ± std. dev. of 3 runs, 1 loop each)


The `pyarrow` engine yields weird memory problems (even though the actual benchmark computations are offloaded to distinct processes) when running its benchmark across more than three trials. However, the standard deviation of its runtime is very small compared to that of the `fastparquet` benchmark. Such a tighter determinism of the runtime is surprising, so we take a comparative look at the timing of the Parquet read tasks, as the Dask status dashboard allows through its *Profile* tab.

We see that with the `fastparquet` engine, reading the store takes up more than 50% of the full runtime. Out of the runtime of this reading operation, 40% is spent decompressing the files. The picture is very different with the `pyarrow` engine: a mere 25-35% of the runtime is spent reading the Parquet store. Out of that, we see **no** usage of function from the `gzip` library, suggesting the the `pyarrow` engine is smarter than `fastparquet` about its reading of the files in the store, running on-demand decompression of relevant bits. Thus, although repeated invocations of full scans of the datastore seem to trip `pyarrow`, it seems to stand out as a better choice for sheer processing performance.

To further assess the distinction, let's run longer benchmarks of partial dataset runs, which weigh in less heavily on RAM usage.

In [14]:
%timeit -r 10 -n 1 benchmark(client, los_alamos_from_parquet, ("flows", "dns", "proc"), engine="fastparquet")

2min 56s ± 4.02 s per loop (mean ± std. dev. of 10 runs, 1 loop each)


In [15]:
%timeit -r 10 -n 1 benchmark(client, los_alamos_from_parquet, ("flows", "dns", "proc"), engine="pyarrow")

2min 35s ± 628 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


The two intervals do not intersect, and the distance between the closest interval boundaries is such that `pyarrow` can be safely deemed to provide a tighter performance envelope.

# Clean-up

In [17]:
client.close()