# Data processors user guide

The full text of [Data processors](https://doc.retentioneering.com/release3/doc/user_guides/dataprocessors.html) user guide is available on the retentioneering website.

## Prerequisites

Run this cell to prepare the environment. This step is obligatory.

In [None]:
!pip install retentioneering

## Creating an eventstream

In [1]:
import pandas as pd
from retentioneering import datasets
from retentioneering.eventstream import Eventstream

stream = datasets.load_simple_shop()

## What is a data processor?


## Helpers and chaining usage

In [2]:
res = stream\
  .add_start_end_events()\
  .split_sessions(timeout=(10, 'm'))\
  .to_dataframe()
res[res['user_id'] == 219483890].head(15)

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id,session_id
0,ecec01ad-6f60-46b5-b125-69a7ca27a685,path_start,0,path_start,2019-11-01 17:59:13.273932,219483890,219483890_1
1,2619c384-1a08-460f-9e3b-f52fe46fc183,session_start,0,session_start,2019-11-01 17:59:13.273932,219483890,219483890_1
2,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890,219483890_1
3,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890,219483890_1
4,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890,219483890_1
5,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890,219483890_1
6,86f7a2b0-1475-4cc8-a83d-c2f99121ae17,session_end,3,session_end,2019-11-01 17:59:32.557029,219483890,219483890_1
3392,7498e18f-8762-440d-a5ac-f6165f8e2058,session_start,2096,session_start,2019-12-06 16:22:57.484842,219483890,219483890_2
3393,9735f8a7-baac-4a90-b390-9b378c0ed4e3,raw,2096,main,2019-12-06 16:22:57.484842,219483890,219483890_2
3394,d11c3f44-5472-46ba-83b9-b11d568be624,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890,219483890_2


## Data processors library

### Adding processors

#### AddStartEndEvents

In [3]:
res = stream.add_start_end_events().to_dataframe()
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,ecec01ad-6f60-46b5-b125-69a7ca27a685,path_start,0,path_start,2019-11-01 17:59:13.273932,219483890
1,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
2,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890
3,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890
4,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890
2566,9735f8a7-baac-4a90-b390-9b378c0ed4e3,raw,2096,main,2019-12-06 16:22:57.484842,219483890
2567,d11c3f44-5472-46ba-83b9-b11d568be624,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890
2568,562884ac-4ad4-4773-b22d-852efb4d7961,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890
5427,6a095bf1-f34c-4ba9-b933-9a470e28d4f4,raw,4542,main,2020-01-06 22:10:13.635011,219483890
5428,11705e29-00ce-4da3-9c4d-6bc556a0acad,raw,4543,catalog,2020-01-06 22:10:15.228575,219483890


#### SplitSessions

##### timeout delimiter

In [4]:
res = stream.split_sessions(timeout=(10, 'm')).to_dataframe()
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id,session_id
0,11d2a4e8-a65a-4d8c-b6b1-1ef84207cd64,session_start,0,session_start,2019-11-01 17:59:13.273932,219483890,219483890_1
1,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890,219483890_1
2,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890,219483890_1
3,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890,219483890_1
4,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890,219483890_1
5,e263d4f9-f77f-457d-83a5-9a7636286ba2,session_end,3,session_end,2019-11-01 17:59:32.557029,219483890,219483890_1
2922,ab3529ff-447f-46d9-bc6c-4bd54512f3df,session_start,2096,session_start,2019-12-06 16:22:57.484842,219483890,219483890_2
2923,9735f8a7-baac-4a90-b390-9b378c0ed4e3,raw,2096,main,2019-12-06 16:22:57.484842,219483890,219483890_2
2924,d11c3f44-5472-46ba-83b9-b11d568be624,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890,219483890_2
2925,562884ac-4ad4-4773-b22d-852efb4d7961,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890,219483890_2


##### single delimiting event

In [5]:
df = pd.DataFrame(
    [
        [111, "session_delimiter", "2023-01-01 00:00:00"],
        [111, "A", "2023-01-01 00:00:01"],
        [111, "B", "2023-01-01 00:00:02"],
        [111, "session_delimiter", "2023-01-01 00:00:04"],
        [111, "C", "2023-01-01 00:00:04"],
    ],
    columns=["user_id", "event", "timestamp"]
)
Eventstream(df)\
    .split_sessions(delimiter_events=['session_delimiter'])\
    .to_dataframe()\
    .sort_values(['user_id', 'event_index'])\
    [['user_id', 'event', 'timestamp', 'session_id']]

Unnamed: 0,user_id,event,timestamp,session_id
0,111,session_start,2023-01-01 00:00:00,111_1
1,111,A,2023-01-01 00:00:01,111_1
2,111,B,2023-01-01 00:00:02,111_1
3,111,session_end,2023-01-01 00:00:02,111_1
4,111,session_start,2023-01-01 00:00:04,111_2
5,111,C,2023-01-01 00:00:04,111_2
6,111,session_end,2023-01-01 00:00:04,111_2


##### paired delimiting event

In [6]:
df = pd.DataFrame(
    [
        [111, "custom_start", "2023-01-01 00:00:00"],
        [111, "A", "2023-01-01 00:00:01"],
        [111, "B", "2023-01-01 00:00:02"],
        [111, "custom_end", "2023-01-01 00:00:02"],
        [111, "custom_start", "2023-01-01 00:00:04"],
        [111, "C", "2023-01-01 00:00:04"],
        [111, "custom_end", "2023-01-01 00:00:04"]
    ],
    columns=["user_id", "event", "timestamp"]
)
dummy_stream = Eventstream(df)
dummy_stream.split_sessions(delimiter_events=['custom_start', 'custom_end'])\
    .to_dataframe()\
    .sort_values(['user_id', 'event_index'])\
    [['user_id', 'event', 'timestamp', 'session_id']]

Unnamed: 0,user_id,event,timestamp,session_id
0,111,session_start,2023-01-01 00:00:00,111_1
1,111,A,2023-01-01 00:00:01,111_1
2,111,B,2023-01-01 00:00:02,111_1
3,111,session_end,2023-01-01 00:00:02,111_1
4,111,session_start,2023-01-01 00:00:04,111_2
5,111,C,2023-01-01 00:00:04,111_2
6,111,session_end,2023-01-01 00:00:04,111_2


##### custom session column

In [7]:
df = pd.DataFrame(
    [
        [111, "A", "2023-01-01 00:00:01", "session_1"],
        [111, "B", "2023-01-01 00:00:02", "session_1"],
        [111, "C", "2023-01-01 00:00:03", "session_2"],
        [111, "D", "2023-01-01 00:00:04", "session_2"],
    ],
    columns=["user_id", "event", "timestamp", "custom_ses_id"]
)
raw_data_schema = {"custom_cols": [{"raw_data_col": "custom_ses_id", "custom_col": "custom_ses_id"}]}
dummy_stream = Eventstream(df, raw_data_schema=raw_data_schema)
dummy_stream.split_sessions(delimiter_col="custom_ses_id")\
    .to_dataframe()\
    .sort_values(["user_id", "event_index"])\
    [["user_id", "event", "timestamp", "session_id", "custom_ses_id"]]


Unnamed: 0,user_id,event,timestamp,session_id,custom_ses_id
0,111,session_start,2023-01-01 00:00:01,111_1,session_1
1,111,A,2023-01-01 00:00:01,111_1,session_1
2,111,B,2023-01-01 00:00:02,111_1,session_1
3,111,session_end,2023-01-01 00:00:02,111_1,session_1
4,111,session_start,2023-01-01 00:00:03,111_2,session_2
5,111,C,2023-01-01 00:00:03,111_2,session_2
6,111,D,2023-01-01 00:00:04,111_2,session_2
7,111,session_end,2023-01-01 00:00:04,111_2,session_2


#### LabelNewUsers

In [8]:
new_users = [219483890, 964964743, 965024600]
res = stream.label_new_users(new_users_list=new_users).to_dataframe()
res[res['user_id'] == 219483890].head()

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,ecec01ad-6f60-46b5-b125-69a7ca27a685,new_user,0,new_user,2019-11-01 17:59:13.273932,219483890
1,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
2,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890
3,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890
4,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890


In [9]:
res[res['user_id'] == 501098384].head()

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
16240,9d3dcdb2-473f-42eb-a2a1-7665877c828c,existing_user,14768,existing_user,2020-04-02 05:36:04.896839,501098384
16241,9d3dcdb2-473f-42eb-a2a1-7665877c828c,raw,14768,main,2020-04-02 05:36:04.896839,501098384
16242,72459794-2d7b-465b-af08-30ee37b8f603,raw,14769,catalog,2020-04-02 05:36:05.371141,501098384
16243,93d419c1-9075-4714-bcf3-3e65a04ab84f,raw,14770,main,2020-04-02 05:36:40.814504,501098384
16244,dd5a3672-c675-4c96-b211-d0bdeb2b33aa,raw,14771,catalog,2020-04-02 05:36:41.190946,501098384


#### LabelLostUsers

In [10]:
lost_users_list = [219483890, 964964743, 965024600]
res = stream.label_lost_users(lost_users_list=lost_users_list).to_dataframe()
res[res['user_id'] == 219483890].tail()

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
4880,d3c84c18-0465-4a37-b2f5-ca5529faa0ee,raw,4547,catalog,2020-01-06 22:11:02.899490,219483890
4881,e70ef300-f288-49b9-b079-f4c8f1e1216c,raw,4548,catalog,2020-01-06 22:11:28.271366,219483890
8808,1a349c5e-a8bd-4d99-8b5b-5095a3da809b,raw,8215,main,2020-02-14 21:04:49.450696,219483890
8809,f0983ad4-e199-4ced-bd98-acf161ef60b5,raw,8216,catalog,2020-02-14 21:04:51.717127,219483890
8810,f0983ad4-e199-4ced-bd98-acf161ef60b5,lost_user,8216,lost_user,2020-02-14 21:04:51.717127,219483890


In [11]:
res[res['user_id'] == 501098384].tail()

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
36029,bbd75140-bbfc-4fac-a76e-a24c30bcc2f9,raw,32279,catalog,2020-04-29 12:47:40.975732,501098384
36030,c9749644-decf-424c-b95d-8d1a47b7a0bf,raw,32280,catalog,2020-04-29 12:48:01.809577,501098384
36031,2d436d10-6f88-4e6c-9d03-e9ac9dd3b4db,raw,32281,main,2020-04-29 12:48:01.938488,501098384
36032,5b3b8ee9-226c-4759-8c21-2bf1f52fc2e3,raw,32282,catalog,2020-04-29 12:48:06.595390,501098384
36033,5b3b8ee9-226c-4759-8c21-2bf1f52fc2e3,absent_user,32282,absent_user,2020-04-29 12:48:06.595390,501098384


In [12]:
res = stream.label_lost_users(timeout=(30, 'D')).to_dataframe()

In [13]:
res['timestamp'].max()

Timestamp('2020-04-29 12:48:06.595390')

In [14]:
res[res['user_id'] == 495985018]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
47,2a324422-7e5d-4966-95c3-343145643789,raw,47,catalog,2019-11-02 01:14:08.664850,495985018
48,062a6cb1-c3ab-43d9-be9e-ab5ea865c0e8,raw,48,cart,2019-11-02 01:14:37.435643,495985018
49,062a6cb1-c3ab-43d9-be9e-ab5ea865c0e8,lost_user,48,lost_user,2019-11-02 01:14:37.435643,495985018


In [15]:
res[res['user_id'] == 819489198]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
24644,f1362fa9-b513-4823-86f2-9783ea53b2c3,raw,22394,main,2020-04-15 21:02:36.903678,819489198
24645,32f15a83-5cad-4e7a-90e5-1b1a77c71b63,raw,22395,catalog,2020-04-15 21:02:37.658557,819489198
24646,9964c30b-b4db-4ef8-896f-343da191b5bb,raw,22396,catalog,2020-04-15 21:02:48.699804,819489198
24647,019faafa-ed8b-4f5c-82a5-5af1143d7090,raw,22397,product2,2020-04-15 21:02:51.173118,819489198
24649,b066e86f-6c57-4ff2-b4e7-72b43fc1926c,raw,22399,catalog,2020-04-15 21:03:05.813046,819489198
24651,b96256ac-8cb1-4f1b-a027-85eaacdbdd8c,raw,22401,cart,2020-04-15 21:03:35.216033,819489198
24655,437b69b1-236b-489f-9ea3-f401b1650890,raw,22404,delivery_choice,2020-04-15 21:03:40.745520,819489198
24656,9679e83c-382a-47fc-bb67-3d96b226be53,raw,22405,delivery_pickup,2020-04-15 21:03:46.448349,819489198
24657,3e04a2b5-132a-45d0-b72a-719d047b2a2e,raw,22406,payment_choice,2020-04-15 21:03:46.575300,819489198
24658,dfeb4d3c-1011-4c8c-8879-f847deb08e28,raw,22407,payment_card,2020-04-15 21:03:46.862126,819489198


#### AddPositiveEvents

In [16]:
positive_events = ['cart', 'payment_done']
res = stream.add_positive_events(
    targets=positive_events
    ).to_dataframe()

In [17]:
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,680f3db3-8641-4cc2-bdb4-e559fb40c37b,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
1,9a64cf4a-6770-420c-82ff-1b9bb3e9d57b,raw,1,product1,2019-11-01 17:59:28.459271,219483890
2,09a12a74-32a0-49d9-8a87-5fff7041903f,raw,2,cart,2019-11-01 17:59:29.502214,219483890
3,09a12a74-32a0-49d9-8a87-5fff7041903f,positive_target,2,positive_target_cart,2019-11-01 17:59:29.502214,219483890
4,859420b5-48cd-4f47-ad85-b9faca15def4,raw,3,catalog,2019-11-01 17:59:32.557029,219483890
2244,ff073183-f2ed-48d0-833f-1b064823bc33,raw,2096,main,2019-12-06 16:22:57.484842,219483890
2245,6dd364d0-5cc7-450a-8a13-e8abba6a0ab5,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890
2246,45d691f3-9ea4-414a-8c51-c3b57f2ea328,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890
4820,1a317681-a863-4507-b840-810a7db308d0,raw,4542,main,2020-01-06 22:10:13.635011,219483890
4821,41618dc0-95ad-4c6f-ba1b-2ad577ebff47,raw,4543,catalog,2020-01-06 22:10:15.228575,219483890


In [18]:
res[res['user_id'] == 24427596]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
67,de63a200-508b-4b3b-851e-630fcbecd662,raw,60,main,2019-11-02 07:28:07.285541,24427596
68,a44c6b84-e6b5-410c-9dce-9db051322cf4,raw,61,catalog,2019-11-02 07:28:14.319850,24427596
69,f53be094-fc0f-4227-838d-d044862fb1fd,raw,62,catalog,2019-11-02 07:29:08.301333,24427596
70,07a9f5e4-bc43-4140-a0cb-fcf68b923fbb,raw,63,catalog,2019-11-02 07:29:41.848396,24427596


In [19]:
def custom_func(eventstream, targets) -> pd.DataFrame:

    event_col = eventstream.schema.event_name
    df = eventstream.to_dataframe()

    return df[df[event_col].isin(targets)]

res = stream.add_positive_events(
      targets=positive_events,
      func=custom_func
      ).to_dataframe()

In [20]:
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,39c5cb78-1ead-4f7a-9259-96a7e3748886,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
1,15101e9d-6a36-424c-b73a-419f77b1b3e9,raw,1,product1,2019-11-01 17:59:28.459271,219483890
2,56f93cf1-f4cc-4409-8d1a-ea88b9f959d3,raw,2,cart,2019-11-01 17:59:29.502214,219483890
3,56f93cf1-f4cc-4409-8d1a-ea88b9f959d3,positive_target,2,positive_target_cart,2019-11-01 17:59:29.502214,219483890
4,ec563472-5c99-485a-94fa-cab2763d7bd8,raw,3,catalog,2019-11-01 17:59:32.557029,219483890
2340,4f03a03f-10ed-46ec-b1c5-8951376db4ed,raw,2096,main,2019-12-06 16:22:57.484842,219483890
2341,0fbb9b88-dac5-4a9d-999e-c708380ee6b4,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890
2342,6a901e98-9db2-49e5-9b55-66fe0c800953,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890
5041,21e37e31-b349-4f1b-a447-eec89fd14a4c,raw,4542,main,2020-01-06 22:10:13.635011,219483890
5042,6901a99c-7766-4a1c-bb22-1b12b224f1a3,raw,4543,catalog,2020-01-06 22:10:15.228575,219483890


#### AddNegativeEvents

In [21]:
negative_events = ['delivery_courier']

res = stream.add_negative_events(
          targets=negative_events
          ).to_dataframe()

In [22]:
res[res['user_id'] == 629881394].loc[36:48]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
36,a1cc3d65-12d7-487b-919f-1fa0f47d33d8,raw,36,cart,2019-11-01 22:35:50.437706,629881394
38,a0097cba-bacc-40ee-9158-306c7a1bad4e,raw,38,delivery_choice,2019-11-01 22:35:57.649549,629881394
39,8715bf14-1ddd-4356-9c5d-eacd5f49af31,raw,39,delivery_courier,2019-11-01 22:36:02.009271,629881394
40,8715bf14-1ddd-4356-9c5d-eacd5f49af31,negative_target,39,negative_target_delivery_courier,2019-11-01 22:36:02.009271,629881394
44,bc254a86-4be3-4ffe-a763-3b3233eb133e,raw,42,payment_choice,2019-11-01 22:36:02.243274,629881394
46,e6e489f6-4736-4483-83b2-2fd693df0052,raw,44,payment_cash,2019-11-01 22:36:03.415201,629881394
47,3d30c9e4-9d75-43cf-ad08-392974455f17,raw,45,payment_done,2019-11-01 22:36:03.999697,629881394


#### LabelCroppedPaths

In [23]:
params = {
    'left_cutoff': (4, 'D'),
    'right_cutoff': (3, 'D')
}

res = stream.label_cropped_paths(**params).to_dataframe()

In [24]:
print('Eventstream start: {}'.format(res.timestamp.min()))
print('Eventstream end: {}'.format(res.timestamp.max()))

Eventstream start: 2019-11-01 17:59:13.273932
Eventstream end: 2020-04-29 12:48:06.595390


In [25]:
res[res['user_id'] == 495985018]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
47,2a324422-7e5d-4966-95c3-343145643789,cropped_left,47,cropped_left,2019-11-02 01:14:08.664850,495985018
48,2a324422-7e5d-4966-95c3-343145643789,raw,47,catalog,2019-11-02 01:14:08.664850,495985018
49,062a6cb1-c3ab-43d9-be9e-ab5ea865c0e8,raw,48,cart,2019-11-02 01:14:37.435643,495985018


In [26]:
res[res['user_id'] == 831491833]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
32533,f7ab012f-239b-48be-bb14-ffd2c17f30ca,raw,32258,catalog,2020-04-29 12:24:21.538805,831491833
32534,f133a5e6-66fa-4dcc-8b22-07f16db424e1,raw,32259,catalog,2020-04-29 12:24:33.841264,831491833
32535,b30b756c-842b-4713-a020-353269638c66,raw,32260,product2,2020-04-29 12:24:39.415424,831491833
32536,9b61306b-dfd4-48ff-bb7f-417bc901eaae,raw,32261,cart,2020-04-29 12:24:59.928499,831491833
32537,e5e2e96b-9def-49c2-a118-68c7cb2d4176,raw,32262,catalog,2020-04-29 12:25:06.262205,831491833
32538,e5e2e96b-9def-49c2-a118-68c7cb2d4176,cropped_right,32262,cropped_right,2020-04-29 12:25:06.262205,831491833


### Removing processors

#### FilterEvents

In [27]:
def save_specific_users(df, schema):
    users_to_save = [219483890, 964964743, 965024600]
    return df[schema.user_id].isin(users_to_save)

res = stream.filter_events(func=save_specific_users).to_dataframe()

In [28]:
res['user_id'].unique().astype(int)

array([219483890, 964964743, 965024600])

In [29]:
stream.to_dataframe()\
    ['event']\
    .value_counts()\
    [lambda s: s.index.isin(['catalog', 'main'])]

catalog    14518
main        5635
Name: event, dtype: int64

In [30]:
def exclude_events(df, schema):
    events_to_exclude = ['catalog', 'main']
    return ~df[schema.event_name].isin(events_to_exclude)

res = stream.filter_events(func=exclude_events).to_dataframe()

In [31]:
res['event']\
    .value_counts()\
    [lambda s: s.index.isin(['catalog', 'main'])]

Series([], Name: event, dtype: int64)

#### DropPaths

In [32]:
res = stream.drop_paths(min_steps=25).to_dataframe()

In [33]:
res[res['user_id'] == 629881394]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,e29fe364-56cd-43f4-a566-ca31e48abf0f,raw,7,main,2019-11-01 22:28:54.791683,629881394
1,1a1e9640-6bdd-477b-af37-b8dae119754a,raw,9,catalog,2019-11-01 22:29:01.049513,629881394
2,9b2faeaa-2b64-45ad-bed7-81238a85bb6d,raw,11,catalog,2019-11-01 22:29:32.322458,629881394
3,ac28a75d-48df-43b7-b7fd-e5109a7eb4d5,raw,13,catalog,2019-11-01 22:30:09.450839,629881394
4,778b1d6e-98c2-4fd4-aca5-eb688600c800,raw,14,catalog,2019-11-01 22:31:05.565762,629881394
5,60227d0e-3d93-4ada-9acd-473dede4a6be,raw,15,main,2019-11-01 22:31:08.333560,629881394
6,1d2cc619-6d03-442f-b686-1316e490af3c,raw,16,catalog,2019-11-01 22:31:09.010626,629881394
7,c6278b8b-1266-4c95-aeaf-886ffeb13625,raw,17,product1,2019-11-01 22:31:10.416231,629881394
8,3e136eff-7f2f-449b-885b-c1d80aa4acee,raw,18,catalog,2019-11-01 22:31:43.019527,629881394
9,e980bbd4-0457-462b-83a9-0057adbb1b8c,raw,19,catalog,2019-11-01 22:32:01.596163,629881394


In [34]:
res = stream.drop_paths(min_time=(1, 'M')).to_dataframe()

In [35]:
res[res['user_id'] == 964964743]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
4,63cfb2da-6a02-4046-954b-f3b3f8a8b829,raw,4,catalog,2019-11-01 21:38:19.283663,964964743
5,303b81dc-4bed-4586-93bb-318828dc048c,raw,5,cart,2019-11-01 21:38:36.761221,964964743
6,8a8305ae-6c9f-48db-b782-d0c58bf88291,raw,6,delivery_choice,2019-11-01 21:38:37.564693,964964743
1101,f35dfc72-f933-491b-84b2-5f0fbeea6c36,raw,2275,main,2019-12-09 01:42:22.801831,964964743
1102,442d0c8c-7c55-45e7-b1b2-2424dc8d784c,raw,2276,catalog,2019-12-09 01:42:23.617764,964964743
1103,025d2b66-381c-4b4d-98d5-573fbfe12cac,raw,2277,product2,2019-12-09 01:42:56.877340,964964743
1104,803089ca-c038-4fdd-9eda-ea7d8f6b1d4a,raw,2278,catalog,2019-12-09 01:43:05.436223,964964743
1105,2a9388e0-aef5-4f78-a001-5c40136900a0,raw,2279,catalog,2019-12-09 01:43:36.923178,964964743
1106,7529c9b6-5040-4c06-97ff-1f53d9824e2c,raw,2280,product2,2019-12-09 01:43:41.174195,964964743
1107,9a17e336-e3a6-442a-b13a-796da48ca7fe,raw,2281,cart,2019-12-09 01:43:57.325569,964964743


#### TruncatePaths

In [36]:
res = stream.truncate_paths(
        drop_before='cart',
        shift_before=-2
        ).to_dataframe()

In [37]:
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
1,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890
2,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890
3,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890
1635,9735f8a7-baac-4a90-b390-9b378c0ed4e3,raw,2096,main,2019-12-06 16:22:57.484842,219483890
1636,d11c3f44-5472-46ba-83b9-b11d568be624,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890
1637,562884ac-4ad4-4773-b22d-852efb4d7961,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890
3553,6a095bf1-f34c-4ba9-b933-9a470e28d4f4,raw,4542,main,2020-01-06 22:10:13.635011,219483890
3554,11705e29-00ce-4da3-9c4d-6bc556a0acad,raw,4543,catalog,2020-01-06 22:10:15.228575,219483890
3555,3648646a-8797-4563-9d2f-6ff2dc8290fb,raw,4544,cart,2020-01-06 22:10:42.309028,219483890


In [38]:
res[res['user_id'] == 24427596]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
32,1656c2bc-d027-4e9e-9da1-a097cab08200,raw,60,main,2019-11-02 07:28:07.285541,24427596
33,f0252960-28da-488b-a9c6-49b0adc135e7,raw,61,catalog,2019-11-02 07:28:14.319850,24427596
34,fee2af15-23b8-4274-951c-3f75447a50c3,raw,62,catalog,2019-11-02 07:29:08.301333,24427596
35,499ca4b5-f897-4043-a9bb-676a633e066b,raw,63,catalog,2019-11-02 07:29:41.848396,24427596


In [39]:
res = stream.truncate_paths(
          drop_after='cart',
          occurrence_after="last"
          ).to_dataframe()

In [40]:
res[res['user_id'] == 219483890]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
0,ecec01ad-6f60-46b5-b125-69a7ca27a685,raw,0,catalog,2019-11-01 17:59:13.273932,219483890
1,9ff63036-fac6-435a-85f4-b0d5a54c557f,raw,1,product1,2019-11-01 17:59:28.459271,219483890
2,a3b37935-dd30-4edb-9328-12e6f87ac8bc,raw,2,cart,2019-11-01 17:59:29.502214,219483890
3,ce686a0c-a329-4159-8e1e-5ed6e1f1f0d9,raw,3,catalog,2019-11-01 17:59:32.557029,219483890
1773,9735f8a7-baac-4a90-b390-9b378c0ed4e3,raw,2096,main,2019-12-06 16:22:57.484842,219483890
1774,d11c3f44-5472-46ba-83b9-b11d568be624,raw,2097,catalog,2019-12-06 16:23:01.331109,219483890
1775,562884ac-4ad4-4773-b22d-852efb4d7961,raw,2098,catalog,2019-12-06 16:23:48.116617,219483890
3862,6a095bf1-f34c-4ba9-b933-9a470e28d4f4,raw,4542,main,2020-01-06 22:10:13.635011,219483890
3863,11705e29-00ce-4da3-9c4d-6bc556a0acad,raw,4543,catalog,2020-01-06 22:10:15.228575,219483890
3864,3648646a-8797-4563-9d2f-6ff2dc8290fb,raw,4544,cart,2020-01-06 22:10:42.309028,219483890


### Editing processors

#### GroupEvents

With ``GroupEvents``, we can group events based on the event name. Suppose
we need to assign a common name ``product`` to events ``product1`` and
``product2``:

In [41]:
def group_events(df, schema):
    events_to_group = ['product1', 'product2']
    return df[schema.event_name].isin(events_to_group)

params = {
    'event_name': 'product',
    'func': group_events
}

res = stream.group_events(**params).to_dataframe()

As we can see, user ``456870964`` now has two ``product`` events
(``event_index=160, 164``) with ``event_type=‘group_alias’``).

In [42]:
res[res['user_id'] == 456870964]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
129,d1b41a49-5c44-4d43-a31a-d0e25728728f,raw,129,catalog,2019-11-03 11:46:55.411714,456870964
130,dc441606-69cd-4e37-8094-200c05ac14a6,raw,130,catalog,2019-11-03 11:47:46.131302,456870964
131,923d1811-49b7-486b-87fd-f5a918e118bd,raw,131,catalog,2019-11-03 11:47:58.401143,456870964
132,f72ded4c-18c4-4bdb-9873-88e88a3f27b1,group_alias,132,product,2019-11-03 11:48:43.243587,456870964
133,5d2933e7-ade5-4e19-bc90-097e7f959856,raw,133,cart,2019-11-03 11:49:17.050519,456870964
134,aea2427f-ce2e-4046-8627-3705cfbc0d6b,raw,134,catalog,2019-11-03 11:49:17.516398,456870964
135,6c75865f-4a7c-4079-819e-829ec93e364b,group_alias,135,product,2019-11-03 11:49:28.927721,456870964
136,c51d9a7c-7e1c-4f30-bbd7-d99c9b4d581c,raw,136,catalog,2019-11-03 11:49:30.788195,456870964


Previously, both events were named
``product1`` and ``product2`` and had ``raw`` event types:

In [43]:
stream.to_dataframe().query('user_id == 456870964')

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
129,d1b41a49-5c44-4d43-a31a-d0e25728728f,raw,129,catalog,2019-11-03 11:46:55.411714,456870964
130,dc441606-69cd-4e37-8094-200c05ac14a6,raw,130,catalog,2019-11-03 11:47:46.131302,456870964
131,923d1811-49b7-486b-87fd-f5a918e118bd,raw,131,catalog,2019-11-03 11:47:58.401143,456870964
132,f72ded4c-18c4-4bdb-9873-88e88a3f27b1,raw,132,product1,2019-11-03 11:48:43.243587,456870964
133,5d2933e7-ade5-4e19-bc90-097e7f959856,raw,133,cart,2019-11-03 11:49:17.050519,456870964
134,aea2427f-ce2e-4046-8627-3705cfbc0d6b,raw,134,catalog,2019-11-03 11:49:17.516398,456870964
135,6c75865f-4a7c-4079-819e-829ec93e364b,raw,135,product2,2019-11-03 11:49:28.927721,456870964
136,c51d9a7c-7e1c-4f30-bbd7-d99c9b4d581c,raw,136,catalog,2019-11-03 11:49:30.788195,456870964


#### CollapseLoops

In [44]:
res = stream.collapse_loops(suffix='loop', time_agg='max').to_dataframe()

In [45]:
stream.to_dataframe().query('user_id == 2112338')

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
3327,a52bf0f9-7e72-4bb6-b983-39e5dfb45f04,raw,3327,main,2019-12-24 12:58:04.891249,2112338
3328,35ab1e14-c157-4a89-8a77-082849779902,raw,3328,catalog,2019-12-24 12:58:08.096923,2112338
3329,6ad16457-0d77-4799-a41a-b1f13d2ea2b2,raw,3329,catalog,2019-12-24 12:58:16.429552,2112338
3330,e4d56465-89c5-4baa-a5e5-949311f3b5da,raw,3330,catalog,2019-12-24 12:58:44.965104,2112338
3331,08d2acdf-bda7-443e-90b3-893c77d09f3c,raw,3331,main,2019-12-24 12:58:52.984853,2112338


In [46]:
res[res['user_id'] == 2112338]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
2626,a52bf0f9-7e72-4bb6-b983-39e5dfb45f04,raw,3327,main,2019-12-24 12:58:04.891249,2112338
2627,ea1ae9c2-56eb-40f2-93c0-56c3c7221b8f,group_alias,3330,catalog_loop,2019-12-24 12:58:44.965104,2112338
2628,08d2acdf-bda7-443e-90b3-893c77d09f3c,raw,3331,main,2019-12-24 12:58:52.984853,2112338


In [47]:
params = {
    'suffix': 'count',
    'time_agg': 'mean'
}

res = stream.collapse_loops(**params).to_dataframe()
res[res['user_id'] == 2112338]

Unnamed: 0,event_id,event_type,event_index,event,timestamp,user_id
2636,a52bf0f9-7e72-4bb6-b983-39e5dfb45f04,raw,3327,main,2019-12-24 12:58:04.891249000,2112338
2637,6b3a8abc-fc3d-496c-8a1e-7b2c8fafc966,group_alias,3329,catalog_loop_3,2019-12-24 12:58:23.163859712,2112338
2638,08d2acdf-bda7-443e-90b3-893c77d09f3c,raw,3331,main,2019-12-24 12:58:52.984853000,2112338
