# Benchmarks

We create two files, each of which is larger than available RAM, and then join them. We try this using three methods:

* Tab-delimited text file
* numpy ndarray
* Pandas HDF5 table

## Setup

In [1]:
import os, shutil

NUM_ROWS = 120 * 1000 * 1000
TEMP_DIR = os.path.join(os.environ['TMPDIR'], 'flatutils')

if not os.path.exists(TEMP_DIR):
    os.mkdir(TEMP_DIR)

def file_name(index):
    return os.path.join(TEMP_DIR, 'file{0}.txt'.format(index))

def make_file(index):
    fn = file_name(index)
    print("making {0}".format(fn))
    with open(fn, 'w') as f:
        for i in range(NUM_ROWS + 1):
            if i > 0 and i % 40000000 == 0:
                print("At row {0}".format(i))
            arith_inv_i = NUM_ROWS - i
            f.write("{0}\t" \
                    "abcdefghijlmnopqrstuv{0}\t" \
                    "abcdefghijlmnopqrs{0}\t" \
                    "abcdefghijlmn{0}\n".format(arith_inv_i))
    return file_name

In [2]:
%time make_file(0)
%time make_file(1)

os.stat(file_name(0)).st_size

making /var/pg_dumps/temp/flatutils/file0.txt
At row 40000000
At row 80000000
At row 120000000
CPU times: user 2min 40s, sys: 7.98 s, total: 2min 48s
Wall time: 2min 49s
making /var/pg_dumps/temp/flatutils/file1.txt
At row 40000000
At row 80000000
At row 120000000
CPU times: user 2min 40s, sys: 8.2 s, total: 2min 48s
Wall time: 2min 49s


10595555652

The goal for each approach is to join the files together by their third column.

## Tab-delimited text file approach

In [4]:
import sys

sys.path.append('/etc/etl/flatutils/flatutils')

from flatutils import Field, Schema, FlatFile, FIELD_INT, FIELD_STRING

schema = Schema([Field("col0", FIELD_INT, 0), 
                 Field("col1", FIELD_STRING, 1),
                 Field("col2", FIELD_STRING, 2),
                 Field("col3", FIELD_STRING, 3)])

file0 = FlatFile(file_name(0), schema)
file1 = FlatFile(file_name(0), schema)

%time sorted0 = file0.output_sorted(file_name(2), "col2", temp_dir=TEMP_DIR)

CPU times: user 17min 42s, sys: 57.3 s, total: 18min 39s
Wall time: 22min 30s


In [5]:
%time sorted1 = file1.output_sorted(file_name(3), "col2", temp_dir=TEMP_DIR)

CPU times: user 17min 42s, sys: 56.9 s, total: 18min 39s
Wall time: 22min 31s


In [6]:
def join():
    rowiter0 = sorted0.iterate_rows()
    rowiter1 = sorted1.iterate_rows()
    row0 = next(rowiter0, None)
    row1 = next(rowiter1, None)
    total_count, join_count = 0, 0
    with open(file_name(4), 'w') as outf:
        while row0 is not None and row1 is not None:
            val0 = row0['col2']
            val1 = row1['col2']
            total_count += 1
            if val0 == val1:
                join_count += 1
                outf.write("{0}\t{1}\t{2}\t{3}".format(
                    row0['col0'], row0['col1'], 
                    row0['col2'], row1['col3']))
                row0 = next(rowiter0, None)
                row1 = next(rowiter1, None)
            elif val0 < val1:
                row0 = next(rowiter0, None)
            else:
                row1 = next(rowiter1, None)
            if total_count % 20000000 == 0:
                print("Processed {0} total rows".format(total_count))
            if join_count % 20000000 == 0:
                print("Processed {0} joined rows".format(join_count))

%time join()

Processed 20000000 total rows
Processed 20000000 joined rows
Processed 40000000 total rows
Processed 40000000 joined rows
Processed 60000000 total rows
Processed 60000000 joined rows
Processed 80000000 total rows
Processed 80000000 joined rows
Processed 100000000 total rows
Processed 100000000 joined rows
Processed 120000000 total rows
Processed 120000000 joined rows
CPU times: user 20min 13s, sys: 20.1 s, total: 20min 33s
Wall time: 20min 44s


## Pandas approach
In the Pandas approach, we're going to emulate the two files as being divided between different orgs, each with 6 million rows. Then we'll "join" each org individually using Pandas dataframes.

In [2]:
import numpy as np
import pandas as pd
import gc

columns = ['col0', 'col1', 'col2', 'col3']
kwargs = dict(
    header=None,
    names=columns,
    index_col=2
)
chunkiter0 = pd.read_table(file_name(0), **kwargs)
chunkiter1 = pd.read_table(file_name(1), **kwargs)
%time df0 = next(chunkiter0, None)
%time df1 = next(chunkiter1, None)
%time result_df = df0.join(df1, how='inner', lsuffix="_left", rsuffix="_right")
%time result_df.to_csv(os.path.join(TEMP_DIR, "joined.csv"))

CPU times: user 14.2 s, sys: 1.4 s, total: 15.6 s
Wall time: 15.6 s
CPU times: user 14.3 s, sys: 1.08 s, total: 15.4 s
Wall time: 15.4 s
CPU times: user 8 s, sys: 260 ms, total: 8.26 s
Wall time: 8.26 s
CPU times: user 31.1 s, sys: 764 ms, total: 31.9 s
Wall time: 32 s
