# Using the output of our luigi pipeline

In [1]:
import os
import dask.dataframe as dd
import datetime
import pandas as pd
import numpy as np

# The joined data transaction data

In [2]:
ENRICHED_TRXN_DATA = './DATA/PROCESSED/enriched_trxns/enriched_trxns-*.csv'

In [3]:
!ls -lsrt $ENRICHED_TRXN_DATA | head -5

40 -rw-r--r--  1 evanwelch  staff  19986 Mar 14 23:37 ./DATA/PROCESSED/enriched_trxns/enriched_trxns-32.csv
48 -rw-r--r--  1 evanwelch  staff  21867 Mar 14 23:37 ./DATA/PROCESSED/enriched_trxns/enriched_trxns-07.csv
40 -rw-r--r--  1 evanwelch  staff  19240 Mar 14 23:37 ./DATA/PROCESSED/enriched_trxns/enriched_trxns-00.csv
48 -rw-r--r--  1 evanwelch  staff  20600 Mar 14 23:37 ./DATA/PROCESSED/enriched_trxns/enriched_trxns-15.csv
48 -rw-r--r--  1 evanwelch  staff  22145 Mar 14 23:37 ./DATA/PROCESSED/enriched_trxns/enriched_trxns-03.csv


In [4]:
! cat ./DATA/PROCESSED/enriched_trxns/enriched_trxns-32.csv | head -1

_id|amount|cust_first_nm|cust_last_nm|cust_state|cust_zip|customer_id|medium|merchant_id|mrch_catgs|mrch_lat|mrch_lon|mrch_name|mrch_state|mrch_zip|payer_id|purchase_date|rewards|status|type


In [5]:
! cat ./DATA/PROCESSED/enriched_trxns/enriched_trxns-32.csv | head -2 | tail -1

5aa9edfff0cec56abfa3f178|314.73842349112346|Michelle|Sawyer|IA|08636|5aa9eacef0cec56abfa3deac|balance|593848ceceb8abe242510b33|['travel']|0.0|0.0|priceline|CA|90503|5aa9ebabf0cec56abfa3e3c5|2017-12-01|0|cancelled|merchant


# Read it in for exploration...

In [6]:
# using dask dataframes, very similar to pandas
ddf = dd.read_csv(ENRICHED_TRXN_DATA, sep='|', dtype={'mrch_zip':str, 'zip': str})

In [None]:
ddf.head()

# Ok, so say we want to make some features.... 
* total, average, and count ...
* of credit card transactions...
* per cusomter
* for certain categories

# one way do it...

In [None]:
ddf[ddf.purchase_date == '2017-12-01'].dropna().groupby('customer_id').amount.sum().compute().head()

# A longer, but more robust way ....

## filter certain rows (in or out)

In [None]:
def no_rewards_purchases(trxn):
    return trxn['medium'] != 'reward'

def only_this_catg(trxn, catg):
    return any([catg.lower() in cat.lower() for cat in trxn['mrch_catgs'].split()])

def only_this_date(trxn, year=None, month=None, day=None):
    result = True
    try:
        trxn_dt = pd.to_datetime(trxn['purchase_date'])
    except:
        trxn_dt = pd.NaT
    if year:
        result &= trxn_dt.year == year
    if month:
        result &= trxn_dt.month == month
    if day:
        result &= trxn_dt.day == day
    
    return result



## some new columns

In [None]:
def first_digit_zip_code(zip_str):
    if len(zip_str) >= 5:
        return int(zip_str[0])
    else:
        return np.nan

In [None]:
def is_amazon(trxn):
    return 'amazon' in trxn['mrch_name'].lower()

## some aggregations

In [None]:
def _trxn_cnt(df):
    return df.shape[0]

def _trxn_total(df):
    return df['amount'].sum()

def _trxn_mean(df):
    return df['amount'].mean()

agg_operations = {
    'cnt': _trxn_cnt,
    'tot': _trxn_total,
    'avg': _trxn_mean}


In [None]:
# where to save features

FEATURE_PATH = 'DATA/FEATURES/'

if not os.path.exists(FEATURE_PATH):
    os.makedirs(FEATURE_PATH)

In [None]:
def write_feature(srs, feat_path):
    feature_folder = os.path.join(FEATURE_PATH, feat_path)
    if not os.path.exists(feature_folder):
        os.makedirs(feature_folder)
    
    # write the feature to disk
    files = srs.to_csv(os.path.join(FEATURE_PATH, feat_path, 'part-*.csv'), sep='|', index_label='customer_id')
    return files

In [None]:
catgories_of_interest = ['grocery', 'food', 'bar']
dates = pd.date_range('2017-12-01', '2017-12-10')

for catg in catgories_of_interest:
    for dt in dates:

        # apply filters
        keep_rows = (
                ddf.apply(no_rewards_purchases, axis=1) & 
                ddf.apply(lambda row: only_this_catg(row, catg=catg), axis=1) &
                ddf.apply(lambda row: only_this_date(row, year=dt.year, month=dt.month, day=dt.day), axis=1))
    
        # prepare for aggregate functions
        g = ddf.loc[keep_rows, :].groupby('customer_id')
        
        feat_path_template = '{dt}_{catg}_{op}'
        
        for agg_name, agg_func in agg_operations.items():
            feat_path = feat_path_template.format(
                dt=dt.strftime('%Y%m%d'),
                catg=catg,
                op=agg_name)
        
            # write the feature to disk
            write_feature(g.apply(agg_func), feat_path)
        

In [None]:
! ls DATA/FEATURE

In [7]:
import exploration

In [None]:
exploration.generate_features(ddf)

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  write_feature(g.apply(agg_func), feat_path)


Wrote feature: 20171201/grocery_cnt
Wrote feature: 20171201/grocery_tot
Wrote feature: 20171201/grocery_avg
Wrote feature: 20171202/grocery_cnt
Wrote feature: 20171202/grocery_tot
Wrote feature: 20171202/grocery_avg
Wrote feature: 20171203/grocery_cnt
Wrote feature: 20171203/grocery_tot
Wrote feature: 20171203/grocery_avg
Wrote feature: 20171204/grocery_cnt
Wrote feature: 20171204/grocery_tot
Wrote feature: 20171204/grocery_avg
Wrote feature: 20171205/grocery_cnt
Wrote feature: 20171205/grocery_tot
Wrote feature: 20171205/grocery_avg
Wrote feature: 20171206/grocery_cnt
Wrote feature: 20171206/grocery_tot
Wrote feature: 20171206/grocery_avg
Wrote feature: 20171207/grocery_cnt
Wrote feature: 20171207/grocery_tot
Wrote feature: 20171207/grocery_avg
Wrote feature: 20171208/grocery_cnt
Wrote feature: 20171208/grocery_tot
Wrote feature: 20171208/grocery_avg
Wrote feature: 20171209/grocery_cnt
Wrote feature: 20171209/grocery_tot
Wrote feature: 20171209/grocery_avg
Wrote feature: 20171210/groc