/
main.py
101 lines (89 loc) · 3.97 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""This module provides statistics for a Survival Analysis."""
import logging
from typing import List
import pandas as pd
import numpy as np
from lifelines import KaplanMeierFitter, NelsonAalenFitter
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.shared import utils
logger = logging.getLogger(__name__)
class SurvivalTask(AnalyticTask):
"""Survival Analysis Task implementing AnalyticTask.
This class is a submittable celery task."""
name = 'survival-analysis'
def main(self, durations: List[pd.DataFrame],
categories: List[pd.DataFrame],
event_observed: List[pd.DataFrame],
estimator: str,
id_filter: List[str],
subsets: List[List[str]]) -> dict:
# TODO: Docstring
if len(durations) != 1:
error = 'Analysis requires exactly one array that specifies the ' \
'duration length.'
logger.exception(error)
raise ValueError(error)
if len(event_observed) > 1:
error = 'Maximal one variable for "event_observed" allowed'
logger.exception(error)
raise ValueError(error)
df = durations[0]
df.dropna(inplace=True)
df = utils.apply_id_filter(df=df, id_filter=id_filter)
df = utils.apply_subsets(df=df, subsets=subsets)
df = utils.apply_categories(df=df, categories=categories)
stats = {}
categories = df['category'].unique().tolist()
subsets = df['subset'].unique().tolist()
# for every category and subset combination estimate the survival fun.
for category in categories:
for subset in subsets:
sub_df = df[(df['category'] == category) &
(df['subset'] == subset)]
T = sub_df['value']
E = None # default is nothing is censored
if len(T) <= 3:
continue
if event_observed:
# find observation boolean value for every duration
E = event_observed[0].merge(sub_df, how='right', on='id')
E = [not x for x in pd.isnull(E['value_x'])]
assert len(E) == len(T)
if estimator == 'NelsonAalen':
fitter = NelsonAalenFitter()
fitter.fit(durations=T, event_observed=E)
estimate = fitter.cumulative_hazard_[
'NA_estimate'].tolist()
ci_lower = fitter.confidence_interval_[
'NA_estimate_lower_0.95'].tolist()
ci_upper = fitter.confidence_interval_[
'NA_estimate_upper_0.95'].tolist()
elif estimator == 'KaplanMeier':
fitter = KaplanMeierFitter()
fitter.fit(durations=T, event_observed=E)
# noinspection PyUnresolvedReferences
estimate = fitter.survival_function_[
'KM_estimate'].tolist()
ci_lower = fitter.confidence_interval_[
'KM_estimate_lower_0.95'].tolist()
ci_upper = fitter.confidence_interval_[
'KM_estimate_upper_0.95'].tolist()
else:
error = 'Unknown estimator: {}'.format(estimator)
logger.exception(error)
raise ValueError(error)
timeline = fitter.timeline.tolist()
if not stats.get(category):
stats[category] = {}
stats[category][subset] = {
'timeline': timeline,
'estimate': estimate,
'ci_lower': ci_lower,
'ci_upper': ci_upper
}
return {
'label': df['feature'].tolist()[0],
'categories': categories,
'subsets': subsets,
'stats': stats
}