<img src="assets/dauphine_master_logo.png" width="480">

Instructor: Vincent Maladière

# Survival Analysis Tutorial Part 2


The goal of this notebook is to extract implicit failure events from a raw activity (or "heart-beat") log using [Ibis](ibis-project.org/) and [DuckDB](https://duckdb.org) or [Polars](https://www.pola.rs/).

It is often the case that the we are dealing with a raw **activity event log** for a pool of members/patients/customers/machines... where the event of interest (e.g. churn, death, hospital transfer, failure) only appears in negative via the lack of activity event for an extended period of time: **activity events are collected somewhat regularly as long as the "failure" event has not occured**.

Our goal is to use a common data-wrangling technique named **sessionization** to infer implicit failure events and measure the duration between the start of activity recording until a failure event (or censoring).

We will also see how censoring naturally occur when we extract time-slices of a sessionized dataset.

Links to the slides:

- https://docs.google.com/presentation/d/1pAFmAFiyTA0_-ZjWG1rImAX8lYJt_UnGgqXD-4H6Aqw/edit?usp=sharing

In [1]:
import ibis

ibis.options.interactive = True
ibis.__version__

'5.1.0'

In [2]:
import duckdb

duckdb.__version__

'0.7.1'

In [3]:
from urllib.request import urlretrieve
from pathlib import Path

data_filepath = Path("wowah_data_raw.parquet")
data_url = (
    "https://storage.googleapis.com/ibis-tutorial-data/wowah_data/"
    "wowah_data_raw.parquet"
)

if not data_filepath.exists():
    print(f"Downloading {data_url}...")
    urlretrieve(data_url, data_filepath)
else:
    print(f"Reusing downloaded {data_filepath}")

Reusing downloaded wowah_data_raw.parquet


In [4]:
conn = ibis.duckdb.connect()  # in-memory DuckDB
event_log = conn.read_parquet(data_filepath)
event_log

In [5]:
event_log.count()

[1;36m10826734[0m

In [6]:
from ibis import deferred as c


entity_window = ibis.cumulative_window(
    group_by=c.char, order_by=c.timestamp
)
threshold = ibis.interval(minutes=30)
deadline_date = c.timestamp.lag().over(entity_window) + threshold

(
    event_log
    .select([c.char, c.timestamp])
    .mutate(deadline_date=deadline_date)
)

In [7]:
(
    event_log
    .select([c.char, c.timestamp])
    .mutate(
        is_new_session=(c.timestamp > deadline_date).fillna(False)
    )
)

In [8]:
(
    event_log
    .select([c.char, c.timestamp])
    .mutate(
        is_new_session=(c.timestamp > deadline_date).fillna(False)
    )
    .mutate(session_id=c.is_new_session.sum().over(entity_window))
)

In [9]:
entity_window = ibis.cumulative_window(
    group_by=c.char, order_by=c.timestamp
)
threshold = ibis.interval(minutes=30)
deadline_date = c.timestamp.lag().over(entity_window) + threshold
is_new_session = (c.timestamp > deadline_date).fillna(False)

sessionized = (
    event_log
    .mutate(is_new_session=is_new_session)
    .mutate(session_id=c.is_new_session.sum().over(entity_window))
    .drop("is_new_session")
)
sessions = (
    sessionized
    .group_by([c.char, c.session_id])
    .order_by(c.timestamp)
    .aggregate(
        session_start_date=c.timestamp.min(),
        session_end_date=c.timestamp.max(),
    )
    .order_by([c.char, c.session_start_date])
)
sessions

In [10]:
# ibis.show_sql(sessions)

In [11]:
def sessionize(table, threshold, entity_col, date_col):
    entity_window = ibis.cumulative_window(
        group_by=entity_col, order_by=date_col
    )
    deadline_date = date_col.lag().over(entity_window) + threshold
    is_new_session = (date_col > deadline_date).fillna(False)

    return (
        table
        .mutate(is_new_session=is_new_session)
        .mutate(session_id=c.is_new_session.sum().over(entity_window))
        .drop("is_new_session")
    )


def extract_sessions(table, entity_col, date_col, session_col):
    return (
        table
        .group_by([entity_col, session_col])
        .aggregate(
            session_start_date=date_col.min(),
            session_end_date=date_col.max(),
        )
        # XXX: we would like to compute session duration here but
        # it seems broken with Ibis + DuckDB at the moment...
        .order_by([entity_col, c.session_start_date])
    )


def preprocess_event_log(event_log):
    return (
        event_log
        .pipe(
            sessionize,
            threshold=ibis.interval(minutes=30),
            entity_col=c.char,
            date_col=c.timestamp,
        )
        .pipe(
            extract_sessions,
            entity_col=c.char,
            date_col=c.timestamp,
            session_col=c.session_id,
        )
    )

In [12]:
%time sessions = preprocess_event_log(event_log).cache()

CPU times: user 13.2 s, sys: 297 ms, total: 13.5 s
Wall time: 2.06 s


In [13]:
%time sessions.count()

CPU times: user 72 µs, sys: 3 µs, total: 75 µs
Wall time: 78 µs


[1;36m1142606[0m

In [14]:
sessions

In [15]:
first_observed_date = event_log.timestamp.max().execute()
first_observed_date

Timestamp('2008-12-31 23:50:18')

In [16]:
last_observed_date = event_log.timestamp.max().execute()
last_observed_date

Timestamp('2008-12-31 23:50:18')

In [17]:
def censor(sessions, censoring_date, threshold=ibis.interval(minutes=30), observation_duration=None):
    if observation_duration is not None:
        sessions = sessions.filter(c.session_start_date > censoring_date - observation_duration)
    return (
        sessions
        .filter(c.session_start_date < censoring_date)
        .mutate(
            is_censored=censoring_date < (c.session_end_date + threshold),
            session_end_date=ibis.ifelse(c.session_end_date < censoring_date, c.session_end_date, censoring_date),
        )
        # remove sessions that are two short
        .filter(c.session_end_date > c.session_start_date + ibis.interval(minutes=1))
        .order_by(c.session_start_date)
    )

censor(sessions, last_observed_date).is_censored.sum()

[1;36m512[0m

In [18]:
from datetime import timedelta

censor(sessions, last_observed_date - timedelta(days=54)).count()

[1;36m751092[0m

In [19]:
censor(sessions, last_observed_date - timedelta(days=54), observation_duration=timedelta(days=5)).to_pandas()

Unnamed: 0,char,session_id,session_start_date,session_end_date,is_censored
0,85273,0,2008-11-02 23:57:39,2008-11-03 00:16:44,False
1,83480,22,2008-11-02 23:58:21,2008-11-03 00:06:50,False
2,20835,135,2008-11-02 23:58:21,2008-11-03 00:06:50,False
3,23806,85,2008-11-02 23:58:26,2008-11-03 00:48:24,False
4,939,435,2008-11-02 23:58:36,2008-11-03 00:17:40,False
...,...,...,...,...,...
9708,68378,386,2008-11-07 23:38:02,2008-11-07 23:50:18,True
9709,65040,130,2008-11-07 23:38:07,2008-11-07 23:50:18,True
9710,57015,335,2008-11-07 23:38:07,2008-11-07 23:50:18,True
9711,76681,140,2008-11-07 23:38:07,2008-11-07 23:50:18,True


In [20]:
# ibis.show_sql(preprocess_event_log(event_log))

In [21]:
import polars as pl


pl.__version__

'0.17.11'

In [22]:
event_log_df = pl.read_parquet(data_filepath)
event_log_df.head(5)

char,level,race,charclass,zone,guild,timestamp
i32,i32,str,str,str,i32,datetime[μs]
59425,1,"""Orc""","""Rogue""","""Orgrimmar""",165,2008-01-01 00:02:04
65494,9,"""Orc""","""Hunter""","""Durotar""",-1,2008-01-01 00:02:04
65325,14,"""Orc""","""Warrior""","""Ghostlands""",-1,2008-01-01 00:02:04
65490,18,"""Orc""","""Hunter""","""Ghostlands""",-1,2008-01-01 00:02:04
2288,60,"""Orc""","""Hunter""","""Hellfire Penin…",-1,2008-01-01 00:02:09


In [23]:
event_log_lazy_df = pl.scan_parquet(data_filepath)
event_log_lazy_df.head(10)

In [24]:
event_log_lazy_df.head(10).collect()

char,level,race,charclass,zone,guild,timestamp
i32,i32,str,str,str,i32,datetime[μs]
59425,1,"""Orc""","""Rogue""","""Orgrimmar""",165,2008-01-01 00:02:04
65494,9,"""Orc""","""Hunter""","""Durotar""",-1,2008-01-01 00:02:04
65325,14,"""Orc""","""Warrior""","""Ghostlands""",-1,2008-01-01 00:02:04
65490,18,"""Orc""","""Hunter""","""Ghostlands""",-1,2008-01-01 00:02:04
2288,60,"""Orc""","""Hunter""","""Hellfire Penin…",-1,2008-01-01 00:02:09
2289,60,"""Orc""","""Hunter""","""Hellfire Penin…",-1,2008-01-01 00:02:09
61239,68,"""Orc""","""Hunter""","""Blade's Edge M…",243,2008-01-01 00:02:14
59772,69,"""Orc""","""Warrior""","""Shadowmoon Val…",35,2008-01-01 00:02:14
22937,69,"""Orc""","""Rogue""","""Warsong Gulch""",243,2008-01-01 00:02:14
23062,69,"""Orc""","""Shaman""","""Shattrath City…",103,2008-01-01 00:02:14


In [25]:
def sessionize_pl(df, entity_col, date_col, threshold_minutes):
    sessionized = (
        df.sort([entity_col, date_col])
        .with_columns(
            [
                (pl.col(date_col).diff().over(entity_col).dt.minutes() > threshold_minutes)
                .fill_null(False)
                .alias("is_new_session"),
            ]
        )
        .with_columns(
            [
                pl.col("is_new_session").cumsum().over(entity_col).alias("session_id"),
            ]
        )
        .drop(["is_new_session"])
    )
    return sessionized


def extract_sessions_pl(
    df,
    entity_col,
    date_col,
    session_col,
    metadata_cols=["race", "zone", "charclass", "guild"]
):
    sessions = (
        df
        .sort(date_col)
        .groupby([entity_col, session_col])
        .agg(
            [pl.col(mc).first().alias(mc) for mc in metadata_cols]
            + [
                pl.col(date_col).min().alias("session_start_date"),
                pl.col(date_col).max().alias("session_end_date"),
            ]
        )
        .with_columns(
            [
                (pl.col("session_end_date") - pl.col("session_start_date")).alias("session_duration"),
            ]
        )
        .sort([entity_col, "session_start_date"])
    )
    return sessions


def preprocess_event_log_pl(df):
    return (
        df
        .pipe(
            sessionize_pl,
            entity_col="char",
            date_col="timestamp",
            threshold_minutes=30,
        )
        .pipe(
            extract_sessions_pl,
            entity_col="char",
            date_col="timestamp",
            session_col="session_id",
        )
    )


%time sessions_collected = preprocess_event_log_pl(event_log_lazy_df).collect()
sessions_collected

CPU times: user 10 s, sys: 1.4 s, total: 11.4 s
Wall time: 2.43 s


char,session_id,race,zone,charclass,guild,session_start_date,session_end_date,session_duration
i32,u32,str,str,str,i32,datetime[μs],datetime[μs],duration[μs]
2,0,"""Orc""","""The Barrens""","""Shaman""",6,2008-12-03 10:41:47,2008-12-03 10:41:47,0µs
7,0,"""Orc""","""Feralas""","""Hunter""",-1,2008-01-15 21:47:09,2008-01-16 00:26:56,2h 39m 47s
7,1,"""Orc""","""Orgrimmar""","""Hunter""",282,2008-01-16 21:57:02,2008-01-17 01:16:49,3h 19m 47s
7,2,"""Orc""","""Stranglethorn …","""Hunter""",282,2008-01-17 18:47:07,2008-01-18 00:07:32,5h 20m 25s
7,3,"""Orc""","""Undercity""","""Hunter""",282,2008-01-18 23:17:13,2008-01-19 01:47:16,2h 30m 3s
7,4,"""Orc""","""Undercity""","""Hunter""",282,2008-01-19 02:37:29,2008-01-19 02:47:13,9m 44s
7,5,"""Orc""","""Western Plague…","""Hunter""",282,2008-01-19 20:36:15,2008-01-19 23:46:28,3h 10m 13s
7,6,"""Orc""","""Ashenvale""","""Hunter""",282,2008-01-20 00:56:02,2008-01-20 04:36:29,3h 40m 27s
7,7,"""Orc""","""Durotar""","""Hunter""",282,2008-01-20 13:26:12,2008-01-20 15:55:57,2h 29m 45s
7,8,"""Orc""","""Hellfire Penin…","""Hunter""",282,2008-01-21 19:36:13,2008-01-22 00:46:11,5h 9m 58s


In [26]:
first_observed_date = event_log_lazy_df.select("timestamp").min().collect().item()
first_observed_date

datetime.datetime(2008, 1, 1, 0, 2, 4)

In [27]:
last_observed_date = event_log_lazy_df.select("timestamp").max().collect().item()
last_observed_date

datetime.datetime(2008, 12, 31, 23, 50, 18)

In [28]:
def censor_pl(sessions, censoring_date, threshold_minutes=30, observation_days=None):
    if observation_days:
        start_date = censoring_date - timedelta(days=observation_days)
        sessions = sessions.filter(pl.col("session_start_date") > start_date)
    return (
        sessions
        .filter(pl.col("session_start_date") < censoring_date)
        .with_columns(
            [
                (((censoring_date - pl.col("session_end_date")).dt.minutes()) < threshold_minutes).alias("is_censored"),
                pl.min(pl.col("session_end_date"), censoring_date).alias("session_end_date"),
            ]
        )
        .with_columns(
            [
                (pl.col("session_end_date") - pl.col("session_start_date")).dt.minutes().alias("duration"),
                (pl.col("is_censored") == False).alias("event"),
            ]
        )
        .filter(pl.col("duration") > 0)
        .sort("session_start_date")
    )


censor_pl(sessions_collected, last_observed_date)

char,session_id,race,zone,charclass,guild,session_start_date,session_end_date,session_duration,is_censored,duration,event
i32,u32,str,str,str,i32,datetime[μs],datetime[μs],duration[μs],bool,i64,bool
59425,0,"""Orc""","""Orgrimmar""","""Rogue""",165,2008-01-01 00:02:04,2008-01-01 00:11:52,9m 48s,false,9,true
65325,0,"""Orc""","""Ghostlands""","""Warrior""",-1,2008-01-01 00:02:04,2008-01-01 00:32:05,30m 1s,false,30,true
65490,0,"""Orc""","""Ghostlands""","""Hunter""",-1,2008-01-01 00:02:04,2008-01-01 03:22:00,3h 19m 56s,false,199,true
65494,0,"""Orc""","""Durotar""","""Hunter""",-1,2008-01-01 00:02:04,2008-01-01 01:32:23,1h 30m 19s,false,90,true
22937,0,"""Orc""","""Warsong Gulch""","""Rogue""",243,2008-01-01 00:02:14,2008-01-01 02:02:34,2h 20s,false,120,true
59772,0,"""Orc""","""Shadowmoon Val…","""Warrior""",35,2008-01-01 00:02:14,2008-01-01 01:12:10,1h 9m 56s,false,69,true
61239,0,"""Orc""","""Blade's Edge M…","""Hunter""",243,2008-01-01 00:02:14,2008-01-01 01:12:10,1h 9m 56s,false,69,true
62,0,"""Orc""","""Shattrath City…","""Warrior""",5,2008-01-01 00:02:19,2008-01-01 01:22:02,1h 19m 43s,false,79,true
582,0,"""Orc""","""Sethekk Halls""","""Warrior""",19,2008-01-01 00:02:19,2008-01-01 00:12:07,9m 48s,false,9,true
1003,0,"""Orc""","""Tirisfal Glade…","""Warrior""",204,2008-01-01 00:02:19,2008-01-01 02:02:39,2h 20s,false,120,true


In [29]:
censor_pl(sessions_collected, last_observed_date).select("is_censored").sum()

is_censored
u32
516


In [30]:
censor_pl(sessions_collected, last_observed_date - timedelta(days=42), observation_days=5)

char,session_id,race,zone,charclass,guild,session_start_date,session_end_date,session_duration,is_censored,duration,event
i32,u32,str,str,str,i32,datetime[μs],datetime[μs],duration[μs],bool,i64,bool
39214,48,"""Orc""","""Scarlet Monast…","""Warlock""",101,2008-11-14 23:54:31,2008-11-15 03:07:51,3h 13m 20s,false,193,true
59310,105,"""Orc""","""Durotar""","""Shaman""",403,2008-11-14 23:54:31,2008-11-15 00:53:13,58m 42s,false,58,true
71401,100,"""Orc""","""Desolace""","""Shaman""",358,2008-11-14 23:54:31,2008-11-15 00:12:50,18m 19s,false,18,true
78818,36,"""Orc""","""Stranglethorn …","""Shaman""",-1,2008-11-14 23:54:31,2008-11-15 06:09:34,6h 15m 3s,false,375,true
85222,37,"""Orc""","""Felwood""","""Hunter""",457,2008-11-14 23:54:31,2008-11-15 04:39:25,4h 44m 54s,false,284,true
2232,77,"""Orc""","""Undercity""","""Hunter""",171,2008-11-14 23:54:36,2008-11-15 00:43:03,48m 27s,false,48,true
27617,47,"""Orc""","""Western Plague…","""Hunter""",364,2008-11-14 23:54:36,2008-11-15 00:24:02,29m 26s,false,29,true
867,19,"""Orc""","""Netherstorm""","""Warlock""",459,2008-11-14 23:54:41,2008-11-15 02:56:47,3h 2m 6s,false,182,true
62363,161,"""Orc""","""Zangarmarsh""","""Warrior""",358,2008-11-14 23:54:41,2008-11-15 00:24:07,29m 26s,false,29,true
71698,122,"""Orc""","""Hellfire Penin…","""Warrior""",189,2008-11-14 23:54:41,2008-11-15 00:53:23,58m 42s,false,58,true


***Wrap-up exercise***

- Select 10 dates randomly from the beginning of January to the end of November (increment the first date with random number of days). For each sample date, define an observation window of 5 days: extract the censored data and concatenate those sessions into a training set;

- Estimate and plot the average survival function using a Kaplan-Meier estimator;

- Reiterate the KM estimation, but stratified on the `race` or the `charclass` features;

- Fit a predictive survival model of your choice with adequate feature engineering on this training set;

- Extract censored data from the last month of the original dataset and use it to measure the performance of your estimator with the metrics of your choice. Compare this to the Kaplan-Meier baseline.

- Inspect which features are the most predictive, one way or another.

In [31]:
# TODO: write your code here