# Functional programming-style data analysis

This notebook demonstrates a "realistic" set of operations and pipeline you might write using functions from `utilz` combined with the `toolz` package. It mimics the declarative style of functional-programming more recently popularized in `R` libraries like `dplyr`. 

In [1]:
import pandas as pd
from utilz import randdf, assign, rename, save
from toolz import pipe

## Verb-based dataframe manipulation

In [2]:
# Setup some random data
df = pipe(
    randdf((20, 3)),
    assign(D=list("abcde") * 4),
    rename({"A": "rt", "B": "score", "C": "speed", "D": "group"}),
    assign(rt_doubled="rt*2"),
    save("test"),
)
df.head()

Unnamed: 0,rt,score,speed,group,rt_doubled
0,0.033274,0.497035,0.96303,a,0.066549
1,0.320243,0.449792,0.646103,b,0.640485
2,0.640274,0.432569,0.87154,c,1.280547
3,0.115659,0.852093,0.311823,d,0.231318
4,0.200713,0.460636,0.047897,e,0.401426


### Basic slicing/subsetting
`rows` and `cols` behave similarly and can index a Dataframe using queries, slices, or indices.

In [3]:
from utilz import rows, cols

pipe(df, 
    rows("group == 'c' or group == 'b'"), 
    cols("rt", "speed")
    )


Unnamed: 0,speed,rt
0,0.646103,0.320243
1,0.87154,0.640274
2,0.327057,0.945022
3,0.106485,0.992307
4,0.642573,0.212407
5,0.035261,0.43766
6,0.08736,0.928069
7,0.541594,0.226964


In [4]:
pipe(df, 
    rows([1,9,14]), 
    cols((3,5))
    )

Unnamed: 0,group,rt_doubled
1,b,0.640485
9,e,1.584978
14,e,0.629167


In [5]:
pipe(df, 
    rows((1,11,2))
    )

Unnamed: 0,rt,score,speed,group,rt_doubled
1,0.320243,0.449792,0.646103,b,0.640485
3,0.115659,0.852093,0.311823,d,0.231318
5,0.094301,0.374459,0.38662,a,0.188603
7,0.992307,0.605558,0.106485,c,1.984614
9,0.792489,0.249192,0.152526,e,1.584978


### Summarizing data
To perform an operation that results in a *scalar* output per column, use `summarize`. It always return a *smaller* dataframe or series than the original

Non-grouped inputs produce *Series* results:

In [8]:
from utilz import summarize

pipe(df, 
    rows("group == 'c' or group == 'b'"), 
    summarize(rt='mean',speed='mean')
    )

rt       0.587868
speed    0.407247
dtype: float64

Grouped inputs product *DataFrame* results:

In [9]:
from utilz import groupby

pipe(df, 
    groupby('group'), 
    summarize(score = 'mean', rt = 'std')
    )

Unnamed: 0_level_0,score,rt
group,Unnamed: 1_level_1,Unnamed: 2_level_1
a,0.272993,0.437812
b,0.387616,0.38951
c,0.46471,0.325779
d,0.343631,0.274298
e,0.551664,0.278721


### Transforming data
To perfom an operation that results in a *non-scalar* output per column, or to ensure that an operation with *scalar* output retains the original shape of the input, use `assign`. It always returns back a DataFrame the *same* size as the original via broadcasting.

With non-grouped inputs, the mean score is broadcasted across the *entire* DataFrame

In [10]:
pipe(df, assign(score_centered="score.mean()"))


Unnamed: 0,rt,score,speed,group,rt_doubled,score_centered
0,0.033274,0.497035,0.96303,a,0.066549,0.404123
1,0.320243,0.449792,0.646103,b,0.640485,0.404123
2,0.640274,0.432569,0.87154,c,1.280547,0.404123
3,0.115659,0.852093,0.311823,d,0.231318,0.404123
4,0.200713,0.460636,0.047897,e,0.401426,0.404123
5,0.094301,0.374459,0.38662,a,0.188603,0.404123
6,0.945022,0.622679,0.327057,b,1.890043,0.404123
7,0.992307,0.605558,0.106485,c,1.984614,0.404123
8,0.457019,0.192776,0.995795,d,0.914038,0.404123
9,0.792489,0.249192,0.152526,e,1.584978,0.404123


With grouped inputs, the mean score is broadcasted *within* each group:

In [11]:
pipe(df, 
    groupby('group'), 
    assign(speed_per_group='speed.mean()')
    )

Unnamed: 0,rt,score,speed,group,rt_doubled,speed_per_group
0,0.033274,0.497035,0.96303,a,0.066549,0.592677
1,0.320243,0.449792,0.646103,b,0.640485,0.425773
2,0.640274,0.432569,0.87154,c,1.280547,0.38872
3,0.115659,0.852093,0.311823,d,0.231318,0.622129
4,0.200713,0.460636,0.047897,e,0.401426,0.27776
5,0.094301,0.374459,0.38662,a,0.188603,0.592677
6,0.945022,0.622679,0.327057,b,1.890043,0.425773
7,0.992307,0.605558,0.106485,c,1.984614,0.38872
8,0.457019,0.192776,0.995795,d,0.914038,0.622129
9,0.792489,0.249192,0.152526,e,1.584978,0.27776


Here no broadcasting occurs, but the result is reshaped while respecting groups like `transform` in pandas.

In [12]:
pipe(df, 
    groupby('group'), 
    assign(
        score_centered='score - score.mean()', 
        score_norm = 'score/score.std()'
        )
        
    )

Unnamed: 0,rt,score,speed,group,rt_doubled,score_centered,score_norm
0,0.033274,0.497035,0.96303,a,0.066549,0.224041,2.484073
1,0.320243,0.449792,0.646103,b,0.640485,0.062176,2.077163
2,0.640274,0.432569,0.87154,c,1.280547,-0.032141,1.247083
3,0.115659,0.852093,0.311823,d,0.231318,0.508462,2.363717
4,0.200713,0.460636,0.047897,e,0.401426,-0.091028,1.669169
5,0.094301,0.374459,0.38662,a,0.188603,0.101466,1.871467
6,0.945022,0.622679,0.327057,b,1.890043,0.235063,2.875564
7,0.992307,0.605558,0.106485,c,1.984614,0.140848,1.745804
8,0.457019,0.192776,0.995795,d,0.914038,-0.150855,0.534764
9,0.792489,0.249192,0.152526,e,1.584978,-0.302472,0.902976


## Efficient analyses using memoization, currying, and caching



It's convenient to combine these operations using `pipe`, but what if some operations take longer than others? It would be annoying to have to rerun expensive i/o or computate operations while you're debugging or adding another step to your pipeline. This section demonstrates three ways that can dramatically improve efficiency.

### Memoization

**Memoize** a function to save its last input in memory (RAM) and recall it when called with the same arguments rather than re-executing a potentially long running function. Memoized outputs *do not* persist across kernel restarts, and aren't great for functions that have very large or non-pickleable inputs and outputs. A nice use case is simply loading a file. Given the filepath, you're loading the same file each time, so lets just memoize the result, especially if it takes a long time to reread the file:

In [16]:
from toolz import memoize
from time import sleep

@memoize
def load(path):
    "Simulate slow loading a file..."
    print("loading from disk")
    sleep(5)
    return pd.read_csv(path)

### Currying
**Curry**, also called *partial function application*, is helpful when writing custom functions you want to add to a `pipe`. When curried, a function operates normally when it receives all its required arguments, but turns into a *partial* function when it gets fewer than all its required arguments. This partial function behaves just like the original except with a subset of its arguments "fixed". Because `pipe` implicitly passes the output of the last function to the next function in the pipeline, it's not possible to manipulate secondary arguments to a function in the pipeline without currying:

In [None]:
from toolz import curry

@curry
def calc_mean(df, normalize=False):
    pass

# Now this works, otherwise it would complain about the wrong number of arguments
# to calc_mean
pipe(df,
    calc_mean(normalize=True)
)

# Note: with no kwargs you sometimes have to write the args backwards
@curry
def calc_mean(norm_value, df):
    pass


### Caching outputs to disk
**Cache** so the result of a function is stored to disk in a file made unique by hash of the args and kwargs to the function. Use `utilz.disk_cache` to decorate a function so it caches, which works similar to `toolz.memoize` but stores the result to a file (rather than in memory) and loads the file when called with the same inputs. Essentially you're trading computation time for i/o time. This is usually preferable to `memoize` for larger memory hungry inputs and outputs, and necessary if input or outputs cannot be pickled (e.g. dataframes, arrays, deep objects, etc). Setting the threshold to something like 1 essentially always caches the result. 

In [19]:
from utilz import disk_cache

# We're also using the curry decorator so norm works in the pipeline below
# It's not needed to use the disk_cache decorator

@curry
@disk_cache(threshold=1)
def norm(df, num='', denom=''):
    "Simulate expensive function that takes args"
    print("computing...")
    sleep(5)
    return pd.DataFrame({"norm": df[num] / df[denom]})

## Putting it together
### Pipeline based data analysis with caching and memoization

First you might want to clear any local cache, i.e. the `.utilz_cache` folder. Not neccessary, but important so you don't accidently load a disk-cached output without realizing. `utilz` will always let you know if it's caching or loading a cached result to help you.


In [21]:
from utilz import clear_cache

# Clear any existing cache
clear_cache()

Because we used the `memoize` decorator when defining `load` above, only the first run of this pipeline actually loads the data and incurs an i/o cost. Likewise because `norm` is decorated with `disk_cache` only the first run of norm (with the same prior pipeline outputs) incurs a compute cost.

In [22]:
summary = pipe(
    "test.csv",
    load,
    groupby("group"),
    assign(
        score_centered="score - score.mean()", 
        score_mean="score.mean()", 
        score_std="score.std()"),
    norm(num='rt',denom='score')
)
summary

loading from disk
computing...
Exceeded compute time. Result cached to .utilz_cache/norm___denom__score--df__9c405c0813bd657babdcd3583202ebea2a7a61819abdb234b93c59dab0d05fcc--num__rt.csv


Unnamed: 0,norm
0,0.876757
4,0.219101
8,1.803562
12,0.080123
16,1.397509
1,0.013055
5,0.29379
9,0.67844
13,1.044078
17,0.495721


Notice how no data loading or expensive computation needs to happen on a second run of this pipeline (no print messages aside from `utilz` telling you it's loading `norm` last cached result):

In [24]:
summary = pipe(
    "test.csv",
    load,
    groupby("group"),
    assign(
        score_centered="score - score.mean()", 
        score_mean="score.mean()", 
        score_std="score.std()"),
    norm(num='rt',denom='score')
)
summary

Returning norm cached result


Unnamed: 0,norm
0,0.876757
1,0.219101
2,1.803562
3,0.080123
4,1.397509
5,0.013055
6,0.29379
7,0.67844
8,1.044078
9,0.495721


## Summary


This setup is nice because it allows for both interactive data analysis as well as reproducible scripts. Simply start writing the pipeline steps, and comment out ones you want to skip or debug. In another notebook cell edit the source code of a function in the pipeline and incrementally add to its body, while rerunning the pipeline to see results as you build up your functions.

For functions that take a while to run, try decorating them with `memoize` or `disk_cache`. Memoize is nice for loading csv/text files (so you don't need to re-read them from disk each re-run of the pipeline). Cacheing is nice for expensive operations or operations on complex datastructures like arrays and dataframes. Plus, utilz saves them in standard robust file types (.csv. or .h5 rather than pickles) so you're also getting incremental backups of your work. No more need to rely on saved "state" in a Juptyer notebook.