<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc" style="margin-top: 1em;"><ul class="toc-item"><li><span><a href="#System-Information" data-toc-modified-id="System-Information-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>System Information</a></span></li><li><span><a href="#Libraries" data-toc-modified-id="Libraries-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Libraries</a></span></li><li><span><a href="#Sequential-Solution" data-toc-modified-id="Sequential-Solution-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Sequential Solution</a></span></li><li><span><a href="#Parallel-Solution" data-toc-modified-id="Parallel-Solution-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Parallel Solution</a></span></li><li><span><a href="#Timings" data-toc-modified-id="Timings-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Timings</a></span></li><li><span><a href="#Function-Profiling" data-toc-modified-id="Function-Profiling-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Function Profiling</a></span></li><li><span><a href="#Line-Profiler" data-toc-modified-id="Line-Profiler-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Line Profiler</a></span></li></ul></div>

# System Information

In [1]:
import sys

In [2]:
sys.version

'3.7.3 (default, Apr 24 2019, 15:29:51) [MSC v.1915 64 bit (AMD64)]'

# Libraries

In [3]:
import pandas as pd
from collections import defaultdict

import string
import numpy as np

In [4]:
import dask.bag as db
from dask.distributed import Client

In [5]:
import line_profiler

In [6]:
%load_ext line_profiler

# Sequential Solution

In [7]:
col1 = ["A", "B", "A", "A", "C"]
col2 = ["a", "a", "a", "c", "z"]
col3 = [1, 1, 5, 4, 1]

df = pd.DataFrame({
    "col1": col1,
    "col2": col2,
    "col3": col3
})

df

Unnamed: 0,col1,col2,col3
0,A,a,1
1,B,a,1
2,A,a,5
3,A,c,4
4,C,z,1


In [8]:
total = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in df.iterrows():
    total[col1][col2] += col3

In [9]:
for key, subkeys in total.items():
    for subkey, value in subkeys.items():
        print(f"{key}, {subkey}: {value}")

A, a: 6
A, c: 4
B, a: 1
C, z: 1


# Parallel Solution

Create a cluster:

In [10]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:52646  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.05 GB


Convert dataframe to list of tuples and then a dask bag:

In [11]:
x = list(df.itertuples())
x = db.from_sequence(x)
x

dask.bag<from_se..., npartitions=5>

Define a function to create a dict to track the counts:

In [12]:
def f(row):
    d = {}
    col1, col2, col3 = row.col1, row.col2, row.col3
    
    d[(col1, col2)] = col3
    
    return d

Define a function to combine a list of dicts:

In [13]:
def g(dict1, dict2):
    for k, v in dict2.items():
        dict1[k] += v
        
    return dict1

Do the computation:

In [14]:
results = x \
    .map(f) \
    .fold(g, initial=defaultdict(int)) \
    .compute()

Print the results:

In [15]:
for keys in sorted(results.keys()):
    key1, key2 = keys
    print(f"{key1}, {key2}: {results[keys]}")

A, a: 6
A, c: 4
B, a: 1
C, z: 1


# Timings

Create some data:

In [16]:
n = 3000000

In [17]:
big_df = pd.DataFrame({
    "col1": np.random.choice(list(string.ascii_uppercase), n, replace=True),
    "col2": np.random.choice(list(string.ascii_lowercase), n, replace=True),
    "col3": np.random.randint(1, n, n)
})

Dask solution:

In [18]:
%%time

x = list(big_df.itertuples())
x = db.from_sequence(x)

results1 = x \
    .map(f) \
    .fold(g, initial=defaultdict(int)) \
    .compute()

Wall time: 2min 36s


Pandas solution with iterrows:

In [19]:
%%time

results2 = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in big_df.iterrows():
    results2[col1][col2] += col3

Wall time: 7min 47s


Pandas solution with itertuples:

In [20]:
%%time

results3 = defaultdict(lambda : defaultdict(int))

for (_, col1, col2, col3) in big_df.itertuples():
    results3[col1][col2] += col3

Wall time: 7.23 s


Pandas solution with custom data structure:

In [21]:
%%time

my_rows = zip(big_df["col1"], big_df["col2"], big_df["col3"])

results4 = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in enumerate(my_rows):
    results4[col1][col2] += col3

Wall time: 3.08 s


Check all approches give identical results:

In [22]:
for (key1, key2), value in results1.items():
    assert results1[(key1, key2)] == results2[key1][key2]

assert results2 == results3
assert results3 == results4

# Function Profiling

Profile the 3 approaches with a smaller dataset:

In [23]:
n = 1000

big_df = pd.DataFrame({
    "col1": np.random.choice(list(string.ascii_uppercase), n, replace=True),
    "col2": np.random.choice(list(string.ascii_lowercase), n, replace=True),
    "col3": np.random.randint(1, n, n)
})

Profile the iterrows approach:

In [24]:
%%prun -s tottime -l 10

results2 = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in big_df.iterrows():
    results2[col1][col2] += col3

 

Profile the itertuples approach:

In [25]:
%%prun -s tottime -l 10

results3 = defaultdict(lambda : defaultdict(int))

for (_, col1, col2, col3) in big_df.itertuples():
    results3[col1][col2] += col3

 

Profile the custom approach:

In [26]:
%%prun -s tottime -l 10

my_rows = zip(big_df["col1"], big_df["col2"], big_df["col3"])

results4 = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in enumerate(my_rows):
    results4[col1][col2] += col3

 

In [None]:
%%prun -s ncalls -l 10

my_rows = zip(big_df["col1"], big_df["col2"], big_df["col3"])

results4 = defaultdict(lambda : defaultdict(int))

for i, (col1, col2, col3) in enumerate(my_rows):
    results4[col1][col2] += col3

# Line Profiler

Functions for the itertuples and custom approach:

In [27]:
def itertuples_approach():
    results3 = defaultdict(lambda : defaultdict(int))
    
    for (_, col1, col2, col3) in big_df.itertuples():
        results3[col1][col2] += col3

def custom_approach():
    my_rows = zip(big_df["col1"], big_df["col2"], big_df["col3"])
    results4 = defaultdict(lambda : defaultdict(int))

    for i, (col1, col2, col3) in enumerate(my_rows):
        results4[col1][col2] += col3

In [28]:
%lprun -f itertuples_approach itertuples_approach()

In [29]:
%lprun -f custom_approach custom_approach()