# Pandas Best Practices - parallelised df.pipe() with caching (WIP)

[![Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dylanhogg/jupyter-experiments/blob/master/notebooks/best-practices/pandas-pipe-parallel-with-caching.ipynb)  

# References
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.pipe.html  
https://github.com/Delgan/loguru  
https://github.com/nalepae/pandarallel  
https://github.com/joblib/joblib  

# Dependencies

In [None]:
%pip install loguru -q  # Logging services; https://github.com/Delgan/loguru

In [None]:
%pip install pandarallel -q  # Pandas parallel_apply(); https://github.com/nalepae/pandarallel

In [None]:
%pip install joblib -q  # Caching to disk; https://github.com/joblib/joblib

## Imports

In [None]:
import requests
import psutil
import numpy as np
import pandas as pd
from datetime import datetime
from functools import wraps
from pathlib import Path
from tqdm.notebook import tqdm
from IPython.display import display, HTML

In [None]:
from loguru import logger
from pandarallel import pandarallel
from joblib import Memory

In [None]:
pd.options.display.max_columns = None
pd.options.display.max_rows = 100
memory = Memory(".joblib_cache", verbose=0)
nb_workers = psutil.cpu_count() * 2
pandarallel.initialize(nb_workers=nb_workers, progress_bar=True)  # Add use_memory_fs=False to avoid /dev/shm space issue in Docker

## Download data

In [None]:
url = "https://s3.amazonaws.com/publicdata.infocruncher.com/awesomepython.org/goodreads_archive_v1_gzip.parquet"

In [None]:
resp = requests.get(url, stream=True)
length = int(resp.headers.get('content-length', 0))
fname = url[url.rindex("/")+1:]
if Path(fname).exists():
    logger.info(f"Loading {fname} from disk")
    pass
else:
    with open(fname, "wb") as f:
        with tqdm(total=length, unit='iB', unit_scale=True, desc=f"Downloading {fname}") as pbar:
            for data in resp.iter_content(chunk_size=1048576):
                pbar.update(len(data))
                f.write(data)
        assert pbar.n == length, "Unexpected length"

In [None]:
df_raw = pd.read_parquet(fname)

In [None]:
len(df_raw)

In [None]:
df_raw[0:2]

## Decorator helpers

In [None]:
def log_pipeline_step(func):
    @wraps(func)
    def wrapper(*args, **kwargs) -> pd.DataFrame:
        input_shape = args[0].shape
        logger.info(f"{func.__name__}")
        tic = datetime.now()
        df_result = func(*args, **kwargs)
        output_shape = df_result.shape
        logger.info(f" ╰╴{func.__name__} took {datetime.now() - tic}s in: {input_shape} out: {output_shape} diff: ({output_shape[0] - input_shape[0]}, {output_shape[1] - input_shape[1]})")
        return df_result
    return wrapper

def log_columns(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        df_result = func(*args, **kwargs)
        logger.info(f"{func.__name__} cols ({len(df_result.columns)}): [{', '.join(list(df_result.columns))}]")
        return df_result
    return wrapper

## Generic pipe functions

In [None]:
@log_pipeline_step
@log_columns
def start_pipeline(dataf):
    return dataf.copy()

@log_columns
def end_pipeline(dataf):
    return dataf

@log_pipeline_step
def filter_rows(dataf: pd.DataFrame, column, min_value) -> pd.DataFrame:
    return (dataf[dataf[column] >= min_value])

@log_pipeline_step
def sort_values(dataf: pd.DataFrame, col_names, ascending=False) -> pd.DataFrame:
    return dataf.sort_values(by=col_names, ascending=ascending)

@log_pipeline_step
def move_col(dataf, col_name, index=0):
    cols = dataf.columns.tolist()
    cols.insert(index, cols.pop(cols.index(col_name)))
    return dataf.loc[:, cols]

@log_pipeline_step
def calc_sum(dataf: pd.DataFrame, index_name= "total") -> pd.DataFrame:
    def _numeric_sum(col):
        return col.sum() if np.issubdtype(col.dtype, np.number) else None
    dataf.loc[index_name] = dataf.apply(_numeric_sum, axis=0) # over columns
    return dataf

## Custom pipe functions

In [None]:
@logger.catch  # Catch error information from threads
@log_pipeline_step
def expand_features(dataf: pd.DataFrame) -> pd.DataFrame:
    @memory.cache  # Somewhat contrieved caching example here. Useful for expensive operations like API calls.
    def _apply_cached(description, total_reviews, counts_of_review):
        return {
            "_len_desc": len(description) if description else 0, 
            "_perc_reviews": counts_of_review * 100 / total_reviews, 
        }
        
    def _apply(row):
        return _apply_cached(row["Description"], dataf["CountsOfReview"].sum(), row["CountsOfReview"])
    
    res = dataf.parallel_apply(_apply, axis=1, result_type='expand')  # .parallel_apply() via pandarallel
    dataf[res.columns] = res
    return dataf

## Pipeline example

In [None]:
df = (df_raw
      .pipe(start_pipeline)
      .pipe(filter_rows, column="Rating", min_value=4)
      .pipe(expand_features)
      .pipe(calc_sum)
      .pipe(end_pipeline)
     )

In [None]:
df