# FENL vs Ibis

## Create and load data

In [1]:
import pandas as pd

# This builds a pandas Dataframe with sample data that we will later load into
# Kaskada
input_data = pd.DataFrame.from_records([

    {'event_id': 'ev_00001', 'event_at': '2022-01-01 22:01:00+00:00', 'entity_id': 'user_001', 'event_name': 'login', 'revenue': 0},
    {'event_id': 'ev_00002', 'event_at': '2022-01-01 22:05:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00003', 'event_at': '2022-01-01 22:20:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00004', 'event_at': '2022-01-01 23:10:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00005', 'event_at': '2022-01-01 23:20:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00006', 'event_at': '2022-01-01 23:40:00+00:00', 'entity_id': 'user_001', 'event_name': 'purchase', 'revenue': 12.50},
    {'event_id': 'ev_00007', 'event_at': '2022-01-01 23:45:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00008', 'event_at': '2022-01-01 23:59:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},

    {'event_id': 'ev_00009', 'event_at': '2022-01-02 05:30:00+00:00', 'entity_id': 'user_001', 'event_name': 'login', 'revenue': 0},
    {'event_id': 'ev_00010', 'event_at': '2022-01-02 05:35:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00011', 'event_at': '2022-01-02 05:45:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00012', 'event_at': '2022-01-02 06:10:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00013', 'event_at': '2022-01-02 06:15:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00014', 'event_at': '2022-01-02 06:25:00+00:00', 'entity_id': 'user_001', 'event_name': 'purchase', 'revenue': 25},
    {'event_id': 'ev_00015', 'event_at': '2022-01-02 06:30:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},
    {'event_id': 'ev_00016', 'event_at': '2022-01-02 06:31:00+00:00', 'entity_id': 'user_001', 'event_name': 'purchase', 'revenue': 5.75},
    {'event_id': 'ev_00016', 'event_at': '2022-01-02 07:01:00+00:00', 'entity_id': 'user_001', 'event_name': 'view_item', 'revenue': 0},


    {'event_id': 'ev_00018', 'event_at': '2022-01-01 22:17:00+00:00', 'entity_id': 'user_002', 'event_name': 'view_item', 'revenue': 0},
])

# convert `event_at` column from string to datetime
input_data['event_at'] = pd.to_datetime(input_data['event_at'])
input_data['event_at_epoch'] = input_data['event_at'].apply(lambda x: x.timestamp())

print(input_data)

    event_id                  event_at entity_id event_name  revenue  \
0   ev_00001 2022-01-01 22:01:00+00:00  user_001      login     0.00   
1   ev_00002 2022-01-01 22:05:00+00:00  user_001  view_item     0.00   
2   ev_00003 2022-01-01 22:20:00+00:00  user_001  view_item     0.00   
3   ev_00004 2022-01-01 23:10:00+00:00  user_001  view_item     0.00   
4   ev_00005 2022-01-01 23:20:00+00:00  user_001  view_item     0.00   
5   ev_00006 2022-01-01 23:40:00+00:00  user_001   purchase    12.50   
6   ev_00007 2022-01-01 23:45:00+00:00  user_001  view_item     0.00   
7   ev_00008 2022-01-01 23:59:00+00:00  user_001  view_item     0.00   
8   ev_00009 2022-01-02 05:30:00+00:00  user_001      login     0.00   
9   ev_00010 2022-01-02 05:35:00+00:00  user_001  view_item     0.00   
10  ev_00011 2022-01-02 05:45:00+00:00  user_001  view_item     0.00   
11  ev_00012 2022-01-02 06:10:00+00:00  user_001  view_item     0.00   
12  ev_00013 2022-01-02 06:15:00+00:00  user_001  view_item     

## Load into SQLite

In [2]:
import sqlite3

In [3]:
dbconn = sqlite3.connect('kaskada_fenl_py_sql.db')


input_data.to_sql('code_comparison_events', dbconn,
                  if_exists='replace')
dbconn.close()

In [4]:
query_string = \
    '''
    with hour_added as (
        select
            *
            , datetime(strftime('%Y-%m-%d %H:00:00', event_at))
                as event_hour_start
            , datetime(strftime('%Y-%m-%d %H:00:00', event_at), '+1 hour')
                as event_hour_end
        from
            code_comparison_events
    )

    , epoch_added as (
        select
            *
            , strftime('%s', event_hour_end) as event_hour_end_epoch
        from
            hour_added
    )

    select *
    from
        epoch_added

    '''



dbconn = sqlite3.connect('kaskada_fenl_py_sql.db')

events_augmented = pd.read_sql(query_string, dbconn)

# replace the original table with the augmented one
events_augmented.to_sql('events_augmented', dbconn,
                        if_exists='replace')

dbconn.close()

# Q1: hourly aggregations with full-history features

## SQL

In [5]:
dbconn = sqlite3.connect('kaskada_fenl_py_sql.db')


query_string = \
    '''
    /* features based on hourly aggregations */
    with hourly_agg as (
        select
            entity_id
            , event_hour_start
            , event_hour_end
            , event_hour_end_epoch

            /* features from hourly aggregation */
            , count(*) as event_count_hourly
            , sum(revenue) as revenue_hourly
        from
            events_augmented
        group by
            1, 2, 3
    )

    /* features based on full event history up to that time */
    , history_joined as (
        select
            hourly_agg.entity_id
            , hourly_agg.event_hour_end
            , hourly_agg.event_hour_end_epoch
            , hourly_agg.event_count_hourly
            , hourly_agg.revenue_hourly

            /* features from all of history */
            , count(*) as event_count_total
            , sum(events_augmented.revenue) as revenue_total
            , min(events_augmented.event_at_epoch) as first_event_at_epoch
            , max(events_augmented.event_at_epoch) as last_event_at_epoch
        from
            hourly_agg
        left join events_augmented
            on hourly_agg.entity_id = events_augmented.entity_id
                and hourly_agg.event_hour_end >= events_augmented.event_at
        group by
            1, 2, 3, 4, 5
    )

    select
        entity_id
        , event_hour_end as timestamp

        /* previous features */
        , event_count_hourly
        , revenue_hourly
        , event_count_total
        , revenue_total
        , first_event_at_epoch
        , last_event_at_epoch

        /* add more derivative features */
        , event_hour_end_epoch - first_event_at_epoch as seconds_since_first_event
        , event_hour_end_epoch - last_event_at_epoch as seconds_since_last_event
        , last_event_at_epoch - first_event_at_epoch as seconds_first_to_last
    from
        history_joined

    '''


results_sql = pd.read_sql(query_string, dbconn)
dbconn.close()

results_sql

Unnamed: 0,entity_id,timestamp,event_count_hourly,revenue_hourly,event_count_total,revenue_total,first_event_at_epoch,last_event_at_epoch,seconds_since_first_event,seconds_since_last_event,seconds_first_to_last
0,user_001,2022-01-01 23:00:00,3,0.0,3,0.0,1641074000.0,1641076000.0,3540.0,2400.0,1140.0
1,user_001,2022-01-02 00:00:00,5,12.5,8,12.5,1641074000.0,1641082000.0,7140.0,60.0,7080.0
2,user_001,2022-01-02 06:00:00,3,0.0,11,12.5,1641074000.0,1641102000.0,28740.0,900.0,27840.0
3,user_001,2022-01-02 07:00:00,5,30.75,16,43.25,1641074000.0,1641105000.0,32340.0,1740.0,30600.0
4,user_001,2022-01-02 08:00:00,1,0.0,17,43.25,1641074000.0,1641107000.0,35940.0,3540.0,32400.0
5,user_002,2022-01-01 23:00:00,1,0.0,1,0.0,1641075000.0,1641075000.0,2580.0,2580.0,0.0


## pandas

In [3]:
data_py = input_data.copy()
data_py['hour_end'] = data_py['event_at'].apply(lambda x: x.ceil('H'))
data_py['hour_end_epoch'] = data_py['hour_end'].apply(lambda x: x.timestamp())

In [4]:
result_data = data_py\
    .groupby(by=[
        'entity_id',
        'hour_end',
        'hour_end_epoch',
        ],
             as_index=False)\
    .agg(
        event_count_hourly=('event_id', 'count'),
        revenue_hourly=('revenue', 'sum'),
        event_at_min=('event_at', 'min'),
        event_at_max=('event_at', 'max'),
        event_at_epoch_min=('event_at_epoch','min'),
        event_at_epoch_max=('event_at_epoch','max'),
    )

result_data['event_count_total'] = result_data['event_count_hourly'].expanding().sum()
result_data['revenue_total'] = result_data['revenue_hourly'].expanding().sum()

result_data['first_event_at_epoch'] = result_data['event_at_epoch_min'].expanding().min()
result_data['last_event_at_epoch'] = result_data['event_at_epoch_max'].expanding().max()

result_data['seconds_since_first_event'] = result_data['hour_end_epoch'] - result_data['first_event_at_epoch']
result_data['seconds_since_last_event'] = result_data['hour_end_epoch'] - result_data['last_event_at_epoch']
result_data['seconds_first_to_last'] = result_data['seconds_since_last_event'] - result_data['seconds_since_first_event']

## Ibis

In [6]:
import ibis
import pytz

In [28]:
data_ibis = ibis.pandas.connect({'code_comparison_events': input_data.copy()}).table('code_comparison_events')

In [29]:
data_ibis

In [30]:
import ibis.expr.datatypes as dt
from ibis.backends.pandas.udf import udf
import datetime as dtime

@udf.elementwise([dt.timestamp], dt.timestamp)
def hour_end(col, **kwargs):
    return col.dt.ceil('H')

@udf.elementwise([dt.timestamp], dt.float64)
def hour_end_epoch(col, **kwargs):
    return (col - dtime.datetime(1970,1,1).astimezone(pytz.timezone('UTC'))).dt.total_seconds()

data_ibis = data_ibis.mutate(hour_end(data_ibis.event_at).name('hour_end'))
data_ibis = data_ibis.mutate(hour_end_epoch(data_ibis.hour_end).name('hour_end_epoch'))

data_ibis.execute()

  return column.astype(out_dtype.to_pandas(), errors='ignore')


Unnamed: 0,event_id,event_at,entity_id,event_name,revenue,event_at_epoch,hour_end,hour_end_epoch
0,ev_00001,2022-01-01 22:01:00+00:00,user_001,login,0.0,1641074000.0,2022-01-01 23:00:00,1641078000.0
1,ev_00002,2022-01-01 22:05:00+00:00,user_001,view_item,0.0,1641075000.0,2022-01-01 23:00:00,1641078000.0
2,ev_00003,2022-01-01 22:20:00+00:00,user_001,view_item,0.0,1641076000.0,2022-01-01 23:00:00,1641078000.0
3,ev_00004,2022-01-01 23:10:00+00:00,user_001,view_item,0.0,1641079000.0,2022-01-02 00:00:00,1641082000.0
4,ev_00005,2022-01-01 23:20:00+00:00,user_001,view_item,0.0,1641079000.0,2022-01-02 00:00:00,1641082000.0
5,ev_00006,2022-01-01 23:40:00+00:00,user_001,purchase,12.5,1641080000.0,2022-01-02 00:00:00,1641082000.0
6,ev_00007,2022-01-01 23:45:00+00:00,user_001,view_item,0.0,1641081000.0,2022-01-02 00:00:00,1641082000.0
7,ev_00008,2022-01-01 23:59:00+00:00,user_001,view_item,0.0,1641082000.0,2022-01-02 00:00:00,1641082000.0
8,ev_00009,2022-01-02 05:30:00+00:00,user_001,login,0.0,1641101000.0,2022-01-02 06:00:00,1641103000.0
9,ev_00010,2022-01-02 05:35:00+00:00,user_001,view_item,0.0,1641102000.0,2022-01-02 06:00:00,1641103000.0


In [31]:
result_ibis = data_ibis.group_by([
    'entity_id',
    'hour_end',
    'hour_end_epoch'
]).aggregate([
    data_ibis.event_id.count().name('event_count_hourly')
    , data_ibis.revenue.sum().name('revenue_hourly')
    , data_ibis.event_at.min().name('event_at_min')
    , data_ibis.event_at.max().name('event_at_max')
    , data_ibis.event_at_epoch.min().name('event_at_epoch_min')
    , data_ibis.event_at_epoch.max().name('event_at_epoch_max')
])

result_ibis = result_ibis.mutate([
    result_ibis.event_count_hourly.cumsum().name('event_count_total')
    , result_ibis.revenue_hourly.cumsum().name('revenue_total')
    , result_ibis.event_at_epoch_min.cummin().name('first_event_at_epoch')
    , result_ibis.event_at_epoch_max.cummax().name('last_event_at_epoch')
])

result_ibis = result_ibis.mutate([
    (result_ibis.hour_end_epoch - result_ibis.first_event_at_epoch).name('seconds_since_first_event')
    , (result_ibis.hour_end_epoch - result_ibis.last_event_at_epoch).name('seconds_since_last_event')
])

result_ibis = result_ibis.mutate((result_ibis.seconds_since_last_event - result_ibis.seconds_since_first_event).name('seconds_first_to_last'))
result_ibis.execute()

  return column.astype(out_dtype.to_pandas(), errors='ignore')


Unnamed: 0,entity_id,hour_end,hour_end_epoch,event_count_hourly,revenue_hourly,event_at_min,event_at_max,event_at_epoch_min,event_at_epoch_max,event_count_total,revenue_total,first_event_at_epoch,last_event_at_epoch,seconds_since_first_event,seconds_since_last_event,seconds_first_to_last
0,user_001,2022-01-01 23:00:00,1641078000.0,3,0.0,2022-01-01 22:01:00+00:00,2022-01-01 22:20:00+00:00,1641074000.0,1641076000.0,3,0.0,1641074000.0,1641076000.0,3540.0,2400.0,-1140.0
1,user_001,2022-01-02 00:00:00,1641082000.0,5,12.5,2022-01-01 23:10:00+00:00,2022-01-01 23:59:00+00:00,1641079000.0,1641082000.0,8,12.5,1641074000.0,1641082000.0,7140.0,60.0,-7080.0
2,user_001,2022-01-02 06:00:00,1641103000.0,3,0.0,2022-01-02 05:30:00+00:00,2022-01-02 05:45:00+00:00,1641101000.0,1641102000.0,11,12.5,1641074000.0,1641102000.0,28740.0,900.0,-27840.0
3,user_001,2022-01-02 07:00:00,1641107000.0,5,30.75,2022-01-02 06:10:00+00:00,2022-01-02 06:31:00+00:00,1641104000.0,1641105000.0,16,43.25,1641074000.0,1641105000.0,32340.0,1740.0,-30600.0
4,user_001,2022-01-02 08:00:00,1641110000.0,1,0.0,2022-01-02 07:01:00+00:00,2022-01-02 07:01:00+00:00,1641107000.0,1641107000.0,17,43.25,1641074000.0,1641107000.0,35940.0,3540.0,-32400.0
5,user_002,2022-01-01 23:00:00,1641078000.0,1,0.0,2022-01-01 22:17:00+00:00,2022-01-01 22:17:00+00:00,1641075000.0,1641075000.0,18,43.25,1641074000.0,1641107000.0,3540.0,-28860.0,-32400.0


In [56]:
datetime_list = pd.DataFrame(data={
    'hour_end': pd.date_range(start=data_py['hour_end'].min(),
                              end=data_py['hour_end'].max(),
                              freq='H')
    })

datetime_list = datetime_list[datetime_list['hour_end'] <= data_py['event_at'].max()]

datetime_spine = datetime_list.merge(
         pd.DataFrame(data={
             'entity_id': data_py['entity_id'].unique()
             }),
         how='cross')

datetime_spine

Unnamed: 0,hour_end,entity_id
0,2022-01-01 23:00:00+00:00,user_001
1,2022-01-01 23:00:00+00:00,user_002
2,2022-01-02 00:00:00+00:00,user_001
3,2022-01-02 00:00:00+00:00,user_002
4,2022-01-02 01:00:00+00:00,user_001
5,2022-01-02 01:00:00+00:00,user_002
6,2022-01-02 02:00:00+00:00,user_001
7,2022-01-02 02:00:00+00:00,user_002
8,2022-01-02 03:00:00+00:00,user_001
9,2022-01-02 03:00:00+00:00,user_002


# datetime_spine

In [32]:
datetime_list = pd.DataFrame(data={
    'hour_end': pd.date_range(start=data_py['hour_end'].min(),
                              end=data_py['hour_end'].max(),
                              freq='H')
})

datetime_table = ibis.memtable(datetime_list)

In [33]:
max_event_at = data_ibis.event_at.max().execute()
datetime_table = datetime_table.filter(datetime_table.hour_end <= max_event_at)

In [34]:
entity_list = pd.DataFrame(data={
             'entity_id': data_py['entity_id'].unique()
             })
entity_table = ibis.memtable(entity_list)

In [58]:
datetime_spine_ibis = datetime_table.cross_join(entity_table)
datetime_spine_ibis.execute()

Unnamed: 0,hour_end,entity_id
0,2022-01-01 23:00:00+00:00,user_001
1,2022-01-02 00:00:00+00:00,user_001
2,2022-01-02 01:00:00+00:00,user_001
3,2022-01-02 02:00:00+00:00,user_001
4,2022-01-02 03:00:00+00:00,user_001
5,2022-01-02 04:00:00+00:00,user_001
6,2022-01-02 05:00:00+00:00,user_001
7,2022-01-02 06:00:00+00:00,user_001
8,2022-01-02 07:00:00+00:00,user_001
9,2022-01-01 23:00:00+00:00,user_002


In [64]:
result_ibis = data_ibis.group_by([
    'entity_id',
    'hour_end',
    'hour_end_epoch'
]).aggregate([
    data_ibis.event_id.count().name('event_count_hourly')
    , data_ibis.revenue.sum().name('revenue_hourly')
    , data_ibis.event_at.min().name('event_at_min')
    , data_ibis.event_at.max().name('event_at_max')
    , data_ibis.event_at_epoch.min().name('event_at_epoch_min')
    , data_ibis.event_at_epoch.max().name('event_at_epoch_max')
])

result_ibis = ibis.memtable(result_ibis.execute())

result_ibis = datetime_spine_ibis.left_join(result_ibis, [
    result_ibis.entity_id == datetime_spine_ibis.entity_id,
    result_ibis.hour_end == datetime_spine_ibis.hour_end
])

result_ibis = result_ibis.mutate([
    result_ibis.event_count_hourly.cumsum().name('event_count_total')
    , result_ibis.revenue_hourly.cumsum().name('revenue_total')
    , result_ibis.event_at_epoch_min.cummin().name('first_event_at_epoch')
    , result_ibis.event_at_epoch_max.cummax().name('last_event_at_epoch')
])

result_ibis = result_ibis.mutate([
    (result_ibis.hour_end_epoch - result_ibis.first_event_at_epoch).name('seconds_since_first_event')
    , (result_ibis.hour_end_epoch - result_ibis.last_event_at_epoch).name('seconds_since_last_event')
])

result_ibis = result_ibis.mutate((result_ibis.seconds_since_last_event - result_ibis.seconds_since_first_event).name('seconds_first_to_last'))
result_ibis = result_ibis.order_by(['hour_end_x', 'entity_id_x'])
result_ibis.execute()

  return column.astype(out_dtype.to_pandas(), errors='ignore')


Unnamed: 0,hour_end_x,entity_id_x,entity_id_y,hour_end_y,hour_end_epoch,event_count_hourly,revenue_hourly,event_at_min,event_at_max,event_at_epoch_min,event_at_epoch_max,event_count_total,revenue_total,first_event_at_epoch,last_event_at_epoch,seconds_since_first_event,seconds_since_last_event,seconds_first_to_last
0,2022-01-01 23:00:00+00:00,user_001,user_001,2022-01-01 23:00:00,1641078000.0,3.0,0.0,2022-01-01 22:01:00+00:00,2022-01-01 22:20:00+00:00,1641074000.0,1641076000.0,3,0.0,1641074000.0,1641076000.0,3540.0,2400.0,-1140.0
1,2022-01-01 23:00:00+00:00,user_002,user_002,2022-01-01 23:00:00,1641078000.0,1.0,0.0,2022-01-01 22:17:00+00:00,2022-01-01 22:17:00+00:00,1641075000.0,1641075000.0,17,43.25,1641074000.0,1641105000.0,3540.0,-27060.0,-30600.0
2,2022-01-02 00:00:00+00:00,user_001,user_001,2022-01-02 00:00:00,1641082000.0,5.0,12.5,2022-01-01 23:10:00+00:00,2022-01-01 23:59:00+00:00,1641079000.0,1641082000.0,8,12.5,1641074000.0,1641082000.0,7140.0,60.0,-7080.0
3,2022-01-02 00:00:00+00:00,user_002,,NaT,,,,NaT,NaT,,,17,43.25,1641074000.0,1641105000.0,,,
4,2022-01-02 01:00:00+00:00,user_001,,NaT,,,,NaT,NaT,,,16,43.25,1641074000.0,1641105000.0,,,
5,2022-01-02 01:00:00+00:00,user_002,,NaT,,,,NaT,NaT,,,17,43.25,1641074000.0,1641105000.0,,,
6,2022-01-02 02:00:00+00:00,user_001,,NaT,,,,NaT,NaT,,,16,43.25,1641074000.0,1641105000.0,,,
7,2022-01-02 02:00:00+00:00,user_002,,NaT,,,,NaT,NaT,,,17,43.25,1641074000.0,1641105000.0,,,
8,2022-01-02 03:00:00+00:00,user_001,,NaT,,,,NaT,NaT,,,16,43.25,1641074000.0,1641105000.0,,,
9,2022-01-02 03:00:00+00:00,user_002,,NaT,,,,NaT,NaT,,,17,43.25,1641074000.0,1641105000.0,,,
