# Pandas Pipe

_________________________
## *Pipe*
_________________________

In [3]:
import pandas as pd
df = pd.read_csv('https://calmcode.io/datasets/bigmac.csv')

df2 = (df
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .sort_values(['currency_code', 'date'])
    .groupby('currency_code')
    .agg(n=('date', 'count')))

df.loc[lambda d: d['currency_code'].isin(df2[df2['n'] >= 32].index)]

Unnamed: 0,date,currency_code,name,local_price,dollar_ex,dollar_price
0,2000-04-01,ARS,Argentina,2.50,1.0000,2.500000
1,2000-04-01,AUD,Australia,2.59,1.6800,1.541667
2,2000-04-01,BRL,Brazil,2.95,1.7900,1.648045
3,2000-04-01,CAD,Canada,2.85,1.4700,1.938776
4,2000-04-01,CHF,Switzerland,5.90,1.7000,3.470588
...,...,...,...,...,...,...
1321,2020-01-14,SEK,Sweden,51.50,9.4591,5.444493
1322,2020-01-14,THB,Thailand,115.00,30.2775,3.798200
1324,2020-01-14,TWD,Taiwan,72.00,29.8845,2.409276
1327,2020-01-14,USD,United States,5.67,1.0000,5.670000


- big mac dataset may be able to show economic activity in different countries based on big mac prices
- we've assigned a date format to the date column, and sorted by currency code and date
- if we only want to see rows where the count of date is greater than 32, column `n` from the above, making a new dataframe and then returning only those rows is doable, but ugly and can be done more easily

Functionally, our code should look like
- read dataframe
- set datatypes
- remove outliers
    - this is like grammar (read dataframe is the noun, datatypes and outliers are verbs)
    
We can also look at this like a pipeline, df -> set dtypes -> remove outliers ->

- the code can be split into interpretable bits with *piping*

        df.pipe(set_dtypes).pipe(remove_outliers)

Let's take the above code, and adapt it to this piping

In [7]:
def set_dtypes(dataf: pd.DataFrame) -> pd.DataFrame:
    return (dataf
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .sort_values(['currency_code', 'date'])
    )

def remove_outliers(dataf: pd.DataFrame) -> pd.DataFrame:
    min_row_country=32
    countries = (dataf
    .groupby('currency_code')
    .agg(n=('name', 'count'))
    .loc[lambda d: d['n'] >= min_row_country]
    .index
    )
    return (dataf
    .loc[lambda d: d['currency_code'].isin(countries)])

df.pipe(set_dtypes).pipe(remove_outliers).shape

(864, 6)

- now we aren't creating an intermediate dataframe, but instead calling the dataframe countries
- separate functions will be very useful as the notebook grows 

_________________________
## *Arguments*
_________________________

- what if we want to change the min_row_country? 
- adding it to the remove_outliers function can be modified in the pipe.

In [8]:
def set_dtypes(dataf: pd.DataFrame) -> pd.DataFrame:
    return (dataf
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .sort_values(['currency_code', 'date'])
    )

def remove_outliers(dataf: pd.DataFrame, min_row_country:int =32) -> pd.DataFrame:
    countries = (dataf
    .groupby('currency_code')
    .agg(n=('name', 'count'))
    .loc[lambda d: d['n'] >= min_row_country]
    .index
    )
    return (dataf
    .loc[lambda d: d['currency_code'].isin(countries)])

df.pipe(set_dtypes).pipe(remove_outliers, min_row_country=20).shape

(1248, 6)

by introducing parameters, we can declare features that we want to be able to change on a macro level instead of a micro level as the pipeline grows


_________________________
## *Start*
_________________________

we can either `set_dtypes(df)` or `df.pipe(set_dtypes)`; both will return the same results, but pipe is better since we can have multiple pipes

        df.pipe(set_dtypes).pipe(remove_outliers)

vs.

        remove_outliers(set_dtypes(df))

In [10]:
def set_dtypes(dataf: pd.DataFrame) -> pd.DataFrame:
    return (dataf
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .sort_values(['currency_code', 'date'])
    )
df.pipe(set_dtypes).dtypes
df.dtypes

date              object
currency_code     object
name              object
local_price      float64
dollar_ex        float64
dollar_price     float64
dtype: object

- it's dangerous to have pipeline steps that change data types, the date column in the original dataframe is an object, but after piping it's a datetime.
- this could be tough when debugging
- it's best to apply the pipe to a copy of the dataframe


In [14]:
def start_pipeline(dataf):
    return dataf.copy()

df.pipe(start_pipeline).pipe(set_dtypes).dtypes, df.dtypes

(dtype('<M8[ns]'), dtype('O'))

- now any pipeline that follows the `start_pipeline` will not affect the original dataframe

_________________________
## *Logs*
_________________________

In [15]:
# applying pipes
(df
.pipe(start_pipeline)
.pipe(set_dtypes)
.pipe(remove_outliers, min_row_country=32)
)

Unnamed: 0,date,currency_code,name,local_price,dollar_ex,dollar_price
0,2000-04-01,ARS,Argentina,2.50,1.00000,2.500000
28,2001-04-01,ARS,Argentina,2.50,1.00000,2.500000
56,2002-04-01,ARS,Argentina,2.50,3.13000,0.798722
88,2003-04-01,ARS,Argentina,4.10,2.88000,1.423611
119,2004-05-01,ARS,Argentina,4.36,2.95000,1.477966
...,...,...,...,...,...,...
1105,2018-01-01,ZAR,South Africa,30.00,12.25815,2.447351
1161,2018-07-01,ZAR,South Africa,31.00,13.36190,2.320029
1217,2019-01-01,ZAR,South Africa,31.00,13.86750,2.235443
1273,2019-07-09,ZAR,South Africa,31.00,14.17500,2.186949


- this pipeline will grow as we add steps
- adding logging will help us as the pipelines happen
- since the pipeline setps are all functions, we can apply a decorator to each of them



In [22]:
# create our decorator function
from functools import wraps
import datetime as dt

def log_step(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        tic = dt.datetime.now()
        result = func(*args, **kwargs)
        time_taken = str(dt.datetime.now() - tic)
        print(f'just ran step {func.__name__}, shape={result.shape} which took {time_taken}s')
        return result
    return wrapper

In [23]:
@log_step
def start_pipeline(dataf):
    return dataf.copy()

@log_step
def set_dtypes(dataf: pd.DataFrame) -> pd.DataFrame:
    return (dataf
    .assign(date=lambda d: pd.to_datetime(d['date']))
    .sort_values(['currency_code', 'date'])
    )
@log_step
def remove_outliers(dataf: pd.DataFrame, min_row_country:int =32) -> pd.DataFrame:
    countries = (dataf
    .groupby('currency_code')
    .agg(n=('name', 'count'))
    .loc[lambda d: d['n'] >= min_row_country]
    .index
    )
    return (dataf
    .loc[lambda d: d['currency_code'].isin(countries)])

# applying pipes, now including logging
(df
.pipe(start_pipeline)
.pipe(set_dtypes)
.pipe(remove_outliers, min_row_country=32)
)

just ran step start_pipeline, shape=(1330, 6) which took 0:00:00.000207s
just ran step set_dtypes, shape=(1330, 6) which took 0:00:00.006433s
just ran step remove_outliers, shape=(864, 6) which took 0:00:00.017203s


Unnamed: 0,date,currency_code,name,local_price,dollar_ex,dollar_price
0,2000-04-01,ARS,Argentina,2.50,1.00000,2.500000
28,2001-04-01,ARS,Argentina,2.50,1.00000,2.500000
56,2002-04-01,ARS,Argentina,2.50,3.13000,0.798722
88,2003-04-01,ARS,Argentina,4.10,2.88000,1.423611
119,2004-05-01,ARS,Argentina,4.36,2.95000,1.477966
...,...,...,...,...,...,...
1105,2018-01-01,ZAR,South Africa,30.00,12.25815,2.447351
1161,2018-07-01,ZAR,South Africa,31.00,13.36190,2.320029
1217,2019-01-01,ZAR,South Africa,31.00,13.86750,2.235443
1273,2019-07-09,ZAR,South Africa,31.00,14.17500,2.186949


In [None]:
_________________________
## *Logs*
_________________________