In [5]:
%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import altair as alt
import warnings
from IPython.core.interactiveshell import InteractiveShell
import numpy as np
import dask.dataframe as dd
import os
import itertools
import matplotlib.ticker as ticker
alt.renderers.enable('default')
InteractiveShell.ast_node_interactivity = "all"
sns.set_theme(style="darkgrid")
warnings.filterwarnings('ignore')
sns.set_palette(sns.color_palette("Set3"))

In [6]:
uid = 'msno'

In [21]:
# This can be changed to the directory where the datasets are stored
data_dir = './data'

# Training data for january, contains two columns : user id and binary churn target variable
train = pd.read_csv(os.path.join(data_dir, 'train.csv'), dtype={'is_churn': str})

train.head()

Unnamed: 0,msno,is_churn
0,waLDQMmcOu2jLDaV1ddDkgCrB/jl6sD66Xzs0Vqax1Y=,1
1,QA7uiXy8vIbUSPOkCf9RwQ3FsT8jVq2OxDr8zqa7bRQ=,1
2,fGwBva6hikQmTJzrbz/2Ezjm5Cth5jZUNvXigKK2AFA=,1
3,mT5V8rEpa+8wuqi6x0DoVd3H5icMKkE9Prt49UlmK+4=,1
4,XaPhtGLk/5UvvOYHcONTwsnH97P4eGECeq+BARGItRw=,1


In [22]:
import vaex

In [23]:
df = vaex.open('./data/user_logs_split_*.hdf5')

In [24]:
names = ['msno','date','num_25','num_50','num_75','num_985','num_100','num_unq','total_secs']

for i, new_name in enumerate(names):
    df.rename(str(i), new_name)

'msno'

'date'

'num_25'

'num_50'

'num_75'

'num_985'

'num_100'

'num_unq'

'total_secs'

In [25]:
df['year'] = df['date'] // 10000
df['month'] = (df['date'] // 100) % 100
df['day'] = df['date'] % 100

df['year'] = df['year'].astype(str)
df['month'] = df['month'].astype(str).str.zfill(2)
df['day'] = df['day'].astype(str).str.zfill(2)

df['date_formatted'] = df['year'] + '-' + df['month'] + '-' + df['day']
df['date_formatted'] = df['date_formatted'].astype('datetime64')

df.drop(['year', 'month', 'day'], inplace=True)

#,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs,date_formatted
0,'rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=',20150513,0,0,0,0,1,1,280.335,Timestamp('2015-05-13 00:00:00')
1,'rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=',20150709,9,1,0,0,7,11,1658.9479999999999,Timestamp('2015-07-09 00:00:00')
2,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',20150105,3,3,0,0,68,36,17364.956000000002,Timestamp('2015-01-05 00:00:00')
3,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',20150306,1,0,1,1,97,27,24667.317000000003,Timestamp('2015-03-06 00:00:00')
4,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',20150501,3,0,0,0,38,38,9649.029,Timestamp('2015-05-01 00:00:00')
...,...,...,...,...,...,...,...,...,...,...
391865090,'ccyBVC3HXqPi7D8GoxT0u3jntamqtPubA9hKhRQUsu0=',20150925,0,1,0,0,1,2,342.207,Timestamp('2015-09-25 00:00:00')
391865091,'ccyBVC3HXqPi7D8GoxT0u3jntamqtPubA9hKhRQUsu0=',20160216,12,9,2,0,13,30,4220.209,Timestamp('2016-02-16 00:00:00')
391865092,'ccyBVC3HXqPi7D8GoxT0u3jntamqtPubA9hKhRQUsu0=',20160515,15,9,10,5,15,47,7300.25,Timestamp('2016-05-15 00:00:00')
391865093,'ccyBVC3HXqPi7D8GoxT0u3jntamqtPubA9hKhRQUsu0=',20160725,8,0,0,0,10,13,2498.097,Timestamp('2016-07-25 00:00:00')


In [26]:
df.shape

(391865095, 10)

In [10]:
sample = df[:10000000]

In [12]:
temp = sample.groupby(['msno', vaex.BinnerTime(sample['date_formatted'], resolution='D', every=60)]).agg(
    *[{'total_seconds_6_months': vaex.agg.sum(sample.total_secs),
    'avg_seconds_6_months': vaex.agg.mean(sample.total_secs)}])

temp

#,msno,date_formatted,total_seconds_6_months,avg_seconds_6_months
0,'rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=',2015-04-30,280.335,280.335
1,'rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=',2015-06-29,1658.9479999999999,1658.9479999999999
2,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',2014-12-31,17364.956000000002,17364.956000000002
3,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',2015-03-01,24667.317000000003,24667.317000000003
4,'yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=',2015-04-30,9649.029,9649.029
...,...,...,...,...
959,'GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=',2016-02-24,982.0,982.0
960,'GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=',2016-06-23,1993.35,1993.35
961,'GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=',2016-08-22,2236.254,2236.254
962,'GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=',2016-12-20,5719.019,5719.019


In [15]:
def compute_aggregations_history(df, date_filter=20160201, resolution="D", resolution_value=30, list_aggs=None):
    """
    Filter df by only taking history inferior to <date_filter> then generate aggregations each :
                                                                            <resolution_value> * <resolution>
    :param df: vaex dataframe
    :param date_filter: filter date, only take instances having a date inferior to this
    :param resolution: datetime frequency of aggregations
    :param resolution_value: number of units of <resolution> to take
    :param list_aggs: dictionary mapping the name of the output column to the aggregation to perform
    :return: pandas df containing aggregations performed
    """
    if list_aggs is None:
        list_aggs = []
    df_filtered = df[df.date <= date_filter]
    return df_filtered.groupby(['msno', vaex.BinnerTime(df_filtered['date_formatted'],
                                               resolution=resolution,
                                               every=resolution_value)]).agg(*list_aggs).to_pandas_df()


In [16]:
res = compute_aggregations_history(sample, 20160603, "M", 6, [{'total_seconds_6_months': vaex.agg.sum(sample.total_secs),
           'avg_seconds_6_months': vaex.agg.mean(sample.total_secs)}])

In [17]:
res

Unnamed: 0,msno,date_formatted,total_seconds_6_months,avg_seconds_6_months
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2014-12-01,280.335,280.335000
1,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-06-01,1658.948,1658.948000
2,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,2014-12-01,51681.302,17227.100667
3,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,2015-06-01,12079.097,4026.365667
4,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,2015-12-01,38025.935,12675.311667
...,...,...,...,...
311,sTJMA5qPerG7atP3EH4Cqu2OZprKCCnW1oE6fOFOJDs=,2015-12-01,24419.594,8139.864667
312,GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=,2014-12-01,5271.971,1757.323667
313,GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=,2015-06-01,2613.544,1306.772000
314,GsxqOgdbfqplwVMVVMu/0spt8ysfkSjexZqzyTb5T/E=,2015-12-01,4907.802,1635.934000


In [None]:
res = df.groupby(['msno', vaex.BinnerTime(df['date_formatted'],
                                               resolution="M",
                                               every=1)]).agg({'nbr_logins_monthly' : vaex.agg.count(df.date)}).to_pandas_df()

In [29]:
res.head()

Unnamed: 0,msno,date_formatted,nbr_logins_monthly
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-03-01,6
1,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-04-01,29
2,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-05-01,27
3,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-06-01,26
4,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-07-01,28


In [28]:
res.shape

(26758112, 3)

In [None]:
res.to_csv('nbr_logins_monthly.csv', index=False)

In [31]:
res.head()

Unnamed: 0,msno,date_formatted,nbr_logins_monthly
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-03-01,6
1,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-04-01,29
2,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-05-01,27
3,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-06-01,26
4,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,2015-07-01,28


In [33]:
res.msno.nunique() // 10 ** 6

5

In [29]:
import pandas as pd
from datetime import datetime

In [30]:
uid = "msno"

In [31]:
df = pd.read_csv('nbr_logins_monthly.csv')

df['date_formatted'] = pd.to_datetime(df['date_formatted'], format='%Y-%m-%d')

In [32]:
df.shape

(26758112, 3)

In [33]:
date = 201603

In [34]:
def months_last_login(unstacked_df, date):
    """
    Generates a dataframe that contains the number of months since last login, relative to a given date.
    """
    temp = unstacked_df[[x for x in unstacked_df.columns if x < pd.to_datetime(date, format='%Y%m')]]
    temp = temp.isnull()
    res = temp.apply(
        lambda x: x.where(x).last_valid_index(), axis=1).reset_index(name="date_last_login")
    res.date_last_login.fillna(datetime(2015, 1, 1), inplace=True)
    res['months_since_login'] = (pd.to_datetime(date, format='%Y%m') - res['date_last_login']).dt.days / 30
    res['date_pred'] = date
    return res[[uid, 'date_pred', "months_since_login"]]

In [41]:
months_last_login_dates = lambda unstacked_df, dates: [months_last_login(unstacked_df, date) for date in dates]

In [35]:
unstacked_df = df.groupby(
    [uid, "date_formatted"])[
    'nbr_logins_monthly'].mean().unstack('date_formatted')

In [8]:
from dask.distributed import Client
client = Client(n_workers=6)

from dask import delayed, compute

dates = [201603, 201608]

%%time
delayed_results = [delayed(months_last_login)(unstacked_df.head(1000), date) for date in dates]
results = compute(*delayed_results, scheduler="processes")

In [36]:
def stats_logins_monthly(df, date):
    df_filtered = df[df.date_formatted < datetime(date // 100, date % 100, 1)]
    
    stats_logins = df.groupby(uid)['nbr_logins_monthly'].agg(['mean', 'max', 'min', 'std', 'count'])
    stats_logins.columns = ['_'.join(['nbr_logins_monthly', x]) for x in stats_logins.columns]

    stats_logins = stats_logins.reset_index()
    stats_logins.fillna(0, inplace=True)
    stats_logins['date_pred'] = date
    return stats_logins

In [37]:
stats_logins_monthly(df, 201601)

Unnamed: 0,msno,nbr_logins_monthly_mean,nbr_logins_monthly_max,nbr_logins_monthly_min,nbr_logins_monthly_std,nbr_logins_monthly_count,date_pred
0,+++4vcS9aMH7KWdfh5git6nA5fC5jjisd5H/NcM++WM=,1.000000,1,1,0.000000,1,201601
1,+++EI4HgyhgcJHIPXk/VRP7bt17+2joG39T6oEfJ+tc=,1.000000,1,1,0.000000,1,201601
2,+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=,7.000000,7,7,0.000000,1,201601
3,+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=,23.923077,31,8,5.992816,26,201601
4,+++TipL0Kt3JvgNE9ahuJ8o+drJAnQINtxD4c5GePXI=,1.000000,1,1,0.000000,1,201601
...,...,...,...,...,...,...,...
5234029,zzzeSzWGUrQw+eP47oa1CXqL/im1Uq6/JYAJs8oGjI8=,1.000000,1,1,0.000000,1,201601
5234030,zzzqx+aMPSFYjW71JqJ6T/hita+iVemVWzJTE4yQRx8=,2.000000,2,2,0.000000,1,201601
5234031,zzztPAN9xjMytpZ0RN2gU9mScDULJnHQZK8eZb4uELU=,6.000000,10,2,3.265986,4,201601
5234032,zzztsqkufVj9DPVJDM3FxDkhlbCL5z4aiYxgPSGkIK4=,3.750000,9,1,3.593976,4,201601


In [4]:
from dask.distributed import Client
from dask import delayed, compute



client = Client(n_workers=6)


In [37]:
dates = [201603, 201608]

In [45]:
def distribute_processes(func, **kwargs):
    """
    Splits the process execution across multiple cores, useful for parallel execution of feature engineering
    relative to multiple dates
    """
    delayed_results = delayed(func)(**kwargs)
    results = compute(delayed_results, scheduler="processes")
    final_res = pd.concat(results[0])
    
    return final_res

In [46]:
distribute_processes(months_last_login_dates, unstacked_df=unstacked_df.head(500), dates=dates)

Unnamed: 0,msno,date_pred,months_since_login
0,+++4vcS9aMH7KWdfh5git6nA5fC5jjisd5H/NcM++WM=,201603,0.966667
1,+++EI4HgyhgcJHIPXk/VRP7bt17+2joG39T6oEfJ+tc=,201603,0.966667
2,+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=,201603,0.966667
3,+++IZseRRiQS9aaSkH6cMYU6bGDcxUieAi/tH67sC5s=,201603,14.166667
4,+++TipL0Kt3JvgNE9ahuJ8o+drJAnQINtxD4c5GePXI=,201603,0.966667
...,...,...,...
495,++OrO+qyw4KsyivgnrEzl5ALtN7gj7BNEodVdR5UIss=,201608,1.033333
496,++OtuWs8cKG2710CCnsZTeVKK9Co38gFamjS9nkus+8=,201608,1.033333
497,++OvJH5FmfZ5CRrYfmbQEk7tJwCZhsJnkWbxClRaUpw=,201608,6.066667
498,++OwfFddNO06garROtSdQUfkCwbKF+9pfndYHGRKDas=,201608,1.033333
