**Functional programming in Python**

For Data people

2021-07-22, EdLambda #3

by Elias Mistler | Senior Data Engineer | [Previse](https://previ.se/)

In [None]:
import pandas as pd
import numpy as np
from toolz import *

# Contents

* Why FP in Python?
* An example: Cleaning email addresses
* Tipps, tricks and the `toolz` library
* Think functional with `pandas`

# Why FP in Python/Jupyter?
- your notebooks are messy and you get lost in them
- when you make a mistake you need to reload your data
- your colleagues don't know how to use your code

## Why not?
- other Python users may struggle to read your code - **make sure they can!**
- ~explorability~ -- *not really a concern in Jupyter*

## FP thinking
* **Write pure, idempotent functions**: Avoid side effects. Instead ensure your functions behave exactly the same way when called twice with the same inputs.
* **Treat data as immutable**: Avoid setting values of existing data structures "in place". Instead create new "versions" of data when you need to modify.

# Example: Cleaning email addresses

In [None]:
email_addresses = {'   yOu@YoU.com  ', 'me@ME.co.uk', 'not an email'}

## Why `for`-loops are bad
Easy to write, tedious to read:

In [None]:
emails_out = []
for email in email_addresses:
    email = email.strip()
    email = email.lower()
    if '@' in email:
        emails_out.append(email)
    
del email  # not necessary, but maybe a good idea
emails_out

In Python, loops do not have their own scope, i.e. variables will exist outside of the loop, and keep the value from the last iteration:

In [None]:
for x in range(100):
    if x == 65:
        break

x

Be aware that this also applies to functions within a loop:

In [None]:
functions = [lambda: x for x in range(5)]
[fn() for fn in functions]

Another reason to avoid loops is the inefficiency of building arrays one element at a time:

In [None]:
%%timeit
out = []
for x in range(10000000):
    out.append(x)

In [None]:
%%timeit
out = [x for x in range(10000000)]

In [None]:
%%timeit
out = list(range(10000000))

## Why FP for the sake of it is bad

In [None]:
def isin(elem, seq) -> bool:
    return elem in seq

sorted(filter(partial(isin, '@'), map(str.lower, map(str.strip, email_addresses))))

Concise? Yes. But at what cost?

If your code is hard to read, it's not clever. It's clever if everyone can read it. Avoid deep nesting of functions

## A nice middle ground: the comprehension syntax

In [None]:
sorted(email.strip().lower()
       for email in email_addresses
       if '@' in email)

Concise, clean, easy to read - this is ideal and very pythonic.

A list comprehension has its own scope, so you don't need to worry about the variables used in it. However, still be careful about lambda functions. The following two snippets do different things:

In [None]:
functions = [lambda y: x * y for x in range(5)]
[fn(3) for fn in functions]

In [None]:
functions = [partial(x.__mul__) for x in range(5)]
[fn(3) for fn in functions]

## Alternative: FP Pipe

In [None]:
thread_last(email_addresses,
            (map, str.strip),
            (map, str.lower),
            (filter, partial(isin, '@')),
            sorted)

The pipe is very clear, as you can read it top-down, left-to-right (as opposed to the inside-out of nested functions). It's less pythonic, but can be very useful when building logic that would not lend itself well to the comprehension syntax.

# Tipps, tricks and the `toolz` library

## Pure functions
Nothing is keeping you from writing pure functions - i.e. no side effects like changing inputs or global variables.

Bad example:

In [None]:
def enrich(d: dict):
    d['new_key'] = ...

Use a pure function instead:

In [None]:
def enrich(d: dict) -> dict:
    return {'new_key': ..., **d}

The `toolz` library can give you additional support to do this:

In [None]:
def enrich(d: dict) -> dict:
    return assoc(d, 'new_key', ...)

Also see `dissoc`, `merge` and `merge_with`

## Pipes

In [None]:
d = {'a': 1, 'b': 2, 'c': 3}

thread_first(d,
             (assoc, 'd', 4),
             (dissoc, 'b'),
             (merge, {'x': 15, 'y': 16}))

It's immutable!

In [None]:
d

*Note*: `thread_first` puts the piped data into the first position of the function, `thread_last` into the last:
* `thread_first(d, (fn, a, b))` is equivalent to `fn(d, a, b)`
* `thread_last(d, (fn, a, b))` is equivalent to `fn(a, b, d)`
* For any other ways of passing arguments, use `partial` or `curry`. A lot of `toolz` functions come "pre-curried", e.g. `curried.map`

Alternatively, you can use `compose` to combine functions before applying them (example from the `toolz` readme):

In [None]:
def stem(word):
    """ Stem word to primitive form """
    return word.lower().rstrip(",.!:;'-\"").lstrip("'\"")

wordcount = compose(frequencies, curried.map(stem), str.split)

sentence = "This cat jumped over this other cat!"
wordcount(sentence)

## Dunder-methods are your friends
Many classes in Python have so-called dunder-methods, which can be very useful in a functional context.

Due to `self` usually being the first argument in these methods, you can use them either on the class or the object:

In [None]:
'abcdef'.__contains__('a')

In [None]:
str.__contains__('abcdef', 'a')

In [None]:
int.__divmod__(49, 7)

In [None]:
thread_last(range(50),
            (filter, (6).__lt__),  # less than
            (filter, (9).__ne__),  # not equal
            (filter, (12).__ge__),  # greater than
            set)

## Hide your mess
If a library requires you to do inplace changes, or you use existing code that does so, you can usually encapsulate this in a function which to the outside behaves as a pure function -- typically using a `copy` operation first:

In [None]:
def my_fn(d: dict):
    d = d.copy()
    d['key1'] = ...
    d['key2'] = ...
    return d

d = {'a': 1, 'b': 2}
my_fn(d)

In [None]:
d

## A few code examples

In [None]:
def parse_table_to_model(table, dataset_name):
    return thread_first(table,
                        parse_table,
                        partial(set_defaults, dataset_name),
                        partial(force_type, dataset_name),
                        curried.map(partial(to_model, dataset_name)),
                        list)

In [None]:
def deduplicate(items, keys):
    get_key = juxt(map(attr, keys))
    return thread_last(items,
                       (groupby, get_key),
                       (valmap, last),
                       dict.values,
                       list)

In [None]:
def apply_mapping_rules(data):
    return thread_first(data,
                        pre_clean,
                        validate_inputs,
                        apply_exclusions,
                        calculate_due_dates,
                        prepare_transaction_updates_from_clearing_lines,
                        generate_payments,
                        generate_document_deltas,
                        generate_clearing_deltas,
                        generate_supplier_deltas,
                        finalize)

# Think functional with `pandas`

In [None]:
df = pd.DataFrame(np.random.random((5,3)), columns=list('abc'))
df

Instead of something like:
```python
df['sum'] = df.sum(axis=1)
```
use `df.assign`:

In [None]:
df.assign(sum=df.sum(axis=1))

Like many methods in Pandas, `assign` treats the DataFrame as immutable, i.e. it returns a new DataFrame with the new column, but does not change the original:

In [None]:
df

Similar to pipes, you can chain methods on a DataFrame. Note how you can re-run this cell any number of times, as the input is not changed, and the output remains the same for the same input.

In [None]:
def my_func(row: pd.Series):
    return f"Hello {row['a'] ** 2:.2f}"

df_transformed = (df
    .assign(sum=lambda df: df.sum(axis=1))
    .assign(a_percent=lambda df: df['a'] / df['sum'],
            my_func_applied=partial(pd.DataFrame.apply, func=my_func, axis=1))
    .drop(index=[1,3])
    .sort_values('b')
    .reset_index())

df_transformed

# A few code examples

In [None]:
def get_aggregates(df):
    aggregates = (df[lambda df: df['flow_type'] == 'instant-pay']
                  .sort_values('run_id')
                  .drop_duplicates(subset=['invoice_id'], keep='last')
                  .assign(count=1)
                  .assign(wa_accelerated_days=lambda df: df['accelerated_days'] * df['invoice_amount'])
                  .agg({'invoice_amount': 'sum', 'rebate_amount': 'sum', 'fee_amount': 'sum', 
                        'payable_amount': 'sum', 'deducted_amount': 'sum', 'wa_accelerated_days': 'sum', 'count': 'sum'})
                 ).to_dict()

    return assoc(aggregates, 'wa_accelerated_days', 
                 round(aggregates['wa_accelerated_days'] / aggregates['invoice_amount'], 2))

In [None]:
def get_updated_transactions(transactions, change_records):
    changes = (change_records
               .sort_values('POLL_ID')
               .drop_duplicates(keep='last', subset=['FIELD', 'REF_ID'])
               .pivot_table(index='REF_ID', columns='FIELD', values='NEW_VALUE', aggfunc=lambda x: (x.iloc[0] if len(x) == 1 else None)))

    def apply_changes(col: str):
        def apply_(df):
            return df[col].fillna(df[col.upper()])

        return apply_

    return (transactions
            .merge(changes, how='left', left_on='REF_ID', right_index=True)
            .assign(**{col.upper(): apply_changes(col) for col in changes.columns})
            .drop(columns=changes.columns))

# Thank you
Any questions?