# Data Wrangling Using Pandas: Group Data

In [1]:
import dask.dataframe as dd 
import dask.array as da
import pandas as pd
import numpy as np

# 1. Introduction

What does it mean to Group Data? 

* Split: Segment data based on criteria

* Apply: Aggregate - Transform - Filter

Dask Dataframe:
http://dask.pydata.org/en/latest/dataframe.html

**Common Uses and Anti-Uses**
http://dask.pydata.org/en/latest/dataframe.html

In [2]:
# Load data sets <Shift-Tab> see inside function parenthesis to see more arguments.
df_bal = dd.read_csv('/Users/stewarta/Documents/DATA/Home Data/bureau_balance.csv')
#pd_bal = dp.read_csv('/Users/stewarta/Documents/DATA/Home Data/bureau_balance.csv')
type(df_bal)

dask.dataframe.core.DataFrame

In [3]:
df_bal = df_bal.set_index('SK_ID_BUREAU').compute()

In [4]:
df_bal['WEIGHT'] = pd.Series(np.random.rand(len(df_bal)))
df_bal = dd.from_pandas(df_bal, npartitions=6)
df_bal.tail()

Unnamed: 0_level_0,MONTHS_BALANCE,STATUS,WEIGHT
SK_ID_BUREAU,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
6842888,-17,C,0.720094
6842888,-18,C,0.720094
6842888,-19,C,0.720094
6842888,-21,C,0.720094
6842888,-33,C,0.720094


In [5]:
#df_bal[['MONTHS_BALANCE','WEIGHT']]
type(df_bal[['MONTHS_BALANCE','WEIGHT']])

dask.dataframe.core.DataFrame

# 2. Split a Dataframe into Groups

Split: Segment data based on criteria

Pandas objects can be split on any of their axes. Splits are created using the using the groupby() function.  

We form groups by passeing one or more columns and and axis to the groupby function. Default axis = 0



In [6]:
group_bal = df_bal.groupby('SK_ID_BUREAU')
type(group_bal)

dask.dataframe.groupby.DataFrameGroupBy

## Inspecting Groups
A single group can be selected using get_group()

In [11]:
group_bal.get_group(5715448) # returns a dataframe!

Unnamed: 0_level_0,MONTHS_BALANCE,STATUS
SK_ID_BUREAU,Unnamed: 1_level_1,Unnamed: 2_level_1
5715448,0,C
5715448,-1,C
5715448,-26,X
5715448,-25,X
5715448,-24,X
5715448,-23,X
5715448,-22,X
5715448,-21,X
5715448,-19,0
5715448,-18,0


## Iterating through Groups¶

Try going back to dask and changing the way I iterate over groups...
import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({'A':[1, 2, 3, 4, 5], 'B':['1','1','a','a','a']})
ddf = dd.from_pandas(pdf, npartitions = 3)
groups = ddf.groupby('B')

for group in pdf['B'].unique():
    print groups.get_group(group)

Unnamed: 0_level_0,MONTHS_BALANCE,STATUS
SK_ID_BUREAU,Unnamed: 1_level_1,Unnamed: 2_level_1
5001709,0,C
5001709,-71,C
5001709,-70,C
5001709,-69,C
5001709,-68,C
5001709,-67,C
5001709,-66,C
5001709,-65,C
5001709,-64,C
5001709,-63,C


In [32]:
# iterating is slow over dask ...
count = 0
for name, group in group_bal:
    print('Group Name: ', name)
    print(group)
    count += 1
    if count == 1:
        break

Group Name:  5001709
              MONTHS_BALANCE STATUS
SK_ID_BUREAU                       
5001709                    0      C
5001709                   -1      C
5001709                   -2      C
5001709                   -3      C
5001709                   -4      C
5001709                   -5      C
5001709                   -6      C
5001709                   -7      C
5001709                   -8      C
5001709                   -9      C
5001709                  -10      C
5001709                  -11      C
5001709                  -12      C
5001709                  -13      C
5001709                  -14      C
5001709                  -15      C
5001709                  -16      C
5001709                  -17      C
5001709                  -18      C
5001709                  -19      C
5001709                  -20      C
5001709                  -21      C
5001709                  -22      C
5001709                  -23      C
5001709                  -24      C
5001709

# 3. Simple Operations on Groups

## Single Operations on Groups

Compute a statistics for each group: first, last, nth, mean, sum, std, var, min, max, size, count, describe,sem

NOTE: Not every method is supported in dask : describe, nth, sem

### Variance for each Group

Dask:

21.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)



In [13]:
%%timeit -n 1 -r 1 -t x = range(10)
# working to become stable: https://github.com/dask/dask/issues/3954
group_bal['MONTHS_BALANCE'].first()

191 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### Sum for a selected group and column...

In [15]:
%%timeit -n 1 -r 1 -t x = range(10)
#res = group_bal.get_group(5001709)['MONTHS_BALANCE'].sum().compute() # ERROR: KeyError: 'S'
group_bal['MONTHS_BALANCE'].sum()

394 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


# Aggregate

## Multiple Operations on Groups

The aggregation API allows one to express possibly multiple aggregation operations in a single concise way.  Use: aggregate() or agg()

* pass multiple aggregation arguments as a list

* You can also pass **named methods** as strings. These will return a Series of the aggregated output. Example: df.agg(['sum', 'mean'])

* a NumPy Mathematical Functions: https://docs.scipy.org/doc/numpy-1.13.0/reference/routines.math.html 



* NOTE: Using a single function is equivalent to apply().

### Apply Multiple Operations to the Same Column

In [7]:
type(group_bal['MONTHS_BALANCE'])

dask.dataframe.groupby.SeriesGroupBy

In [8]:
%%timeit -n 1 -r 1 -t x = range(10)
# this takes a long time ... is the object I am working with is a dask dataframe ????
group_bal['MONTHS_BALANCE'].agg(['sum', 'mean', 'std']).compute()

Dask: 3min 51s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

3min 51s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### Apply Multiple Operations to Different Columns

In [10]:
mymean = lambda x: np.size(x)

In [13]:
%%timeit -n 1 -r 1 -t x = range(10)
group_bal.agg({'MONTHS_BALANCE' : np.mean, 'STATUS' : mymean}).compute()
#group_bal.groupby(columns).aggregate(['sum', 'mean', 'max', 'min'])

gb = df.groupby(['customer', 'url', 'ts'])
gb.apply(lambda d: pd.DataFrame({'views': len(d), 
     'visitiors': d.session_id.count(), 
     'referrers': [d.referer.tolist()]})).reset_index()


ValueError: unknown aggregate lambda

## Transformation Operations

The transform() method returns an object that is indexed the same (same size) as the original. This API allows you to provide multiple operations at the same time rather than one-by-one. Its API is quite similar to the .agg API
Some examples:

* Standardize data (zscore) within a group.
* Filling NAs within groups with a value derived from each group.

WOMP: zscore 
DASK: : 8min 40s
panda : 8min 48s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

==============================================================
DASK: computing zscore for each group  seems like a job that could run in parallel

how to do set this up in DASK ?


Computing one group at a time is likely to be slow. Instead I recommend using groupby-apply

df.groupby([...]).apply(func)
Like Pandas, the user-defined function func should expect a Pandas dataframe that has all rows corresponding to that group, and should return either a Pandas dataframe, a Pandas Series, or scalar.

=== Example of giving Dask a pd dataframe ====
import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({'A':[1, 2, 3, 4, 5], 'B':['1','1','a','a','a']})
ddf = dd.from_pandas(pdf, npartitions = 3)
groups = ddf.groupby('B')

for group in pdf['B'].unique():
    print groups.get_group(group)
    
===========
Getting one group at a time can be cheap if your data is indexed by the grouping column

df = df.set_index('date')
part = df.loc['2018-05-01'].compute()

In [67]:
zscore = lambda x: (x - x.mean()) / x.std()

AttributeError: Cannot access callable attribute 'set_index' of 'DataFrameGroupBy' objects, try using the 'apply' method

In [56]:
%%timeit -n 1 -r 1 -t x = range(10)
group_bal['MONTHS_BALANCE'].transform(zscore).compute()

8min 40s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## 3. Filter:

Discard some groups, according to a group-wise computation that evaluates True or False. 

Some examples:

* Discard data that belongs to groups with only a few members.
* Filter out data based on the group sum or mean.   

DASK : 

apply(): 2min 46s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

sum(): 21.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Pandas:

### This takes a long time ...

In [59]:
%%timeit -n 1 -r 1 -t x = range(10)
group_bal['MONTHS_BALANCE'].apply(lambda x: x.sum() > 0 , meta=pd.Series(dtype='bool', name='MONTHS_BALANCE')).compute()

2min 45s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


### # Same result, but reformulated based on documentation

In [59]:
%%timeit -n 1 -r 1 -t x = range(10)
group_bal['MONTHS_BALANCE'].sum().apply(lambda x : x > 0, meta=pd.Series(dtype='bool', name='MONTHS_BALANCE')).compute()

2min 45s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## How do I use the result to ?

In [None]:
# some ops in dask optimized # what is the same time for the op in panda