In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [16, 6]
plt.style.use('ggplot')

In [2]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from fastprogress import progress_bar

def parallel(func, job_list, n_jobs=12):
    with ProcessPoolExecutor(max_workers=n_jobs) as pool:
        futures = [pool.submit(func, job) for job in job_list]
        for f in progress_bar(as_completed(futures), total=len(job_list)):
            pass
    return [f.result() for f in futures]

In [3]:
import gc
from warnings import filterwarnings
filterwarnings("ignore")

In [4]:
import datetime as dt
def to_dt(x):
    return dt.datetime.fromtimestamp(int(x/1000))

In [5]:
from multiprocessing import Pool
num_partitions = 100
num_cores = 12
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [None]:
atr = pd.read_pickle('../attributes2.pkl')

In [None]:
atr.head()

In [None]:
atr.drop(['week_start'], inplace=True, axis=1)

In [None]:
events.dtypes

In [None]:
events.sort_values(['system_time', 'user_id_hash'], inplace=True)

In [None]:
{'ab': 97, 'message': 98, 'campaing': 99}

In [None]:
events.shape

In [12]:
events.event = events.event.cat.add_categories([97, 98, 99])

In [13]:
events.loc[events.event_type == 'ab', 'event'] = 97
events.loc[events.event_type == 'message', 'event'] = 98
events.loc[events.event_type == 'campaing', 'event'] = 99

In [14]:
events.event = events.event.cat.remove_unused_categories()

In [15]:
events.event.value_counts()

45    75493931
1      5363926
5      4887922
6      4068474
14     3491117
4      3435363
40     2775997
7      1970903
41     1969223
97     1855428
3      1828494
42     1598212
44      495590
98      463225
0       407118
63      347198
47      310038
57      271268
8       265034
55      194341
50       88641
9        85832
11       68328
64       67966
54       51687
43       23850
49       17955
48       11912
56        8267
52        6688
59        6651
60        6370
58        4167
10        2362
51        1446
61        1062
62         464
99          83
0/          35
1/          28
32           1
Name: event, dtype: int64

In [16]:
drop_events = ['10', '51', '61', '62', '99', '0/', '1/', '32']

In [17]:
for e in drop_events:
    even = events[events.event != e]

In [18]:
events.drop(['event_type'], inplace=True, axis=1)

In [34]:
events.to_pickle('../events2.pkl')

In [6]:
events = pd.read_pickle('../events2.pkl')

In [7]:
events.drop(['session_id', 'system_date', 'event_timestamp'], inplace=True, axis=1)

In [8]:
events.dtypes

event                 category
event_value            float64
user_id_hash          category
system_time     datetime64[ns]
anchor_date     datetime64[ns]
dtype: object

In [9]:
gc.collect()

7

In [10]:
def wrapper_get_anchor_date(i, df):
#     df['anchor_date'] = (df.system_time - \
#                             df.system_time.dt.weekday\
#                               .apply(lambda x: x + i)\
#                               .astype('timedelta64[D]')).dt.date

    df['anchor_date'] = (df.system_time - df.system_time.dt.weekday\
                                   .apply(lambda x: dt.timedelta(days=(x+i) % 7)))\
                                   .dt.date
    df.anchor_date = df.anchor_date.astype('datetime64[ns]')

    return df

In [11]:
from functools import partial

In [12]:
def get_mode(x):
    try:
        return x.value_counts().index[0]
    except:
        return None

def custom_aggregate(df):

    grp_cols = ['anchor_date', 'event', 'user_id_hash']
    grpby_obj = df.groupby(grp_cols)
    
    temp = grpby_obj.event_value.count().to_frame().reset_index()\
                .rename(columns={'event_value': 'event_count'})
    temp1 = grpby_obj.event_value.sum().to_frame().reset_index()\
                .rename(columns={'event_value': 'event_sum'})
    temp2 = grpby_obj.event_value.mean().to_frame().reset_index()\
                .rename(columns={'event_value': 'event_mean'})
    temp3 = grpby_obj.event_value.agg(lambda x: get_mode(x))\
                                 .to_frame().reset_index()\
                                 .rename(columns={'event_value': 'event_mode'})

    for i in range(1, 4):
        temp = temp.merge(locals()[f'temp{i}'], 
                          on=grp_cols)
    return temp

In [13]:
def wrapper_agg(wst):
    events_ss = events[events.anchor_date == wst].copy()
    return custom_aggregate(events_ss)

In [14]:
time_df = events.system_time.dt.date.unique()

In [15]:
time_df = pd.DataFrame({'system_time': time_df})

In [16]:
time_df.system_time = time_df.system_time.astype('datetime64[ns]')

In [17]:
events.reset_index(inplace=True, drop=True)

In [19]:
results = []
for i in range(7):
    temp = dict(zip(time_df.system_time.dt.date.values,
                    (time_df.system_time - time_df.system_time.dt.weekday\
                     .apply(lambda x: dt.timedelta(days=(x+i) % 7)))\
                    .dt.date.values))
    events['anchor_date'] = events.system_time.dt.date.apply(lambda x: temp[x])
    events.anchor_date = events.anchor_date.astype('datetime64[ns]')
    print(f'{i}')
    result = parallel(wrapper_agg, events.anchor_date.unique())
    result = pd.concat(result)
    results += [result]
    gc.collect()

0


1


2


3


4


5


6


In [20]:
result = pd.concat(results)

In [21]:
result.shape

(56566389, 7)

In [22]:
result.to_pickle('../events_roll_agg.pkl')